| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734 |
- import asyncio
- import base64
- import logging
- import os
- import re
- import shutil
- import time
- from pathlib import Path
- import pytest
- from agents import ModelSettings, ToolCallItem
- from openai import AsyncOpenAI, NotFoundError
- from agency_swarm import Agency, Agent
- OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
- @pytest.fixture(scope="module")
- async def real_openai_client():
- return AsyncOpenAI(api_key=OPENAI_API_KEY)
- @pytest.mark.asyncio
- async def test_agent_processes_message_files_attachment(real_openai_client: AsyncOpenAI, tmp_path: Path):
- """
- Tests that an agent can receive a file ID via `file_ids` parameter,
- and OpenAI automatically makes the file content available to the LLM.
- No custom tools are needed - OpenAI handles file processing automatically.
- Uses a REAL OpenAI client for file upload and REAL agent execution.
- """
- # Use existing rich test PDF from v0.X tests
- test_pdf_path = Path("tests/data/files/test-pdf-2.pdf")
- assert test_pdf_path.exists(), f"Test PDF not found at {test_pdf_path}"
- uploaded_real_file = await real_openai_client.files.create(file=test_pdf_path.open("rb"), purpose="assistants")
- attached_file_id = uploaded_real_file.id
- print(f"TEST: Uploaded file {test_pdf_path.name} for attachment, ID: {attached_file_id}")
- # 2. Initialize Agent WITHOUT any custom file processing tools
- # OpenAI will automatically process the attached file and make content available to the LLM
- attachment_tester_agent = Agent(
- name="AttachmentTesterAgentReal",
- instructions=(
- "You are a helpful assistant. When files are attached, you can read their content directly. "
- "Answer questions about the file content accurately."
- ),
- model_settings=ModelSettings(temperature=0.0),
- )
- attachment_tester_agent._openai_client = real_openai_client
- # 3. Setup a real Agency for proper testing
- agency = Agency(attachment_tester_agent, user_context=None)
- # 4. Call get_response with file_ids - OpenAI will automatically process the file
- message_to_agent = "What is my favorite food?"
- print(f"TEST: Calling get_response for agent '{attachment_tester_agent.name}' with file_ids: [{attached_file_id}]")
- response_result = await agency.get_response(message_to_agent, file_ids=[attached_file_id])
- assert response_result is not None
- assert response_result.final_output is not None
- print(f"TEST: Agent final output: {response_result.final_output}")
- # 5. Verify the agent could access the file content automatically
- # The LLM should be able to read the PDF content without any custom tools
- # The test PDF contains a secret phrase we can verify
- response_lower = response_result.final_output.lower()
- assert len(response_result.final_output) > 20, (
- f"Response too short, suggests file content was not processed. Response: {response_result.final_output}"
- )
- # Look for the secret phrase that should be in the PDF
- secret_phrase_found = "strawberry" in response_lower.lower()
- assert secret_phrase_found, (
- f"Expected word 'strawberry' not found in response. "
- f"This suggests the PDF content was not made available to the LLM. "
- f"Response: {response_result.final_output}"
- )
- # 6. Verify NO custom tool calls were made (since OpenAI handles file processing automatically)
- tool_calls_found = False
- for item in response_result.new_items:
- if isinstance(item, ToolCallItem):
- tool_calls_found = True
- print(f"Unexpected tool call found: {item.raw_item}")
- # We expect NO tool calls since OpenAI processes files automatically
- assert not tool_calls_found, (
- "No tool calls should be found since OpenAI automatically processes file attachments. "
- "The presence of tool calls suggests the implementation is incorrectly trying to use custom tools."
- )
- # 7. Cleanup the test file
- try:
- await real_openai_client.files.delete(attached_file_id)
- print(f"Cleaned up file {attached_file_id}")
- except Exception as e:
- print(f"Warning: Failed to clean up file {attached_file_id}: {e}")
- @pytest.mark.asyncio
- async def test_multi_file_type_processing(real_openai_client: AsyncOpenAI, tmp_path: Path):
- """
- Tests that an agent can process PDF files automatically via OpenAI's Responses API file processing.
- NOTE: The OpenAI Responses API with input_file type only supports PDF files for direct attachment.
- Other file types (TXT, CSV, images) are supported through different mechanisms:
- - Vector Stores/File Search (for RAG functionality)
- - Code Interpreter (for code execution with files)
- This test focuses on the direct file attachment capability which is PDF-only.
- Uses the existing rich test PDF from the v0.X test suite.
- """
- # Use the existing rich test PDF with secret phrase
- test_pdf_path = Path("tests/data/files/test-pdf-2.pdf")
- assert test_pdf_path.exists(), f"Test PDF not found at {test_pdf_path}"
- # Upload PDF file to OpenAI
- with open(test_pdf_path, "rb") as f:
- uploaded_file = await real_openai_client.files.create(file=f, purpose="assistants")
- file_id = uploaded_file.id
- print(f"Uploaded {test_pdf_path.name}, got ID: {file_id}")
- try:
- # Create an agent WITHOUT custom file processing tools
- # OpenAI will automatically process PDF files and make content available
- file_processor_agent = Agent(
- name="FileProcessorAgent",
- instructions="""You are an agent that can read and analyze PDF files automatically.
- When PDF files are attached, you can access their content directly.
- Extract and summarize key information from the PDF content accurately.""",
- model_settings=ModelSettings(temperature=0.0),
- )
- file_processor_agent._openai_client = real_openai_client
- # Initialize agency for the agent
- Agency(file_processor_agent, user_context=None)
- # Test processing the PDF file
- question = "What is my favorite food?"
- expected_content = "strawberry"
- # Process the PDF file - OpenAI will automatically make file content available
- response_result = await file_processor_agent.get_response(question, file_ids=[file_id])
- # Verify response
- assert response_result is not None
- print(f"Response for {test_pdf_path.name}: {response_result.final_output}")
- # Use case-insensitive search for matching
- response_lower = response_result.final_output.lower()
- expected_lower = expected_content.lower()
- # With temperature=0, responses should be deterministic
- content_found = expected_lower in response_lower
- assert content_found, (
- f"Expected content '{expected_content}' not found in response for {test_pdf_path.name}. "
- f"This suggests OpenAI did not make the PDF file content available to the LLM. "
- f"Response: {response_result.final_output}"
- )
- # Verify NO custom tool calls were made (OpenAI processes PDFs automatically)
- tool_calls_found = False
- for item in response_result.new_items:
- if isinstance(item, ToolCallItem):
- tool_calls_found = True
- print(f"Unexpected tool call found for {test_pdf_path.name}: {item.raw_item}")
- assert not tool_calls_found, (
- f"No tool calls should be found for {test_pdf_path.name} since OpenAI automatically processes "
- f"PDF file attachments. The presence of tool calls suggests the implementation is incorrectly "
- f"trying to use custom tools."
- )
- finally:
- # Cleanup: Delete uploaded file from OpenAI
- try:
- await real_openai_client.files.delete(file_id=file_id)
- print(f"Cleaned up file {file_id}")
- except Exception as e:
- print(f"Error cleaning up file {file_id}: {e}")
- async def _setup_file_search_agent(real_openai_client: AsyncOpenAI, tmp_path: Path):
- """Helper to set up file search agent with test file."""
- test_txt_path = Path("tests/data/files/favorite_books.txt")
- assert test_txt_path.exists(), f"Test file not found at {test_txt_path}"
- # Use pytest tmp_path for isolation
- tmp_dir = tmp_path / "file_search_test"
- tmp_dir.mkdir(exist_ok=True)
- tmp_file_path = tmp_dir / "favorite_books.txt"
- shutil.copy(test_txt_path, tmp_file_path)
- file_search_agent = Agent(
- name="FileSearchAgent",
- instructions="""You are an agent that can read and analyze text files using FileSearch.
- When asked questions about files, always use your FileSearch tool to search through the uploaded documents.
- Be direct and specific in your answers based on what you find in the files.""",
- model="gpt-5.4-mini",
- model_settings=ModelSettings(tool_choice="required"),
- include_search_results=True,
- tool_use_behavior="stop_on_first_tool",
- files_folder=tmp_dir,
- )
- file_search_agent._openai_client = real_openai_client
- # Find vector store folder
- candidates = list(tmp_dir.parent.glob(f"{tmp_dir.name}_vs_*"))
- folder_path = candidates[0] if candidates else None
- assert folder_path, "No vector store folder found"
- return file_search_agent, folder_path, tmp_file_path, test_txt_path
- async def _wait_for_vector_store(real_openai_client: AsyncOpenAI, agent):
- """Helper to wait for vector store processing to complete."""
- vector_store_id = agent._associated_vector_store_id
- if not vector_store_id:
- return
- print(f"Waiting for vector store {vector_store_id} to complete processing...")
- for i in range(30): # Wait up to 30 seconds
- vs = await real_openai_client.vector_stores.retrieve(vector_store_id)
- if vs.status == "completed":
- print(f"Vector store processing completed after {i + 1} seconds")
- break
- elif vs.status == "failed":
- raise Exception(f"Vector store processing failed: {vs}")
- await asyncio.sleep(1)
- else:
- print(f"Warning: Vector store still processing after 30 seconds, status: {vs.status}")
- async def _cleanup_file_search_resources(real_openai_client: AsyncOpenAI, folder_path: Path, agent):
- """Helper to clean up file search test resources."""
- try:
- if folder_path and folder_path.exists():
- for file in folder_path.glob("*"):
- try:
- file_id = agent.file_manager.get_id_from_file(file)
- if file_id:
- await real_openai_client.files.delete(file_id=file_id)
- print(f"Cleaned up file {file.name}")
- os.remove(file)
- except Exception as e:
- print(f"Error cleaning up file {file.name}: {e}")
- # Clean up vector store
- try:
- vector_store_id = folder_path.name.split("_vs_")[-1]
- await real_openai_client.vector_stores.delete(vector_store_id=f"vs_{vector_store_id}")
- print(f"Cleaned up vector store {folder_path.name}")
- os.rmdir(folder_path)
- except Exception as e:
- print(f"Error cleaning up vector store: {e}")
- except Exception as e:
- print(f"Error during cleanup: {e}")
- async def _assert_openai_file_absent(real_openai_client: AsyncOpenAI, file_id: str, timeout_seconds: int = 120) -> None:
- """Polls until the given OpenAI file_id is confirmed deleted, mirroring FileSync waits."""
- deadline = asyncio.get_event_loop().time() + timeout_seconds
- while True:
- try:
- await real_openai_client.files.retrieve(file_id=file_id)
- except NotFoundError:
- return
- if asyncio.get_event_loop().time() >= deadline:
- pytest.fail(f"OpenAI file {file_id} still exists after waiting {timeout_seconds} seconds")
- await asyncio.sleep(1)
- @pytest.mark.asyncio
- async def test_files_folder_reuse_without_missing_directory_warning(
- real_openai_client: AsyncOpenAI, tmp_path: Path, caplog
- ):
- """Agents reuse existing vector store directories without logging directory warnings."""
- caplog.set_level(logging.WARNING, logger="agency_swarm.agent.file_manager")
- source_file = Path("tests/data/files/favorite_books.txt")
- assert source_file.exists(), f"Test file not found at {source_file}"
- files_dir = tmp_path / "files"
- files_dir.mkdir()
- shutil.copy2(source_file, files_dir / source_file.name)
- agent_kwargs = {
- "name": "FileReuseAgent",
- "instructions": "You are a document assistant who relies on FileSearch for answers.",
- "files_folder": str(files_dir),
- "include_search_results": True,
- "model": "gpt-5.4-mini",
- "model_settings": ModelSettings(tool_choice="file_search"),
- "tool_use_behavior": "stop_on_first_tool",
- }
- first_agent = Agent(**agent_kwargs)
- first_agent._openai_client = real_openai_client
- Agency(first_agent, user_context=None)
- await _wait_for_vector_store(real_openai_client, first_agent)
- initial_folder = first_agent.files_folder_path
- assert initial_folder is not None
- assert initial_folder.name.startswith("files_vs_"), f"Unexpected folder name: {initial_folder}"
- first_run_logs = "\n".join(
- record.message for record in caplog.records if record.name == "agency_swarm.agent.file_manager"
- )
- assert "not a directory" not in first_run_logs
- caplog.clear()
- reuse_agent = Agent(**agent_kwargs)
- reuse_agent._openai_client = real_openai_client
- Agency(reuse_agent, user_context=None)
- await _wait_for_vector_store(real_openai_client, reuse_agent)
- assert reuse_agent.files_folder_path == initial_folder
- reuse_logs = "\n".join(
- record.message for record in caplog.records if record.name == "agency_swarm.agent.file_manager"
- )
- assert "not a directory" not in reuse_logs
- try:
- await _cleanup_file_search_resources(real_openai_client, initial_folder, reuse_agent)
- finally:
- # Ensure the original `files` path no longer exists, confirming reuse
- assert not files_dir.exists()
- @pytest.mark.asyncio
- async def test_file_search_tool(real_openai_client: AsyncOpenAI, tmp_path: Path):
- """Tests that an agent can use FileSearch tool to process files."""
- file_search_agent, folder_path, tmp_file_path, test_txt_path = await _setup_file_search_agent(
- real_openai_client, tmp_path
- )
- try:
- await _wait_for_vector_store(real_openai_client, file_search_agent)
- # Initialize agency and run test
- agency = Agency(file_search_agent, user_context=None)
- question = (
- "Use FileSearch with the query 'hobbit' to find the answer: What is the title of the 4th book in the list?"
- )
- try:
- from agents import RunConfig
- # Single-turn: enforce FileSearch tool usage deterministically
- response_result = await agency.get_response(
- question,
- run_config=RunConfig(model_settings=ModelSettings(tool_choice="file_search")),
- )
- assert response_result is not None
- print(f"Response for {test_txt_path.name}: {response_result.final_output}")
- # Verify FileSearch was used and expected content found
- final_output_lower = response_result.final_output.lower()
- hobbit_found = any(term in final_output_lower for term in ["hobbit", "the hobbit", "j.r.r. tolkien"])
- if not hobbit_found:
- print("Expected content not found, checking if FileSearch was used")
- tool_calls_made = [
- item for item in response_result.new_items if hasattr(item, "tool_calls") and item.tool_calls
- ]
- file_search_used = any(
- any(call.type == "file_search" for call in item.tool_calls if hasattr(call, "type"))
- for item in tool_calls_made
- )
- if not file_search_used:
- print("FileSearch tool was not used, this may explain why the answer wasn't found")
- assert hobbit_found, f"Expected 'hobbit' or related terms not found in: {response_result.final_output}"
- except Exception as e:
- # Handle 404 errors with retry
- if "404" in str(e) and "Files" in str(e):
- print(f"Files not found error, re-uploading and retrying: {e}")
- uploaded_file_id = file_search_agent.upload_file(str(tmp_file_path), include_in_vector_store=True)
- print(f"Re-uploaded file {tmp_file_path.name} with ID: {uploaded_file_id}")
- response_result = await agency.get_response(question)
- assert response_result is not None
- print(f"Response (retry): {response_result.final_output}")
- final_output_lower = response_result.final_output.lower()
- hobbit_found = any(term in final_output_lower for term in ["hobbit", "the hobbit", "j.r.r. tolkien"])
- assert hobbit_found, f"Expected 'hobbit' terms not found in retry: {response_result.final_output}"
- else:
- raise
- finally:
- await _cleanup_file_search_resources(real_openai_client, folder_path, file_search_agent)
- @pytest.mark.asyncio
- async def test_code_interpreter_tool(real_openai_client: AsyncOpenAI, tmp_path: Path):
- """
- Tests that an agent can read and execute code using CodeInterpreter tool.
- """
- test_py_path = Path("tests/data/files/test-python.py")
- assert test_py_path.exists(), f"Test file not found at {test_py_path}"
- # Use pytest tmp_path for isolation
- tmp_dir = tmp_path / "code_interpreter_test"
- tmp_dir.mkdir(exist_ok=True)
- tmp_file_path = tmp_dir / "test-python.py"
- shutil.copy(test_py_path, tmp_file_path)
- try:
- code_interpreter_agent = Agent(
- name="CodeInterpreterAgent",
- instructions="""You are an agent that can read and execute code using CodeInterpreter tool.""",
- model_settings=ModelSettings(temperature=0.0, tool_choice="required"),
- tool_use_behavior="stop_on_first_tool",
- files_folder=tmp_dir,
- )
- code_interpreter_agent._openai_client = real_openai_client
- # Find vector store folder
- candidates = list(tmp_dir.parent.glob(f"{tmp_dir.name}_vs_*"))
- folder_path = candidates[0] if candidates else None
- assert folder_path, "No vector store folder found"
- # Initialize agency for the agent
- agency = Agency(code_interpreter_agent, user_context=None)
- # Test the simple usage of the code interpreter tool (answer is always 37)
- # Use a deterministic script to avoid RNG differences across environments
- question = """
- Use CodeInterpreter tool to execute this script and tell me the results:
- ```print(sum([10, 20, 7]))```
- """
- response_result = await agency.get_response(question)
- # Verify response
- assert response_result is not None
- assert "37" in response_result.final_output.lower()
- # Execute python script (answer is always 14910)
- query = "Run test-python script, return me its results and tell me exactly what you did to get them."
- response_result = await agency.get_response(query)
- assert response_result is not None
- # Handle various number formatting (with/without commas, LaTeX formatting, etc.)
- response_text = response_result.final_output.lower()
- # Remove LaTeX formatting and common separators to find the core number
- numbers_in_response = re.findall(r"14[,\s\\()]*910", response_text)
- assert len(numbers_in_response) > 0, (
- f"Expected to find '14910' (possibly formatted) in response. Response: {response_result.final_output}"
- )
- finally:
- # Cleanup: Delete uploaded file from OpenAI and temp directory
- try:
- for file in folder_path.glob("*"):
- file_id = code_interpreter_agent.file_manager.get_id_from_file(file)
- if file_id:
- await real_openai_client.files.delete(file_id=file_id)
- print(f"Cleaned up file {file.name}")
- os.remove(file)
- vector_store_id = folder_path.name.split("_vs_")[-1]
- await real_openai_client.vector_stores.delete(vector_store_id=f"vs_{vector_store_id}")
- print(f"Cleaned up vector store {folder_path.name}")
- os.rmdir(folder_path)
- print(f"Cleaned up folder {folder_path.name}")
- # Clean up the tmp directory if it's empty
- if tmp_dir.exists() and not any(tmp_dir.iterdir()):
- os.rmdir(tmp_dir)
- print(f"Cleaned up tmp directory {tmp_dir}")
- except Exception as e:
- print(f"Error cleaning up: {e}, dir: {tmp_dir.glob('*')}")
- @pytest.mark.asyncio
- async def test_agent_vision_capabilities(real_openai_client: AsyncOpenAI, tmp_path: Path):
- """
- Tests that an agent can process images using OpenAI's vision capabilities.
- Uses the input_image format with base64 encoded images.
- Uses the pre-generated example images since vision requires actual image files.
- """
- def image_to_base64(image_path: Path) -> str:
- """Convert image file to base64 string."""
- with open(image_path, "rb") as image_file:
- encoded_string = base64.b64encode(image_file.read()).decode("utf-8")
- return encoded_string
- # Use the example images since they're actual image files (not text files)
- # Resolve paths relative to the project root
- project_root = Path(__file__).resolve().parents[3] # Go up to project root
- test_images = [
- (
- project_root / "examples/data/shapes_and_text.png",
- "How many shapes do you see in this image?",
- ("three", "3"),
- ),
- (project_root / "examples/data/shapes_and_text.png", "What text do you see in this image?", "VISION TEST 2024"),
- ]
- # Verify test images exist
- for image_path, _, _ in test_images:
- assert image_path.exists(), f"Test image not found at {image_path}"
- # Create a vision-capable agent with temperature=0 for deterministic responses
- vision_agent = Agent(
- name="VisionAgent",
- instructions="""You are an expert vision AI that can analyze images accurately.
- When images are provided, examine them carefully and answer questions about their content.
- Be precise and specific in your descriptions.""",
- model_settings=ModelSettings(temperature=0.0),
- )
- vision_agent._openai_client = real_openai_client
- # Initialize agency for the agent
- Agency(vision_agent, user_context=None)
- # Test processing each image
- for image_path, question, expected_content in test_images:
- print(f"\nTesting vision processing of {image_path.name}")
- # Convert image to base64
- b64_image = image_to_base64(image_path)
- # Create message with input_image format
- message_with_image = [
- {
- "role": "user",
- "content": [
- {
- "type": "input_image",
- "detail": "auto",
- "image_url": f"data:image/png;base64,{b64_image}",
- }
- ],
- },
- {"role": "user", "content": question},
- ]
- # Process the image - OpenAI will automatically handle vision processing
- response_result = await vision_agent.get_response(message_with_image)
- # Verify response
- assert response_result is not None
- assert response_result.final_output is not None
- print(f"Vision response for {image_path.name}: {response_result.final_output}")
- # Use case-insensitive search for matching (accept any alternative)
- response_lower = response_result.final_output.lower()
- alternatives = (expected_content,) if isinstance(expected_content, str) else expected_content
- content_found = any(alt.lower() in response_lower for alt in alternatives)
- assert content_found, (
- f"Expected content '{expected_content}' not found in vision response for {image_path.name}. "
- f"This suggests the vision processing failed or the model couldn't see the image content. "
- f"Response: {response_result.final_output}"
- )
- # Verify NO custom tool calls were made (OpenAI processes vision automatically)
- tool_calls_found = False
- for item in response_result.new_items:
- if isinstance(item, ToolCallItem):
- tool_calls_found = True
- print(f"Unexpected tool call found for {image_path.name}: {item.raw_item}")
- # We expect NO tool calls since OpenAI processes vision automatically
- assert not tool_calls_found, (
- f"No tool calls should be found for {image_path.name} since OpenAI automatically processes vision. "
- f"The presence of tool calls suggests the implementation is incorrectly trying to use custom tools."
- )
- @pytest.mark.asyncio
- @pytest.mark.skipif(
- os.getenv("CI") == "true",
- reason="Requires live OpenAI API; skipped on CI to avoid upstream flake.",
- )
- async def test_vector_store_cleanup_on_init(real_openai_client: AsyncOpenAI, tmp_path: Path):
- """Agent initialization synchronizes vector store with local files, removing orphaned files from VS and OpenAI."""
- source_file = Path("tests/data/files/favorite_books.txt")
- assert source_file.exists(), f"Test file not found at {source_file}"
- # Create temp folder with two files
- files_dir = tmp_path / "cleanup_files"
- files_dir.mkdir(exist_ok=True)
- file_a = files_dir / "books_a.txt"
- file_b = files_dir / "books_b.txt"
- file_a.write_text(source_file.read_text(encoding="utf-8"), encoding="utf-8")
- file_b.write_text(source_file.read_text(encoding="utf-8"), encoding="utf-8")
- agent_kwargs = {
- "name": "CleanupAgent",
- "instructions": "Use FileSearch to answer from documents.",
- "files_folder": str(files_dir),
- "include_search_results": True,
- "model": "gpt-5.4-mini",
- "model_settings": ModelSettings(tool_choice="file_search"),
- "tool_use_behavior": "stop_on_first_tool",
- }
- # First init: uploads both files and creates VS
- agent1 = Agent(**agent_kwargs)
- agent1._openai_client = real_openai_client
- Agency(agent1, user_context=None)
- await _wait_for_vector_store(real_openai_client, agent1)
- # Find VS folder and collect uploaded ids
- candidates = list(files_dir.parent.glob(f"{files_dir.name}_vs_*"))
- folder_path = candidates[0] if candidates else None
- assert folder_path and folder_path.exists(), "No vector store folder found"
- uploaded_ids = []
- for f in folder_path.glob("*"):
- if f.is_file():
- fid = agent1.file_manager.get_id_from_file(f)
- if fid:
- uploaded_ids.append(fid)
- assert len(uploaded_ids) == 2, f"Expected 2 uploaded files, got {len(uploaded_ids)}"
- # Remove one local file
- local_files = [p for p in folder_path.glob("*") if p.is_file()]
- assert len(local_files) >= 2
- removed_local = local_files[0]
- removed_id = agent1.file_manager.get_id_from_file(removed_local)
- os.remove(removed_local)
- # Re-init: should detach removed from VS and delete OpenAI file object
- agent2 = Agent(**agent_kwargs)
- agent2._openai_client = real_openai_client
- Agency(agent2, user_context=None)
- await _wait_for_vector_store(real_openai_client, agent2)
- vs_id = agent2._associated_vector_store_id
- assert isinstance(vs_id, str) and vs_id
- # Vector Store file listings are eventually consistent; do not assert immediate absence here.
- await _assert_openai_file_absent(real_openai_client, removed_id)
- # Cleanup
- try:
- await _cleanup_file_search_resources(real_openai_client, folder_path, agent2)
- except Exception as e:
- print(f"Cleanup failed: {e}")
- @pytest.mark.asyncio
- @pytest.mark.skipif(
- os.getenv("CI") == "true",
- reason="Requires live OpenAI API; skipped on CI to avoid upstream flake.",
- )
- async def test_file_reupload_on_mtime_update(real_openai_client: AsyncOpenAI, tmp_path: Path):
- """Modifying local file triggers re-upload with a new file_id and VS update."""
- source_file = Path("tests/data/files/favorite_books.txt")
- assert source_file.exists(), f"Test file not found at {source_file}"
- # Create temp folder and copy file
- files_dir = tmp_path / "reupload_files"
- files_dir.mkdir(exist_ok=True)
- local_file = files_dir / "favorite_books.txt"
- shutil.copy2(source_file, local_file)
- agent_kwargs = {
- "name": "ReuploadAgent",
- "instructions": "Use FileSearch to answer from documents.",
- "files_folder": str(files_dir),
- "include_search_results": True,
- "model": "gpt-5.4-mini",
- "model_settings": ModelSettings(tool_choice="file_search"),
- "tool_use_behavior": "stop_on_first_tool",
- }
- # First init: upload original file
- agent1 = Agent(**agent_kwargs)
- agent1._openai_client = real_openai_client
- Agency(agent1, user_context=None)
- await _wait_for_vector_store(real_openai_client, agent1)
- # Locate vector store folder and uploaded file id
- candidates = list(files_dir.parent.glob(f"{files_dir.name}_vs_*"))
- folder_path = candidates[0] if candidates else None
- assert folder_path and folder_path.exists(), "No vector store folder found"
- vs_files_local = [p for p in folder_path.glob("*") if p.is_file()]
- assert len(vs_files_local) == 1
- uploaded_path = vs_files_local[0]
- old_id = agent1.file_manager.get_id_from_file(uploaded_path)
- assert isinstance(old_id, str) and old_id
- # Ensure mtime > created_at by waiting and modifying the file
- time.sleep(2)
- with open(uploaded_path, "a", encoding="utf-8") as f:
- f.write("\nReupload test line.")
- # Bump mtime explicitly to avoid rounding issues
- try:
- st = uploaded_path.stat()
- os.utime(uploaded_path, (st.st_atime, st.st_mtime + 2))
- except Exception:
- pass
- # Re-init agent: should detect newer mtime and re-upload
- agent2 = Agent(**agent_kwargs)
- agent2._openai_client = real_openai_client
- Agency(agent2, user_context=None)
- await _wait_for_vector_store(real_openai_client, agent2)
- vs_id = agent2._associated_vector_store_id
- assert isinstance(vs_id, str) and vs_id
- # Verify that reupload occurred (new file was uploaded)
- # Note: We don't test that the old file was removed from the vector store,
- # as vector store cleanup may be eventually consistent
- vs_files = await real_openai_client.vector_stores.files.list(vector_store_id=vs_id, filter="completed")
- new_ids = {getattr(f, "file_id", None) or getattr(f, "id", None) for f in vs_files.data}
- assert len(new_ids) >= 1, f"Expected at least 1 file in vector store, got {len(new_ids)}"
- # Verify that a new file ID exists (reupload occurred) - either old file is gone or we have multiple files
- assert old_id not in new_ids or len(new_ids) > 1, (
- f"Reupload should create new file, but only found old_id {old_id} in {new_ids}"
- )
- # Cleanup
- try:
- await _cleanup_file_search_resources(real_openai_client, folder_path, agent2)
- except Exception as e:
- print(f"Cleanup failed: {e}")
|