| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504 |
- """
- Integration test for FastAPI file processing functionality.
- This test verifies that the FastAPI endpoints can properly handle file_urls parameter,
- process various file types through HTTP requests, and return appropriate responses
- containing the expected file content.
- """
- import asyncio
- import json
- import re
- import socket
- import subprocess
- import sys
- import tempfile
- import threading
- import time
- from pathlib import Path
- import httpx
- import pytest
- import uvicorn
- from agency_swarm import Agency, Agent, run_fastapi
- class TestFastAPIFileProcessing:
- """Test suite for FastAPI file processing with file_urls parameter."""
- @staticmethod
- def _get_free_tcp_port() -> int:
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
- sock.bind(("127.0.0.1", 0))
- return int(sock.getsockname()[1])
- @staticmethod
- def _start_fastapi_server(app, base_url: str, port: int) -> None:
- def run_server():
- uvicorn.run(app, host="127.0.0.1", port=port, log_level="error")
- server_thread = threading.Thread(target=run_server, daemon=True)
- server_thread.start()
- time.sleep(3)
- max_retries = 15
- for i in range(max_retries):
- try:
- response = httpx.get(f"{base_url}/docs", timeout=10.0)
- if response.status_code == 200:
- time.sleep(1)
- break
- except (httpx.ConnectTimeout, httpx.ReadTimeout):
- time.sleep(1.5)
- if i == max_retries - 1:
- pytest.skip("Could not start FastAPI server after multiple retries")
- except Exception as e:
- time.sleep(1)
- if i == max_retries - 1:
- pytest.skip(f"Could not start FastAPI server: {e}")
- @staticmethod
- def get_http_client(timeout_seconds: int = 120) -> httpx.AsyncClient:
- """Create an HTTP client with proper timeout configuration."""
- timeout_config = httpx.Timeout(
- timeout_seconds, # Total timeout (first positional arg)
- connect=10.0, # Connection timeout
- read=timeout_seconds, # Read timeout for the entire response
- write=10.0, # Write timeout for sending request
- pool=5.0, # Pool connection timeout
- )
- return httpx.AsyncClient(timeout=timeout_config)
- @pytest.fixture(scope="class")
- def agency_factory(self):
- """Create an agency factory for testing."""
- def create_agency(load_threads_callback=None, save_threads_callback=None):
- agent = Agent(
- name="FileProcessorAgent",
- instructions="""
- You are a file processing agent. When you receive files, read their content carefully
- and echo back the exact secret phrases found in the files verbatim—do not paraphrase or invent
- alternative text. If multiple phrases appear, include them all exactly as written.
- """,
- description="Agent that processes and analyzes file content",
- model="gpt-5.4-mini",
- )
- return Agency(
- agent,
- load_threads_callback=load_threads_callback,
- save_threads_callback=save_threads_callback,
- )
- return create_agency
- @pytest.fixture(scope="class")
- def file_server_base_url(self) -> str:
- """Start HTTP file server for serving test files."""
- test_files_dir = Path(__file__).parents[2] / "data" / "files"
- if not test_files_dir.exists():
- pytest.skip(f"Test files directory not found: {test_files_dir}")
- port = self._get_free_tcp_port()
- base_url = f"http://127.0.0.1:{port}"
- server_process = subprocess.Popen(
- [sys.executable, "-m", "http.server", str(port), "--bind", "127.0.0.1"],
- cwd=test_files_dir,
- stdout=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL,
- )
- time.sleep(2)
- try:
- response = httpx.get(f"{base_url}/", timeout=5)
- assert response.status_code == 200
- except Exception as e:
- server_process.terminate()
- pytest.skip(f"Could not start file server: {e}")
- yield base_url
- server_process.terminate()
- server_process.wait()
- @pytest.fixture(scope="class")
- def fastapi_base_url(self, agency_factory) -> str:
- """Start FastAPI server on an available port."""
- port = self._get_free_tcp_port()
- base_url = f"http://127.0.0.1:{port}"
- app = run_fastapi(
- agencies={"test_agency": agency_factory},
- port=port,
- app_token_env="",
- return_app=True,
- enable_agui=False,
- allowed_local_file_dirs=[tempfile.gettempdir()],
- )
- self._start_fastapi_server(app, base_url, port)
- yield base_url
- @pytest.fixture(scope="class")
- def fastapi_server_no_local(self, agency_factory) -> str:
- """Start FastAPI server with local file access disabled."""
- port = self._get_free_tcp_port()
- base_url = f"http://127.0.0.1:{port}"
- app = run_fastapi(
- agencies={"test_agency": agency_factory},
- port=port,
- app_token_env="",
- return_app=True,
- enable_agui=False,
- allowed_local_file_dirs=None,
- )
- server_thread = threading.Thread(
- target=lambda: uvicorn.run(
- app,
- host="127.0.0.1",
- port=port,
- log_level="error",
- ),
- daemon=True,
- )
- server_thread.start()
- time.sleep(3)
- yield base_url
- @pytest.mark.asyncio
- async def test_chat_name(self, file_server_base_url: str, fastapi_base_url: str):
- """Test processing a single text file via file_urls with chat name generation."""
- url = f"{fastapi_base_url}/test_agency/get_response"
- payload = {
- "message": "I want to find a restaurant in New York.",
- "generate_chat_name": True,
- }
- async with self.get_http_client(timeout_seconds=20) as client:
- response = await client.post(url, json=payload)
- assert response.status_code == 200
- response_data = response.json()
- assert "chat_name" in response_data
- assert len(response_data["chat_name"]) > 0
- @pytest.mark.asyncio
- async def test_file_search_attachment(self, file_server_base_url: str, fastapi_base_url: str):
- """Test processing a single text file via file_urls."""
- url = f"{fastapi_base_url}/test_agency/get_response"
- message = "Please read the content of the uploaded file and tell me what secret phrase you find."
- expected_phrase = "first txt secret phrase"
- file_name = "test_file.txt"
- file_url = f"{file_server_base_url}/test-txt.txt"
- headers = {}
- # OpenAI file availability can be eventually consistent even after upload reports "processed".
- # Mirror other tests' stabilization: retry by re-asking using the returned file_id (no re-upload).
- max_attempts = 3
- retry_delay_seconds = 2
- file_id: str | None = None
- last_response_text = ""
- last_response_data: dict[str, object] | None = None
- async with self.get_http_client(timeout_seconds=120) as client:
- for attempt in range(max_attempts):
- if attempt == 0:
- payload = {"message": message, "file_urls": {file_name: file_url}}
- else:
- assert file_id is not None
- payload = {"message": message, "file_ids": [file_id]}
- response = await client.post(url, json=payload, headers=headers)
- assert response.status_code == 200
- response_data = response.json()
- last_response_data = response_data
- if "error" in response_data:
- pytest.fail(f"Unexpected error response: {response_data['error']}")
- if attempt == 0:
- file_ids_map = response_data.get("file_ids_map")
- assert isinstance(file_ids_map, dict), f"Expected file_ids_map dict, got: {type(file_ids_map)}"
- file_id_value = file_ids_map.get(file_name)
- assert isinstance(file_id_value, str) and file_id_value, f"Missing file_id for {file_name}"
- file_id = file_id_value
- assert "response" in response_data
- last_response_text = str(response_data["response"]).lower()
- if expected_phrase in last_response_text:
- break
- if attempt < max_attempts - 1:
- await asyncio.sleep(retry_delay_seconds)
- assert expected_phrase in last_response_text, f"Expected phrase not found. Last response: {last_response_data}"
- # Verify response contains expected content
- assert last_response_data is not None and "response" in last_response_data
- @pytest.mark.asyncio
- async def test_local_file_attachment(self, fastapi_base_url: str, tmp_path):
- """Test processing a local absolute file path via file_urls."""
- file_path = tmp_path / "local-file.txt"
- file_path.write_text("local secret phrase", encoding="utf-8")
- url = f"{fastapi_base_url}/test_agency/get_response"
- payload = {
- "message": "Please read the content of the uploaded file and tell me what secret phrase you find.",
- "file_urls": {"local-file.txt": str(file_path)},
- }
- headers = {}
- async with self.get_http_client(timeout_seconds=120) as client:
- response = await client.post(url, json=payload, headers=headers)
- assert response.status_code == 200
- response_data = response.json()
- # Verify the file was attached and processed
- assert "file_ids_map" in response_data
- assert "local-file.txt" in response_data["file_ids_map"]
- # Should return a response without error
- assert "response" in response_data
- response_text = response_data["response"].lower()
- assert "local secret phrase" in response_text
- new_messages = response_data["new_messages"]
- file_path_json_fragment = json.dumps(str(file_path))[1:-1]
- source_message = next(
- msg
- for msg in new_messages
- if isinstance(msg, dict)
- and msg.get("role") == "system"
- and file_path_json_fragment in str(msg.get("content", ""))
- )
- assert "The user has provided file attachments in their message." in str(source_message["content"])
- assert "upload provenance only" in str(source_message["content"])
- follow_up_payload = {
- "message": "What exact attachment source string was used for local-file.txt? Reply with only that string.",
- "chat_history": new_messages,
- }
- async with self.get_http_client(timeout_seconds=120) as client:
- follow_up_response = await client.post(url, json=follow_up_payload, headers=headers)
- assert follow_up_response.status_code == 200
- follow_up_data = follow_up_response.json()
- normalized_follow_up_response = re.sub(
- r"(?<=[/\\\\])\s+",
- "",
- str(follow_up_data["response"]).strip("` \n"),
- )
- assert str(file_path) in normalized_follow_up_response
- @pytest.mark.asyncio
- async def test_local_allowlist_created_after_start(self, tmp_path, agency_factory):
- """Allowlist entries should activate when created after server start."""
- port = self._get_free_tcp_port()
- base_url = f"http://127.0.0.1:{port}"
- missing_dir = tmp_path / "late-uploads"
- app = run_fastapi(
- agencies={"test_agency": agency_factory},
- port=port,
- app_token_env="",
- return_app=True,
- enable_agui=False,
- allowed_local_file_dirs=[str(missing_dir)],
- )
- self._start_fastapi_server(app, base_url, port)
- missing_dir.mkdir(parents=True, exist_ok=True)
- file_path = missing_dir / "late-file.txt"
- file_path.write_text("late secret phrase", encoding="utf-8")
- url = f"{base_url}/test_agency/get_response"
- payload = {
- "message": "Please read the content of the uploaded file and tell me what secret phrase you find.",
- "file_urls": {"late-file.txt": str(file_path)},
- }
- async with self.get_http_client(timeout_seconds=120) as client:
- response = await client.post(url, json=payload)
- assert response.status_code == 200
- response_data = response.json()
- assert "file_ids_map" in response_data
- assert "late-file.txt" in response_data["file_ids_map"]
- assert "late secret phrase" in str(response_data.get("response", "")).lower()
- @pytest.mark.asyncio
- async def test_local_file_attachment_disallowed_without_allowlist(self, fastapi_server_no_local, tmp_path):
- """Local file access should be blocked when no allowlist is configured."""
- file_path = tmp_path / "local-file.txt"
- file_path.write_text("local secret phrase", encoding="utf-8")
- url = f"{fastapi_server_no_local}/test_agency/get_response"
- payload = {
- "message": "Please read the content of the uploaded file and tell me what secret phrase you find.",
- "file_urls": {"local-file.txt": str(file_path)},
- }
- async with self.get_http_client(timeout_seconds=60) as client:
- response = await client.post(url, json=payload)
- assert response.status_code == 200
- response_data = response.json()
- assert "error" in response_data
- assert "file access is disabled" in response_data["error"].lower()
- @pytest.mark.asyncio
- async def test_code_interpreter_attachment(self, file_server_base_url: str, fastapi_base_url: str):
- """Test processing an HTML file via file_urls."""
- url = f"{fastapi_base_url}/test_agency/get_response"
- expected_marker = "html_body_2718"
- payload = {
- "message": (
- "Search only the HTML document body text with the code interpreter. "
- "Return exactly the marker token after BODY_TOKEN."
- ),
- "file_urls": {"webpage.html": f"{file_server_base_url}/test-html.html"},
- }
- headers = {}
- async with self.get_http_client(timeout_seconds=120) as client:
- response = await client.post(url, json=payload, headers=headers)
- assert response.status_code == 200
- response_data = response.json()
- response_text = response_data["response"].lower()
- assert expected_marker in response_text, f"Expected HTML marker not found. Response: {response_text}"
- file_ids = response_data["file_ids_map"]
- assert "webpage.html" in file_ids.keys()
- @pytest.mark.asyncio
- async def test_image_and_pdf_attachments(self, file_server_base_url: str, fastapi_base_url: str):
- """Test processing multiple files simultaneously via file_urls."""
- url = f"{fastapi_base_url}/test_agency/get_response"
- payload = {
- "message": (
- "I'm uploading multiple files. Please tell me the function name presented in the image"
- "and tell me what my favorite food is."
- ),
- "file_urls": {
- "text_image": f"{file_server_base_url}/test-image.png",
- "pdf_file": f"{file_server_base_url}/test-pdf-2.pdf",
- },
- }
- headers = {}
- async with self.get_http_client(timeout_seconds=120) as client:
- response = await client.post(url, json=payload, headers=headers)
- assert response.status_code == 200
- response_data = response.json()
- response_text = response_data["response"].lower()
- # Should find secret phrases from multiple files
- assert "strawberry" in response_text.lower()
- assert "sum_of_squares" in response_text or "sum of squares" in response_text
- @pytest.mark.asyncio
- async def test_streaming_response(self, file_server_base_url: str, fastapi_base_url: str):
- """Test streaming response with file processing."""
- url = f"{fastapi_base_url}/test_agency/get_response_stream"
- payload = {
- "message": "Please read the text file and describe its content in detail.",
- "file_urls": {"stream_test.txt": f"{file_server_base_url}/test-txt.txt"},
- }
- headers = {}
- expected_phrase = "first txt secret phrase"
- max_attempts = 3
- retry_delay_seconds = 2
- last_response = ""
- async with self.get_http_client(timeout_seconds=120) as client:
- for attempt in range(max_attempts):
- collected_data = []
- async with client.stream("POST", url, json=payload, headers=headers) as response:
- assert response.status_code == 200
- async for line in response.aiter_lines():
- if line.strip():
- collected_data.append(line)
- # Verify we received streaming data
- assert len(collected_data) > 0
- # Join all collected data to check for content
- full_response = " ".join(collected_data).lower()
- last_response = full_response
- if expected_phrase in full_response:
- break
- # OpenAI streaming can transiently fail with provider-side 5xx errors.
- # Retry bounded times to avoid flaking on transient backend errors.
- if attempt < max_attempts - 1 and "an error occurred while processing your request" in full_response:
- await asyncio.sleep(retry_delay_seconds)
- continue
- break
- assert expected_phrase in last_response, f"Expected '{expected_phrase}' in streamed output: {last_response}"
- @pytest.mark.asyncio
- async def test_invalid_file_url(self, file_server_base_url: str, fastapi_base_url: str):
- """Test handling of invalid file URLs."""
- url = f"{fastapi_base_url}/test_agency/get_response"
- payload = {
- "message": "Please process this file.",
- "file_urls": {"nonexistent.txt": f"{file_server_base_url}/nonexistent-file.txt"},
- }
- headers = {}
- async with self.get_http_client(timeout_seconds=60) as client:
- response = await client.post(url, json=payload, headers=headers)
- # The request should still return 200, but the response should indicate file issues
- assert response.status_code == 200
- response_data = response.json()
- response_text = response_data["error"].lower()
- assert "error downloading file from provided urls" in response_text
- @pytest.mark.asyncio
- async def test_streaming_invalid_file_url(self, file_server_base_url: str, fastapi_base_url: str):
- """Test that streaming endpoint properly handles invalid file URLs with HTTP 404."""
- url = f"{fastapi_base_url}/test_agency/get_response_stream"
- payload = {
- "message": "Please process this file.",
- "file_urls": {"nonexistent.txt": f"{file_server_base_url}/nonexistent-file.txt"},
- }
- headers = {}
- collected_data = []
- error_found = False
- async with self.get_http_client(timeout_seconds=60) as client:
- async with client.stream("POST", url, json=payload, headers=headers) as response:
- assert response.status_code == 200
- async for line in response.aiter_lines():
- if line.strip():
- collected_data.append(line)
- # Check if this is an error event
- if line.startswith("data: "):
- try:
- data = json.loads(line[6:]) # Remove "data: " prefix
- if "error" in data:
- error_found = True
- assert "error downloading file from provided urls" in data["error"].lower()
- except json.JSONDecodeError:
- pass # Some lines might not be JSON
- # Verify we received streaming data and found the error
- assert len(collected_data) > 0, "Should have received streaming data"
- assert error_found, "Should have received an error event for invalid file URL"
|