import asyncio import base64 import logging import os import re import shutil import time from pathlib import Path import pytest from agents import ModelSettings, ToolCallItem from openai import AsyncOpenAI, NotFoundError from agency_swarm import Agency, Agent OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") @pytest.fixture(scope="module") async def real_openai_client(): return AsyncOpenAI(api_key=OPENAI_API_KEY) @pytest.mark.asyncio async def test_agent_processes_message_files_attachment(real_openai_client: AsyncOpenAI, tmp_path: Path): """ Tests that an agent can receive a file ID via `file_ids` parameter, and OpenAI automatically makes the file content available to the LLM. No custom tools are needed - OpenAI handles file processing automatically. Uses a REAL OpenAI client for file upload and REAL agent execution. """ # Use existing rich test PDF from v0.X tests test_pdf_path = Path("tests/data/files/test-pdf-2.pdf") assert test_pdf_path.exists(), f"Test PDF not found at {test_pdf_path}" uploaded_real_file = await real_openai_client.files.create(file=test_pdf_path.open("rb"), purpose="assistants") attached_file_id = uploaded_real_file.id print(f"TEST: Uploaded file {test_pdf_path.name} for attachment, ID: {attached_file_id}") # 2. Initialize Agent WITHOUT any custom file processing tools # OpenAI will automatically process the attached file and make content available to the LLM attachment_tester_agent = Agent( name="AttachmentTesterAgentReal", instructions=( "You are a helpful assistant. When files are attached, you can read their content directly. " "Answer questions about the file content accurately." ), model_settings=ModelSettings(temperature=0.0), ) attachment_tester_agent._openai_client = real_openai_client # 3. Setup a real Agency for proper testing agency = Agency(attachment_tester_agent, user_context=None) # 4. Call get_response with file_ids - OpenAI will automatically process the file message_to_agent = "What is my favorite food?" print(f"TEST: Calling get_response for agent '{attachment_tester_agent.name}' with file_ids: [{attached_file_id}]") response_result = await agency.get_response(message_to_agent, file_ids=[attached_file_id]) assert response_result is not None assert response_result.final_output is not None print(f"TEST: Agent final output: {response_result.final_output}") # 5. Verify the agent could access the file content automatically # The LLM should be able to read the PDF content without any custom tools # The test PDF contains a secret phrase we can verify response_lower = response_result.final_output.lower() assert len(response_result.final_output) > 20, ( f"Response too short, suggests file content was not processed. Response: {response_result.final_output}" ) # Look for the secret phrase that should be in the PDF secret_phrase_found = "strawberry" in response_lower.lower() assert secret_phrase_found, ( f"Expected word 'strawberry' not found in response. " f"This suggests the PDF content was not made available to the LLM. " f"Response: {response_result.final_output}" ) # 6. Verify NO custom tool calls were made (since OpenAI handles file processing automatically) tool_calls_found = False for item in response_result.new_items: if isinstance(item, ToolCallItem): tool_calls_found = True print(f"Unexpected tool call found: {item.raw_item}") # We expect NO tool calls since OpenAI processes files automatically assert not tool_calls_found, ( "No tool calls should be found since OpenAI automatically processes file attachments. " "The presence of tool calls suggests the implementation is incorrectly trying to use custom tools." ) # 7. Cleanup the test file try: await real_openai_client.files.delete(attached_file_id) print(f"Cleaned up file {attached_file_id}") except Exception as e: print(f"Warning: Failed to clean up file {attached_file_id}: {e}") @pytest.mark.asyncio async def test_multi_file_type_processing(real_openai_client: AsyncOpenAI, tmp_path: Path): """ Tests that an agent can process PDF files automatically via OpenAI's Responses API file processing. NOTE: The OpenAI Responses API with input_file type only supports PDF files for direct attachment. Other file types (TXT, CSV, images) are supported through different mechanisms: - Vector Stores/File Search (for RAG functionality) - Code Interpreter (for code execution with files) This test focuses on the direct file attachment capability which is PDF-only. Uses the existing rich test PDF from the v0.X test suite. """ # Use the existing rich test PDF with secret phrase test_pdf_path = Path("tests/data/files/test-pdf-2.pdf") assert test_pdf_path.exists(), f"Test PDF not found at {test_pdf_path}" # Upload PDF file to OpenAI with open(test_pdf_path, "rb") as f: uploaded_file = await real_openai_client.files.create(file=f, purpose="assistants") file_id = uploaded_file.id print(f"Uploaded {test_pdf_path.name}, got ID: {file_id}") try: # Create an agent WITHOUT custom file processing tools # OpenAI will automatically process PDF files and make content available file_processor_agent = Agent( name="FileProcessorAgent", instructions="""You are an agent that can read and analyze PDF files automatically. When PDF files are attached, you can access their content directly. Extract and summarize key information from the PDF content accurately.""", model_settings=ModelSettings(temperature=0.0), ) file_processor_agent._openai_client = real_openai_client # Initialize agency for the agent Agency(file_processor_agent, user_context=None) # Test processing the PDF file question = "What is my favorite food?" expected_content = "strawberry" # Process the PDF file - OpenAI will automatically make file content available response_result = await file_processor_agent.get_response(question, file_ids=[file_id]) # Verify response assert response_result is not None print(f"Response for {test_pdf_path.name}: {response_result.final_output}") # Use case-insensitive search for matching response_lower = response_result.final_output.lower() expected_lower = expected_content.lower() # With temperature=0, responses should be deterministic content_found = expected_lower in response_lower assert content_found, ( f"Expected content '{expected_content}' not found in response for {test_pdf_path.name}. " f"This suggests OpenAI did not make the PDF file content available to the LLM. " f"Response: {response_result.final_output}" ) # Verify NO custom tool calls were made (OpenAI processes PDFs automatically) tool_calls_found = False for item in response_result.new_items: if isinstance(item, ToolCallItem): tool_calls_found = True print(f"Unexpected tool call found for {test_pdf_path.name}: {item.raw_item}") assert not tool_calls_found, ( f"No tool calls should be found for {test_pdf_path.name} since OpenAI automatically processes " f"PDF file attachments. The presence of tool calls suggests the implementation is incorrectly " f"trying to use custom tools." ) finally: # Cleanup: Delete uploaded file from OpenAI try: await real_openai_client.files.delete(file_id=file_id) print(f"Cleaned up file {file_id}") except Exception as e: print(f"Error cleaning up file {file_id}: {e}") async def _setup_file_search_agent(real_openai_client: AsyncOpenAI, tmp_path: Path): """Helper to set up file search agent with test file.""" test_txt_path = Path("tests/data/files/favorite_books.txt") assert test_txt_path.exists(), f"Test file not found at {test_txt_path}" # Use pytest tmp_path for isolation tmp_dir = tmp_path / "file_search_test" tmp_dir.mkdir(exist_ok=True) tmp_file_path = tmp_dir / "favorite_books.txt" shutil.copy(test_txt_path, tmp_file_path) file_search_agent = Agent( name="FileSearchAgent", instructions="""You are an agent that can read and analyze text files using FileSearch. When asked questions about files, always use your FileSearch tool to search through the uploaded documents. Be direct and specific in your answers based on what you find in the files.""", model="gpt-5.4-mini", model_settings=ModelSettings(tool_choice="required"), include_search_results=True, tool_use_behavior="stop_on_first_tool", files_folder=tmp_dir, ) file_search_agent._openai_client = real_openai_client # Find vector store folder candidates = list(tmp_dir.parent.glob(f"{tmp_dir.name}_vs_*")) folder_path = candidates[0] if candidates else None assert folder_path, "No vector store folder found" return file_search_agent, folder_path, tmp_file_path, test_txt_path async def _wait_for_vector_store(real_openai_client: AsyncOpenAI, agent): """Helper to wait for vector store processing to complete.""" vector_store_id = agent._associated_vector_store_id if not vector_store_id: return print(f"Waiting for vector store {vector_store_id} to complete processing...") for i in range(30): # Wait up to 30 seconds vs = await real_openai_client.vector_stores.retrieve(vector_store_id) if vs.status == "completed": print(f"Vector store processing completed after {i + 1} seconds") break elif vs.status == "failed": raise Exception(f"Vector store processing failed: {vs}") await asyncio.sleep(1) else: print(f"Warning: Vector store still processing after 30 seconds, status: {vs.status}") async def _cleanup_file_search_resources(real_openai_client: AsyncOpenAI, folder_path: Path, agent): """Helper to clean up file search test resources.""" try: if folder_path and folder_path.exists(): for file in folder_path.glob("*"): try: file_id = agent.file_manager.get_id_from_file(file) if file_id: await real_openai_client.files.delete(file_id=file_id) print(f"Cleaned up file {file.name}") os.remove(file) except Exception as e: print(f"Error cleaning up file {file.name}: {e}") # Clean up vector store try: vector_store_id = folder_path.name.split("_vs_")[-1] await real_openai_client.vector_stores.delete(vector_store_id=f"vs_{vector_store_id}") print(f"Cleaned up vector store {folder_path.name}") os.rmdir(folder_path) except Exception as e: print(f"Error cleaning up vector store: {e}") except Exception as e: print(f"Error during cleanup: {e}") async def _assert_openai_file_absent(real_openai_client: AsyncOpenAI, file_id: str, timeout_seconds: int = 120) -> None: """Polls until the given OpenAI file_id is confirmed deleted, mirroring FileSync waits.""" deadline = asyncio.get_event_loop().time() + timeout_seconds while True: try: await real_openai_client.files.retrieve(file_id=file_id) except NotFoundError: return if asyncio.get_event_loop().time() >= deadline: pytest.fail(f"OpenAI file {file_id} still exists after waiting {timeout_seconds} seconds") await asyncio.sleep(1) @pytest.mark.asyncio async def test_files_folder_reuse_without_missing_directory_warning( real_openai_client: AsyncOpenAI, tmp_path: Path, caplog ): """Agents reuse existing vector store directories without logging directory warnings.""" caplog.set_level(logging.WARNING, logger="agency_swarm.agent.file_manager") source_file = Path("tests/data/files/favorite_books.txt") assert source_file.exists(), f"Test file not found at {source_file}" files_dir = tmp_path / "files" files_dir.mkdir() shutil.copy2(source_file, files_dir / source_file.name) agent_kwargs = { "name": "FileReuseAgent", "instructions": "You are a document assistant who relies on FileSearch for answers.", "files_folder": str(files_dir), "include_search_results": True, "model": "gpt-5.4-mini", "model_settings": ModelSettings(tool_choice="file_search"), "tool_use_behavior": "stop_on_first_tool", } first_agent = Agent(**agent_kwargs) first_agent._openai_client = real_openai_client Agency(first_agent, user_context=None) await _wait_for_vector_store(real_openai_client, first_agent) initial_folder = first_agent.files_folder_path assert initial_folder is not None assert initial_folder.name.startswith("files_vs_"), f"Unexpected folder name: {initial_folder}" first_run_logs = "\n".join( record.message for record in caplog.records if record.name == "agency_swarm.agent.file_manager" ) assert "not a directory" not in first_run_logs caplog.clear() reuse_agent = Agent(**agent_kwargs) reuse_agent._openai_client = real_openai_client Agency(reuse_agent, user_context=None) await _wait_for_vector_store(real_openai_client, reuse_agent) assert reuse_agent.files_folder_path == initial_folder reuse_logs = "\n".join( record.message for record in caplog.records if record.name == "agency_swarm.agent.file_manager" ) assert "not a directory" not in reuse_logs try: await _cleanup_file_search_resources(real_openai_client, initial_folder, reuse_agent) finally: # Ensure the original `files` path no longer exists, confirming reuse assert not files_dir.exists() @pytest.mark.asyncio async def test_file_search_tool(real_openai_client: AsyncOpenAI, tmp_path: Path): """Tests that an agent can use FileSearch tool to process files.""" file_search_agent, folder_path, tmp_file_path, test_txt_path = await _setup_file_search_agent( real_openai_client, tmp_path ) try: await _wait_for_vector_store(real_openai_client, file_search_agent) # Initialize agency and run test agency = Agency(file_search_agent, user_context=None) question = ( "Use FileSearch with the query 'hobbit' to find the answer: What is the title of the 4th book in the list?" ) try: from agents import RunConfig # Single-turn: enforce FileSearch tool usage deterministically response_result = await agency.get_response( question, run_config=RunConfig(model_settings=ModelSettings(tool_choice="file_search")), ) assert response_result is not None print(f"Response for {test_txt_path.name}: {response_result.final_output}") # Verify FileSearch was used and expected content found final_output_lower = response_result.final_output.lower() hobbit_found = any(term in final_output_lower for term in ["hobbit", "the hobbit", "j.r.r. tolkien"]) if not hobbit_found: print("Expected content not found, checking if FileSearch was used") tool_calls_made = [ item for item in response_result.new_items if hasattr(item, "tool_calls") and item.tool_calls ] file_search_used = any( any(call.type == "file_search" for call in item.tool_calls if hasattr(call, "type")) for item in tool_calls_made ) if not file_search_used: print("FileSearch tool was not used, this may explain why the answer wasn't found") assert hobbit_found, f"Expected 'hobbit' or related terms not found in: {response_result.final_output}" except Exception as e: # Handle 404 errors with retry if "404" in str(e) and "Files" in str(e): print(f"Files not found error, re-uploading and retrying: {e}") uploaded_file_id = file_search_agent.upload_file(str(tmp_file_path), include_in_vector_store=True) print(f"Re-uploaded file {tmp_file_path.name} with ID: {uploaded_file_id}") response_result = await agency.get_response(question) assert response_result is not None print(f"Response (retry): {response_result.final_output}") final_output_lower = response_result.final_output.lower() hobbit_found = any(term in final_output_lower for term in ["hobbit", "the hobbit", "j.r.r. tolkien"]) assert hobbit_found, f"Expected 'hobbit' terms not found in retry: {response_result.final_output}" else: raise finally: await _cleanup_file_search_resources(real_openai_client, folder_path, file_search_agent) @pytest.mark.asyncio async def test_code_interpreter_tool(real_openai_client: AsyncOpenAI, tmp_path: Path): """ Tests that an agent can read and execute code using CodeInterpreter tool. """ test_py_path = Path("tests/data/files/test-python.py") assert test_py_path.exists(), f"Test file not found at {test_py_path}" # Use pytest tmp_path for isolation tmp_dir = tmp_path / "code_interpreter_test" tmp_dir.mkdir(exist_ok=True) tmp_file_path = tmp_dir / "test-python.py" shutil.copy(test_py_path, tmp_file_path) try: code_interpreter_agent = Agent( name="CodeInterpreterAgent", instructions="""You are an agent that can read and execute code using CodeInterpreter tool.""", model_settings=ModelSettings(temperature=0.0, tool_choice="required"), tool_use_behavior="stop_on_first_tool", files_folder=tmp_dir, ) code_interpreter_agent._openai_client = real_openai_client # Find vector store folder candidates = list(tmp_dir.parent.glob(f"{tmp_dir.name}_vs_*")) folder_path = candidates[0] if candidates else None assert folder_path, "No vector store folder found" # Initialize agency for the agent agency = Agency(code_interpreter_agent, user_context=None) # Test the simple usage of the code interpreter tool (answer is always 37) # Use a deterministic script to avoid RNG differences across environments question = """ Use CodeInterpreter tool to execute this script and tell me the results: ```print(sum([10, 20, 7]))``` """ response_result = await agency.get_response(question) # Verify response assert response_result is not None assert "37" in response_result.final_output.lower() # Execute python script (answer is always 14910) query = "Run test-python script, return me its results and tell me exactly what you did to get them." response_result = await agency.get_response(query) assert response_result is not None # Handle various number formatting (with/without commas, LaTeX formatting, etc.) response_text = response_result.final_output.lower() # Remove LaTeX formatting and common separators to find the core number numbers_in_response = re.findall(r"14[,\s\\()]*910", response_text) assert len(numbers_in_response) > 0, ( f"Expected to find '14910' (possibly formatted) in response. Response: {response_result.final_output}" ) finally: # Cleanup: Delete uploaded file from OpenAI and temp directory try: for file in folder_path.glob("*"): file_id = code_interpreter_agent.file_manager.get_id_from_file(file) if file_id: await real_openai_client.files.delete(file_id=file_id) print(f"Cleaned up file {file.name}") os.remove(file) vector_store_id = folder_path.name.split("_vs_")[-1] await real_openai_client.vector_stores.delete(vector_store_id=f"vs_{vector_store_id}") print(f"Cleaned up vector store {folder_path.name}") os.rmdir(folder_path) print(f"Cleaned up folder {folder_path.name}") # Clean up the tmp directory if it's empty if tmp_dir.exists() and not any(tmp_dir.iterdir()): os.rmdir(tmp_dir) print(f"Cleaned up tmp directory {tmp_dir}") except Exception as e: print(f"Error cleaning up: {e}, dir: {tmp_dir.glob('*')}") @pytest.mark.asyncio async def test_agent_vision_capabilities(real_openai_client: AsyncOpenAI, tmp_path: Path): """ Tests that an agent can process images using OpenAI's vision capabilities. Uses the input_image format with base64 encoded images. Uses the pre-generated example images since vision requires actual image files. """ def image_to_base64(image_path: Path) -> str: """Convert image file to base64 string.""" with open(image_path, "rb") as image_file: encoded_string = base64.b64encode(image_file.read()).decode("utf-8") return encoded_string # Use the example images since they're actual image files (not text files) # Resolve paths relative to the project root project_root = Path(__file__).resolve().parents[3] # Go up to project root test_images = [ ( project_root / "examples/data/shapes_and_text.png", "How many shapes do you see in this image?", ("three", "3"), ), (project_root / "examples/data/shapes_and_text.png", "What text do you see in this image?", "VISION TEST 2024"), ] # Verify test images exist for image_path, _, _ in test_images: assert image_path.exists(), f"Test image not found at {image_path}" # Create a vision-capable agent with temperature=0 for deterministic responses vision_agent = Agent( name="VisionAgent", instructions="""You are an expert vision AI that can analyze images accurately. When images are provided, examine them carefully and answer questions about their content. Be precise and specific in your descriptions.""", model_settings=ModelSettings(temperature=0.0), ) vision_agent._openai_client = real_openai_client # Initialize agency for the agent Agency(vision_agent, user_context=None) # Test processing each image for image_path, question, expected_content in test_images: print(f"\nTesting vision processing of {image_path.name}") # Convert image to base64 b64_image = image_to_base64(image_path) # Create message with input_image format message_with_image = [ { "role": "user", "content": [ { "type": "input_image", "detail": "auto", "image_url": f"data:image/png;base64,{b64_image}", } ], }, {"role": "user", "content": question}, ] # Process the image - OpenAI will automatically handle vision processing response_result = await vision_agent.get_response(message_with_image) # Verify response assert response_result is not None assert response_result.final_output is not None print(f"Vision response for {image_path.name}: {response_result.final_output}") # Use case-insensitive search for matching (accept any alternative) response_lower = response_result.final_output.lower() alternatives = (expected_content,) if isinstance(expected_content, str) else expected_content content_found = any(alt.lower() in response_lower for alt in alternatives) assert content_found, ( f"Expected content '{expected_content}' not found in vision response for {image_path.name}. " f"This suggests the vision processing failed or the model couldn't see the image content. " f"Response: {response_result.final_output}" ) # Verify NO custom tool calls were made (OpenAI processes vision automatically) tool_calls_found = False for item in response_result.new_items: if isinstance(item, ToolCallItem): tool_calls_found = True print(f"Unexpected tool call found for {image_path.name}: {item.raw_item}") # We expect NO tool calls since OpenAI processes vision automatically assert not tool_calls_found, ( f"No tool calls should be found for {image_path.name} since OpenAI automatically processes vision. " f"The presence of tool calls suggests the implementation is incorrectly trying to use custom tools." ) @pytest.mark.asyncio @pytest.mark.skipif( os.getenv("CI") == "true", reason="Requires live OpenAI API; skipped on CI to avoid upstream flake.", ) async def test_vector_store_cleanup_on_init(real_openai_client: AsyncOpenAI, tmp_path: Path): """Agent initialization synchronizes vector store with local files, removing orphaned files from VS and OpenAI.""" source_file = Path("tests/data/files/favorite_books.txt") assert source_file.exists(), f"Test file not found at {source_file}" # Create temp folder with two files files_dir = tmp_path / "cleanup_files" files_dir.mkdir(exist_ok=True) file_a = files_dir / "books_a.txt" file_b = files_dir / "books_b.txt" file_a.write_text(source_file.read_text(encoding="utf-8"), encoding="utf-8") file_b.write_text(source_file.read_text(encoding="utf-8"), encoding="utf-8") agent_kwargs = { "name": "CleanupAgent", "instructions": "Use FileSearch to answer from documents.", "files_folder": str(files_dir), "include_search_results": True, "model": "gpt-5.4-mini", "model_settings": ModelSettings(tool_choice="file_search"), "tool_use_behavior": "stop_on_first_tool", } # First init: uploads both files and creates VS agent1 = Agent(**agent_kwargs) agent1._openai_client = real_openai_client Agency(agent1, user_context=None) await _wait_for_vector_store(real_openai_client, agent1) # Find VS folder and collect uploaded ids candidates = list(files_dir.parent.glob(f"{files_dir.name}_vs_*")) folder_path = candidates[0] if candidates else None assert folder_path and folder_path.exists(), "No vector store folder found" uploaded_ids = [] for f in folder_path.glob("*"): if f.is_file(): fid = agent1.file_manager.get_id_from_file(f) if fid: uploaded_ids.append(fid) assert len(uploaded_ids) == 2, f"Expected 2 uploaded files, got {len(uploaded_ids)}" # Remove one local file local_files = [p for p in folder_path.glob("*") if p.is_file()] assert len(local_files) >= 2 removed_local = local_files[0] removed_id = agent1.file_manager.get_id_from_file(removed_local) os.remove(removed_local) # Re-init: should detach removed from VS and delete OpenAI file object agent2 = Agent(**agent_kwargs) agent2._openai_client = real_openai_client Agency(agent2, user_context=None) await _wait_for_vector_store(real_openai_client, agent2) vs_id = agent2._associated_vector_store_id assert isinstance(vs_id, str) and vs_id # Vector Store file listings are eventually consistent; do not assert immediate absence here. await _assert_openai_file_absent(real_openai_client, removed_id) # Cleanup try: await _cleanup_file_search_resources(real_openai_client, folder_path, agent2) except Exception as e: print(f"Cleanup failed: {e}") @pytest.mark.asyncio @pytest.mark.skipif( os.getenv("CI") == "true", reason="Requires live OpenAI API; skipped on CI to avoid upstream flake.", ) async def test_file_reupload_on_mtime_update(real_openai_client: AsyncOpenAI, tmp_path: Path): """Modifying local file triggers re-upload with a new file_id and VS update.""" source_file = Path("tests/data/files/favorite_books.txt") assert source_file.exists(), f"Test file not found at {source_file}" # Create temp folder and copy file files_dir = tmp_path / "reupload_files" files_dir.mkdir(exist_ok=True) local_file = files_dir / "favorite_books.txt" shutil.copy2(source_file, local_file) agent_kwargs = { "name": "ReuploadAgent", "instructions": "Use FileSearch to answer from documents.", "files_folder": str(files_dir), "include_search_results": True, "model": "gpt-5.4-mini", "model_settings": ModelSettings(tool_choice="file_search"), "tool_use_behavior": "stop_on_first_tool", } # First init: upload original file agent1 = Agent(**agent_kwargs) agent1._openai_client = real_openai_client Agency(agent1, user_context=None) await _wait_for_vector_store(real_openai_client, agent1) # Locate vector store folder and uploaded file id candidates = list(files_dir.parent.glob(f"{files_dir.name}_vs_*")) folder_path = candidates[0] if candidates else None assert folder_path and folder_path.exists(), "No vector store folder found" vs_files_local = [p for p in folder_path.glob("*") if p.is_file()] assert len(vs_files_local) == 1 uploaded_path = vs_files_local[0] old_id = agent1.file_manager.get_id_from_file(uploaded_path) assert isinstance(old_id, str) and old_id # Ensure mtime > created_at by waiting and modifying the file time.sleep(2) with open(uploaded_path, "a", encoding="utf-8") as f: f.write("\nReupload test line.") # Bump mtime explicitly to avoid rounding issues try: st = uploaded_path.stat() os.utime(uploaded_path, (st.st_atime, st.st_mtime + 2)) except Exception: pass # Re-init agent: should detect newer mtime and re-upload agent2 = Agent(**agent_kwargs) agent2._openai_client = real_openai_client Agency(agent2, user_context=None) await _wait_for_vector_store(real_openai_client, agent2) vs_id = agent2._associated_vector_store_id assert isinstance(vs_id, str) and vs_id # Verify that reupload occurred (new file was uploaded) # Note: We don't test that the old file was removed from the vector store, # as vector store cleanup may be eventually consistent vs_files = await real_openai_client.vector_stores.files.list(vector_store_id=vs_id, filter="completed") new_ids = {getattr(f, "file_id", None) or getattr(f, "id", None) for f in vs_files.data} assert len(new_ids) >= 1, f"Expected at least 1 file in vector store, got {len(new_ids)}" # Verify that a new file ID exists (reupload occurred) - either old file is gone or we have multiple files assert old_id not in new_ids or len(new_ids) > 1, ( f"Reupload should create new file, but only found old_id {old_id} in {new_ids}" ) # Cleanup try: await _cleanup_file_search_resources(real_openai_client, folder_path, agent2) except Exception as e: print(f"Cleanup failed: {e}")