| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401 |
- import asyncio
- import importlib
- import json
- import logging
- import subprocess
- import sys
- import tarfile
- import threading
- from io import BytesIO
- from pathlib import Path
- import pytest
- from agency_swarm import Agency, Agent
- agentswarm_cli_demo = importlib.import_module("agency_swarm.ui.demos.agentswarm_cli")
- class DummyServer:
- def __init__(self, port: int = 43121) -> None:
- self.port = port
- self.stopped = False
- def stop(self) -> None:
- self.stopped = True
- def build_agency() -> Agency:
- return Agency(Agent(name="CEO", instructions="test"), name="My Agency")
- def test_agentswarm_cli_tui_launches_agent_swarm_cli(monkeypatch):
- agency = build_agency()
- server = DummyServer()
- calls: dict[str, object] = {}
- monkeypatch.delenv(agentswarm_cli_demo._RELOAD_CHILD_ENV, raising=False)
- monkeypatch.setattr(agentswarm_cli_demo.os, "getcwd", lambda: "/tmp/project")
- monkeypatch.setattr(agentswarm_cli_demo, "_start_server", lambda value, capture=None: server)
- monkeypatch.setattr(agentswarm_cli_demo, "_ensure_cli", lambda: Path("/usr/local/bin/agentswarm"))
- def fake_run(cmd, cwd, env, check):
- calls["cmd"] = cmd
- calls["cwd"] = cwd
- calls["env"] = env
- calls["check"] = check
- return subprocess.CompletedProcess(cmd, 0)
- monkeypatch.setattr(agentswarm_cli_demo.subprocess, "run", fake_run)
- agentswarm_cli_demo.start_tui(agency, reload=False)
- assert calls["cmd"] == ["/usr/local/bin/agentswarm", "--model", agentswarm_cli_demo._MODEL]
- assert calls["cwd"] == "/tmp/project"
- assert calls["check"] is False
- config = json.loads(calls["env"]["OPENCODE_CONFIG_CONTENT"])
- assert config["model"] == agentswarm_cli_demo._MODEL
- assert config["provider"]["agency-swarm"]["options"]["baseURL"] == "http://127.0.0.1:43121"
- assert config["provider"]["agency-swarm"]["options"]["agency"] == "My_Agency"
- assert server.stopped is True
- def test_agentswarm_cli_tui_continues_after_reload(monkeypatch):
- agency = build_agency()
- server = DummyServer()
- calls: list[str] = []
- monkeypatch.setenv(agentswarm_cli_demo._RELOAD_CHILD_ENV, "1")
- monkeypatch.setattr(agentswarm_cli_demo.os, "getcwd", lambda: "/tmp/project")
- monkeypatch.setattr(agentswarm_cli_demo, "_start_server", lambda value, capture=None: server)
- monkeypatch.setattr(agentswarm_cli_demo, "_ensure_cli", lambda: Path("/usr/local/bin/agentswarm"))
- monkeypatch.setattr(
- agentswarm_cli_demo.subprocess,
- "run",
- lambda cmd, cwd, env, check: calls.extend(cmd) or subprocess.CompletedProcess(cmd, 0),
- )
- agentswarm_cli_demo.start_tui(agency, reload=False)
- assert "--continue" in calls
- assert server.stopped is True
- def test_agentswarm_cli_tui_installs_agent_swarm_cli(monkeypatch):
- monkeypatch.delenv(agentswarm_cli_demo._BIN_ENV, raising=False)
- monkeypatch.setattr(agentswarm_cli_demo, "_ensure_cli", lambda: Path("/tmp/cache/agentswarm"))
- assert agentswarm_cli_demo._command() == ["/tmp/cache/agentswarm"]
- def test_agentswarm_cli_tui_prefers_explicit_cli(monkeypatch):
- monkeypatch.setenv(agentswarm_cli_demo._BIN_ENV, "/tmp/agentswarm")
- monkeypatch.setattr(agentswarm_cli_demo, "_ensure_cli", lambda: (_ for _ in ()).throw(AssertionError("unused")))
- assert agentswarm_cli_demo._command() == ["/tmp/agentswarm"]
- def test_agentswarm_cli_tui_rejects_hidden_reasoning():
- agency = build_agency()
- with pytest.raises(NotImplementedError, match="show_reasoning=False"):
- agentswarm_cli_demo.start_tui(agency, show_reasoning=False, reload=False)
- def test_agentswarm_cli_tui_raises_when_bridge_fails(monkeypatch):
- agency = build_agency()
- monkeypatch.setattr(agentswarm_cli_demo, "_ensure_cli", lambda: Path("/usr/local/bin/agentswarm"))
- monkeypatch.setattr(
- agentswarm_cli_demo,
- "_start_server",
- lambda value, capture=None: (_ for _ in ()).throw(RuntimeError("boom")),
- )
- with pytest.raises(RuntimeError, match="bridge failed to start"):
- agentswarm_cli_demo.start_tui(agency, reload=False)
- def test_agentswarm_cli_tui_raises_when_cli_launch_fails(monkeypatch):
- agency = build_agency()
- server = DummyServer()
- monkeypatch.setattr(agentswarm_cli_demo.os, "getcwd", lambda: "/tmp/project")
- monkeypatch.setattr(agentswarm_cli_demo, "_start_server", lambda value, capture=None: server)
- monkeypatch.setattr(agentswarm_cli_demo, "_ensure_cli", lambda: Path("/usr/local/bin/agentswarm"))
- monkeypatch.setattr(
- agentswarm_cli_demo.subprocess,
- "run",
- lambda *args, **kwargs: (_ for _ in ()).throw(OSError("boom")),
- )
- with pytest.raises(RuntimeError, match="could not be launched"):
- agentswarm_cli_demo.start_tui(agency, reload=False)
- assert server.stopped is True
- def test_agentswarm_cli_tui_contains_python_prints_while_cli_runs(monkeypatch, capsys, tmp_path):
- agency = build_agency()
- log = tmp_path / "bridge.log"
- worker: threading.Thread | None = None
- state: dict[str, DummyServer] = {}
- logger = logging.getLogger("test.agentswarm_cli.bridge")
- handler = logging.StreamHandler(sys.stderr)
- logger.handlers = [handler]
- logger.setLevel(logging.WARNING)
- logger.propagate = False
- monkeypatch.setattr(agentswarm_cli_demo.os, "getcwd", lambda: "/tmp/project")
- monkeypatch.setattr(agentswarm_cli_demo, "_ensure_cli", lambda: Path("/usr/local/bin/agentswarm"))
- monkeypatch.setattr(agentswarm_cli_demo, "_bridge_log", lambda: log)
- monkeypatch.setattr(agentswarm_cli_demo, "_should_contain_bridge_output", lambda: True)
- def fake_start_server(value, capture):
- nonlocal worker
- def target():
- with agentswarm_cli_demo._contain_bridge_output(capture):
- sys.stdout.write("bridge stdout noise\n")
- sys.stderr.write("bridge stderr noise\n")
- logger.warning("bridge logger noise")
- worker = threading.Thread(target=target)
- worker.start()
- class LoggingServer(DummyServer):
- def stop(self) -> None:
- if worker is not None:
- worker.join()
- super().stop()
- state["server"] = LoggingServer()
- return state["server"]
- monkeypatch.setattr(agentswarm_cli_demo, "_start_server", fake_start_server)
- def fake_run(cmd, cwd, env, check):
- if worker is not None:
- worker.join()
- return subprocess.CompletedProcess(cmd, 0)
- monkeypatch.setattr(agentswarm_cli_demo.subprocess, "run", fake_run)
- log.unlink(missing_ok=True)
- try:
- agentswarm_cli_demo.start_tui(agency, reload=False)
- finally:
- logger.handlers = []
- captured = capsys.readouterr()
- assert captured.out == ""
- assert "bridge stdout noise" not in captured.err
- assert "bridge stderr noise" not in captured.err
- assert "bridge logger noise" not in captured.err
- assert str(log) in captured.err
- text = log.read_text()
- assert "bridge stdout noise" in text
- assert "bridge stderr noise" in text
- assert "bridge logger noise" in text
- assert state["server"].stopped is True
- def test_agentswarm_cli_tui_reports_bridge_output_on_failure(monkeypatch, capsys, tmp_path):
- agency = build_agency()
- log = tmp_path / "bridge.log"
- worker: threading.Thread | None = None
- state: dict[str, DummyServer] = {}
- monkeypatch.setattr(agentswarm_cli_demo.os, "getcwd", lambda: "/tmp/project")
- monkeypatch.setattr(agentswarm_cli_demo, "_ensure_cli", lambda: Path("/usr/local/bin/agentswarm"))
- monkeypatch.setattr(agentswarm_cli_demo, "_bridge_log", lambda: log)
- monkeypatch.setattr(agentswarm_cli_demo, "_should_contain_bridge_output", lambda: True)
- def fake_start_server(value, capture):
- nonlocal worker
- def target():
- with agentswarm_cli_demo._contain_bridge_output(capture):
- print("bridge stdout noise")
- print("bridge stderr noise", file=sys.stderr)
- worker = threading.Thread(target=target)
- worker.start()
- class LoggingServer(DummyServer):
- def stop(self) -> None:
- if worker is not None:
- worker.join()
- super().stop()
- state["server"] = LoggingServer()
- return state["server"]
- monkeypatch.setattr(agentswarm_cli_demo, "_start_server", fake_start_server)
- def fake_run(cmd, cwd, env, check):
- if worker is not None:
- worker.join()
- return subprocess.CompletedProcess(cmd, 1)
- monkeypatch.setattr(agentswarm_cli_demo.subprocess, "run", fake_run)
- log.unlink(missing_ok=True)
- with pytest.raises(subprocess.CalledProcessError):
- agentswarm_cli_demo.start_tui(agency, reload=False)
- captured = capsys.readouterr()
- assert captured.out == ""
- assert str(log) in captured.err
- assert "bridge stdout noise" in log.read_text()
- assert "bridge stderr noise" in log.read_text()
- assert state["server"].stopped is True
- def test_agentswarm_cli_tui_capture_includes_bridge_worker_threads(capsys, tmp_path):
- log = tmp_path / "bridge.log"
- logger = logging.getLogger("test.agentswarm_cli.capture.worker")
- handler = logging.StreamHandler(sys.stderr)
- logger.handlers = [handler]
- logger.setLevel(logging.WARNING)
- logger.propagate = False
- def worker() -> None:
- sys.stdout.write("bridge worker stdout\n")
- sys.stderr.write("bridge worker stderr\n")
- logger.warning("bridge worker logger")
- try:
- with agentswarm_cli_demo._contain_bridge_output(log):
- asyncio.run(asyncio.to_thread(worker))
- finally:
- logger.handlers = []
- captured = capsys.readouterr()
- assert captured.out == ""
- assert captured.err == ""
- assert log.read_text() == "bridge worker stdout\nbridge worker stderr\nbridge worker logger\n"
- def test_agentswarm_cli_tui_capture_keeps_other_threads_on_real_streams(capsys, tmp_path):
- log = tmp_path / "bridge.log"
- logger = logging.getLogger("test.agentswarm_cli.capture")
- handler = logging.StreamHandler(sys.stderr)
- logger.handlers = [handler]
- logger.setLevel(logging.WARNING)
- logger.propagate = False
- def other():
- sys.stdout.write("other thread stdout\n")
- sys.stderr.write("other thread stderr\n")
- logger.warning("other thread logger")
- try:
- with agentswarm_cli_demo._contain_bridge_output(log):
- worker = threading.Thread(target=other)
- worker.start()
- worker.join()
- sys.stdout.write("server thread stdout\n")
- sys.stderr.write("server thread stderr\n")
- logger.warning("server thread logger")
- finally:
- logger.handlers = []
- captured = capsys.readouterr()
- assert captured.out == "other thread stdout\n"
- assert captured.err == "other thread stderr\nother thread logger\n"
- assert log.read_text() == "server thread stdout\nserver thread stderr\nserver thread logger\n"
- def test_agentswarm_cli_tui_downloads_platform_cli(monkeypatch, tmp_path):
- root = tmp_path / "cache"
- blob = BytesIO()
- with tarfile.open(fileobj=blob, mode="w:gz") as tar:
- data = b"#!/bin/sh\nexit 0\n"
- info = tarfile.TarInfo("package/bin/agentswarm")
- info.size = len(data)
- info.mode = 0o755
- tar.addfile(info, BytesIO(data))
- data = blob.getvalue()
- sha = agentswarm_cli_demo.hashlib.sha1(data).hexdigest()
- calls: list[str] = []
- class Response:
- def __init__(self, payload=None, chunks=None):
- self.payload = payload
- self.chunks = chunks or []
- def raise_for_status(self):
- return None
- def json(self):
- return self.payload
- def iter_content(self, chunk_size=0):
- return iter(self.chunks)
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc, tb):
- return False
- def fake_get(url, timeout, stream=False):
- calls.append(url)
- if stream:
- return Response(chunks=[data])
- return Response(
- payload={
- "dist": {
- "tarball": "https://registry.npmjs.org/@vrsen/agentswarm-cli-darwin-arm64/-/pkg.tgz",
- "shasum": sha,
- }
- }
- )
- monkeypatch.setattr(agentswarm_cli_demo.requests, "get", fake_get)
- monkeypatch.setattr(agentswarm_cli_demo, "_cache", lambda: root)
- monkeypatch.setattr(
- agentswarm_cli_demo,
- "_package",
- lambda: agentswarm_cli_demo._Package(
- "agentswarm-cli-darwin-arm64",
- "agentswarm",
- "@vrsen/agentswarm-cli-darwin-arm64",
- ),
- )
- monkeypatch.setattr(agentswarm_cli_demo, "_CLI_VERSION", "1.2.27-test")
- path = agentswarm_cli_demo._ensure_cli()
- assert path == root / "1.2.27-test" / "agentswarm-cli-darwin-arm64" / "agentswarm"
- assert path.read_text() == "#!/bin/sh\nexit 0\n"
- assert calls == [
- "https://registry.npmjs.org/%40vrsen%2Fagentswarm-cli-darwin-arm64/1.2.27-test",
- "https://registry.npmjs.org/@vrsen/agentswarm-cli-darwin-arm64/-/pkg.tgz",
- ]
- def test_agentswarm_cli_tui_notifies_on_first_run(monkeypatch, tmp_path):
- root = tmp_path / "cache"
- notices: list[str] = []
- monkeypatch.setattr(agentswarm_cli_demo, "_cache", lambda: root)
- monkeypatch.setattr(
- agentswarm_cli_demo,
- "_package",
- lambda: agentswarm_cli_demo._Package(
- "agentswarm-cli-darwin-arm64",
- "agentswarm",
- "@vrsen/agentswarm-cli-darwin-arm64",
- ),
- )
- monkeypatch.setattr(agentswarm_cli_demo, "_install", lambda pkg, install_root, path: path.write_text("ok"))
- monkeypatch.setattr(agentswarm_cli_demo, "_notify_setup", notices.append)
- path = agentswarm_cli_demo._ensure_cli()
- assert path.read_text() == "ok"
- assert notices == [
- agentswarm_cli_demo._SETUP_MESSAGE,
- agentswarm_cli_demo._SETUP_COMPLETE_MESSAGE,
- ]
|