| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- """Integration test for run lineage metadata in multi-level orchestration.
- Validates agent_run_id/parent_run_id propagation across nested agent calls so
- the delegation chain (CEO → Manager → Worker) can be reconstructed. This proves
- the necessity of enriching streaming events and saved messages with run IDs as
- documented (observability, streaming docs).
- """
- import asyncio
- import json
- import logging
- from typing import Any
- import pytest
- from agency_swarm import Agency, Agent
- logger = logging.getLogger(__name__)
- def _assert_non_empty_agent_run_ids(messages: list[dict[str, Any]]) -> None:
- agent_messages = [msg for msg in messages if msg.get("agent")]
- assert agent_messages, "Expected persisted messages with agent metadata"
- missing_or_invalid = [
- msg for msg in agent_messages if not isinstance(msg.get("agent_run_id"), str) or not msg.get("agent_run_id")
- ]
- assert not missing_or_invalid, f"Found messages with missing or invalid agent_run_id: {missing_or_invalid}"
- @pytest.fixture(scope="function")
- def three_level_agency():
- """Create a shared 3-level agency for testing parent_run_id tracking."""
- # Track messages and their metadata
- captured_messages = []
- def capture_message(messages: list[dict[str, Any]]):
- """Callback to capture persisted messages with metadata."""
- captured_messages.extend(messages)
- # Create Worker agent (bottom of hierarchy)
- worker = Agent(
- name="Worker",
- instructions="You are a Worker. When asked to do work, respond with 'Work completed by Worker'.",
- model="gpt-5.4-mini",
- )
- # Create Manager agent (middle layer - orchestrates Worker)
- manager = Agent(
- name="Manager",
- instructions=(
- "You are a Manager. When asked to manage a task, delegate the actual work to the Worker agent. "
- "Use send_message to ask Worker to 'Please do the work'. "
- "After receiving Worker's response, summarize as 'Manager coordinated: [Worker's response]'."
- ),
- model="gpt-5.4-mini",
- )
- # Create CEO agent (top layer - orchestrates Manager)
- ceo = Agent(
- name="CEO",
- instructions=(
- "You are the CEO. When asked to execute a project, delegate to the Manager. "
- "Use send_message to ask Manager to 'Please manage this task'. "
- "After receiving Manager's response, summarize as 'CEO executed: [Manager's response]'."
- ),
- model="gpt-5.4-mini",
- )
- # Create agency with orchestration flows
- agency = Agency(
- ceo, # CEO is the entry point
- communication_flows=[
- ceo > manager, # CEO can orchestrate Manager
- manager > worker, # Manager can orchestrate Worker
- ],
- save_threads_callback=capture_message,
- )
- return agency, captured_messages
- @pytest.mark.asyncio
- async def test_parent_run_id_three_level_orchestration(three_level_agency) -> None:
- """Test parent_run_id tracking through CEO → Manager → Worker orchestration.
- Verifies:
- 1. CEO's initial execution has no parent_run_id
- 2. Manager receives the call_id from CEO's send_message as parent_run_id
- 3. Worker receives the call_id from Manager's send_message as parent_run_id
- 4. All messages are properly tagged with parent_run_id
- 5. The delegation chain can be fully traversed via send_message call_ids
- """
- agency, captured_messages = three_level_agency
- delegation_chain = {} # Maps agent_run_id to parent_run_id
- # Build delegation chain from captured messages
- def update_delegation_chain():
- for msg in captured_messages:
- if "agent_run_id" in msg:
- agent_run_id = msg["agent_run_id"]
- parent_run_id = msg.get("parent_run_id")
- delegation_chain[agent_run_id] = parent_run_id
- # Execute a project that triggers the full delegation chain
- result = await agency.get_response(message="Execute project Alpha", agent_name="CEO")
- # Give async operations time to complete
- await asyncio.sleep(1)
- # Update delegation chain from captured messages
- update_delegation_chain()
- _assert_non_empty_agent_run_ids(captured_messages)
- # Verify response contains evidence of delegation
- response_text = result.final_output if hasattr(result, "final_output") else str(result)
- assert "CEO executed" in response_text or "Manager coordinated" in response_text or "Work" in response_text, (
- f"Response should show delegation occurred: {response_text}"
- )
- # Analyze captured messages for parent_run_id tracking
- # Find CEO's initial agent_run_id (should have no parent)
- ceo_messages = [msg for msg in captured_messages if msg.get("agent") == "CEO" and msg.get("role") == "user"]
- assert ceo_messages, "Should have CEO's initial message"
- ceo_run_id = None
- for msg in ceo_messages:
- if "agent_run_id" in msg:
- ceo_run_id = msg["agent_run_id"]
- # CEO's initial execution should have no parent_run_id
- assert msg.get("parent_run_id") is None, f"CEO's initial execution should have no parent_run_id: {msg}"
- break
- # Find send_message calls from CEO to Manager
- ceo_send_messages = [
- msg
- for msg in captured_messages
- if msg.get("type") == "function_call" and msg.get("name") == "send_message" and msg.get("agent") == "CEO"
- ]
- # Manager's parent_run_id should be the call_id of a send_message from CEO
- manager_messages = [msg for msg in captured_messages if msg.get("agent") == "Manager" and "agent_run_id" in msg]
- manager_run_id = None
- manager_parent_call_id = None
- if manager_messages:
- # Get Manager's parent_run_id (should be a call_id from CEO's send_message)
- for msg in manager_messages:
- parent_id = msg.get("parent_run_id")
- if parent_id and parent_id.startswith("call_"):
- manager_run_id = msg["agent_run_id"]
- manager_parent_call_id = parent_id
- logger.info(f"Found Manager execution with parent_run_id={parent_id} (send_message call_id)")
- break
- # Verify Manager's parent_run_id is a valid send_message call_id from CEO
- ceo_call_ids = [msg.get("call_id") for msg in ceo_send_messages]
- assert manager_parent_call_id in ceo_call_ids, (
- f"Manager's parent_run_id ({manager_parent_call_id}) should be a send_message call_id from CEO"
- )
- # Find send_message calls from Manager to Worker
- manager_send_messages = [
- msg
- for msg in captured_messages
- if msg.get("type") == "function_call" and msg.get("name") == "send_message" and msg.get("agent") == "Manager"
- ]
- # Worker's parent_run_id should be the call_id of a send_message from Manager
- worker_messages = [msg for msg in captured_messages if msg.get("agent") == "Worker" and "agent_run_id" in msg]
- worker_parent_call_id = None
- if worker_messages and manager_send_messages:
- # Get Worker's parent_run_id (should be a call_id from Manager's send_message)
- for msg in worker_messages:
- parent_id = msg.get("parent_run_id")
- if parent_id and parent_id.startswith("call_"):
- worker_parent_call_id = parent_id
- logger.info(f"Found Worker execution with parent_run_id={parent_id} (send_message call_id)")
- break
- # Verify Worker's parent_run_id is a valid send_message call_id from Manager
- manager_call_ids = [msg.get("call_id") for msg in manager_send_messages]
- assert worker_parent_call_id in manager_call_ids, (
- f"Worker's parent_run_id ({worker_parent_call_id}) should be a send_message call_id from Manager"
- )
- # Verify delegation chain can be traversed
- assert len(delegation_chain) >= 2, f"Should have at least 2 levels in delegation chain, got {len(delegation_chain)}"
- # Log the delegation chain for debugging
- logger.info(f"Delegation chain: {json.dumps(delegation_chain, indent=2)}")
- # Verify we can trace the delegation chain through call_ids
- if manager_parent_call_id and worker_parent_call_id:
- # We should be able to trace:
- # 1. Worker's parent_run_id -> Manager's send_message call_id
- # 2. Manager's parent_run_id -> CEO's send_message call_id
- # 3. CEO has no parent_run_id
- assert ceo_run_id is not None, "Should have found CEO's run_id"
- assert delegation_chain.get(ceo_run_id) is None, "CEO should have no parent"
- logger.info(
- f"Successfully traced delegation chain:\n"
- f" CEO (run_id={ceo_run_id}, parent=None)\n"
- f" └─> send_message (call_id={manager_parent_call_id})\n"
- f" └─> Manager (run_id={manager_run_id}, parent={manager_parent_call_id})\n"
- f" └─> send_message (call_id={worker_parent_call_id})\n"
- f" └─> Worker (parent={worker_parent_call_id})"
- )
- @pytest.mark.asyncio
- async def test_parent_run_id_in_streaming(three_level_agency):
- """Test that parent_run_id is propagated correctly in streaming mode."""
- agency, captured_messages = three_level_agency
- captured_events = []
- # Stream the response and capture events
- ceo_run_id = None
- manager_run_id = None
- worker_run_id = None
- async for event in agency.get_response_stream(message="Execute streaming project Beta", agent_name="CEO"):
- # Capture agent_run_id and parent_run_id from events
- if hasattr(event, "agent_run_id"):
- agent_name = getattr(event, "agent", None)
- if agent_name == "CEO":
- # Check if this is the initial CEO event (not a sub-agent call result)
- # CEO's initial execution has no parent, but CEO can appear in events from sub-agents returning
- if not hasattr(event, "parent_run_id") or event.parent_run_id is None:
- ceo_run_id = event.agent_run_id
- elif agent_name == "Manager":
- manager_run_id = event.agent_run_id
- # Manager's parent should be a send_message call_id from CEO
- if hasattr(event, "parent_run_id") and event.parent_run_id:
- # Verify it's a call_id format
- assert event.parent_run_id.startswith("call_"), (
- f"Manager's parent_run_id should be a send_message call_id, got: {event.parent_run_id}"
- )
- logger.info(f"Streaming: Manager has parent_run_id={event.parent_run_id} (send_message call_id)")
- elif agent_name == "Worker":
- worker_run_id = event.agent_run_id
- # Worker's parent should be a send_message call_id from Manager
- if hasattr(event, "parent_run_id") and event.parent_run_id:
- # Verify it's a call_id format
- assert event.parent_run_id.startswith("call_"), (
- f"Worker's parent_run_id should be a send_message call_id, got: {event.parent_run_id}"
- )
- logger.info(f"Streaming: Worker has parent_run_id={event.parent_run_id} (send_message call_id)")
- captured_events.append(event)
- # Verify we captured run IDs
- assert ceo_run_id is not None, "Should have captured CEO's run_id from stream"
- _assert_non_empty_agent_run_ids(captured_messages)
- # Log streaming results
- logger.info(
- f"Streaming test: CEO run_id={ceo_run_id}, Manager run_id={manager_run_id}, Worker run_id={worker_run_id}"
- )
|