test_agency_helpers.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. import importlib.abc
  2. import importlib.util
  3. import subprocess
  4. import sys
  5. import textwrap
  6. from pathlib import Path
  7. import agency_swarm
  8. from agency_swarm import Agency, Agent
  9. from agency_swarm.agency.helpers import run_fastapi as helpers_run_fastapi
  10. from agency_swarm.tools import SendMessage
  11. class _BlockOptionalDepsFinder(importlib.abc.MetaPathFinder):
  12. def find_spec(self, fullname: str, path, target=None): # noqa: ANN001, ANN201
  13. if fullname in {"fastapi", "uvicorn"}:
  14. raise ModuleNotFoundError(fullname)
  15. if fullname.startswith("fastapi.") or fullname.startswith("uvicorn."):
  16. raise ModuleNotFoundError(fullname)
  17. return None
  18. def test_integrations_fastapi_imports_without_optional_dependencies(caplog):
  19. """`agency_swarm.integrations.fastapi` must import without the fastapi extra installed."""
  20. fastapi_module_path = Path(agency_swarm.__file__).resolve().parent / "integrations" / "fastapi.py"
  21. spec = importlib.util.spec_from_file_location("agency_swarm_test_fastapi_no_deps", fastapi_module_path)
  22. assert spec is not None
  23. assert spec.loader is not None
  24. module = importlib.util.module_from_spec(spec)
  25. blocker = _BlockOptionalDepsFinder()
  26. saved_fastapi = sys.modules.pop("fastapi", None)
  27. saved_uvicorn = sys.modules.pop("uvicorn", None)
  28. sys.meta_path.insert(0, blocker)
  29. try:
  30. spec.loader.exec_module(module)
  31. caplog.set_level("ERROR")
  32. module.run_fastapi(agencies={"test": lambda **_: None})
  33. finally:
  34. sys.meta_path.remove(blocker)
  35. if saved_fastapi is not None:
  36. sys.modules["fastapi"] = saved_fastapi
  37. if saved_uvicorn is not None:
  38. sys.modules["uvicorn"] = saved_uvicorn
  39. assert "FastAPI deployment dependencies are missing" in caplog.text
  40. def test_run_fastapi_creates_new_agency_instance(mocker):
  41. agent = Agent(name="HelperAgent", instructions="test", model="gpt-5.4-mini")
  42. agency = Agency(agent)
  43. captured = {}
  44. def fake_run_fastapi(*, agencies=None, **kwargs):
  45. captured["factory"] = agencies["agency"]
  46. return None
  47. mocker.patch("agency_swarm.integrations.fastapi.run_fastapi", side_effect=fake_run_fastapi)
  48. helpers_run_fastapi(agency)
  49. factory = captured["factory"]
  50. load_called = False
  51. def load_cb():
  52. nonlocal load_called
  53. load_called = True
  54. return []
  55. new_agency = factory(load_threads_callback=load_cb)
  56. assert load_called, "load_threads_callback was not invoked"
  57. assert new_agency is not agency, "Factory should create a new Agency instance"
  58. class CustomSendMessage(SendMessage):
  59. """Test-specific send_message tool."""
  60. def test_run_fastapi_preserves_custom_tool_mappings(mocker):
  61. sender = Agent(name="A", instructions="test", model="gpt-5.4-mini")
  62. recipient = Agent(name="B", instructions="test", model="gpt-5.4-mini")
  63. agency = Agency(sender, recipient, communication_flows=[(sender, recipient, CustomSendMessage)])
  64. captured = {}
  65. def fake_run_fastapi(*, agencies=None, **kwargs):
  66. captured["factory"] = agencies["agency"]
  67. return None
  68. mocker.patch("agency_swarm.integrations.fastapi.run_fastapi", side_effect=fake_run_fastapi)
  69. helpers_run_fastapi(agency)
  70. factory = captured["factory"]
  71. new_agency = factory()
  72. pair = ("A", "B")
  73. assert new_agency._communication_tool_classes.get(pair) is CustomSendMessage, (
  74. "Custom tool mapping was not preserved"
  75. )
  76. def test_run_fastapi_normalizes_relative_shared_folders_for_factory_calls(mocker, tmp_path: Path):
  77. """Relative shared_*_folder must be stable across agency_factory call stacks.
  78. The FastAPI integration calls agency_factory from within the server stack (uvicorn/fastapi),
  79. which changes get_external_caller_directory(). We normalize relative shared folders to
  80. absolute once when run_fastapi is called, so the rebuilt Agency can still load shared resources.
  81. """
  82. creator_dir = tmp_path / "creator"
  83. creator_dir.mkdir()
  84. shared_tools_dir = creator_dir / "shared_tools"
  85. shared_tools_dir.mkdir()
  86. (shared_tools_dir / "SampleTool.py").write_text(
  87. textwrap.dedent(
  88. """
  89. from agency_swarm.tools import BaseTool
  90. from pydantic import Field
  91. class SampleTool(BaseTool):
  92. \"\"\"A sample tool.\"\"\"
  93. message: str = Field(..., description="Message to echo")
  94. def run(self) -> str:
  95. return f"Echo: {self.message}"
  96. """
  97. ).strip()
  98. + "\n"
  99. )
  100. captured: dict[str, object] = {}
  101. def fake_run_fastapi(*, agencies=None, **_kwargs):
  102. captured["factory"] = agencies["agency"]
  103. return None
  104. mocker.patch("agency_swarm.integrations.fastapi.run_fastapi", side_effect=fake_run_fastapi)
  105. creator_code = textwrap.dedent(
  106. """
  107. from agency_swarm import Agency, Agent
  108. from agency_swarm.agency.helpers import run_fastapi as helpers_run_fastapi
  109. a = Agent(name="A", instructions="test", model="gpt-5.4-mini")
  110. agency = Agency(a, shared_tools_folder="shared_tools")
  111. helpers_run_fastapi(agency)
  112. """
  113. ).strip()
  114. exec(compile(creator_code, str(creator_dir / "create_agency.py"), "exec"), {})
  115. factory = captured["factory"]
  116. assert callable(factory)
  117. other_dir = tmp_path / "other"
  118. other_dir.mkdir()
  119. call_code = textwrap.dedent(
  120. """
  121. agency2 = factory()
  122. agent = agency2.agents["A"]
  123. tool_names = [getattr(t, "name", None) for t in agent.tools]
  124. """
  125. ).strip()
  126. ns = {"factory": factory}
  127. exec(compile(call_code, str(other_dir / "call_factory.py"), "exec"), ns)
  128. assert "SampleTool" in ns["tool_names"]
  129. def test_package_star_import_succeeds_without_jupyter_dependencies() -> None:
  130. """`from agency_swarm import *` should not fail when jupyter extras are missing."""
  131. script = textwrap.dedent(
  132. """
  133. import builtins
  134. import importlib.util
  135. original_find_spec = importlib.util.find_spec
  136. original_import = builtins.__import__
  137. def blocked_find_spec(name, package=None):
  138. if name == "jupyter_client":
  139. return None
  140. return original_find_spec(name, package)
  141. def blocked_import(name, globals=None, locals=None, fromlist=(), level=0):
  142. if name == "jupyter_client" or name.startswith("jupyter_client."):
  143. raise ModuleNotFoundError(name)
  144. return original_import(name, globals, locals, fromlist, level)
  145. importlib.util.find_spec = blocked_find_spec
  146. builtins.__import__ = blocked_import
  147. namespace = {}
  148. exec("from agency_swarm import *", namespace)
  149. assert "IPythonInterpreter" not in namespace
  150. """
  151. )
  152. result = subprocess.run([sys.executable, "-c", script], capture_output=True, text=True, check=False)
  153. assert result.returncode == 0, result.stderr or result.stdout