""" FastAPI Client Example for Agency Swarm v1.x This client demonstrates how to interact with the FastAPI server, including both regular and streaming responses, cancellation, and how to properly handle the agent/callerAgent fields. To run: 1. Start the server: python server.py 2. Run this client: python client.py """ import json import threading import time from typing import Literal import requests # Set to False to print raw SSE stream PARSE_STREAM = True def test_regular_endpoint(): """Test the regular (non-streaming) endpoint.""" print("\n" + "=" * 60) print("Testing Regular Endpoint: /my-agency/get_response") print("=" * 60) url = "http://localhost:8080/my-agency/get_response" # Initial request chat_history = [] payload = { "message": "Hi, I'm John, can you ask the second agent to call ExampleTool?", "chat_history": chat_history, } print(f"\n๐Ÿ“ค Request: {payload['message']}") response = requests.post(url, json=payload) if response.status_code == 200: data = response.json() print(f"โœ… Response: {data.get('response', 'No response')}") # Show new messages with agent metadata new_messages = data.get("new_messages", []) print(f"\nNew messages added ({len(new_messages)} total):") for i, msg in enumerate(new_messages, 1): print(f"\n Message {i}:") print(f" Agent: {msg.get('agent', 'N/A')}") print(f" CallerAgent: {msg.get('callerAgent', 'N/A')}") print(f" Type: {msg.get('type', msg.get('role', 'unknown'))}") if "content" in msg: content = msg["content"] if isinstance(content, list) and content: text = content[0].get("text", "") if isinstance(content[0], dict) else str(content[0]) else: text = str(content) print(f" Content: {text[:100]}...") elif "name" in msg: print(f" Tool: {msg['name']}") if "arguments" in msg: print(f" Arguments: {msg['arguments'][:100]}...") # Update chat history for next request chat_history.extend(new_messages) else: print(f"โŒ Error: {response.status_code}") print(response.text) def cancel_stream(run_id: str, cancel_mode: str | None = None): """Cancel an active streaming run.""" cancel_url = "http://localhost:8080/my-agency/cancel_response_stream" print(f"\n๐Ÿ›‘ Cancelling run: {run_id} (mode={cancel_mode or 'immediate'})") try: payload = {"run_id": run_id} if cancel_mode is not None: payload["cancel_mode"] = cancel_mode response = requests.post(cancel_url, json=payload) if response.status_code == 200: data = response.json() print(f"โœ… Cancel response: ok={data.get('ok')}, cancelled={data.get('cancelled')}") new_messages = data.get("new_messages", []) print(f" Messages before cancel: {new_messages}") return data else: print(f"โŒ Cancel failed: {response.status_code}") print(response.text) except Exception as e: print(f"โŒ Cancel error: {e}") return None # Shared state for streaming thread streaming_state = { "run_id": None, "completed": False, "cancelled": False, "accumulated_text": "", } def test_streaming_endpoint(message: str): """Test the streaming SSE endpoint.""" print("\n" + "=" * 60) print("Testing Streaming Endpoint: /my-agency/get_response_stream") print("=" * 60) url = "http://localhost:8080/my-agency/get_response_stream" payload = { "message": message, "chat_history": [], } print(f"\n๐Ÿ“ค Request: {payload['message']}") print("\nStreaming events:") response = requests.post(url, json=payload, stream=True) if response.status_code == 200: print("Streaming response:") accumulated_text = "" add_newline = False for line in response.iter_lines(): if line: line_str = line.decode("utf-8") if not PARSE_STREAM: print(line_str) else: if line_str.startswith("event: meta"): continue if line_str.startswith("data: "): data_str = line_str[6:] if data_str == "[DONE]": print("\n\nโœ… Stream complete") break try: data = json.loads(data_str) if "run_id" in data: streaming_state["run_id"] = data["run_id"] continue if "new_messages" in data: print(f"\n๐Ÿ“จ Final messages: {len(data.get('new_messages', []))} messages") continue if "data" in data and isinstance(data["data"], dict): nested_data = data["data"] if "data" in nested_data and isinstance(nested_data["data"], dict): inner_data = nested_data["data"] if "type" in inner_data and ".done" in inner_data["type"]: add_newline = True elif "delta" in inner_data: delta_text = inner_data["delta"] if isinstance(delta_text, str): if add_newline: print("\n") print(delta_text, end="", flush=True) accumulated_text += delta_text add_newline = False except json.JSONDecodeError: pass print(f"\nSummary: Received {len(accumulated_text)} characters") else: print(f"โŒ Error: {response.status_code}") print(response.text) def test_cancel_endpoint(cancel_mode: Literal["immediate", "after_turn"] | None = None): """Test cancelling a specific run_id (prompting the user when not provided).""" print("\n" + "=" * 60) print("Testing Cancel Endpoint") print("=" * 60) stream_thread = None print("โš™๏ธ Starting a streaming run to capture run_id automatically...") streaming_state["run_id"] = None streaming_state["completed"] = False streaming_state["cancelled"] = False streaming_state["accumulated_text"] = "" stream_thread = threading.Thread(target=test_streaming_endpoint, args=("Write a 500 word poem.",)) stream_thread.start() print("โณ Waiting for run_id...") timeout = 10 elapsed = 0.0 while streaming_state["run_id"] is None and elapsed < timeout: time.sleep(0.1) elapsed += 0.1 run_id = streaming_state["run_id"] if run_id is None: print("โŒ Timeout waiting for run_id; cannot demonstrate cancel endpoint.") stream_thread.join(timeout=5) return print(f"โœ… Captured run_id: {run_id}") cancel_mode = cancel_mode or "immediate" cancel_url = "http://localhost:8080/my-agency/cancel_response_stream" payload = {"run_id": run_id} if cancel_mode is not None: payload["cancel_mode"] = cancel_mode # Delay to wait for delta events to start coming in time.sleep(3) print(f"\n๐Ÿ“ค Attempting to cancel run {run_id} (mode={cancel_mode or 'immediate'})") response = requests.post(cancel_url, json=payload) if response.status_code == 404: print(f"โœ… Correctly returned 404: {response.json()}") elif response.status_code == 200: payload = response.json() print("โœ… Cancelled run; response payload:") print(json.dumps(payload, indent=2)) else: print(f"โŒ Unexpected status: {response.status_code}") print(response.text) if stream_thread is not None: stream_thread.join(timeout=5) print("๐Ÿ›‘ Streaming helper thread stopped.") def test_metadata_endpoint(): """Test the metadata endpoint.""" print("\n" + "=" * 60) print("Testing Metadata Endpoint: /my-agency/get_metadata") print("=" * 60) url = "http://localhost:8080/my-agency/get_metadata" response = requests.get(url) if response.status_code == 200: metadata = response.json() print("\nAgency Structure:") print(json.dumps(metadata, indent=2)) else: print(f"โŒ Error: {response.status_code}") print(response.text) def main(): """Run all tests.""" print("๐Ÿงช Agency Swarm FastAPI Client Test") print("=" * 60) print("Make sure the server is running on http://localhost:8080") print("=" * 60) # Wait a moment for server to be ready time.sleep(1) try: # Test all endpoints # test_regular_endpoint() # test_streaming_endpoint(message="Hi, I'm John, can you ask the second agent to call ExampleTool?") # Change mode to "after_turn" to see alternative cancellation behavior test_cancel_endpoint(cancel_mode="immediate") # test_metadata_endpoint() print("\nDemo completed!") except requests.exceptions.ConnectionError: print("\nโŒ Could not connect to server. Make sure it's running:") print(" python server.py") except Exception as e: print(f"\nโŒ Error: {e}") if __name__ == "__main__": main()