client.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. """
  2. FastAPI Client Example for Agency Swarm v1.x
  3. This client demonstrates how to interact with the FastAPI server,
  4. including both regular and streaming responses, cancellation, and how to properly
  5. handle the agent/callerAgent fields.
  6. To run:
  7. 1. Start the server: python server.py
  8. 2. Run this client: python client.py
  9. """
  10. import json
  11. import threading
  12. import time
  13. from typing import Literal
  14. import requests
  15. # Set to False to print raw SSE stream
  16. PARSE_STREAM = True
  17. def test_regular_endpoint():
  18. """Test the regular (non-streaming) endpoint."""
  19. print("\n" + "=" * 60)
  20. print("Testing Regular Endpoint: /my-agency/get_response")
  21. print("=" * 60)
  22. url = "http://localhost:8080/my-agency/get_response"
  23. # Initial request
  24. chat_history = []
  25. payload = {
  26. "message": "Hi, I'm John, can you ask the second agent to call ExampleTool?",
  27. "chat_history": chat_history,
  28. }
  29. print(f"\n📤 Request: {payload['message']}")
  30. response = requests.post(url, json=payload)
  31. if response.status_code == 200:
  32. data = response.json()
  33. print(f"✅ Response: {data.get('response', 'No response')}")
  34. # Show new messages with agent metadata
  35. new_messages = data.get("new_messages", [])
  36. print(f"\nNew messages added ({len(new_messages)} total):")
  37. for i, msg in enumerate(new_messages, 1):
  38. print(f"\n Message {i}:")
  39. print(f" Agent: {msg.get('agent', 'N/A')}")
  40. print(f" CallerAgent: {msg.get('callerAgent', 'N/A')}")
  41. print(f" Type: {msg.get('type', msg.get('role', 'unknown'))}")
  42. if "content" in msg:
  43. content = msg["content"]
  44. if isinstance(content, list) and content:
  45. text = content[0].get("text", "") if isinstance(content[0], dict) else str(content[0])
  46. else:
  47. text = str(content)
  48. print(f" Content: {text[:100]}...")
  49. elif "name" in msg:
  50. print(f" Tool: {msg['name']}")
  51. if "arguments" in msg:
  52. print(f" Arguments: {msg['arguments'][:100]}...")
  53. # Update chat history for next request
  54. chat_history.extend(new_messages)
  55. else:
  56. print(f"❌ Error: {response.status_code}")
  57. print(response.text)
  58. def cancel_stream(run_id: str, cancel_mode: str | None = None):
  59. """Cancel an active streaming run."""
  60. cancel_url = "http://localhost:8080/my-agency/cancel_response_stream"
  61. print(f"\n🛑 Cancelling run: {run_id} (mode={cancel_mode or 'immediate'})")
  62. try:
  63. payload = {"run_id": run_id}
  64. if cancel_mode is not None:
  65. payload["cancel_mode"] = cancel_mode
  66. response = requests.post(cancel_url, json=payload)
  67. if response.status_code == 200:
  68. data = response.json()
  69. print(f"✅ Cancel response: ok={data.get('ok')}, cancelled={data.get('cancelled')}")
  70. new_messages = data.get("new_messages", [])
  71. print(f" Messages before cancel: {new_messages}")
  72. return data
  73. else:
  74. print(f"❌ Cancel failed: {response.status_code}")
  75. print(response.text)
  76. except Exception as e:
  77. print(f"❌ Cancel error: {e}")
  78. return None
  79. # Shared state for streaming thread
  80. streaming_state = {
  81. "run_id": None,
  82. "completed": False,
  83. "cancelled": False,
  84. "accumulated_text": "",
  85. }
  86. def test_streaming_endpoint(message: str):
  87. """Test the streaming SSE endpoint."""
  88. print("\n" + "=" * 60)
  89. print("Testing Streaming Endpoint: /my-agency/get_response_stream")
  90. print("=" * 60)
  91. url = "http://localhost:8080/my-agency/get_response_stream"
  92. payload = {
  93. "message": message,
  94. "chat_history": [],
  95. }
  96. print(f"\n📤 Request: {payload['message']}")
  97. print("\nStreaming events:")
  98. response = requests.post(url, json=payload, stream=True)
  99. if response.status_code == 200:
  100. print("Streaming response:")
  101. accumulated_text = ""
  102. add_newline = False
  103. for line in response.iter_lines():
  104. if line:
  105. line_str = line.decode("utf-8")
  106. if not PARSE_STREAM:
  107. print(line_str)
  108. else:
  109. if line_str.startswith("event: meta"):
  110. continue
  111. if line_str.startswith("data: "):
  112. data_str = line_str[6:]
  113. if data_str == "[DONE]":
  114. print("\n\n✅ Stream complete")
  115. break
  116. try:
  117. data = json.loads(data_str)
  118. if "run_id" in data:
  119. streaming_state["run_id"] = data["run_id"]
  120. continue
  121. if "new_messages" in data:
  122. print(f"\n📨 Final messages: {len(data.get('new_messages', []))} messages")
  123. continue
  124. if "data" in data and isinstance(data["data"], dict):
  125. nested_data = data["data"]
  126. if "data" in nested_data and isinstance(nested_data["data"], dict):
  127. inner_data = nested_data["data"]
  128. if "type" in inner_data and ".done" in inner_data["type"]:
  129. add_newline = True
  130. elif "delta" in inner_data:
  131. delta_text = inner_data["delta"]
  132. if isinstance(delta_text, str):
  133. if add_newline:
  134. print("\n")
  135. print(delta_text, end="", flush=True)
  136. accumulated_text += delta_text
  137. add_newline = False
  138. except json.JSONDecodeError:
  139. pass
  140. print(f"\nSummary: Received {len(accumulated_text)} characters")
  141. else:
  142. print(f"❌ Error: {response.status_code}")
  143. print(response.text)
  144. def test_cancel_endpoint(cancel_mode: Literal["immediate", "after_turn"] | None = None):
  145. """Test cancelling a specific run_id (prompting the user when not provided)."""
  146. print("\n" + "=" * 60)
  147. print("Testing Cancel Endpoint")
  148. print("=" * 60)
  149. stream_thread = None
  150. print("⚙️ Starting a streaming run to capture run_id automatically...")
  151. streaming_state["run_id"] = None
  152. streaming_state["completed"] = False
  153. streaming_state["cancelled"] = False
  154. streaming_state["accumulated_text"] = ""
  155. stream_thread = threading.Thread(target=test_streaming_endpoint, args=("Write a 500 word poem.",))
  156. stream_thread.start()
  157. print("⏳ Waiting for run_id...")
  158. timeout = 10
  159. elapsed = 0.0
  160. while streaming_state["run_id"] is None and elapsed < timeout:
  161. time.sleep(0.1)
  162. elapsed += 0.1
  163. run_id = streaming_state["run_id"]
  164. if run_id is None:
  165. print("❌ Timeout waiting for run_id; cannot demonstrate cancel endpoint.")
  166. stream_thread.join(timeout=5)
  167. return
  168. print(f"✅ Captured run_id: {run_id}")
  169. cancel_mode = cancel_mode or "immediate"
  170. cancel_url = "http://localhost:8080/my-agency/cancel_response_stream"
  171. payload = {"run_id": run_id}
  172. if cancel_mode is not None:
  173. payload["cancel_mode"] = cancel_mode
  174. # Delay to wait for delta events to start coming in
  175. time.sleep(3)
  176. print(f"\n📤 Attempting to cancel run {run_id} (mode={cancel_mode or 'immediate'})")
  177. response = requests.post(cancel_url, json=payload)
  178. if response.status_code == 404:
  179. print(f"✅ Correctly returned 404: {response.json()}")
  180. elif response.status_code == 200:
  181. payload = response.json()
  182. print("✅ Cancelled run; response payload:")
  183. print(json.dumps(payload, indent=2))
  184. else:
  185. print(f"❌ Unexpected status: {response.status_code}")
  186. print(response.text)
  187. if stream_thread is not None:
  188. stream_thread.join(timeout=5)
  189. print("🛑 Streaming helper thread stopped.")
  190. def test_metadata_endpoint():
  191. """Test the metadata endpoint."""
  192. print("\n" + "=" * 60)
  193. print("Testing Metadata Endpoint: /my-agency/get_metadata")
  194. print("=" * 60)
  195. url = "http://localhost:8080/my-agency/get_metadata"
  196. response = requests.get(url)
  197. if response.status_code == 200:
  198. metadata = response.json()
  199. print("\nAgency Structure:")
  200. print(json.dumps(metadata, indent=2))
  201. else:
  202. print(f"❌ Error: {response.status_code}")
  203. print(response.text)
  204. def main():
  205. """Run all tests."""
  206. print("🧪 Agency Swarm FastAPI Client Test")
  207. print("=" * 60)
  208. print("Make sure the server is running on http://localhost:8080")
  209. print("=" * 60)
  210. # Wait a moment for server to be ready
  211. time.sleep(1)
  212. try:
  213. # Test all endpoints
  214. # test_regular_endpoint()
  215. # test_streaming_endpoint(message="Hi, I'm John, can you ask the second agent to call ExampleTool?")
  216. # Change mode to "after_turn" to see alternative cancellation behavior
  217. test_cancel_endpoint(cancel_mode="immediate")
  218. # test_metadata_endpoint()
  219. print("\nDemo completed!")
  220. except requests.exceptions.ConnectionError:
  221. print("\n❌ Could not connect to server. Make sure it's running:")
  222. print(" python server.py")
  223. except Exception as e:
  224. print(f"\n❌ Error: {e}")
  225. if __name__ == "__main__":
  226. main()