| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- import asyncio
- import threading
- from unittest.mock import MagicMock
- import pytest
- from agency_swarm import Agent
- def _build_simple_agent(name: str = "TestAgent") -> Agent:
- return Agent(
- name=name,
- instructions="Return deterministic streaming output for test validation.",
- description="Minimal agent used to exercise streaming wrappers.",
- )
- def test_wait_final_result_without_event_loop(monkeypatch):
- """Streaming wrapper must resolve to None when created before an event loop starts."""
- from agency_swarm.agent.execution_streaming import StreamingRunResponse
- async def _empty_stream():
- if False: # pragma: no cover
- yield
- def _stubbed_run_stream(**_kwargs):
- wrapper = StreamingRunResponse(_empty_stream())
- wrapper._resolve_final_result(None)
- return wrapper
- monkeypatch.setattr("agency_swarm.agent.execution.run_stream_with_guardrails", _stubbed_run_stream)
- agent = _build_simple_agent()
- stream = agent.get_response_stream("Trigger stream")
- async def _drive_stream() -> None:
- async for _ in stream:
- pass
- async def _await_result() -> None:
- result = await asyncio.wait_for(stream.wait_final_result(), timeout=0.5)
- assert result is None
- async def _run() -> None:
- await asyncio.gather(_drive_stream(), _await_result())
- asyncio.run(_run())
- @pytest.mark.asyncio
- async def test_wait_final_result_before_adoption(monkeypatch):
- """Awaiting wait_final_result before iterating events must resolve once the inner stream finishes."""
- from agency_swarm.agent.execution_streaming import StreamingRunResponse
- final_result = MagicMock()
- async def _single_event_stream():
- yield {"type": "test_event"}
- def _stubbed_run_stream(**_kwargs):
- wrapper = StreamingRunResponse(_single_event_stream())
- wrapper._resolve_final_result(final_result)
- return wrapper
- monkeypatch.setattr("agency_swarm.agent.execution.run_stream_with_guardrails", _stubbed_run_stream)
- agent = _build_simple_agent("PreAdoptionAgent")
- stream = agent.get_response_stream("Trigger stream")
- wait_task = asyncio.create_task(asyncio.wait_for(stream.wait_final_result(), timeout=0.5))
- events = []
- async for event in stream:
- events.append(event)
- result = await wait_task
- assert events == [{"type": "test_event"}]
- assert result is final_result
- @pytest.mark.asyncio
- async def test_adopt_stream_syncs_futures_across_event_loops():
- """Adopting a stream must safely synchronize final futures from another loop."""
- from agency_swarm.agent.execution_streaming import StreamingRunResponse
- async def _empty_stream():
- if False: # pragma: no cover
- yield
- external_loop_ready = threading.Event()
- external_loop = asyncio.new_event_loop()
- def _loop_runner() -> None:
- asyncio.set_event_loop(external_loop)
- external_loop_ready.set()
- external_loop.run_forever()
- runner_thread = threading.Thread(target=_loop_runner, daemon=True)
- runner_thread.start()
- external_loop_ready.wait()
- async def _create_future() -> asyncio.Future[object | None]:
- loop = asyncio.get_running_loop()
- return loop.create_future()
- try:
- outer_future = asyncio.run_coroutine_threadsafe(_create_future(), external_loop).result(timeout=1)
- outer_wrapper = StreamingRunResponse(_empty_stream())
- outer_wrapper._final_future = outer_future
- inner_wrapper = StreamingRunResponse(_empty_stream())
- inner_wrapper._final_future = asyncio.get_running_loop().create_future()
- outer_wrapper._adopt_stream(inner_wrapper)
- completion = threading.Event()
- external_loop.call_soon_threadsafe(outer_future.add_done_callback, lambda _fut: completion.set())
- sentinel = object()
- inner_wrapper._final_future.set_result(sentinel)
- assert await asyncio.to_thread(completion.wait, 1.0)
- async def _await_external() -> object | None:
- return await outer_future
- result = asyncio.run_coroutine_threadsafe(_await_external(), external_loop).result(timeout=1)
- assert result is sentinel
- finally:
- external_loop.call_soon_threadsafe(external_loop.stop)
- runner_thread.join(timeout=1)
- external_loop.close()
|