test_streaming_order_consistency.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606
  1. """
  2. Deterministic streaming order test with two agents and custom tools.
  3. """
  4. import logging
  5. import os
  6. from typing import Any
  7. import pytest
  8. from agents import ModelSettings, function_tool
  9. from agents.models.fake_id import FAKE_RESPONSES_ID
  10. from openai.types.shared import Reasoning
  11. from agency_swarm import Agency, Agent
  12. logger = logging.getLogger(__name__)
  13. def _assert_sanitized_history(messages: list[dict[str, Any]]) -> None:
  14. """Validate persisted conversation order matches sanitized tool semantics."""
  15. seen_ids: set[str] = set()
  16. for index, message in enumerate(messages):
  17. msg_type = message.get("type")
  18. msg_id = message.get("id")
  19. if isinstance(msg_id, str) and msg_id and msg_id != FAKE_RESPONSES_ID:
  20. assert msg_id not in seen_ids, f"Duplicate message id detected: {msg_id}"
  21. seen_ids.add(msg_id)
  22. if msg_type != "function_call":
  23. continue
  24. call_id = message.get("call_id")
  25. assert isinstance(call_id, str) and call_id, f"Missing call_id for function_call at index {index}"
  26. output_index = None
  27. for candidate in range(index + 1, len(messages)):
  28. if (
  29. messages[candidate].get("type") == "function_call_output"
  30. and messages[candidate].get("call_id") == call_id
  31. ):
  32. output_index = candidate
  33. break
  34. assert output_index is not None, f"No function_call_output found for call_id {call_id}"
  35. between = messages[index + 1 : output_index]
  36. assistants = [m for m in between if m.get("role") == "assistant"]
  37. assert not assistants, (
  38. f"Intermediate assistant message found between function_call and output for call_id {call_id}: {assistants}"
  39. )
  40. def _strip_optional_initial_message_output(
  41. flow: list[tuple[str, str, str | None]],
  42. agent_name: str,
  43. ) -> list[tuple[str, str, str | None]]:
  44. """Allow optional initial agent message_output_item after first tool_call."""
  45. if len(flow) >= 2 and flow[1] == ("message_output_item", agent_name, None):
  46. return [flow[0], *flow[2:]]
  47. return flow
  48. def _normalize_optional_agent_message_outputs(
  49. flow: list[tuple[str, str, str | None]],
  50. agent_name: str,
  51. ) -> list[tuple[str, str, str | None]]:
  52. """Allow optional top-level assistant message items while preserving tool order."""
  53. normalized = _strip_optional_initial_message_output(flow, agent_name)
  54. final_message = ("message_output_item", agent_name, None)
  55. if len(normalized) >= 2 and normalized[-2:] == [final_message, final_message]:
  56. return normalized[:-1]
  57. return normalized
  58. # Additional tools for complex scenarios
  59. @function_tool
  60. def process_data(data: str) -> str:
  61. return f"PROCESSED:{data}"
  62. @function_tool
  63. def validate_result(result: str) -> str:
  64. return f"VALID:{result}"
  65. @function_tool
  66. def combine_results(results: str) -> str:
  67. return f"COMBINED:{results}"
  68. # Hardcoded expected flow (normalized stream type, agent, tool_name)
  69. #
  70. # Starting with openai-agents 0.2.10, tool calls are emitted as soon as the
  71. # model finalizes the tool call item (via ResponseOutputItemDoneEvent), so the
  72. # semantic `tool_call_item` arrives before the agent's own message output.
  73. # Preserve the deterministic order we now observe so that the tests confirm the
  74. # integration keeps step with SDK streaming semantics.
  75. EXPECTED_FLOW_DEFAULT: list[tuple[str, str, str | None]] = [
  76. ("tool_call_item", "MainAgent", "get_market_data"),
  77. ("tool_call_output_item", "MainAgent", None),
  78. ("tool_call_item", "MainAgent", "send_message"),
  79. ("tool_call_item", "SubAgent", "analyze_risk"),
  80. ("tool_call_output_item", "SubAgent", None),
  81. ("message_output_item", "SubAgent", None),
  82. ("tool_call_output_item", "MainAgent", None),
  83. ("message_output_item", "MainAgent", None),
  84. ]
  85. ANTHROPIC_MODEL_NAME = "anthropic/claude-sonnet-4-20250514"
  86. EXPECTED_FLOW_ANTHROPIC: list[tuple[str, str, str | None]] = [
  87. ("tool_call_item", "MainAgent", "get_market_data"),
  88. ("message_output_item", "MainAgent", None),
  89. ("tool_call_output_item", "MainAgent", None),
  90. ("tool_call_item", "MainAgent", "send_message"),
  91. ("tool_call_item", "SubAgent", "analyze_risk"),
  92. ("message_output_item", "SubAgent", None),
  93. ("tool_call_output_item", "SubAgent", None),
  94. ("message_output_item", "SubAgent", None),
  95. ("message_output_item", "MainAgent", None),
  96. ("tool_call_output_item", "MainAgent", None),
  97. ("message_output_item", "MainAgent", None),
  98. ]
  99. @function_tool
  100. def get_market_data(symbol: str) -> str:
  101. return "AAPL:PRICE=150"
  102. @function_tool
  103. def analyze_risk(data: str) -> str:
  104. return "RISK=LOW"
  105. @pytest.mark.asyncio
  106. @pytest.mark.parametrize(
  107. ("use_anthropic", "expected_flow"),
  108. [
  109. (False, EXPECTED_FLOW_DEFAULT),
  110. pytest.param(
  111. True,
  112. EXPECTED_FLOW_ANTHROPIC,
  113. marks=pytest.mark.skipif(
  114. not os.getenv("ANTHROPIC_API_KEY"),
  115. reason="ANTHROPIC_API_KEY required for Anthropic test",
  116. ),
  117. ),
  118. ],
  119. )
  120. async def test_full_streaming_flow_hardcoded_sequence(
  121. use_anthropic: bool, expected_flow: list[tuple[str, str, str | None]]
  122. ) -> None:
  123. """Proves canonical streaming order for Main→Sub agent with tool calls is deterministic."""
  124. if use_anthropic:
  125. pytest.importorskip("litellm", reason="litellm package is required for Anthropic test")
  126. import litellm
  127. from agents.extensions.models.litellm_model import LitellmModel
  128. litellm.modify_params = True
  129. main_model = LitellmModel(model=ANTHROPIC_MODEL_NAME)
  130. helper_model = LitellmModel(model=ANTHROPIC_MODEL_NAME)
  131. main_model_settings = None
  132. helper_model_settings = None
  133. main_instructions = (
  134. "First send a standalone 'ACK' message before any tool calls. "
  135. "Then call get_market_data('AAPL'). "
  136. "Then use the send_message tool to ask SubAgent to analyze the data and reply. "
  137. "Finally, respond to the user with a brief conclusion."
  138. )
  139. user_message = "Start."
  140. else:
  141. main_model = "gpt-5.4-mini"
  142. helper_model = "gpt-5.4-mini"
  143. main_model_settings = ModelSettings(
  144. reasoning=Reasoning(effort="low"),
  145. tool_choice="get_market_data",
  146. parallel_tool_calls=False,
  147. )
  148. helper_model_settings = ModelSettings(
  149. reasoning=Reasoning(effort="low"),
  150. tool_choice="analyze_risk",
  151. parallel_tool_calls=False,
  152. )
  153. main_instructions = (
  154. "Complete the workflow in this exact order. "
  155. "First call get_market_data with symbol 'AAPL'. Do not send assistant text before this tool call. "
  156. "After get_market_data returns, call send_message to ask SubAgent to analyze the returned market data. "
  157. "Do not send assistant text between get_market_data and send_message. "
  158. "After SubAgent replies, send one brief final conclusion to the user."
  159. )
  160. user_message = (
  161. "Run the streaming-order proof now: call get_market_data for AAPL, then send_message to SubAgent "
  162. "with the returned market data, then provide the final conclusion after SubAgent responds."
  163. )
  164. main = Agent(
  165. name="MainAgent",
  166. description="Coordinator",
  167. instructions=main_instructions,
  168. model=main_model,
  169. model_settings=main_model_settings,
  170. tools=[get_market_data],
  171. )
  172. helper = Agent(
  173. name="SubAgent",
  174. description="Risk analyzer",
  175. instructions=(
  176. "When prompted by MainAgent, first call analyze_risk on the provided data. "
  177. "After analyze_risk returns, reply succinctly."
  178. ),
  179. model=helper_model,
  180. model_settings=helper_model_settings,
  181. tools=[analyze_risk],
  182. )
  183. agency = Agency(
  184. main,
  185. communication_flows=[main > helper],
  186. shared_instructions="",
  187. )
  188. before = len(agency.thread_manager.get_all_messages())
  189. # Collect stream as (type, agent, tool_name)
  190. stream_items: list[tuple[str, str, str | None]] = []
  191. async for event in agency.get_response_stream(message=user_message):
  192. if hasattr(event, "item") and event.item is not None:
  193. item = event.item
  194. evt_type = getattr(item, "type", None)
  195. if evt_type == "reasoning_item":
  196. continue
  197. agent_name = getattr(event, "agent", None)
  198. tool_name = None
  199. if evt_type == "tool_call_item":
  200. raw = getattr(item, "raw_item", None)
  201. tool_name = getattr(raw, "name", None)
  202. if isinstance(evt_type, str) and isinstance(agent_name, str):
  203. stream_items.append((evt_type, agent_name, tool_name))
  204. all_messages = agency.thread_manager.get_all_messages()
  205. new_messages = all_messages[before:]
  206. # Map saved messages to same triple format
  207. comparable: list[dict[str, Any]] = []
  208. for m in new_messages:
  209. t = m.get("type")
  210. role = m.get("role")
  211. if t in {"function_call", "function_call_output"} or role == "assistant":
  212. comparable.append(m)
  213. expected_without_main_message = _strip_optional_initial_message_output(expected_flow, "MainAgent")
  214. assert stream_items in (expected_flow, expected_without_main_message), (
  215. "Stream flow mismatch:\n"
  216. f" got={stream_items}\n"
  217. f" exp={expected_flow}\n"
  218. f" exp_without_initial_message={expected_without_main_message}"
  219. )
  220. _assert_sanitized_history(comparable)
  221. _assert_tool_call_recorded(new_messages, "MainAgent", "get_market_data", context="default streaming workflow")
  222. _assert_tool_call_recorded(new_messages, "MainAgent", "send_message", context="default streaming workflow")
  223. _assert_tool_call_recorded(new_messages, "SubAgent", "analyze_risk", context="default streaming workflow")
  224. # Expected flow for multiple sequential sub-agent calls
  225. EXPECTED_FLOW_MULTIPLE_CALLS: list[tuple[str, str, str | None]] = [
  226. # Agent calls tool immediately without ACK message
  227. ("tool_call_item", "Coordinator", "get_market_data"), # First data fetch
  228. ("tool_call_output_item", "Coordinator", None),
  229. # First sub-agent call - SDK emits send_message immediately
  230. ("tool_call_item", "Coordinator", "send_message"), # SDK emits send_message immediately
  231. ("tool_call_item", "Worker", "process_data"), # Worker processes
  232. ("tool_call_output_item", "Worker", None),
  233. ("message_output_item", "Worker", None), # Worker responds
  234. ("tool_call_output_item", "Coordinator", None), # send_message completes
  235. # Second sub-agent call - SDK emits send_message immediately
  236. ("tool_call_item", "Coordinator", "send_message"), # SDK emits send_message immediately
  237. ("tool_call_item", "Worker", "validate_result"), # Worker validates
  238. ("tool_call_output_item", "Worker", None),
  239. ("message_output_item", "Worker", None), # Worker responds again
  240. ("tool_call_output_item", "Coordinator", None), # send_message completes
  241. ("message_output_item", "Coordinator", None), # Final response
  242. ]
  243. @pytest.mark.asyncio
  244. async def test_multiple_sequential_subagent_calls() -> None:
  245. """Proves repeated send_message to same sub-agent streams in strict canonical order."""
  246. coordinator = Agent(
  247. name="Coordinator",
  248. description="Main coordinator",
  249. instructions=(
  250. "First say 'ACK'. Then call get_market_data('TEST'). "
  251. "Then use send_message to ask Worker to process the data. "
  252. "After Worker responds, use send_message again to ask Worker to validate the result. "
  253. "Finally, respond with 'DONE'."
  254. ),
  255. model_settings=ModelSettings(temperature=0.0),
  256. tools=[get_market_data],
  257. )
  258. worker = Agent(
  259. name="Worker",
  260. description="Data processor",
  261. instructions=(
  262. "When asked to process: use process_data tool and respond 'Processed'. "
  263. "When asked to validate: use validate_result tool and respond 'Validated'."
  264. ),
  265. model_settings=ModelSettings(temperature=0.0),
  266. tools=[process_data, validate_result],
  267. )
  268. agency = Agency(
  269. coordinator,
  270. communication_flows=[coordinator > worker],
  271. shared_instructions="",
  272. )
  273. before = len(agency.thread_manager.get_all_messages())
  274. # Collect stream events
  275. stream_items: list[tuple[str, str, str | None]] = []
  276. async for event in agency.get_response_stream(message="Execute multiple tasks."):
  277. if hasattr(event, "item") and event.item is not None:
  278. item = event.item
  279. evt_type = getattr(item, "type", None)
  280. if evt_type == "reasoning_item":
  281. continue
  282. agent_name = getattr(event, "agent", None)
  283. tool_name = None
  284. if evt_type == "tool_call_item":
  285. raw = getattr(item, "raw_item", None)
  286. tool_name = getattr(raw, "name", None)
  287. if isinstance(evt_type, str) and isinstance(agent_name, str):
  288. stream_items.append((evt_type, agent_name, tool_name))
  289. # Verify stream matches expected (allow optional initial message_output from reasoning models)
  290. normalized = _normalize_optional_agent_message_outputs(stream_items, "Coordinator")
  291. assert normalized == EXPECTED_FLOW_MULTIPLE_CALLS, (
  292. f"Multiple calls stream mismatch:\n got={stream_items}\n exp={EXPECTED_FLOW_MULTIPLE_CALLS}"
  293. )
  294. # Verify saved messages
  295. all_messages = agency.thread_manager.get_all_messages()
  296. new_messages = all_messages[before:]
  297. comparable: list[dict[str, Any]] = []
  298. for m in new_messages:
  299. t = m.get("type")
  300. role = m.get("role")
  301. if t in {"function_call", "function_call_output"} or role == "assistant":
  302. comparable.append(m)
  303. _assert_sanitized_history(comparable)
  304. # Expected flow for nested delegation (A->B->C) based on actual execution
  305. EXPECTED_FLOW_NESTED: list[tuple[str, str, str | None]] = [
  306. ("message_output_item", "AgentA", None),
  307. ("tool_call_item", "AgentA", "send_message"), # A delegates to B
  308. ("tool_call_item", "AgentB", "send_message"), # B delegates to C
  309. ("tool_call_item", "AgentB", "analyze_risk"), # C's tool call attributed via B stream
  310. ("tool_call_output_item", "AgentB", None),
  311. ("message_output_item", "AgentB", None),
  312. ("tool_call_output_item", "AgentB", None),
  313. ("tool_call_item", "AgentB", "process_data"), # B processes
  314. ("tool_call_output_item", "AgentB", None),
  315. ("message_output_item", "AgentB", None),
  316. ("tool_call_output_item", "AgentA", None),
  317. ("message_output_item", "AgentA", None), # Final response
  318. ]
  319. @pytest.mark.asyncio
  320. async def test_nested_delegation_streaming() -> None:
  321. """Proves nested A→B→C delegation appears in stream and AgentA completes after sub-chain."""
  322. agent_a = Agent(
  323. name="AgentA",
  324. description="Top-level coordinator",
  325. instructions=(
  326. "First say 'ACK'. "
  327. "Then use send_message to ask AgentB to process and analyze data. "
  328. "Finally respond with 'Complete'."
  329. ),
  330. model="gpt-5.4-mini",
  331. tools=[],
  332. )
  333. agent_b = Agent(
  334. name="AgentB",
  335. description="Middle processor",
  336. instructions=(
  337. "When asked by AgentA: "
  338. "First use send_message to ask AgentC to analyze risk. "
  339. "Then use process_data tool with the response. "
  340. "Finally respond 'Processed'."
  341. ),
  342. model="gpt-5.4-mini",
  343. model_settings=ModelSettings(tool_choice="required"),
  344. tools=[process_data],
  345. )
  346. agent_c = Agent(
  347. name="AgentC",
  348. description="Risk analyzer",
  349. instructions="When asked: use analyze_risk tool and respond 'Risk analyzed'.",
  350. model="gpt-5.4-mini",
  351. model_settings=ModelSettings(tool_choice="required"),
  352. tools=[analyze_risk],
  353. )
  354. agency = Agency(
  355. agent_a,
  356. communication_flows=[agent_a > agent_b, agent_b > agent_c],
  357. shared_instructions="",
  358. )
  359. before = len(agency.thread_manager.get_all_messages())
  360. # Collect stream events
  361. stream_items: list[tuple[str, str, str | None]] = []
  362. async for event in agency.get_response_stream(message="Start nested delegation."):
  363. if hasattr(event, "item") and event.item is not None:
  364. item = event.item
  365. evt_type = getattr(item, "type", None)
  366. if evt_type == "reasoning_item":
  367. continue
  368. agent_name = getattr(event, "agent", None)
  369. tool_name = None
  370. if evt_type == "tool_call_item":
  371. raw = getattr(item, "raw_item", None)
  372. tool_name = getattr(raw, "name", None)
  373. if isinstance(evt_type, str) and isinstance(agent_name, str):
  374. stream_items.append((evt_type, agent_name, tool_name))
  375. # Verify stream contains the required sequence in order and AgentC performs analyze_risk
  376. required_seq = [
  377. ("tool_call_item", "AgentA", "send_message"),
  378. ("tool_call_item", "AgentB", "send_message"),
  379. ("tool_call_item", "AgentC", "analyze_risk"),
  380. ("tool_call_output_item", "AgentA", None),
  381. ("message_output_item", "AgentA", None),
  382. ]
  383. def is_subsequence(needles: list[tuple[str, str, str | None]], haystack: list[tuple[str, str, str | None]]) -> bool:
  384. i = 0
  385. for item in haystack:
  386. if i < len(needles) and item == needles[i]:
  387. i += 1
  388. return i == len(needles)
  389. assert is_subsequence(required_seq, stream_items), (
  390. f"Nested delegation stream mismatch (required subsequence not found):\n got={stream_items}\n req={required_seq}"
  391. )
  392. # Verify saved messages
  393. all_messages = agency.thread_manager.get_all_messages()
  394. new_messages = all_messages[before:]
  395. comparable: list[dict[str, Any]] = []
  396. for m in new_messages:
  397. t = m.get("type")
  398. role = m.get("role")
  399. if t in {"function_call", "function_call_output"} or role == "assistant":
  400. comparable.append(m)
  401. _assert_sanitized_history(comparable)
  402. # Verify stream contains the required sequence in order (for saved messages verification)
  403. required_seq = [
  404. ("tool_call_item", "AgentA", "send_message"),
  405. ("tool_call_item", "AgentB", "send_message"),
  406. ("tool_call_output_item", "AgentA", None),
  407. ("message_output_item", "AgentA", None),
  408. ]
  409. assert is_subsequence(required_seq, stream_items), (
  410. f"Nested delegation stream mismatch (required subsequence not found):\n got={stream_items}\n req={required_seq}"
  411. )
  412. # Helper to confirm specific tool calls were persisted for an agent
  413. def _assert_tool_call_recorded(
  414. messages: list[dict[str, Any]], agent_name: str, tool_name: str, *, context: str
  415. ) -> None:
  416. for message in messages:
  417. if message.get("type") != "function_call":
  418. continue
  419. if message.get("name") != tool_name:
  420. continue
  421. recorded_agent = message.get("agent") or message.get("callerAgent")
  422. if str(recorded_agent) == agent_name:
  423. return
  424. raise AssertionError(f"Expected {context}: agent '{agent_name}' did not record function_call '{tool_name}'")
  425. # Expected flow for parallel sub-agent calls (to different agents)
  426. # NOTE: No ACK message expected - we don't instruct the agent to emit one,
  427. # keeping the expected flow strict and deterministic.
  428. EXPECTED_FLOW_PARALLEL: list[tuple[str, str, str | None]] = [
  429. ("tool_call_item", "Orchestrator", "get_market_data"), # Get initial data arrives first via tool_called
  430. ("tool_call_output_item", "Orchestrator", None),
  431. ("tool_call_item", "Orchestrator", "send_message"),
  432. ("tool_call_item", "ProcessorA", "process_data"), # ProcessorA works
  433. ("tool_call_output_item", "ProcessorA", None),
  434. ("message_output_item", "ProcessorA", None),
  435. ("tool_call_output_item", "Orchestrator", None),
  436. ("tool_call_item", "Orchestrator", "send_message"),
  437. ("tool_call_item", "ProcessorB", "validate_result"), # ProcessorB works
  438. ("tool_call_output_item", "ProcessorB", None),
  439. ("message_output_item", "ProcessorB", None),
  440. ("tool_call_output_item", "Orchestrator", None),
  441. ("tool_call_item", "Orchestrator", "combine_results"),
  442. ("tool_call_output_item", "Orchestrator", None),
  443. ("message_output_item", "Orchestrator", None), # Final response
  444. ]
  445. @pytest.mark.asyncio
  446. async def test_parallel_subagent_calls() -> None:
  447. """Proves orchestrator issues two sub-agent calls and completion follows canonical order."""
  448. orchestrator = Agent(
  449. name="Orchestrator",
  450. description="Main orchestrator",
  451. instructions=(
  452. "Call get_market_data('DATA'). "
  453. "Then use send_message to ask ProcessorA to process the data. "
  454. "After ProcessorA responds, use send_message to ask ProcessorB to validate. "
  455. "Finally, use combine_results tool and respond 'All done'."
  456. ),
  457. model_settings=ModelSettings(temperature=0.0),
  458. tools=[get_market_data, combine_results],
  459. )
  460. processor_a = Agent(
  461. name="ProcessorA",
  462. description="Data processor",
  463. instructions="When asked: use process_data tool and respond 'ProcessorA complete'.",
  464. model_settings=ModelSettings(temperature=0.0, tool_choice="required"),
  465. tools=[process_data],
  466. )
  467. processor_b = Agent(
  468. name="ProcessorB",
  469. description="Result validator",
  470. instructions="When asked: use validate_result tool and respond 'ProcessorB complete'.",
  471. model_settings=ModelSettings(temperature=0.0, tool_choice="required"),
  472. tools=[validate_result],
  473. )
  474. agency = Agency(
  475. orchestrator,
  476. communication_flows=[orchestrator > processor_a, orchestrator > processor_b],
  477. shared_instructions="",
  478. )
  479. before = len(agency.thread_manager.get_all_messages())
  480. # Collect stream events
  481. stream_items: list[tuple[str, str, str | None]] = []
  482. async for event in agency.get_response_stream(message="Coordinate parallel work."):
  483. if hasattr(event, "item") and event.item is not None:
  484. item = event.item
  485. evt_type = getattr(item, "type", None)
  486. if evt_type == "reasoning_item":
  487. continue
  488. agent_name = getattr(event, "agent", None)
  489. tool_name = None
  490. if evt_type == "tool_call_item":
  491. raw = getattr(item, "raw_item", None)
  492. tool_name = getattr(raw, "name", None)
  493. if isinstance(evt_type, str) and isinstance(agent_name, str):
  494. stream_items.append((evt_type, agent_name, tool_name))
  495. # Verify stream matches expected (allow optional initial message_output from reasoning models)
  496. normalized = _normalize_optional_agent_message_outputs(stream_items, "Orchestrator")
  497. if normalized != EXPECTED_FLOW_PARALLEL:
  498. logger.error(
  499. "Parallel sub-agent stream mismatch",
  500. extra={
  501. "got": stream_items,
  502. "expected": EXPECTED_FLOW_PARALLEL,
  503. },
  504. )
  505. assert normalized == EXPECTED_FLOW_PARALLEL, (
  506. f"Parallel calls stream mismatch:\n got={stream_items}\n exp={EXPECTED_FLOW_PARALLEL}"
  507. )
  508. # Verify saved messages
  509. all_messages = agency.thread_manager.get_all_messages()
  510. new_messages = all_messages[before:]
  511. comparable: list[dict[str, Any]] = []
  512. for m in new_messages:
  513. t = m.get("type")
  514. role = m.get("role")
  515. if t in {"function_call", "function_call_output"} or role == "assistant":
  516. comparable.append(m)
  517. _assert_tool_call_recorded(new_messages, "ProcessorA", "process_data", context="parallel workflow")
  518. _assert_tool_call_recorded(new_messages, "ProcessorB", "validate_result", context="parallel workflow")
  519. _assert_sanitized_history(comparable)