test_fastapi_file_processing.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504
  1. """
  2. Integration test for FastAPI file processing functionality.
  3. This test verifies that the FastAPI endpoints can properly handle file_urls parameter,
  4. process various file types through HTTP requests, and return appropriate responses
  5. containing the expected file content.
  6. """
  7. import asyncio
  8. import json
  9. import re
  10. import socket
  11. import subprocess
  12. import sys
  13. import tempfile
  14. import threading
  15. import time
  16. from pathlib import Path
  17. import httpx
  18. import pytest
  19. import uvicorn
  20. from agency_swarm import Agency, Agent, run_fastapi
  21. class TestFastAPIFileProcessing:
  22. """Test suite for FastAPI file processing with file_urls parameter."""
  23. @staticmethod
  24. def _get_free_tcp_port() -> int:
  25. with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
  26. sock.bind(("127.0.0.1", 0))
  27. return int(sock.getsockname()[1])
  28. @staticmethod
  29. def _start_fastapi_server(app, base_url: str, port: int) -> None:
  30. def run_server():
  31. uvicorn.run(app, host="127.0.0.1", port=port, log_level="error")
  32. server_thread = threading.Thread(target=run_server, daemon=True)
  33. server_thread.start()
  34. time.sleep(3)
  35. max_retries = 15
  36. for i in range(max_retries):
  37. try:
  38. response = httpx.get(f"{base_url}/docs", timeout=10.0)
  39. if response.status_code == 200:
  40. time.sleep(1)
  41. break
  42. except (httpx.ConnectTimeout, httpx.ReadTimeout):
  43. time.sleep(1.5)
  44. if i == max_retries - 1:
  45. pytest.skip("Could not start FastAPI server after multiple retries")
  46. except Exception as e:
  47. time.sleep(1)
  48. if i == max_retries - 1:
  49. pytest.skip(f"Could not start FastAPI server: {e}")
  50. @staticmethod
  51. def get_http_client(timeout_seconds: int = 120) -> httpx.AsyncClient:
  52. """Create an HTTP client with proper timeout configuration."""
  53. timeout_config = httpx.Timeout(
  54. timeout_seconds, # Total timeout (first positional arg)
  55. connect=10.0, # Connection timeout
  56. read=timeout_seconds, # Read timeout for the entire response
  57. write=10.0, # Write timeout for sending request
  58. pool=5.0, # Pool connection timeout
  59. )
  60. return httpx.AsyncClient(timeout=timeout_config)
  61. @pytest.fixture(scope="class")
  62. def agency_factory(self):
  63. """Create an agency factory for testing."""
  64. def create_agency(load_threads_callback=None, save_threads_callback=None):
  65. agent = Agent(
  66. name="FileProcessorAgent",
  67. instructions="""
  68. You are a file processing agent. When you receive files, read their content carefully
  69. and echo back the exact secret phrases found in the files verbatim—do not paraphrase or invent
  70. alternative text. If multiple phrases appear, include them all exactly as written.
  71. """,
  72. description="Agent that processes and analyzes file content",
  73. model="gpt-5.4-mini",
  74. )
  75. return Agency(
  76. agent,
  77. load_threads_callback=load_threads_callback,
  78. save_threads_callback=save_threads_callback,
  79. )
  80. return create_agency
  81. @pytest.fixture(scope="class")
  82. def file_server_base_url(self) -> str:
  83. """Start HTTP file server for serving test files."""
  84. test_files_dir = Path(__file__).parents[2] / "data" / "files"
  85. if not test_files_dir.exists():
  86. pytest.skip(f"Test files directory not found: {test_files_dir}")
  87. port = self._get_free_tcp_port()
  88. base_url = f"http://127.0.0.1:{port}"
  89. server_process = subprocess.Popen(
  90. [sys.executable, "-m", "http.server", str(port), "--bind", "127.0.0.1"],
  91. cwd=test_files_dir,
  92. stdout=subprocess.DEVNULL,
  93. stderr=subprocess.DEVNULL,
  94. )
  95. time.sleep(2)
  96. try:
  97. response = httpx.get(f"{base_url}/", timeout=5)
  98. assert response.status_code == 200
  99. except Exception as e:
  100. server_process.terminate()
  101. pytest.skip(f"Could not start file server: {e}")
  102. yield base_url
  103. server_process.terminate()
  104. server_process.wait()
  105. @pytest.fixture(scope="class")
  106. def fastapi_base_url(self, agency_factory) -> str:
  107. """Start FastAPI server on an available port."""
  108. port = self._get_free_tcp_port()
  109. base_url = f"http://127.0.0.1:{port}"
  110. app = run_fastapi(
  111. agencies={"test_agency": agency_factory},
  112. port=port,
  113. app_token_env="",
  114. return_app=True,
  115. enable_agui=False,
  116. allowed_local_file_dirs=[tempfile.gettempdir()],
  117. )
  118. self._start_fastapi_server(app, base_url, port)
  119. yield base_url
  120. @pytest.fixture(scope="class")
  121. def fastapi_server_no_local(self, agency_factory) -> str:
  122. """Start FastAPI server with local file access disabled."""
  123. port = self._get_free_tcp_port()
  124. base_url = f"http://127.0.0.1:{port}"
  125. app = run_fastapi(
  126. agencies={"test_agency": agency_factory},
  127. port=port,
  128. app_token_env="",
  129. return_app=True,
  130. enable_agui=False,
  131. allowed_local_file_dirs=None,
  132. )
  133. server_thread = threading.Thread(
  134. target=lambda: uvicorn.run(
  135. app,
  136. host="127.0.0.1",
  137. port=port,
  138. log_level="error",
  139. ),
  140. daemon=True,
  141. )
  142. server_thread.start()
  143. time.sleep(3)
  144. yield base_url
  145. @pytest.mark.asyncio
  146. async def test_chat_name(self, file_server_base_url: str, fastapi_base_url: str):
  147. """Test processing a single text file via file_urls with chat name generation."""
  148. url = f"{fastapi_base_url}/test_agency/get_response"
  149. payload = {
  150. "message": "I want to find a restaurant in New York.",
  151. "generate_chat_name": True,
  152. }
  153. async with self.get_http_client(timeout_seconds=20) as client:
  154. response = await client.post(url, json=payload)
  155. assert response.status_code == 200
  156. response_data = response.json()
  157. assert "chat_name" in response_data
  158. assert len(response_data["chat_name"]) > 0
  159. @pytest.mark.asyncio
  160. async def test_file_search_attachment(self, file_server_base_url: str, fastapi_base_url: str):
  161. """Test processing a single text file via file_urls."""
  162. url = f"{fastapi_base_url}/test_agency/get_response"
  163. message = "Please read the content of the uploaded file and tell me what secret phrase you find."
  164. expected_phrase = "first txt secret phrase"
  165. file_name = "test_file.txt"
  166. file_url = f"{file_server_base_url}/test-txt.txt"
  167. headers = {}
  168. # OpenAI file availability can be eventually consistent even after upload reports "processed".
  169. # Mirror other tests' stabilization: retry by re-asking using the returned file_id (no re-upload).
  170. max_attempts = 3
  171. retry_delay_seconds = 2
  172. file_id: str | None = None
  173. last_response_text = ""
  174. last_response_data: dict[str, object] | None = None
  175. async with self.get_http_client(timeout_seconds=120) as client:
  176. for attempt in range(max_attempts):
  177. if attempt == 0:
  178. payload = {"message": message, "file_urls": {file_name: file_url}}
  179. else:
  180. assert file_id is not None
  181. payload = {"message": message, "file_ids": [file_id]}
  182. response = await client.post(url, json=payload, headers=headers)
  183. assert response.status_code == 200
  184. response_data = response.json()
  185. last_response_data = response_data
  186. if "error" in response_data:
  187. pytest.fail(f"Unexpected error response: {response_data['error']}")
  188. if attempt == 0:
  189. file_ids_map = response_data.get("file_ids_map")
  190. assert isinstance(file_ids_map, dict), f"Expected file_ids_map dict, got: {type(file_ids_map)}"
  191. file_id_value = file_ids_map.get(file_name)
  192. assert isinstance(file_id_value, str) and file_id_value, f"Missing file_id for {file_name}"
  193. file_id = file_id_value
  194. assert "response" in response_data
  195. last_response_text = str(response_data["response"]).lower()
  196. if expected_phrase in last_response_text:
  197. break
  198. if attempt < max_attempts - 1:
  199. await asyncio.sleep(retry_delay_seconds)
  200. assert expected_phrase in last_response_text, f"Expected phrase not found. Last response: {last_response_data}"
  201. # Verify response contains expected content
  202. assert last_response_data is not None and "response" in last_response_data
  203. @pytest.mark.asyncio
  204. async def test_local_file_attachment(self, fastapi_base_url: str, tmp_path):
  205. """Test processing a local absolute file path via file_urls."""
  206. file_path = tmp_path / "local-file.txt"
  207. file_path.write_text("local secret phrase", encoding="utf-8")
  208. url = f"{fastapi_base_url}/test_agency/get_response"
  209. payload = {
  210. "message": "Please read the content of the uploaded file and tell me what secret phrase you find.",
  211. "file_urls": {"local-file.txt": str(file_path)},
  212. }
  213. headers = {}
  214. async with self.get_http_client(timeout_seconds=120) as client:
  215. response = await client.post(url, json=payload, headers=headers)
  216. assert response.status_code == 200
  217. response_data = response.json()
  218. # Verify the file was attached and processed
  219. assert "file_ids_map" in response_data
  220. assert "local-file.txt" in response_data["file_ids_map"]
  221. # Should return a response without error
  222. assert "response" in response_data
  223. response_text = response_data["response"].lower()
  224. assert "local secret phrase" in response_text
  225. new_messages = response_data["new_messages"]
  226. file_path_json_fragment = json.dumps(str(file_path))[1:-1]
  227. source_message = next(
  228. msg
  229. for msg in new_messages
  230. if isinstance(msg, dict)
  231. and msg.get("role") == "system"
  232. and file_path_json_fragment in str(msg.get("content", ""))
  233. )
  234. assert "The user has provided file attachments in their message." in str(source_message["content"])
  235. assert "upload provenance only" in str(source_message["content"])
  236. follow_up_payload = {
  237. "message": "What exact attachment source string was used for local-file.txt? Reply with only that string.",
  238. "chat_history": new_messages,
  239. }
  240. async with self.get_http_client(timeout_seconds=120) as client:
  241. follow_up_response = await client.post(url, json=follow_up_payload, headers=headers)
  242. assert follow_up_response.status_code == 200
  243. follow_up_data = follow_up_response.json()
  244. normalized_follow_up_response = re.sub(
  245. r"(?<=[/\\\\])\s+",
  246. "",
  247. str(follow_up_data["response"]).strip("` \n"),
  248. )
  249. assert str(file_path) in normalized_follow_up_response
  250. @pytest.mark.asyncio
  251. async def test_local_allowlist_created_after_start(self, tmp_path, agency_factory):
  252. """Allowlist entries should activate when created after server start."""
  253. port = self._get_free_tcp_port()
  254. base_url = f"http://127.0.0.1:{port}"
  255. missing_dir = tmp_path / "late-uploads"
  256. app = run_fastapi(
  257. agencies={"test_agency": agency_factory},
  258. port=port,
  259. app_token_env="",
  260. return_app=True,
  261. enable_agui=False,
  262. allowed_local_file_dirs=[str(missing_dir)],
  263. )
  264. self._start_fastapi_server(app, base_url, port)
  265. missing_dir.mkdir(parents=True, exist_ok=True)
  266. file_path = missing_dir / "late-file.txt"
  267. file_path.write_text("late secret phrase", encoding="utf-8")
  268. url = f"{base_url}/test_agency/get_response"
  269. payload = {
  270. "message": "Please read the content of the uploaded file and tell me what secret phrase you find.",
  271. "file_urls": {"late-file.txt": str(file_path)},
  272. }
  273. async with self.get_http_client(timeout_seconds=120) as client:
  274. response = await client.post(url, json=payload)
  275. assert response.status_code == 200
  276. response_data = response.json()
  277. assert "file_ids_map" in response_data
  278. assert "late-file.txt" in response_data["file_ids_map"]
  279. assert "late secret phrase" in str(response_data.get("response", "")).lower()
  280. @pytest.mark.asyncio
  281. async def test_local_file_attachment_disallowed_without_allowlist(self, fastapi_server_no_local, tmp_path):
  282. """Local file access should be blocked when no allowlist is configured."""
  283. file_path = tmp_path / "local-file.txt"
  284. file_path.write_text("local secret phrase", encoding="utf-8")
  285. url = f"{fastapi_server_no_local}/test_agency/get_response"
  286. payload = {
  287. "message": "Please read the content of the uploaded file and tell me what secret phrase you find.",
  288. "file_urls": {"local-file.txt": str(file_path)},
  289. }
  290. async with self.get_http_client(timeout_seconds=60) as client:
  291. response = await client.post(url, json=payload)
  292. assert response.status_code == 200
  293. response_data = response.json()
  294. assert "error" in response_data
  295. assert "file access is disabled" in response_data["error"].lower()
  296. @pytest.mark.asyncio
  297. async def test_code_interpreter_attachment(self, file_server_base_url: str, fastapi_base_url: str):
  298. """Test processing an HTML file via file_urls."""
  299. url = f"{fastapi_base_url}/test_agency/get_response"
  300. expected_marker = "html_body_2718"
  301. payload = {
  302. "message": (
  303. "Search only the HTML document body text with the code interpreter. "
  304. "Return exactly the marker token after BODY_TOKEN."
  305. ),
  306. "file_urls": {"webpage.html": f"{file_server_base_url}/test-html.html"},
  307. }
  308. headers = {}
  309. async with self.get_http_client(timeout_seconds=120) as client:
  310. response = await client.post(url, json=payload, headers=headers)
  311. assert response.status_code == 200
  312. response_data = response.json()
  313. response_text = response_data["response"].lower()
  314. assert expected_marker in response_text, f"Expected HTML marker not found. Response: {response_text}"
  315. file_ids = response_data["file_ids_map"]
  316. assert "webpage.html" in file_ids.keys()
  317. @pytest.mark.asyncio
  318. async def test_image_and_pdf_attachments(self, file_server_base_url: str, fastapi_base_url: str):
  319. """Test processing multiple files simultaneously via file_urls."""
  320. url = f"{fastapi_base_url}/test_agency/get_response"
  321. payload = {
  322. "message": (
  323. "I'm uploading multiple files. Please tell me the function name presented in the image"
  324. "and tell me what my favorite food is."
  325. ),
  326. "file_urls": {
  327. "text_image": f"{file_server_base_url}/test-image.png",
  328. "pdf_file": f"{file_server_base_url}/test-pdf-2.pdf",
  329. },
  330. }
  331. headers = {}
  332. async with self.get_http_client(timeout_seconds=120) as client:
  333. response = await client.post(url, json=payload, headers=headers)
  334. assert response.status_code == 200
  335. response_data = response.json()
  336. response_text = response_data["response"].lower()
  337. # Should find secret phrases from multiple files
  338. assert "strawberry" in response_text.lower()
  339. assert "sum_of_squares" in response_text or "sum of squares" in response_text
  340. @pytest.mark.asyncio
  341. async def test_streaming_response(self, file_server_base_url: str, fastapi_base_url: str):
  342. """Test streaming response with file processing."""
  343. url = f"{fastapi_base_url}/test_agency/get_response_stream"
  344. payload = {
  345. "message": "Please read the text file and describe its content in detail.",
  346. "file_urls": {"stream_test.txt": f"{file_server_base_url}/test-txt.txt"},
  347. }
  348. headers = {}
  349. expected_phrase = "first txt secret phrase"
  350. max_attempts = 3
  351. retry_delay_seconds = 2
  352. last_response = ""
  353. async with self.get_http_client(timeout_seconds=120) as client:
  354. for attempt in range(max_attempts):
  355. collected_data = []
  356. async with client.stream("POST", url, json=payload, headers=headers) as response:
  357. assert response.status_code == 200
  358. async for line in response.aiter_lines():
  359. if line.strip():
  360. collected_data.append(line)
  361. # Verify we received streaming data
  362. assert len(collected_data) > 0
  363. # Join all collected data to check for content
  364. full_response = " ".join(collected_data).lower()
  365. last_response = full_response
  366. if expected_phrase in full_response:
  367. break
  368. # OpenAI streaming can transiently fail with provider-side 5xx errors.
  369. # Retry bounded times to avoid flaking on transient backend errors.
  370. if attempt < max_attempts - 1 and "an error occurred while processing your request" in full_response:
  371. await asyncio.sleep(retry_delay_seconds)
  372. continue
  373. break
  374. assert expected_phrase in last_response, f"Expected '{expected_phrase}' in streamed output: {last_response}"
  375. @pytest.mark.asyncio
  376. async def test_invalid_file_url(self, file_server_base_url: str, fastapi_base_url: str):
  377. """Test handling of invalid file URLs."""
  378. url = f"{fastapi_base_url}/test_agency/get_response"
  379. payload = {
  380. "message": "Please process this file.",
  381. "file_urls": {"nonexistent.txt": f"{file_server_base_url}/nonexistent-file.txt"},
  382. }
  383. headers = {}
  384. async with self.get_http_client(timeout_seconds=60) as client:
  385. response = await client.post(url, json=payload, headers=headers)
  386. # The request should still return 200, but the response should indicate file issues
  387. assert response.status_code == 200
  388. response_data = response.json()
  389. response_text = response_data["error"].lower()
  390. assert "error downloading file from provided urls" in response_text
  391. @pytest.mark.asyncio
  392. async def test_streaming_invalid_file_url(self, file_server_base_url: str, fastapi_base_url: str):
  393. """Test that streaming endpoint properly handles invalid file URLs with HTTP 404."""
  394. url = f"{fastapi_base_url}/test_agency/get_response_stream"
  395. payload = {
  396. "message": "Please process this file.",
  397. "file_urls": {"nonexistent.txt": f"{file_server_base_url}/nonexistent-file.txt"},
  398. }
  399. headers = {}
  400. collected_data = []
  401. error_found = False
  402. async with self.get_http_client(timeout_seconds=60) as client:
  403. async with client.stream("POST", url, json=payload, headers=headers) as response:
  404. assert response.status_code == 200
  405. async for line in response.aiter_lines():
  406. if line.strip():
  407. collected_data.append(line)
  408. # Check if this is an error event
  409. if line.startswith("data: "):
  410. try:
  411. data = json.loads(line[6:]) # Remove "data: " prefix
  412. if "error" in data:
  413. error_found = True
  414. assert "error downloading file from provided urls" in data["error"].lower()
  415. except json.JSONDecodeError:
  416. pass # Some lines might not be JSON
  417. # Verify we received streaming data and found the error
  418. assert len(collected_data) > 0, "Should have received streaming data"
  419. assert error_found, "Should have received an error event for invalid file URL"