| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- """
- Integration test for SendMessage concurrent blocking behavior.
- Tests both same-agent (blocking) and different-agent (no blocking) scenarios.
- """
- import asyncio
- import json
- from types import MethodType, SimpleNamespace
- import pytest
- from agency_swarm import Agency, Agent
- from agency_swarm.context import MasterContext
- @pytest.mark.asyncio
- async def test_concurrent_messages_to_same_agent():
- """
- Concurrent sends to the same recipient should trigger the pending-recipient guard.
- """
- gate = asyncio.Event()
- sender = Agent(name="Coordinator", instructions="Coordinate tasks", model="gpt-5.4-mini")
- recipient = Agent(name="Worker", instructions="Handle tasks", model="gpt-5.4-mini")
- async def waiting_response(self, **_kwargs):
- await gate.wait()
- return SimpleNamespace(final_output=f"{self.name} result")
- recipient.get_response = MethodType(waiting_response, recipient)
- agency = Agency(sender, recipient, communication_flows=[sender > recipient])
- runtime_state = agency.get_agent_runtime_state(sender.name)
- send_tool = next(iter(runtime_state.send_message_tools.values()))
- wrapper = SimpleNamespace(
- context=MasterContext(
- thread_manager=agency.thread_manager,
- agents=agency.agents,
- shared_instructions=agency.shared_instructions,
- )
- )
- payload = json.dumps(
- {
- "recipient_agent": recipient.name,
- "message": "Task",
- "additional_instructions": "",
- }
- )
- first_task = asyncio.create_task(send_tool.on_invoke_tool(wrapper, payload))
- await asyncio.sleep(0)
- second_result = await send_tool.on_invoke_tool(wrapper, payload)
- gate.set()
- first_result = await first_task
- assert isinstance(first_result, str) and first_result.endswith("result")
- assert (
- "Cannot send another message to 'Worker' while the previous message is still being processed" in second_result
- )
- @pytest.mark.asyncio
- async def test_messages_to_different_agents():
- """
- Concurrent sends to different recipients should not trigger the pending-recipient guard.
- """
- gate = asyncio.Event()
- started = 0
- sender = Agent(name="Coordinator", instructions="Coordinate tasks", model="gpt-5.4-mini")
- recipient_one = Agent(name="Worker1", instructions="Handle tasks", model="gpt-5.4-mini")
- recipient_two = Agent(name="Worker2", instructions="Handle tasks", model="gpt-5.4-mini")
- async def waiting_response(self, **_kwargs):
- nonlocal started
- started += 1
- if started < 2:
- await gate.wait()
- else:
- gate.set()
- return SimpleNamespace(final_output=f"{self.name} result")
- recipient_one.get_response = MethodType(waiting_response, recipient_one)
- recipient_two.get_response = MethodType(waiting_response, recipient_two)
- agency = Agency(
- sender,
- recipient_one,
- recipient_two,
- communication_flows=[sender > recipient_one, sender > recipient_two],
- )
- runtime_state = agency.get_agent_runtime_state(sender.name)
- send_tool_one = next(
- tool for tool in runtime_state.send_message_tools.values() if recipient_one.name.lower() in tool.recipients
- )
- send_tool_two = next(
- tool for tool in runtime_state.send_message_tools.values() if recipient_two.name.lower() in tool.recipients
- )
- wrapper = SimpleNamespace(
- context=MasterContext(
- thread_manager=agency.thread_manager,
- agents=agency.agents,
- shared_instructions=agency.shared_instructions,
- )
- )
- payload_one = json.dumps(
- {
- "recipient_agent": recipient_one.name,
- "message": "Task one",
- "additional_instructions": "",
- }
- )
- payload_two = json.dumps(
- {
- "recipient_agent": recipient_two.name,
- "message": "Task two",
- "additional_instructions": "",
- }
- )
- result_one, result_two = await asyncio.gather(
- send_tool_one.on_invoke_tool(wrapper, payload_one),
- send_tool_two.on_invoke_tool(wrapper, payload_two),
- )
- assert isinstance(result_one, str) and result_one.endswith("result")
- assert isinstance(result_two, str) and result_two.endswith("result")
- assert "Cannot send another message" not in result_one
- assert "Cannot send another message" not in result_two
- @pytest.mark.asyncio
- async def test_pending_guard_is_isolated_between_agencies_that_share_agents():
- """A pending send in one agency should not block the same recipient in another agency."""
- gate = asyncio.Event()
- first_call_pending = {"value": False}
- sender = Agent(name="Coordinator", instructions="Coordinate tasks", model="gpt-5.4-mini")
- recipient = Agent(name="Worker", instructions="Handle tasks", model="gpt-5.4-mini")
- async def waiting_response(self, **_kwargs):
- if not first_call_pending["value"]:
- first_call_pending["value"] = True
- await gate.wait()
- return SimpleNamespace(final_output=f"{self.name} result")
- recipient.get_response = MethodType(waiting_response, recipient)
- agency_one = Agency(sender, recipient, communication_flows=[sender > recipient])
- agency_two = Agency(sender, recipient, communication_flows=[sender > recipient])
- runtime_one = agency_one.get_agent_runtime_state(sender.name)
- runtime_two = agency_two.get_agent_runtime_state(sender.name)
- send_tool_one = next(iter(runtime_one.send_message_tools.values()))
- send_tool_two = next(iter(runtime_two.send_message_tools.values()))
- wrapper_one = SimpleNamespace(
- context=MasterContext(
- thread_manager=agency_one.thread_manager,
- agents=agency_one.agents,
- shared_instructions=agency_one.shared_instructions,
- )
- )
- wrapper_two = SimpleNamespace(
- context=MasterContext(
- thread_manager=agency_two.thread_manager,
- agents=agency_two.agents,
- shared_instructions=agency_two.shared_instructions,
- )
- )
- payload = json.dumps(
- {
- "recipient_agent": recipient.name,
- "message": "Task",
- "additional_instructions": "",
- }
- )
- first_task = asyncio.create_task(send_tool_one.on_invoke_tool(wrapper_one, payload))
- await asyncio.sleep(0)
- second_result = await send_tool_two.on_invoke_tool(wrapper_two, payload)
- gate.set()
- first_result = await first_task
- assert isinstance(first_result, str) and first_result.endswith("result")
- assert isinstance(second_result, str) and second_result.endswith("result")
- assert "Cannot send another message" not in second_result
|