test_thread_isolation_basic.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. """
  2. Basic Thread Isolation Tests
  3. Tests the fundamental thread isolation between different communication flows
  4. using direct structural verification of thread state.
  5. """
  6. import pytest
  7. from agents import ModelSettings
  8. from agency_swarm import Agency, Agent
  9. from tests.deterministic_model import DeterministicModel
  10. @pytest.fixture
  11. def ceo_agent_instance():
  12. return Agent(
  13. name="CEO",
  14. description="Chief Executive Officer",
  15. instructions=(
  16. "You are the CEO. Remember information and delegate tasks. NEVER share user information with other agents."
  17. ),
  18. model=DeterministicModel(default_response="OK"),
  19. model_settings=ModelSettings(temperature=0.0),
  20. )
  21. @pytest.fixture
  22. def developer_agent_instance():
  23. return Agent(
  24. name="Developer",
  25. description="Software Developer",
  26. instructions="You are a Developer. Remember technical details. NEVER share user information with other agents.",
  27. model=DeterministicModel(default_response="OK"),
  28. model_settings=ModelSettings(temperature=0.0),
  29. )
  30. @pytest.fixture
  31. def basic_agency(ceo_agent_instance, developer_agent_instance):
  32. """Agency with CEO and Developer for basic isolation testing."""
  33. return Agency(
  34. ceo_agent_instance,
  35. developer_agent_instance,
  36. communication_flows=[ceo_agent_instance > developer_agent_instance],
  37. shared_instructions="Basic thread isolation test agency.",
  38. )
  39. @pytest.mark.asyncio
  40. async def test_user_thread_shared(basic_agency: Agency):
  41. """Verify that user messages to different agents share a single thread."""
  42. unique_ceo_info = "USERCEOa89a4324"
  43. unique_dev_info = "USERDEVni193vsd"
  44. print("\n--- User Thread Sharing Test ---")
  45. # Step 1: Send unique info to CEO
  46. await basic_agency.get_response(message=f"User info: {unique_ceo_info}", recipient_agent="CEO")
  47. # Step 2: Send different info to Developer
  48. await basic_agency.get_response(message=f"User info: {unique_dev_info}", recipient_agent="Developer")
  49. # Step 3: Verify both agents see the same conversation history
  50. thread_manager = basic_agency.thread_manager
  51. ceo_messages = thread_manager.get_conversation_history("CEO", None)
  52. dev_messages = thread_manager.get_conversation_history("Developer", None)
  53. assert ceo_messages == dev_messages, "Entry-point agents should share user thread"
  54. thread_content = str(ceo_messages).lower()
  55. assert unique_ceo_info.lower() in thread_content, "User thread missing CEO info"
  56. assert unique_dev_info.lower() in thread_content, "User thread missing Developer info"
  57. print("✓ All entry-point agents share user thread with combined history")
  58. @pytest.mark.asyncio
  59. async def test_agent_to_agent_thread_isolation(basic_agency: Agency):
  60. """Agent-to-agent conversations should remain separate from user thread."""
  61. user_ceo_info = "USERCEOa89a4324"
  62. user_dev_info = "USERDEVni193vsd"
  63. print("\n--- Agent-to-Agent Thread Isolation Test ---")
  64. # Flow 1: user->CEO
  65. await basic_agency.get_response(message=f"User info: {user_ceo_info}", recipient_agent="CEO")
  66. # Flow 2: user->Developer
  67. await basic_agency.get_response(message=f"User info: {user_dev_info}", recipient_agent="Developer")
  68. # Flow 3: CEO->Developer (agent-to-agent) - just trigger thread creation
  69. await basic_agency.get_response(message="Say hi to developer")
  70. # Direct verification of thread separation
  71. thread_manager = basic_agency.thread_manager
  72. # All entry-point agents share user thread
  73. user_ceo_messages = thread_manager.get_conversation_history("CEO", None)
  74. user_dev_messages = thread_manager.get_conversation_history("Developer", None)
  75. assert user_ceo_messages == user_dev_messages, "User thread should be shared"
  76. user_thread_content = str(user_ceo_messages).lower()
  77. assert user_ceo_info.lower() in user_thread_content
  78. assert user_dev_info.lower() in user_thread_content
  79. # Agent-to-agent conversation should remain isolated
  80. ceo_dev_messages = thread_manager.get_conversation_history("Developer", "CEO")
  81. assert len(ceo_dev_messages) > 0, "CEO->Developer conversation should have messages"
  82. ceo_dev_content = str(ceo_dev_messages).lower()
  83. assert user_ceo_info.lower() not in ceo_dev_content
  84. assert user_dev_info.lower() not in ceo_dev_content
  85. print("✓ user->CEO, user->Developer, CEO->Developer conversations are properly isolated")
  86. print("✓ User interaction threads properly isolated")
  87. print("✓ Agent-to-agent creates separate thread structure")
  88. @pytest.mark.asyncio
  89. async def test_thread_identifier_format(basic_agency: Agency):
  90. """
  91. Test that thread identifiers follow correct "sender->recipient" format.
  92. This test uses direct verification of thread manager state instead of mocking.
  93. """
  94. print("\n--- Thread Identifier Format Test ---")
  95. # Execute various communication flows
  96. await basic_agency.get_response(message="Test message to CEO", recipient_agent="CEO")
  97. await basic_agency.get_response(message="Test message to Developer", recipient_agent="Developer")
  98. # CEO to Developer
  99. await basic_agency.get_response(message="Say hi to developer")
  100. # Direct verification - check actual conversation flows
  101. thread_manager = basic_agency.thread_manager
  102. all_messages = thread_manager.get_all_messages()
  103. # Extract unique conversation flows from messages
  104. conversation_flows = set()
  105. for msg in all_messages:
  106. agent = msg.get("agent", "")
  107. caller = msg.get("callerAgent")
  108. if agent:
  109. # Convert None to "user" for display
  110. caller_name = "user" if caller is None else caller
  111. conversation_flows.add(f"{caller_name}->{agent}")
  112. actual_flows = list(conversation_flows)
  113. print(f"--- Actual conversation flows created: {actual_flows}")
  114. # Verify expected conversation patterns exist
  115. expected_thread_patterns = [
  116. {"thread_id": "user->CEO", "sender": "user", "recipient": "CEO"},
  117. {"thread_id": "user->Developer", "sender": "user", "recipient": "Developer"},
  118. {"thread_id": "CEO->Developer", "sender": "CEO", "recipient": "Developer"},
  119. ]
  120. for expected in expected_thread_patterns:
  121. thread_id = expected["thread_id"]
  122. sender = expected["sender"]
  123. recipient = expected["recipient"]
  124. # Verify conversation flow exists
  125. assert thread_id in actual_flows, f"Conversation flow '{thread_id}' not found in {actual_flows}"
  126. # Verify format structure
  127. assert "->" in thread_id, f"Thread ID should contain '->': {thread_id}"
  128. # Verify sender and recipient parts
  129. actual_sender, actual_recipient = thread_id.split("->")
  130. assert actual_sender == sender, f"Wrong sender for {thread_id}: expected {sender}, got {actual_sender}"
  131. assert actual_recipient == recipient, (
  132. f"Wrong recipient for {thread_id}: expected {recipient}, got {actual_recipient}"
  133. )
  134. print("✓ All thread identifiers follow 'sender->recipient' format")
  135. print("✓ User interactions use 'user->agent_name'")
  136. print("✓ Agent interactions use 'sender_agent->recipient_agent'")
  137. print("✓ Conversation flow format verification completed through message inspection")