test_parent_run_id_tracking.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. """Integration test for run lineage metadata in multi-level orchestration.
  2. Validates agent_run_id/parent_run_id propagation across nested agent calls so
  3. the delegation chain (CEO → Manager → Worker) can be reconstructed. This proves
  4. the necessity of enriching streaming events and saved messages with run IDs as
  5. documented (observability, streaming docs).
  6. """
  7. import asyncio
  8. import json
  9. import logging
  10. from typing import Any
  11. import pytest
  12. from agency_swarm import Agency, Agent
  13. logger = logging.getLogger(__name__)
  14. def _assert_non_empty_agent_run_ids(messages: list[dict[str, Any]]) -> None:
  15. agent_messages = [msg for msg in messages if msg.get("agent")]
  16. assert agent_messages, "Expected persisted messages with agent metadata"
  17. missing_or_invalid = [
  18. msg for msg in agent_messages if not isinstance(msg.get("agent_run_id"), str) or not msg.get("agent_run_id")
  19. ]
  20. assert not missing_or_invalid, f"Found messages with missing or invalid agent_run_id: {missing_or_invalid}"
  21. @pytest.fixture(scope="function")
  22. def three_level_agency():
  23. """Create a shared 3-level agency for testing parent_run_id tracking."""
  24. # Track messages and their metadata
  25. captured_messages = []
  26. def capture_message(messages: list[dict[str, Any]]):
  27. """Callback to capture persisted messages with metadata."""
  28. captured_messages.extend(messages)
  29. # Create Worker agent (bottom of hierarchy)
  30. worker = Agent(
  31. name="Worker",
  32. instructions="You are a Worker. When asked to do work, respond with 'Work completed by Worker'.",
  33. model="gpt-5.4-mini",
  34. )
  35. # Create Manager agent (middle layer - orchestrates Worker)
  36. manager = Agent(
  37. name="Manager",
  38. instructions=(
  39. "You are a Manager. When asked to manage a task, delegate the actual work to the Worker agent. "
  40. "Use send_message to ask Worker to 'Please do the work'. "
  41. "After receiving Worker's response, summarize as 'Manager coordinated: [Worker's response]'."
  42. ),
  43. model="gpt-5.4-mini",
  44. )
  45. # Create CEO agent (top layer - orchestrates Manager)
  46. ceo = Agent(
  47. name="CEO",
  48. instructions=(
  49. "You are the CEO. When asked to execute a project, delegate to the Manager. "
  50. "Use send_message to ask Manager to 'Please manage this task'. "
  51. "After receiving Manager's response, summarize as 'CEO executed: [Manager's response]'."
  52. ),
  53. model="gpt-5.4-mini",
  54. )
  55. # Create agency with orchestration flows
  56. agency = Agency(
  57. ceo, # CEO is the entry point
  58. communication_flows=[
  59. ceo > manager, # CEO can orchestrate Manager
  60. manager > worker, # Manager can orchestrate Worker
  61. ],
  62. save_threads_callback=capture_message,
  63. )
  64. return agency, captured_messages
  65. @pytest.mark.asyncio
  66. async def test_parent_run_id_three_level_orchestration(three_level_agency) -> None:
  67. """Test parent_run_id tracking through CEO → Manager → Worker orchestration.
  68. Verifies:
  69. 1. CEO's initial execution has no parent_run_id
  70. 2. Manager receives the call_id from CEO's send_message as parent_run_id
  71. 3. Worker receives the call_id from Manager's send_message as parent_run_id
  72. 4. All messages are properly tagged with parent_run_id
  73. 5. The delegation chain can be fully traversed via send_message call_ids
  74. """
  75. agency, captured_messages = three_level_agency
  76. delegation_chain = {} # Maps agent_run_id to parent_run_id
  77. # Build delegation chain from captured messages
  78. def update_delegation_chain():
  79. for msg in captured_messages:
  80. if "agent_run_id" in msg:
  81. agent_run_id = msg["agent_run_id"]
  82. parent_run_id = msg.get("parent_run_id")
  83. delegation_chain[agent_run_id] = parent_run_id
  84. # Execute a project that triggers the full delegation chain
  85. result = await agency.get_response(message="Execute project Alpha", agent_name="CEO")
  86. # Give async operations time to complete
  87. await asyncio.sleep(1)
  88. # Update delegation chain from captured messages
  89. update_delegation_chain()
  90. _assert_non_empty_agent_run_ids(captured_messages)
  91. # Verify response contains evidence of delegation
  92. response_text = result.final_output if hasattr(result, "final_output") else str(result)
  93. assert "CEO executed" in response_text or "Manager coordinated" in response_text or "Work" in response_text, (
  94. f"Response should show delegation occurred: {response_text}"
  95. )
  96. # Analyze captured messages for parent_run_id tracking
  97. # Find CEO's initial agent_run_id (should have no parent)
  98. ceo_messages = [msg for msg in captured_messages if msg.get("agent") == "CEO" and msg.get("role") == "user"]
  99. assert ceo_messages, "Should have CEO's initial message"
  100. ceo_run_id = None
  101. for msg in ceo_messages:
  102. if "agent_run_id" in msg:
  103. ceo_run_id = msg["agent_run_id"]
  104. # CEO's initial execution should have no parent_run_id
  105. assert msg.get("parent_run_id") is None, f"CEO's initial execution should have no parent_run_id: {msg}"
  106. break
  107. # Find send_message calls from CEO to Manager
  108. ceo_send_messages = [
  109. msg
  110. for msg in captured_messages
  111. if msg.get("type") == "function_call" and msg.get("name") == "send_message" and msg.get("agent") == "CEO"
  112. ]
  113. # Manager's parent_run_id should be the call_id of a send_message from CEO
  114. manager_messages = [msg for msg in captured_messages if msg.get("agent") == "Manager" and "agent_run_id" in msg]
  115. manager_run_id = None
  116. manager_parent_call_id = None
  117. if manager_messages:
  118. # Get Manager's parent_run_id (should be a call_id from CEO's send_message)
  119. for msg in manager_messages:
  120. parent_id = msg.get("parent_run_id")
  121. if parent_id and parent_id.startswith("call_"):
  122. manager_run_id = msg["agent_run_id"]
  123. manager_parent_call_id = parent_id
  124. logger.info(f"Found Manager execution with parent_run_id={parent_id} (send_message call_id)")
  125. break
  126. # Verify Manager's parent_run_id is a valid send_message call_id from CEO
  127. ceo_call_ids = [msg.get("call_id") for msg in ceo_send_messages]
  128. assert manager_parent_call_id in ceo_call_ids, (
  129. f"Manager's parent_run_id ({manager_parent_call_id}) should be a send_message call_id from CEO"
  130. )
  131. # Find send_message calls from Manager to Worker
  132. manager_send_messages = [
  133. msg
  134. for msg in captured_messages
  135. if msg.get("type") == "function_call" and msg.get("name") == "send_message" and msg.get("agent") == "Manager"
  136. ]
  137. # Worker's parent_run_id should be the call_id of a send_message from Manager
  138. worker_messages = [msg for msg in captured_messages if msg.get("agent") == "Worker" and "agent_run_id" in msg]
  139. worker_parent_call_id = None
  140. if worker_messages and manager_send_messages:
  141. # Get Worker's parent_run_id (should be a call_id from Manager's send_message)
  142. for msg in worker_messages:
  143. parent_id = msg.get("parent_run_id")
  144. if parent_id and parent_id.startswith("call_"):
  145. worker_parent_call_id = parent_id
  146. logger.info(f"Found Worker execution with parent_run_id={parent_id} (send_message call_id)")
  147. break
  148. # Verify Worker's parent_run_id is a valid send_message call_id from Manager
  149. manager_call_ids = [msg.get("call_id") for msg in manager_send_messages]
  150. assert worker_parent_call_id in manager_call_ids, (
  151. f"Worker's parent_run_id ({worker_parent_call_id}) should be a send_message call_id from Manager"
  152. )
  153. # Verify delegation chain can be traversed
  154. assert len(delegation_chain) >= 2, f"Should have at least 2 levels in delegation chain, got {len(delegation_chain)}"
  155. # Log the delegation chain for debugging
  156. logger.info(f"Delegation chain: {json.dumps(delegation_chain, indent=2)}")
  157. # Verify we can trace the delegation chain through call_ids
  158. if manager_parent_call_id and worker_parent_call_id:
  159. # We should be able to trace:
  160. # 1. Worker's parent_run_id -> Manager's send_message call_id
  161. # 2. Manager's parent_run_id -> CEO's send_message call_id
  162. # 3. CEO has no parent_run_id
  163. assert ceo_run_id is not None, "Should have found CEO's run_id"
  164. assert delegation_chain.get(ceo_run_id) is None, "CEO should have no parent"
  165. logger.info(
  166. f"Successfully traced delegation chain:\n"
  167. f" CEO (run_id={ceo_run_id}, parent=None)\n"
  168. f" └─> send_message (call_id={manager_parent_call_id})\n"
  169. f" └─> Manager (run_id={manager_run_id}, parent={manager_parent_call_id})\n"
  170. f" └─> send_message (call_id={worker_parent_call_id})\n"
  171. f" └─> Worker (parent={worker_parent_call_id})"
  172. )
  173. @pytest.mark.asyncio
  174. async def test_parent_run_id_in_streaming(three_level_agency):
  175. """Test that parent_run_id is propagated correctly in streaming mode."""
  176. agency, captured_messages = three_level_agency
  177. captured_events = []
  178. # Stream the response and capture events
  179. ceo_run_id = None
  180. manager_run_id = None
  181. worker_run_id = None
  182. async for event in agency.get_response_stream(message="Execute streaming project Beta", agent_name="CEO"):
  183. # Capture agent_run_id and parent_run_id from events
  184. if hasattr(event, "agent_run_id"):
  185. agent_name = getattr(event, "agent", None)
  186. if agent_name == "CEO":
  187. # Check if this is the initial CEO event (not a sub-agent call result)
  188. # CEO's initial execution has no parent, but CEO can appear in events from sub-agents returning
  189. if not hasattr(event, "parent_run_id") or event.parent_run_id is None:
  190. ceo_run_id = event.agent_run_id
  191. elif agent_name == "Manager":
  192. manager_run_id = event.agent_run_id
  193. # Manager's parent should be a send_message call_id from CEO
  194. if hasattr(event, "parent_run_id") and event.parent_run_id:
  195. # Verify it's a call_id format
  196. assert event.parent_run_id.startswith("call_"), (
  197. f"Manager's parent_run_id should be a send_message call_id, got: {event.parent_run_id}"
  198. )
  199. logger.info(f"Streaming: Manager has parent_run_id={event.parent_run_id} (send_message call_id)")
  200. elif agent_name == "Worker":
  201. worker_run_id = event.agent_run_id
  202. # Worker's parent should be a send_message call_id from Manager
  203. if hasattr(event, "parent_run_id") and event.parent_run_id:
  204. # Verify it's a call_id format
  205. assert event.parent_run_id.startswith("call_"), (
  206. f"Worker's parent_run_id should be a send_message call_id, got: {event.parent_run_id}"
  207. )
  208. logger.info(f"Streaming: Worker has parent_run_id={event.parent_run_id} (send_message call_id)")
  209. captured_events.append(event)
  210. # Verify we captured run IDs
  211. assert ceo_run_id is not None, "Should have captured CEO's run_id from stream"
  212. _assert_non_empty_agent_run_ids(captured_messages)
  213. # Log streaming results
  214. logger.info(
  215. f"Streaming test: CEO run_id={ceo_run_id}, Manager run_id={manager_run_id}, Worker run_id={worker_run_id}"
  216. )