| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- from unittest.mock import AsyncMock, MagicMock
- import pytest
- from dotenv import load_dotenv
- from agency_swarm import Agency, AgencyContext, Agent
- from agency_swarm.utils.thread import ThreadManager
- load_dotenv()
- @pytest.fixture
- def mock_thread_manager():
- """Provides a mocked ThreadManager instance with flat message storage."""
- manager = MagicMock(spec=ThreadManager)
- messages = []
- def add_message_side_effect(message):
- """Side effect for add_message to append to message list."""
- messages.append(message)
- def add_messages_side_effect(msgs):
- """Side effect for add_messages to extend message list."""
- messages.extend(msgs)
- def get_conversation_history_side_effect(agent, caller_agent=None):
- """Side effect for get_conversation_history to filter messages."""
- filtered = []
- for msg in messages:
- if (msg.get("agent") == agent and msg.get("callerAgent") == caller_agent) or (
- msg.get("callerAgent") == agent and msg.get("agent") == caller_agent
- ):
- filtered.append(msg)
- return filtered
- def get_all_messages_side_effect():
- """Side effect for get_all_messages to return all messages."""
- return messages.copy()
- def replace_messages_side_effect(new_messages):
- """Replace stored messages with the provided collection."""
- messages.clear()
- messages.extend(new_messages)
- manager.add_message.side_effect = add_message_side_effect
- manager.add_messages.side_effect = add_messages_side_effect
- manager.get_conversation_history.side_effect = get_conversation_history_side_effect
- manager.get_all_messages.side_effect = get_all_messages_side_effect
- manager.replace_messages.side_effect = replace_messages_side_effect
- # Legacy compatibility - these should not be used but may be called
- manager.get_thread = MagicMock()
- manager.add_item_and_save = MagicMock()
- manager.add_items_and_save = MagicMock()
- return manager
- @pytest.fixture
- def mock_agency_instance(mock_thread_manager):
- agency = MagicMock()
- agency.agents = {}
- agency.user_context = {}
- agency.thread_manager = mock_thread_manager
- return agency
- @pytest.fixture
- def minimal_agent(mock_thread_manager, mock_agency_instance):
- """Provides a minimal Agent instance for basic tests."""
- agent = Agent(name="TestAgent", instructions="Test instructions")
- # Create an agency and replace its thread manager with our mock
- agency = Agency(agent)
- agency.thread_manager = mock_thread_manager
- # Mock the agent's context creation to always return a context with our mock thread manager
- def mock_create_minimal_context():
- return AgencyContext(
- agency_instance=None,
- thread_manager=mock_thread_manager,
- subagents={},
- load_threads_callback=None,
- save_threads_callback=None,
- shared_instructions=None,
- )
- agent._create_minimal_context = mock_create_minimal_context
- return agent
- @pytest.fixture
- def mock_agent():
- """Provides a mocked Agent instance for testing."""
- agent = MagicMock(spec=Agent)
- agent.name = "MockAgent"
- agent.get_response = AsyncMock()
- # Create a proper async generator mock for get_response_stream
- async def default_stream(*args, **kwargs):
- yield {"event": "text", "data": "Mock response"}
- yield {"event": "done"}
- agent.get_response_stream = default_stream
- return agent
- @pytest.fixture
- def mock_agent2():
- """Provides a second mocked Agent instance for testing."""
- agent = MagicMock(spec=Agent)
- agent.name = "MockAgent2"
- agent.get_response = AsyncMock()
- # Create a proper async generator mock for get_response_stream
- async def default_stream(*args, **kwargs):
- yield {"event": "text", "data": "Mock response 2"}
- yield {"event": "done"}
- agent.get_response_stream = default_stream
- return agent
|