test_send_message_blocking.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. """
  2. Integration test for SendMessage concurrent blocking behavior.
  3. Tests both same-agent (blocking) and different-agent (no blocking) scenarios.
  4. """
  5. import asyncio
  6. import json
  7. from types import MethodType, SimpleNamespace
  8. import pytest
  9. from agency_swarm import Agency, Agent
  10. from agency_swarm.context import MasterContext
  11. @pytest.mark.asyncio
  12. async def test_concurrent_messages_to_same_agent():
  13. """
  14. Concurrent sends to the same recipient should trigger the pending-recipient guard.
  15. """
  16. gate = asyncio.Event()
  17. sender = Agent(name="Coordinator", instructions="Coordinate tasks", model="gpt-5.4-mini")
  18. recipient = Agent(name="Worker", instructions="Handle tasks", model="gpt-5.4-mini")
  19. async def waiting_response(self, **_kwargs):
  20. await gate.wait()
  21. return SimpleNamespace(final_output=f"{self.name} result")
  22. recipient.get_response = MethodType(waiting_response, recipient)
  23. agency = Agency(sender, recipient, communication_flows=[sender > recipient])
  24. runtime_state = agency.get_agent_runtime_state(sender.name)
  25. send_tool = next(iter(runtime_state.send_message_tools.values()))
  26. wrapper = SimpleNamespace(
  27. context=MasterContext(
  28. thread_manager=agency.thread_manager,
  29. agents=agency.agents,
  30. shared_instructions=agency.shared_instructions,
  31. )
  32. )
  33. payload = json.dumps(
  34. {
  35. "recipient_agent": recipient.name,
  36. "message": "Task",
  37. "additional_instructions": "",
  38. }
  39. )
  40. first_task = asyncio.create_task(send_tool.on_invoke_tool(wrapper, payload))
  41. await asyncio.sleep(0)
  42. second_result = await send_tool.on_invoke_tool(wrapper, payload)
  43. gate.set()
  44. first_result = await first_task
  45. assert isinstance(first_result, str) and first_result.endswith("result")
  46. assert (
  47. "Cannot send another message to 'Worker' while the previous message is still being processed" in second_result
  48. )
  49. @pytest.mark.asyncio
  50. async def test_messages_to_different_agents():
  51. """
  52. Concurrent sends to different recipients should not trigger the pending-recipient guard.
  53. """
  54. gate = asyncio.Event()
  55. started = 0
  56. sender = Agent(name="Coordinator", instructions="Coordinate tasks", model="gpt-5.4-mini")
  57. recipient_one = Agent(name="Worker1", instructions="Handle tasks", model="gpt-5.4-mini")
  58. recipient_two = Agent(name="Worker2", instructions="Handle tasks", model="gpt-5.4-mini")
  59. async def waiting_response(self, **_kwargs):
  60. nonlocal started
  61. started += 1
  62. if started < 2:
  63. await gate.wait()
  64. else:
  65. gate.set()
  66. return SimpleNamespace(final_output=f"{self.name} result")
  67. recipient_one.get_response = MethodType(waiting_response, recipient_one)
  68. recipient_two.get_response = MethodType(waiting_response, recipient_two)
  69. agency = Agency(
  70. sender,
  71. recipient_one,
  72. recipient_two,
  73. communication_flows=[sender > recipient_one, sender > recipient_two],
  74. )
  75. runtime_state = agency.get_agent_runtime_state(sender.name)
  76. send_tool_one = next(
  77. tool for tool in runtime_state.send_message_tools.values() if recipient_one.name.lower() in tool.recipients
  78. )
  79. send_tool_two = next(
  80. tool for tool in runtime_state.send_message_tools.values() if recipient_two.name.lower() in tool.recipients
  81. )
  82. wrapper = SimpleNamespace(
  83. context=MasterContext(
  84. thread_manager=agency.thread_manager,
  85. agents=agency.agents,
  86. shared_instructions=agency.shared_instructions,
  87. )
  88. )
  89. payload_one = json.dumps(
  90. {
  91. "recipient_agent": recipient_one.name,
  92. "message": "Task one",
  93. "additional_instructions": "",
  94. }
  95. )
  96. payload_two = json.dumps(
  97. {
  98. "recipient_agent": recipient_two.name,
  99. "message": "Task two",
  100. "additional_instructions": "",
  101. }
  102. )
  103. result_one, result_two = await asyncio.gather(
  104. send_tool_one.on_invoke_tool(wrapper, payload_one),
  105. send_tool_two.on_invoke_tool(wrapper, payload_two),
  106. )
  107. assert isinstance(result_one, str) and result_one.endswith("result")
  108. assert isinstance(result_two, str) and result_two.endswith("result")
  109. assert "Cannot send another message" not in result_one
  110. assert "Cannot send another message" not in result_two
  111. @pytest.mark.asyncio
  112. async def test_pending_guard_is_isolated_between_agencies_that_share_agents():
  113. """A pending send in one agency should not block the same recipient in another agency."""
  114. gate = asyncio.Event()
  115. first_call_pending = {"value": False}
  116. sender = Agent(name="Coordinator", instructions="Coordinate tasks", model="gpt-5.4-mini")
  117. recipient = Agent(name="Worker", instructions="Handle tasks", model="gpt-5.4-mini")
  118. async def waiting_response(self, **_kwargs):
  119. if not first_call_pending["value"]:
  120. first_call_pending["value"] = True
  121. await gate.wait()
  122. return SimpleNamespace(final_output=f"{self.name} result")
  123. recipient.get_response = MethodType(waiting_response, recipient)
  124. agency_one = Agency(sender, recipient, communication_flows=[sender > recipient])
  125. agency_two = Agency(sender, recipient, communication_flows=[sender > recipient])
  126. runtime_one = agency_one.get_agent_runtime_state(sender.name)
  127. runtime_two = agency_two.get_agent_runtime_state(sender.name)
  128. send_tool_one = next(iter(runtime_one.send_message_tools.values()))
  129. send_tool_two = next(iter(runtime_two.send_message_tools.values()))
  130. wrapper_one = SimpleNamespace(
  131. context=MasterContext(
  132. thread_manager=agency_one.thread_manager,
  133. agents=agency_one.agents,
  134. shared_instructions=agency_one.shared_instructions,
  135. )
  136. )
  137. wrapper_two = SimpleNamespace(
  138. context=MasterContext(
  139. thread_manager=agency_two.thread_manager,
  140. agents=agency_two.agents,
  141. shared_instructions=agency_two.shared_instructions,
  142. )
  143. )
  144. payload = json.dumps(
  145. {
  146. "recipient_agent": recipient.name,
  147. "message": "Task",
  148. "additional_instructions": "",
  149. }
  150. )
  151. first_task = asyncio.create_task(send_tool_one.on_invoke_tool(wrapper_one, payload))
  152. await asyncio.sleep(0)
  153. second_result = await send_tool_two.on_invoke_tool(wrapper_two, payload)
  154. gate.set()
  155. first_result = await first_task
  156. assert isinstance(first_result, str) and first_result.endswith("result")
  157. assert isinstance(second_result, str) and second_result.endswith("result")
  158. assert "Cannot send another message" not in second_result