| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384 |
- """Integration tests for PresentFiles tool."""
- import importlib
- import os
- from pathlib import Path
- import pytest
- from agency_swarm import Agent
- from agency_swarm.tools.built_in import PresentFiles
- def _expected_mnt_path(source_path: Path, mnt_dir: Path) -> Path:
- cwd = Path.cwd().resolve()
- resolved = Path(os.path.abspath(source_path.expanduser()))
- try:
- relative = resolved.relative_to(cwd)
- return mnt_dir / relative
- except ValueError:
- anchor = resolved.anchor.strip("\\/").replace(":", "")
- if not anchor:
- anchor = "abs"
- anchor = anchor.replace("\\", "_").replace("/", "_")
- return mnt_dir / anchor / Path(*resolved.parts[1:])
- @pytest.fixture
- def agent_with_present_files():
- """Create an agent with PresentFiles tool."""
- return Agent(
- name="PresentFilesAgent",
- description="Test agent with file preview capability",
- instructions="Present files when requested",
- tools=[PresentFiles],
- )
- class TestPresentFilesBasics:
- """Test basic file preview functionality."""
- def test_moves_common_file_types_to_mnt(self, agent_with_present_files, tmp_path, monkeypatch):
- mnt_dir = tmp_path / "mnt"
- monkeypatch.setenv("MNT_DIR", str(mnt_dir))
- sample_files = {
- "example.txt": b"sample",
- "example.csv": b"col1,col2\n1,2\n",
- "example.md": b"# Example\n",
- "example.pdf": b"%PDF-1.4\n%%EOF",
- "example.docx": b"PK\x03\x04",
- "example.png": b"\x89PNG\r\n\x1a\n",
- "example.jpg": b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xd9",
- "example.jpeg": b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xd9",
- "example.gif": b"GIF89a",
- "example.svg": b"<?xml version='1.0' encoding='UTF-8'?><svg xmlns='http://www.w3.org/2000/svg'/>",
- "example.mp3": b"ID3",
- "example.mp4": b"\x00\x00\x00\x18ftypmp42",
- "example.wav": b"RIFF",
- "example.zip": b"PK\x03\x04",
- "example.tar": b"ustar",
- "example.pptx": b"PK\x03\x04",
- "example.py": b"print('hello')\n",
- "example.js": b"console.log('hello');\n",
- "example.ts": b"console.log('hello');\n",
- }
- file_paths = []
- expected_sizes = {}
- for name, payload in sample_files.items():
- sample_file = tmp_path / name
- sample_file.write_bytes(payload)
- file_paths.append(str(sample_file))
- expected_sizes[name] = sample_file.stat().st_size
- tool = PresentFiles(files=file_paths)
- tool._caller_agent = agent_with_present_files
- result = tool.run()
- assert result.get("errors") == []
- returned_files = result.get("files", [])
- assert len(returned_files) == len(sample_files)
- returned_names = {entry["name"] for entry in returned_files}
- assert returned_names == set(sample_files.keys())
- for entry in returned_files:
- assert isinstance(entry["mime_type"], str)
- assert entry["mime_type"]
- assert entry["size_bytes"] == expected_sizes[entry["name"]]
- assert Path(entry["path"]).is_relative_to(mnt_dir)
- assert Path(entry["path"]).name == entry["name"]
- def test_moves_file_to_mnt(self, agent_with_present_files, tmp_path, monkeypatch):
- src_file = tmp_path / "report.pdf"
- src_file.write_text("%PDF-1.4\n%%EOF")
- expected_size = src_file.stat().st_size
- mnt_dir = tmp_path / "mnt"
- monkeypatch.setenv("MNT_DIR", str(mnt_dir))
- tool = PresentFiles(files=[str(src_file)])
- tool._caller_agent = agent_with_present_files
- result = tool.run()
- assert isinstance(result, dict)
- assert result.get("errors") == []
- assert len(result.get("files", [])) == 1
- file_entry = result["files"][0]
- assert file_entry["name"] == "report.pdf"
- assert file_entry["mime_type"] == "application/pdf"
- assert file_entry["size_bytes"] == expected_size
- dest_path = Path(file_entry["path"])
- assert dest_path.exists()
- assert dest_path.is_relative_to(mnt_dir)
- assert not src_file.exists()
- def test_keeps_file_already_in_mnt(self, agent_with_present_files, tmp_path, monkeypatch):
- mnt_dir = tmp_path / "mnt"
- mnt_dir.mkdir()
- existing_file = mnt_dir / "chart.png"
- existing_file.write_bytes(b"\x89PNG\r\n\x1a\n")
- monkeypatch.setenv("MNT_DIR", str(mnt_dir))
- tool = PresentFiles(files=[str(existing_file)])
- tool._caller_agent = agent_with_present_files
- result = tool.run()
- assert result.get("errors") == []
- assert len(result.get("files", [])) == 1
- file_entry = result["files"][0]
- assert Path(file_entry["path"]).resolve() == existing_file.resolve()
- assert existing_file.exists()
- def test_overwrites_existing_file_in_mnt(self, agent_with_present_files, tmp_path, monkeypatch):
- mnt_dir = tmp_path / "mnt"
- mnt_dir.mkdir()
- src_file = tmp_path / "report.pdf"
- src_file.write_text("new")
- monkeypatch.setenv("MNT_DIR", str(mnt_dir))
- existing_file = _expected_mnt_path(src_file, mnt_dir)
- existing_file.parent.mkdir(parents=True, exist_ok=True)
- existing_file.write_text("old")
- tool = PresentFiles(files=[str(src_file)])
- tool._caller_agent = agent_with_present_files
- result = tool.run()
- assert result.get("errors") == []
- assert len(result.get("files", [])) == 1
- file_entry = result["files"][0]
- assert Path(file_entry["path"]).resolve() == existing_file.resolve()
- assert existing_file.read_text() == "new"
- assert not src_file.exists()
- def test_preserves_structure_for_same_basename(self, agent_with_present_files, tmp_path, monkeypatch):
- mnt_dir = tmp_path / "mnt"
- monkeypatch.setenv("MNT_DIR", str(mnt_dir))
- dir_a = tmp_path / "a"
- dir_b = tmp_path / "b"
- dir_a.mkdir()
- dir_b.mkdir()
- file_a = dir_a / "report.pdf"
- file_b = dir_b / "report.pdf"
- file_a.write_text("alpha")
- file_b.write_text("beta")
- tool = PresentFiles(files=[str(file_a), str(file_b)])
- tool._caller_agent = agent_with_present_files
- result = tool.run()
- assert result.get("errors") == []
- returned_files = result.get("files", [])
- assert len(returned_files) == 2
- paths = {Path(entry["path"]).resolve() for entry in returned_files}
- assert len(paths) == 2
- assert _expected_mnt_path(file_a, mnt_dir) in paths
- assert _expected_mnt_path(file_b, mnt_dir) in paths
- def test_moves_symlink_without_moving_target(self, agent_with_present_files, tmp_path, monkeypatch):
- mnt_dir = tmp_path / "mnt"
- monkeypatch.setenv("MNT_DIR", str(mnt_dir))
- target_dir = tmp_path / "targets"
- target_dir.mkdir()
- target_file = target_dir / "report.pdf"
- target_file.write_text("linked content")
- links_dir = tmp_path / "links"
- links_dir.mkdir()
- symlink_file = links_dir / "report-link.pdf"
- try:
- symlink_file.symlink_to(target_file)
- except OSError as exc:
- pytest.skip(f"Symlink creation is not supported in this environment: {exc}")
- tool = PresentFiles(files=[str(symlink_file)])
- tool._caller_agent = agent_with_present_files
- result = tool.run()
- assert result.get("errors") == []
- assert len(result.get("files", [])) == 1
- returned_path = Path(result["files"][0]["path"])
- expected_path = _expected_mnt_path(symlink_file, mnt_dir)
- assert returned_path == expected_path
- assert returned_path.is_symlink()
- assert returned_path.resolve() == target_file.resolve()
- assert target_file.exists()
- assert not symlink_file.exists()
- def test_rewrites_relative_symlink_target_after_move(self, agent_with_present_files, tmp_path, monkeypatch):
- mnt_dir = tmp_path / "mnt"
- monkeypatch.setenv("MNT_DIR", str(mnt_dir))
- target_dir = tmp_path / "targets"
- target_dir.mkdir()
- target_file = target_dir / "report.pdf"
- target_file.write_text("linked content")
- links_dir = tmp_path / "links"
- links_dir.mkdir()
- symlink_file = links_dir / "report-link.pdf"
- relative_target = os.path.relpath(target_file, start=links_dir)
- try:
- symlink_file.symlink_to(relative_target)
- except OSError as exc:
- pytest.skip(f"Symlink creation is not supported in this environment: {exc}")
- tool = PresentFiles(files=[str(symlink_file)])
- tool._caller_agent = agent_with_present_files
- result = tool.run()
- assert result.get("errors") == []
- assert len(result.get("files", [])) == 1
- returned_path = Path(result["files"][0]["path"])
- assert returned_path.is_symlink()
- assert returned_path.resolve() == target_file.resolve()
- assert returned_path.read_text() == "linked content"
- def test_unknown_extension_uses_octet_stream(self, agent_with_present_files, tmp_path, monkeypatch):
- mnt_dir = tmp_path / "mnt"
- monkeypatch.setenv("MNT_DIR", str(mnt_dir))
- src_file = tmp_path / "artifact.unknownext"
- src_file.write_text("payload")
- tool = PresentFiles(files=[str(src_file)])
- tool._caller_agent = agent_with_present_files
- result = tool.run()
- assert result.get("errors") == []
- assert len(result.get("files", [])) == 1
- file_entry = result["files"][0]
- assert file_entry["mime_type"] == "application/octet-stream"
- class TestPresentFilesErrorHandling:
- """Test error handling and validations."""
- def test_directory_path_reports_error(self, agent_with_present_files, tmp_path, monkeypatch):
- mnt_dir = tmp_path / "mnt"
- monkeypatch.setenv("MNT_DIR", str(mnt_dir))
- tool = PresentFiles(files=[str(tmp_path)])
- tool._caller_agent = agent_with_present_files
- result = tool.run()
- assert result.get("files") == []
- assert len(result.get("errors", [])) == 1
- assert "directory" in result["errors"][0].lower()
- def test_missing_file_reports_error(self, agent_with_present_files, tmp_path, monkeypatch):
- missing_file = tmp_path / "missing.txt"
- mnt_dir = tmp_path / "mnt"
- monkeypatch.setenv("MNT_DIR", str(mnt_dir))
- tool = PresentFiles(files=[str(missing_file)])
- tool._caller_agent = agent_with_present_files
- result = tool.run()
- assert result.get("files") == []
- assert len(result.get("errors", [])) == 1
- assert "not found" in result["errors"][0].lower()
- def test_rejects_large_file(self, agent_with_present_files, tmp_path, monkeypatch):
- large_file = tmp_path / "large.bin"
- large_file.write_bytes(b"x" * 20)
- mnt_dir = tmp_path / "mnt"
- monkeypatch.setenv("MNT_DIR", str(mnt_dir))
- monkeypatch.setenv("FILE_PREVIEW_MAX_BYTES", "10")
- tool = PresentFiles(files=[str(large_file)])
- tool._caller_agent = agent_with_present_files
- result = tool.run()
- assert result.get("files") == []
- assert len(result.get("errors", [])) == 1
- assert "exceeds" in result["errors"][0].lower()
- moved_file = _expected_mnt_path(large_file, mnt_dir)
- assert moved_file.exists()
- assert not large_file.exists()
- def test_rejects_large_file_already_in_mnt(self, agent_with_present_files, tmp_path, monkeypatch):
- mnt_dir = tmp_path / "mnt"
- mnt_dir.mkdir()
- large_file = mnt_dir / "large.bin"
- large_file.write_bytes(b"x" * 20)
- monkeypatch.setenv("MNT_DIR", str(mnt_dir))
- monkeypatch.setenv("FILE_PREVIEW_MAX_BYTES", "10")
- tool = PresentFiles(files=[str(large_file)])
- tool._caller_agent = agent_with_present_files
- result = tool.run()
- assert result.get("files") == []
- assert len(result.get("errors", [])) == 1
- assert "exceeds" in result["errors"][0].lower()
- assert large_file.exists()
- def test_move_failure_does_not_delete_existing_destination(self, agent_with_present_files, tmp_path, monkeypatch):
- mnt_dir = tmp_path / "mnt"
- mnt_dir.mkdir()
- monkeypatch.setenv("MNT_DIR", str(mnt_dir))
- src_file = tmp_path / "report.pdf"
- src_file.write_text("new")
- destination = _expected_mnt_path(src_file, mnt_dir)
- destination.parent.mkdir(parents=True, exist_ok=True)
- destination.write_text("old")
- def _always_fail_move(source: str, destination_path: str):
- raise OSError("simulated move failure")
- present_files_module = importlib.import_module("agency_swarm.tools.built_in.PresentFiles")
- monkeypatch.setattr(present_files_module.shutil, "move", _always_fail_move)
- tool = PresentFiles(files=[str(src_file)])
- tool._caller_agent = agent_with_present_files
- result = tool.run()
- assert result.get("files") == []
- assert len(result.get("errors", [])) == 1
- assert "unable to move file to mnt directory" in result["errors"][0].lower()
- assert destination.exists()
- assert destination.read_text() == "old"
- assert src_file.exists()
- def test_dangling_symlink_destination_respects_overwrite_false(
- self, agent_with_present_files, tmp_path, monkeypatch
- ):
- mnt_dir = tmp_path / "mnt"
- mnt_dir.mkdir()
- monkeypatch.setenv("MNT_DIR", str(mnt_dir))
- src_file = tmp_path / "report.pdf"
- src_file.write_text("new")
- destination = _expected_mnt_path(src_file, mnt_dir)
- destination.parent.mkdir(parents=True, exist_ok=True)
- try:
- destination.symlink_to(destination.parent / "missing-target.pdf")
- except OSError as exc:
- pytest.skip(f"Symlink creation is not supported in this environment: {exc}")
- tool = PresentFiles(files=[str(src_file)], overwrite=False)
- tool._caller_agent = agent_with_present_files
- result = tool.run()
- assert result.get("files") == []
- assert len(result.get("errors", [])) == 1
- assert "destination already exists and overwrite is disabled" in result["errors"][0].lower()
- assert destination.is_symlink()
- assert src_file.exists()
|