| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789 |
- from __future__ import annotations
- import copy
- import json
- import time
- from collections.abc import AsyncIterator
- from typing import Any
- import pytest
- from agents import Tool
- from agents.agent_output import AgentOutputSchemaBase
- from agents.handoffs import Handoff
- from agents.items import ModelResponse, TResponseInputItem, TResponseStreamEvent
- from agents.model_settings import ModelSettings
- from agents.models.interface import Model, ModelTracing
- from agents.usage import Usage
- from openai import OpenAI
- from openai.types.responses import (
- Response,
- ResponseCompletedEvent,
- ResponseContentPartAddedEvent,
- ResponseContentPartDoneEvent,
- ResponseCreatedEvent,
- ResponseOutputItemAddedEvent,
- ResponseOutputItemDoneEvent,
- ResponseOutputMessage,
- ResponseOutputText,
- ResponseTextDeltaEvent,
- )
- from openai.types.responses.response_prompt_param import ResponsePromptParam
- from openai.types.responses.response_usage import InputTokensDetails, OutputTokensDetails
- from agency_swarm import Agency, Agent
- from agency_swarm.integrations.fastapi_utils.endpoint_handlers import (
- ActiveRunRegistry,
- generate_chat_name,
- make_response_endpoint,
- make_stream_endpoint,
- )
- from agency_swarm.integrations.fastapi_utils.request_models import BaseRequest
- from agency_swarm.messages.response_input_sanitizer import (
- REASONING_ENCRYPTED_CONTENT_INCLUDE,
- sanitize_store_false_responses_input,
- )
- class _TrackingResponsesModel(Model):
- def __init__(self, model: str = "test-openclaw-threading") -> None:
- self.model = model
- self.issued_response_ids: list[str] = []
- self.seen_previous_response_ids: list[str | None] = []
- self.seen_inputs: list[str | list[TResponseInputItem]] = []
- self.seen_model_settings: list[ModelSettings] = []
- async def get_response(
- self,
- system_instructions: str | None,
- input: str | list[TResponseInputItem],
- model_settings: ModelSettings,
- tools: list[Tool],
- output_schema: AgentOutputSchemaBase | None,
- handoffs: list[Handoff],
- tracing: ModelTracing,
- *,
- previous_response_id: str | None,
- conversation_id: str | None,
- prompt: ResponsePromptParam | None,
- ) -> ModelResponse:
- self.seen_inputs.append(copy.deepcopy(input) if isinstance(input, list) else input)
- self.seen_model_settings.append(copy.deepcopy(model_settings))
- self.seen_previous_response_ids.append(previous_response_id)
- response_id = self._issue_response_id()
- return _build_model_response(text="OK", response_id=response_id)
- def stream_response(
- self,
- system_instructions: str | None,
- input: str | list[TResponseInputItem],
- model_settings: ModelSettings,
- tools: list[Tool],
- output_schema: AgentOutputSchemaBase | None,
- handoffs: list[Handoff],
- tracing: ModelTracing,
- *,
- previous_response_id: str | None,
- conversation_id: str | None,
- prompt: ResponsePromptParam | None,
- ) -> AsyncIterator[TResponseStreamEvent]:
- self.seen_inputs.append(copy.deepcopy(input) if isinstance(input, list) else input)
- self.seen_model_settings.append(copy.deepcopy(model_settings))
- self.seen_previous_response_ids.append(previous_response_id)
- response_id = self._issue_response_id()
- return _stream_text_events(text="OK", model_name=self.model, response_id=response_id)
- def _issue_response_id(self) -> str:
- response_id = f"resp_test_{len(self.issued_response_ids) + 1}"
- self.issued_response_ids.append(response_id)
- return response_id
- def _build_model_response(*, text: str, response_id: str) -> ModelResponse:
- message = ResponseOutputMessage(
- id=f"msg_{response_id}",
- content=[ResponseOutputText(text=text, type="output_text", annotations=[], logprobs=[])],
- role="assistant",
- status="completed",
- type="message",
- )
- usage = Usage(
- requests=1,
- input_tokens=0,
- output_tokens=1,
- total_tokens=1,
- input_tokens_details=InputTokensDetails(cached_tokens=0),
- output_tokens_details=OutputTokensDetails(reasoning_tokens=0),
- )
- return ModelResponse(output=[message], usage=usage, response_id=response_id)
- async def _stream_text_events(*, text: str, model_name: str, response_id: str) -> AsyncIterator[TResponseStreamEvent]:
- created_at = int(time.time())
- message_id = f"msg_{response_id}"
- completed_message = ResponseOutputMessage(
- id=message_id,
- content=[ResponseOutputText(text=text, type="output_text", annotations=[], logprobs=[])],
- role="assistant",
- status="completed",
- type="message",
- )
- yield ResponseCreatedEvent(
- response=Response(
- id=response_id,
- created_at=created_at,
- model=model_name,
- object="response",
- output=[],
- tool_choice="none",
- tools=[],
- parallel_tool_calls=False,
- usage=None,
- ),
- sequence_number=0,
- type="response.created",
- )
- yield ResponseOutputItemAddedEvent(
- item=ResponseOutputMessage(
- id=message_id,
- content=[],
- role="assistant",
- status="in_progress",
- type="message",
- ),
- output_index=0,
- sequence_number=1,
- type="response.output_item.added",
- )
- yield ResponseContentPartAddedEvent(
- content_index=0,
- item_id=message_id,
- output_index=0,
- part=ResponseOutputText(text="", type="output_text", annotations=[], logprobs=[]),
- sequence_number=2,
- type="response.content_part.added",
- )
- yield ResponseTextDeltaEvent(
- content_index=0,
- delta=text,
- item_id=message_id,
- logprobs=[],
- output_index=0,
- sequence_number=3,
- type="response.output_text.delta",
- )
- yield ResponseContentPartDoneEvent(
- content_index=0,
- item_id=message_id,
- output_index=0,
- part=ResponseOutputText(text=text, type="output_text", annotations=[], logprobs=[]),
- sequence_number=4,
- type="response.content_part.done",
- )
- yield ResponseOutputItemDoneEvent(
- item=completed_message,
- output_index=0,
- sequence_number=5,
- type="response.output_item.done",
- )
- yield ResponseCompletedEvent(
- response=Response(
- id=response_id,
- created_at=created_at,
- model=model_name,
- object="response",
- output=[completed_message],
- tool_choice="none",
- tools=[],
- parallel_tool_calls=False,
- usage=None,
- ),
- sequence_number=6,
- type="response.completed",
- )
- def _parse_sse_messages_payload(chunks: list[str]) -> dict[str, Any]:
- current_event: str | None = None
- for chunk in chunks:
- for line in chunk.splitlines():
- if line.startswith("event: "):
- current_event = line.split("event: ", 1)[1].strip()
- continue
- if current_event == "messages" and line.startswith("data: "):
- return json.loads(line.split("data: ", 1)[1])
- raise AssertionError("messages payload not found in SSE stream")
- class _StubRequest:
- async def is_disconnected(self) -> bool:
- return False
- def _persist_messages(store: list[dict[str, Any]], messages: list[dict[str, Any]]) -> None:
- store[:] = copy.deepcopy(messages)
- def _agency_factory_with_store(model: _TrackingResponsesModel, store: list[dict[str, Any]]) -> Agency:
- agent = Agent(
- name="TestAgent",
- instructions="Base instructions",
- model=model,
- model_settings=ModelSettings(temperature=0.0),
- )
- return Agency(
- agent,
- load_threads_callback=lambda: copy.deepcopy(store),
- save_threads_callback=lambda messages: _persist_messages(store, messages),
- )
- def _build_agency_factory(model: _TrackingResponsesModel):
- def create_agency(load_threads_callback=None, save_threads_callback=None):
- agent = Agent(
- name="TestAgent",
- instructions="Base instructions",
- model=model,
- model_settings=ModelSettings(temperature=0.0),
- )
- return Agency(
- agent,
- load_threads_callback=load_threads_callback,
- save_threads_callback=save_threads_callback,
- )
- return create_agency
- def _build_store_false_agency_factory(model: _TrackingResponsesModel):
- def create_agency(load_threads_callback=None, save_threads_callback=None):
- agent = Agent(
- name="TestAgent",
- instructions="Base instructions",
- model=model,
- model_settings=ModelSettings(store=False, temperature=0.0),
- )
- return Agency(
- agent,
- load_threads_callback=load_threads_callback,
- save_threads_callback=save_threads_callback,
- )
- return create_agency
- def _history_with_encrypted_reasoning() -> list[dict[str, Any]]:
- return [
- {
- "type": "reasoning",
- "id": "rs_reasoning_123",
- "summary": [{"type": "summary_text", "text": "looked up the answer", "id": "rs_summary_123"}],
- "content": [{"type": "reasoning_text", "text": "private", "id": "rs_content_123"}],
- "encrypted_content": "encrypted_reasoning",
- "previous_response_id": "resp_previous_123",
- "status": "completed",
- "agent": "TestAgent",
- "callerAgent": None,
- "timestamp": 1,
- },
- {
- "type": "message",
- "role": "assistant",
- "id": "msg_answer_123",
- "content": [{"type": "output_text", "text": "The answer is 42.", "annotations": [], "id": "msg_text_123"}],
- "conversation_id": "conv_previous_123",
- "status": "completed",
- "agent": "TestAgent",
- "callerAgent": None,
- "timestamp": 2,
- },
- {
- "type": "function_call",
- "id": "fc_lookup_123",
- "call_id": "call_lookup_123",
- "name": "lookup",
- "arguments": "{}",
- "status": "completed",
- "agent": "TestAgent",
- "callerAgent": None,
- "timestamp": 3,
- },
- {
- "type": "function_call_output",
- "id": "fc_output_123",
- "call_id": "call_lookup_123",
- "output": "42",
- "status": "completed",
- "agent": "TestAgent",
- "callerAgent": None,
- "timestamp": 4,
- },
- {"type": "item_reference", "id": "msg_answer_123", "agent": "TestAgent", "callerAgent": None, "timestamp": 5},
- ]
- def _history_with_unencrypted_reasoning() -> list[dict[str, Any]]:
- history = _history_with_encrypted_reasoning()
- reasoning = next(item for item in history if item.get("type") == "reasoning")
- reasoning.pop("encrypted_content")
- return history
- def _history_with_unencrypted_reasoning_before_tool_pair() -> list[dict[str, Any]]:
- return [
- {
- "type": "reasoning",
- "id": "rs_reasoning_123",
- "summary": [{"type": "summary_text", "text": "looked up the answer"}],
- "status": "completed",
- },
- {
- "type": "function_call",
- "id": "fc_lookup_123",
- "call_id": "call_lookup_123",
- "name": "lookup",
- "arguments": "{}",
- "status": "completed",
- },
- {
- "type": "function_call_output",
- "id": "fc_output_123",
- "call_id": "call_lookup_123",
- "output": "42",
- "status": "completed",
- },
- {"role": "user", "content": "again"},
- ]
- def _history_with_unencrypted_reasoning_before_current_user_message() -> list[dict[str, Any]]:
- return [
- {
- "type": "reasoning",
- "id": "rs_reasoning_123",
- "summary": [{"type": "summary_text", "text": "looked up the answer"}],
- "status": "completed",
- },
- {"role": "user", "content": "again"},
- ]
- def _history_with_unencrypted_reasoning_before_builtin_tool_call() -> list[dict[str, Any]]:
- return [
- {
- "type": "reasoning",
- "id": "rs_reasoning_123",
- "summary": [{"type": "summary_text", "text": "searched the web"}],
- "status": "completed",
- },
- {
- "type": "web_search_call",
- "id": "ws_lookup_123",
- "status": "completed",
- },
- {"role": "user", "content": "again"},
- ]
- def _history_with_unencrypted_reasoning_before_tool_search_pair() -> list[dict[str, Any]]:
- return [
- {
- "type": "reasoning",
- "id": "rs_reasoning_123",
- "summary": [{"type": "summary_text", "text": "searched local tools"}],
- "status": "completed",
- },
- {
- "type": "tool_search_call",
- "id": "ts_lookup_123",
- "call_id": "call_lookup_123",
- "arguments": {},
- "execution": "client",
- },
- {
- "type": "tool_search_output",
- "id": "ts_output_123",
- "call_id": "call_lookup_123",
- "tools": [],
- "execution": "client",
- },
- {"role": "user", "content": "again"},
- ]
- def _history_with_user_and_legacy_unencrypted_reasoning_turn() -> list[dict[str, Any]]:
- return [
- {"role": "user", "content": "what is 2+2?"},
- {
- "type": "reasoning",
- "id": "rs_reasoning_123",
- "summary": [{"type": "summary_text", "text": "calculated"}],
- "status": "completed",
- },
- {
- "type": "message",
- "role": "assistant",
- "id": "msg_answer_123",
- "content": [{"type": "output_text", "text": "4", "annotations": []}],
- "status": "completed",
- },
- {"role": "user", "content": "thanks"},
- ]
- def _assert_store_false_input_preserves_stateless_reasoning(model_input: str | list[TResponseInputItem]) -> None:
- assert isinstance(model_input, list)
- reasoning = next(item for item in model_input if isinstance(item, dict) and item.get("type") == "reasoning")
- assert reasoning["id"] == "rs_reasoning_123"
- assert reasoning["encrypted_content"] == "encrypted_reasoning"
- assert "previous_response_id" not in reasoning
- assert all("id" not in item for item in reasoning["summary"])
- assert all("id" not in item for item in reasoning["content"])
- assistant_message = next(item for item in model_input if isinstance(item, dict) and item.get("type") == "message")
- assert "conversation_id" not in assistant_message
- assert all("id" not in item for item in assistant_message["content"])
- function_call = next(item for item in model_input if isinstance(item, dict) and item.get("type") == "function_call")
- tool_output = next(
- item for item in model_input if isinstance(item, dict) and item.get("type") == "function_call_output"
- )
- assert function_call["call_id"] == "call_lookup_123"
- assert tool_output["call_id"] == "call_lookup_123"
- def _assert_unencrypted_reasoning_is_dropped(model_input: str | list[TResponseInputItem]) -> None:
- assert isinstance(model_input, list)
- assert all(not (isinstance(item, dict) and item.get("type") == "reasoning") for item in model_input)
- assert all(not (isinstance(item, dict) and item.get("id") == "msg_answer_123") for item in model_input)
- assert model_input == [{"role": "user", "content": "again", "type": "message"}]
- def _assert_store_false_requests_encrypted_reasoning(model_settings: ModelSettings) -> None:
- assert model_settings.store is False
- assert model_settings.response_include is not None
- assert REASONING_ENCRYPTED_CONTENT_INCLUDE in model_settings.response_include
- def _assert_history_input_has_no_response_ids(model_input: str | list[TResponseInputItem]) -> None:
- assert isinstance(model_input, list)
- leaked_response_ids = [item for item in model_input if isinstance(item, dict) and "response_id" in item]
- assert leaked_response_ids == []
- def _assert_messages_have_no_response_ids(messages: list[dict[str, Any]]) -> None:
- leaked_response_ids = [item for item in messages if "response_id" in item]
- assert leaked_response_ids == []
- @pytest.mark.asyncio
- async def test_response_endpoint_replays_returned_history_without_hidden_response_ids() -> None:
- model = _TrackingResponsesModel()
- handler = make_response_endpoint(BaseRequest, _build_agency_factory(model), lambda: None)
- first = await handler(BaseRequest(message="hi"), token=None)
- history = copy.deepcopy(first["new_messages"])
- _assert_messages_have_no_response_ids(history)
- await handler(BaseRequest(message="again", chat_history=history), token=None)
- assert model.seen_previous_response_ids == [None, None]
- _assert_history_input_has_no_response_ids(model.seen_inputs[1])
- @pytest.mark.asyncio
- async def test_stream_endpoint_replays_returned_history_without_hidden_response_ids() -> None:
- model = _TrackingResponsesModel()
- handler = make_stream_endpoint(BaseRequest, _build_agency_factory(model), lambda: None, ActiveRunRegistry())
- http_request = _StubRequest()
- first_response = await handler(http_request=http_request, request=BaseRequest(message="hi"), token=None)
- first_chunks = [chunk async for chunk in first_response.body_iterator]
- first_payload = _parse_sse_messages_payload(first_chunks)
- history = copy.deepcopy(first_payload["new_messages"])
- _assert_messages_have_no_response_ids(history)
- second_response = await handler(
- http_request=http_request,
- request=BaseRequest(message="again", chat_history=history),
- token=None,
- )
- _second_chunks = [chunk async for chunk in second_response.body_iterator]
- assert model.seen_previous_response_ids == [None, None]
- _assert_history_input_has_no_response_ids(model.seen_inputs[1])
- @pytest.mark.asyncio
- async def test_response_endpoint_store_false_requests_and_preserves_encrypted_reasoning() -> None:
- model = _TrackingResponsesModel()
- handler = make_response_endpoint(BaseRequest, _build_store_false_agency_factory(model), lambda: None)
- await handler(BaseRequest(message="again", chat_history=_history_with_encrypted_reasoning()), token=None)
- _assert_store_false_requests_encrypted_reasoning(model.seen_model_settings[0])
- _assert_store_false_input_preserves_stateless_reasoning(model.seen_inputs[0])
- @pytest.mark.asyncio
- async def test_stream_endpoint_store_false_drops_only_unencrypted_reasoning() -> None:
- model = _TrackingResponsesModel()
- handler = make_stream_endpoint(
- BaseRequest,
- _build_store_false_agency_factory(model),
- lambda: None,
- ActiveRunRegistry(),
- )
- response = await handler(
- http_request=_StubRequest(),
- request=BaseRequest(message="again", chat_history=_history_with_unencrypted_reasoning()),
- token=None,
- )
- _chunks = [chunk async for chunk in response.body_iterator]
- _assert_store_false_requests_encrypted_reasoning(model.seen_model_settings[0])
- _assert_unencrypted_reasoning_is_dropped(model.seen_inputs[0])
- @pytest.mark.asyncio
- async def test_stream_endpoint_store_false_drops_legacy_reasoning_span_and_keeps_current_user() -> None:
- model = _TrackingResponsesModel()
- handler = make_stream_endpoint(
- BaseRequest,
- _build_store_false_agency_factory(model),
- lambda: None,
- ActiveRunRegistry(),
- )
- legacy_history = _history_with_unencrypted_reasoning_before_tool_pair()[:-1]
- response = await handler(
- http_request=_StubRequest(),
- request=BaseRequest(message="again", chat_history=legacy_history),
- token=None,
- )
- _chunks = [chunk async for chunk in response.body_iterator]
- _assert_store_false_requests_encrypted_reasoning(model.seen_model_settings[0])
- assert model.seen_inputs[0] == [{"role": "user", "content": "again", "type": "message"}]
- def test_store_false_sanitizer_drops_dependent_followers_after_unencrypted_reasoning() -> None:
- sanitized = sanitize_store_false_responses_input(_history_with_unencrypted_reasoning_before_tool_pair())
- assert sanitized == [{"role": "user", "content": "again"}]
- def test_store_false_sanitizer_preserves_current_user_after_unencrypted_reasoning() -> None:
- sanitized = sanitize_store_false_responses_input(_history_with_unencrypted_reasoning_before_current_user_message())
- assert sanitized == [{"role": "user", "content": "again"}]
- def test_store_false_sanitizer_drops_builtin_tool_follower_after_unencrypted_reasoning() -> None:
- sanitized = sanitize_store_false_responses_input(_history_with_unencrypted_reasoning_before_builtin_tool_call())
- assert sanitized == [{"role": "user", "content": "again"}]
- def test_store_false_sanitizer_drops_tool_search_pair_after_unencrypted_reasoning() -> None:
- sanitized = sanitize_store_false_responses_input(_history_with_unencrypted_reasoning_before_tool_search_pair())
- assert sanitized == [{"role": "user", "content": "again"}]
- def test_store_false_sanitizer_drops_full_legacy_reasoning_turn() -> None:
- sanitized = sanitize_store_false_responses_input(_history_with_user_and_legacy_unencrypted_reasoning_turn())
- assert sanitized == [{"role": "user", "content": "thanks"}]
- def test_store_false_sanitizer_drops_late_reference_to_removed_reasoning() -> None:
- sanitized = sanitize_store_false_responses_input(
- [
- {
- "type": "reasoning",
- "id": "rs_reasoning_123",
- "summary": [{"type": "summary_text", "text": "legacy"}],
- "status": "completed",
- },
- {"role": "user", "content": "again"},
- {"type": "item_reference", "id": "rs_reasoning_123"},
- ]
- )
- assert sanitized == [{"role": "user", "content": "again"}]
- def test_store_false_sanitizer_skips_non_messages_and_nested_unencrypted_reasoning() -> None:
- sanitized = sanitize_store_false_responses_input(
- [
- {
- "type": "reasoning",
- "id": "rs_reasoning_123",
- "summary": [{"type": "summary_text", "text": "legacy"}],
- "status": "completed",
- },
- "ignored legacy output",
- {
- "role": "user",
- "content": [
- {"type": "reasoning", "summary": [{"type": "summary_text", "text": "nested"}]},
- {"type": "input_text", "text": "again"},
- ],
- },
- ]
- )
- assert sanitized == [{"role": "user", "content": [{"type": "input_text", "text": "again"}]}]
- def test_store_false_sanitizer_drops_prior_provider_outputs_before_legacy_reasoning() -> None:
- sanitized = sanitize_store_false_responses_input(
- [
- {
- "type": "message",
- "role": "assistant",
- "content": [{"type": "output_text", "text": "stale"}],
- },
- {
- "type": "reasoning",
- "id": "rs_reasoning_123",
- "summary": [{"type": "summary_text", "text": "legacy"}],
- "status": "completed",
- },
- {"role": "user", "content": "again"},
- ]
- )
- assert sanitized == [{"role": "user", "content": "again"}]
- def test_store_false_sanitizer_keeps_prior_encrypted_reasoning_boundary() -> None:
- encrypted_reasoning = _history_with_encrypted_reasoning()[0]
- sanitized = sanitize_store_false_responses_input(
- [
- encrypted_reasoning,
- {
- "type": "reasoning",
- "id": "rs_legacy_456",
- "summary": [{"type": "summary_text", "text": "legacy"}],
- "status": "completed",
- },
- {"role": "user", "content": "again"},
- ]
- )
- assert sanitized == [
- {
- "type": "reasoning",
- "id": "rs_reasoning_123",
- "summary": [{"type": "summary_text", "text": "looked up the answer"}],
- "content": [{"type": "reasoning_text", "text": "private"}],
- "encrypted_content": "encrypted_reasoning",
- "status": "completed",
- "agent": "TestAgent",
- "timestamp": 1,
- },
- {"role": "user", "content": "again"},
- ]
- def test_live_openai_store_false_replays_encrypted_reasoning() -> None:
- """Live OpenAI proof for stateless Responses reasoning replay."""
- client = OpenAI()
- first = client.responses.create(
- model="gpt-5.4-nano",
- input="Compute 37*41. Return only the number.",
- store=False,
- include=[REASONING_ENCRYPTED_CONTENT_INCLUDE],
- reasoning={"effort": "high"},
- max_output_tokens=64,
- )
- first_items = [item.model_dump(exclude_none=True) for item in first.output]
- reasoning_items = [item for item in first_items if item.get("type") == "reasoning"]
- output_types = [item.get("type") for item in first_items]
- reasoning_tokens = first.usage.output_tokens_details.reasoning_tokens if first.usage else None
- assert first.output_text.strip() == "1517"
- assert reasoning_items, f"Expected encrypted reasoning output item; got {output_types=} {reasoning_tokens=}"
- assert all(item.get("encrypted_content") for item in reasoning_items)
- replay_input = sanitize_store_false_responses_input(
- [
- *first_items,
- {
- "role": "user",
- "content": "What exact number did you just return? Return only that same number.",
- },
- ]
- )
- second = client.responses.create(
- model="gpt-5.4-nano",
- input=replay_input,
- store=False,
- include=[REASONING_ENCRYPTED_CONTENT_INCLUDE],
- reasoning={"effort": "high"},
- max_output_tokens=64,
- )
- assert second.output_text.strip() == "1517"
- @pytest.mark.asyncio
- async def test_codex_chat_name_store_false_uses_encrypted_reasoning_include() -> None:
- captured_inputs: list[list[TResponseInputItem]] = []
- captured_includes: list[list[str]] = []
- class _TitleStream:
- def __aiter__(self):
- return self
- async def __anext__(self):
- raise StopAsyncIteration
- class _Responses:
- async def create(self, **kwargs: Any) -> _TitleStream:
- captured_inputs.append(copy.deepcopy(kwargs["input"]))
- captured_includes.append(copy.deepcopy(kwargs["include"]))
- return _TitleStream()
- class _Client:
- base_url = "https://chatgpt.com/backend-api/codex"
- responses = _Responses()
- with pytest.raises(ValueError, match="Generated chat name"):
- await generate_chat_name(_history_with_encrypted_reasoning(), openai_client=_Client()) # type: ignore[arg-type]
- assert captured_inputs
- assert captured_includes
- assert all(include == [REASONING_ENCRYPTED_CONTENT_INCLUDE] for include in captured_includes)
- _assert_store_false_input_preserves_stateless_reasoning(captured_inputs[0])
- @pytest.mark.asyncio
- async def test_agency_get_response_persists_history_without_hidden_response_ids() -> None:
- model = _TrackingResponsesModel()
- persisted_history: list[dict[str, Any]] = []
- agency = _agency_factory_with_store(model, persisted_history)
- await agency.get_response(message="hi")
- _assert_messages_have_no_response_ids(persisted_history)
- await agency.get_response(message="again")
- assert model.seen_previous_response_ids == [None, None]
- _assert_history_input_has_no_response_ids(model.seen_inputs[1])
- @pytest.mark.asyncio
- async def test_agency_stream_persists_history_without_hidden_response_ids() -> None:
- model = _TrackingResponsesModel()
- persisted_history: list[dict[str, Any]] = []
- agency = _agency_factory_with_store(model, persisted_history)
- first_stream = agency.get_response_stream(message="hi")
- _first_events = [event async for event in first_stream]
- _assert_messages_have_no_response_ids(persisted_history)
- second_stream = agency.get_response_stream(message="again")
- _second_events = [event async for event in second_stream]
- assert model.seen_previous_response_ids == [None, None]
- _assert_history_input_has_no_response_ids(model.seen_inputs[1])
|