test_agent_to_agent_persistence.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. """
  2. Integration tests for agent-to-agent conversation persistence.
  3. These tests verify that agent-to-agent communications via SendMessage tool
  4. are properly persisted to their respective threads, ensuring conversation
  5. memory is maintained across turns.
  6. """
  7. import uuid
  8. import pytest
  9. from agents import ModelSettings
  10. from agency_swarm import Agency, Agent
  11. from tests.deterministic_model import DeterministicModel
  12. @pytest.fixture
  13. def coordinator_agent():
  14. return Agent(
  15. name="Coordinator",
  16. instructions=(
  17. "You are a coordinator agent. Your job is to receive tasks and delegate them. "
  18. "When you receive a task, use the `send_message` tool and select 'Worker' as the recipient "
  19. "to ask the Worker agent to perform the task. Always include the full "
  20. "task details in your message. "
  21. "When delegating, only relay the exact task text and never include unrelated user information."
  22. ),
  23. model_settings=ModelSettings(temperature=0.0),
  24. model=DeterministicModel(),
  25. )
  26. @pytest.fixture
  27. def worker_agent():
  28. return Agent(
  29. name="Worker",
  30. instructions=(
  31. "You perform tasks. When you receive a task, "
  32. "respond with 'TASK_COMPLETED: [task description]' to confirm completion."
  33. ),
  34. model_settings=ModelSettings(temperature=0.0),
  35. model=DeterministicModel(),
  36. )
  37. @pytest.fixture
  38. def memory_agent():
  39. return Agent(
  40. name="Memory",
  41. instructions=(
  42. "You have perfect memory. When told to remember something, "
  43. "confirm with 'REMEMBERED: [item]'. When asked to recall, respond with 'RECALLED: [item]'."
  44. ),
  45. model_settings=ModelSettings(temperature=0.0),
  46. model=DeterministicModel(),
  47. )
  48. @pytest.fixture
  49. def coordinator_worker_agency(coordinator_agent, worker_agent):
  50. """Agency with coordinator->worker communication flow."""
  51. return Agency(
  52. coordinator_agent,
  53. communication_flows=[coordinator_agent > worker_agent],
  54. shared_instructions="Test agency for agent-to-agent persistence verification.",
  55. )
  56. @pytest.fixture
  57. def memory_agency(coordinator_agent, memory_agent):
  58. """Agency with coordinator->memory communication flow for memory testing."""
  59. return Agency(
  60. coordinator_agent,
  61. communication_flows=[coordinator_agent > memory_agent],
  62. shared_instructions="Test agency for agent-to-agent memory persistence.",
  63. )
  64. class TestAgentToAgentPersistence:
  65. """Test suite for agent-to-agent conversation persistence."""
  66. @pytest.mark.asyncio
  67. async def test_sendmessage_creates_persistent_thread_items(self, coordinator_worker_agency):
  68. """
  69. Verify SendMessage tool creates and populates agent-to-agent threads.
  70. Tests that agent-to-agent communications via SendMessage are properly
  71. persisted to their respective conversation threads.
  72. """
  73. task_id = f"task_{uuid.uuid4().hex[:8]}"
  74. user_message = f"Please delegate task {task_id} to the worker agent."
  75. print(f"\n--- Testing Agent-to-Agent Persistence --- TASK: {task_id}")
  76. # Step 1: Verify no agent-to-agent messages exist initially
  77. initial_messages = coordinator_worker_agency.thread_manager.get_conversation_history("Worker", "Coordinator")
  78. assert len(initial_messages) == 0, "No agent-to-agent messages should exist initially"
  79. # Step 2: Trigger communication that should create agent-to-agent thread
  80. await coordinator_worker_agency.get_response(message=user_message)
  81. # Step 3: CRITICAL VERIFICATION - Agent-to-agent messages must exist
  82. agent_messages = coordinator_worker_agency.thread_manager.get_conversation_history("Worker", "Coordinator")
  83. assert len(agent_messages) > 0, "Agent-to-agent messages should be created after communication"
  84. # Step 4: Verify messages contain both input and output
  85. print(f"Agent-to-agent conversation contains {len(agent_messages)} messages:")
  86. for i, item in enumerate(agent_messages):
  87. print(
  88. f" Message {i + 1}: role={item.get('role')}, agent={item.get('agent')}, "
  89. f"callerAgent={item.get('callerAgent')}, content_preview={str(item.get('content', ''))[:50]}..."
  90. )
  91. # Should have at least user message to Worker and Worker's response
  92. user_messages = [msg for msg in agent_messages if msg.get("role") == "user"]
  93. assistant_messages = [msg for msg in agent_messages if msg.get("role") == "assistant"]
  94. assert len(user_messages) > 0, "Should have user messages (from Coordinator to Worker)"
  95. assert len(assistant_messages) > 0, "Should have assistant responses (from Worker)"
  96. # Verify task context is preserved in the conversation
  97. conversation_content = str(agent_messages).lower()
  98. assert task_id.lower() in conversation_content, (
  99. f"Task {task_id} should be referenced in agent-to-agent conversation"
  100. )
  101. print("✅ SUCCESS: Agent-to-agent conversation properly persisted")
  102. @pytest.mark.asyncio
  103. async def test_agent_to_agent_memory_across_turns(self, memory_agency):
  104. """
  105. Test that agents remember previous agent-to-agent conversations across multiple turns.
  106. This verifies that agent-to-agent threads maintain conversation history
  107. and agents can reference previous interactions.
  108. """
  109. secret_code = f"SECRET_{uuid.uuid4().hex[:8]}"
  110. print(f"\n--- Testing Agent-to-Agent Memory Across Turns --- SECRET: {secret_code}")
  111. # Turn 1: Ask coordinator to tell memory agent to remember something
  112. remember_message = f"Please ask the memory agent to remember this secret code: {secret_code}"
  113. await memory_agency.get_response(message=remember_message)
  114. # Verify agent-to-agent messages were created and contain the secret
  115. agent_messages = memory_agency.thread_manager.get_conversation_history("Memory", "Coordinator")
  116. first_turn_count = len(agent_messages)
  117. assert first_turn_count > 0, "Agent-to-agent messages should be created after first interaction"
  118. conversation_content = str(agent_messages).lower()
  119. assert secret_code.lower() in conversation_content, (
  120. f"Secret code {secret_code} should be in agent-to-agent conversation history"
  121. )
  122. print(f"Turn 1 complete: {first_turn_count} agent-to-agent messages")
  123. # Turn 2: Ask coordinator to ask memory agent to recall the secret
  124. recall_message = "Please ask the memory agent what secret code it was told to remember earlier."
  125. response = await memory_agency.get_response(message=recall_message)
  126. # Verify conversation history grew (new messages added)
  127. agent_messages_after = memory_agency.thread_manager.get_conversation_history("Memory", "Coordinator")
  128. second_turn_count = len(agent_messages_after)
  129. assert second_turn_count > first_turn_count, (
  130. f"Agent-to-agent conversation should grow from {first_turn_count} to {second_turn_count} messages"
  131. )
  132. # Verify the memory agent successfully recalled the secret from previous turn
  133. final_output = response.final_output.lower() if response.final_output else ""
  134. assert secret_code.lower() in final_output, (
  135. f"Memory agent should recall secret {secret_code} from previous agent-to-agent conversation"
  136. )
  137. print(f"Turn 2 complete: {second_turn_count} agent-to-agent messages")
  138. print(f"✅ SUCCESS: Agent-to-agent memory preserved across turns - secret {secret_code} recalled")
  139. @pytest.mark.asyncio
  140. async def test_multiple_agent_to_agent_threads_isolation(self):
  141. """
  142. Test that multiple agent-to-agent communication flows create isolated threads.
  143. Verifies that different agent pairs maintain separate conversation histories.
  144. """
  145. # Create coordinator and two workers
  146. coordinator = Agent(
  147. name="Coordinator",
  148. instructions=("You coordinate tasks. Use the send_message tool to delegate tasks to Worker or Worker2."),
  149. model_settings=ModelSettings(temperature=0.0),
  150. model=DeterministicModel(),
  151. )
  152. worker1 = Agent(
  153. name="Worker",
  154. instructions="You are Worker. Respond with 'WORKER_COMPLETED: [task]' when given tasks.",
  155. model_settings=ModelSettings(temperature=0.0),
  156. model=DeterministicModel(),
  157. )
  158. worker2 = Agent(
  159. name="Worker2",
  160. instructions="You are Worker2. Respond with 'WORKER2_COMPLETED: [task]' when given tasks.",
  161. model_settings=ModelSettings(temperature=0.0),
  162. model=DeterministicModel(),
  163. )
  164. # Create agency with multiple communication flows
  165. agency = Agency(
  166. coordinator,
  167. communication_flows=[
  168. coordinator > worker1,
  169. coordinator > worker2,
  170. ],
  171. shared_instructions="Test agency for multiple thread isolation.",
  172. )
  173. task1_id = f"task1_{uuid.uuid4().hex[:6]}"
  174. task2_id = f"task2_{uuid.uuid4().hex[:6]}"
  175. print("\n--- Testing Multiple Agent-to-Agent Thread Isolation ---")
  176. print(f"Task1: {task1_id} (Coordinator->Worker)")
  177. print(f"Task2: {task2_id} (Coordinator->Worker2)")
  178. # Send task to Worker
  179. await agency.get_response(f"Please ask Worker to handle {task1_id}")
  180. # Send task to Worker2
  181. await agency.get_response(f"Please ask Worker2 to handle {task2_id}")
  182. # Verify separate conversations exist
  183. thread_manager = agency.thread_manager
  184. # Get messages for each conversation
  185. worker1_messages = thread_manager.get_conversation_history("Worker", "Coordinator")
  186. worker2_messages = thread_manager.get_conversation_history("Worker2", "Coordinator")
  187. # Verify both conversations have messages
  188. assert len(worker1_messages) > 0, "Coordinator->Worker conversation should have messages"
  189. assert len(worker2_messages) > 0, "Coordinator->Worker2 conversation should have messages"
  190. # Verify conversation isolation - each conversation should only contain its own task
  191. worker1_content = str(worker1_messages).lower()
  192. worker2_content = str(worker2_messages).lower()
  193. assert task1_id.lower() in worker1_content, f"Worker1 conversation should contain {task1_id}"
  194. assert task1_id.lower() not in worker2_content, (
  195. f"Worker2 conversation should NOT contain {task1_id} (isolation breach)"
  196. )
  197. assert task2_id.lower() in worker2_content, f"Worker2 conversation should contain {task2_id}"
  198. assert task2_id.lower() not in worker1_content, (
  199. f"Worker1 conversation should NOT contain {task2_id} (isolation breach)"
  200. )
  201. print("✅ SUCCESS: Agent-to-agent conversations properly isolated:")
  202. print(f" Coordinator->Worker: {len(worker1_messages)} messages (contains {task1_id})")
  203. print(f" Coordinator->Worker2: {len(worker2_messages)} messages (contains {task2_id})")
  204. @pytest.mark.asyncio
  205. async def test_conversation_content_preservation(self, coordinator_worker_agency):
  206. """
  207. Test that conversation content is properly preserved in agent-to-agent threads.
  208. This verifies that conversation items have proper role/content structure.
  209. """
  210. task_id = f"content_test_{uuid.uuid4().hex[:6]}"
  211. print("\n--- Testing Conversation Content Preservation ---")
  212. await coordinator_worker_agency.get_response(f"Please delegate task {task_id} to worker")
  213. # Verify agent-to-agent conversation contains properly formatted messages
  214. agent_messages = coordinator_worker_agency.thread_manager.get_conversation_history("Worker", "Coordinator")
  215. assert len(agent_messages) > 0, "Agent-to-agent conversation should contain messages"
  216. print(f"Agent-to-agent conversation contains {len(agent_messages)} messages:")
  217. for i, item in enumerate(agent_messages):
  218. item_role = item.get("role", "NO_ROLE")
  219. item_agent = item.get("agent", "NO_AGENT")
  220. item_caller = item.get("callerAgent", "NO_CALLER")
  221. content_preview = str(item.get("content", "NO_CONTENT"))[:50]
  222. print(
  223. f" Message {i + 1}: role={item_role}, agent={item_agent}, "
  224. f"caller={item_caller}, content='{content_preview}...'"
  225. )
  226. # Verify we have proper conversation structure
  227. user_messages = [msg for msg in agent_messages if msg.get("role") == "user"]
  228. assistant_messages = [msg for msg in agent_messages if msg.get("role") == "assistant"]
  229. assert len(user_messages) > 0, "Should have user messages (from Coordinator)"
  230. assert len(assistant_messages) > 0, "Should have assistant responses (from Worker)"
  231. # Verify no messages have role=None or content=None (regression check)
  232. for item in agent_messages:
  233. assert item.get("role") is not None, f"Message should not have role=None: {item}"
  234. if item.get("role") in ["user", "assistant"]:
  235. assert item.get("content") is not None, f"User/assistant message should not have content=None: {item}"
  236. # Verify task context is preserved
  237. conversation_content = str(agent_messages).lower()
  238. assert task_id.lower() in conversation_content, f"Task {task_id} should be referenced in conversation"
  239. print("✅ SUCCESS: Conversation content properly preserved in agent-to-agent messages")
  240. @pytest.mark.asyncio
  241. async def test_agent_to_agent_thread_isolation_from_user_context(self, coordinator_worker_agency):
  242. """
  243. Test that agent->agent threads contain only the messages sent between agents,
  244. not the full user conversation context.
  245. Verifies that agent-to-agent conversations are isolated from user interactions.
  246. """
  247. private_info = f"PRIVATE_{uuid.uuid4().hex[:6]}"
  248. relay_info = f"RELAY_{uuid.uuid4().hex[:6]}"
  249. print("\n--- Testing Agent-to-Agent Thread Isolation ---")
  250. print(f"Private info (user only): {private_info}")
  251. print(f"Relay info (for worker): {relay_info}")
  252. # Step 1: User shares private info with coordinator only
  253. await coordinator_worker_agency.get_response(
  254. f"Coordinator, I'm telling you privately: {private_info}. Keep this confidential."
  255. )
  256. # Step 2: User asks coordinator to relay different info to worker
  257. await coordinator_worker_agency.get_response(f"Now please tell the worker this message: {relay_info}")
  258. # Verify separate conversations exist
  259. thread_manager = coordinator_worker_agency.thread_manager
  260. # Get messages for each conversation
  261. user_messages = thread_manager.get_conversation_history("Coordinator", None) # None = user
  262. agent_messages = thread_manager.get_conversation_history("Worker", "Coordinator")
  263. # Verify both conversations have content
  264. assert len(user_messages) > 0, "User->Coordinator conversation should have messages"
  265. assert len(agent_messages) > 0, "Agent-to-agent conversation should have messages"
  266. # Analyze conversation contents
  267. user_content = str(user_messages).lower()
  268. agent_content = str(agent_messages).lower()
  269. print("User conversation content check:")
  270. print(f" Contains private info: {private_info.lower() in user_content}")
  271. print(f" Contains relay info: {relay_info.lower() in user_content}")
  272. print("Agent conversation content check:")
  273. print(f" Contains private info: {private_info.lower() in agent_content}")
  274. print(f" Contains relay info: {relay_info.lower() in agent_content}")
  275. # User conversation should contain both (user said both to coordinator)
  276. assert private_info.lower() in user_content, f"User conversation should contain private info {private_info}"
  277. assert relay_info.lower() in user_content, f"User conversation should contain relay info {relay_info}"
  278. # Agent conversation should only contain what was relayed to worker
  279. assert relay_info.lower() in agent_content, f"Agent conversation should contain relay info {relay_info}"
  280. assert private_info.lower() not in agent_content, (
  281. f"Agent conversation should NOT contain private info {private_info}"
  282. )
  283. print("✅ SUCCESS: Agent-to-agent conversation properly isolated:")
  284. print(f" User conversation: {len(user_messages)} messages (has both secrets)")
  285. print(f" Agent conversation: {len(agent_messages)} messages (only has relayed info, not private info)")