| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486 |
- import dataclasses
- import json
- from types import SimpleNamespace
- from unittest.mock import MagicMock
- import pytest
- pytest.importorskip("ag_ui")
- from ag_ui.core import (
- AssistantMessage,
- CustomEvent,
- DeveloperMessage,
- EventType,
- FunctionCall,
- MessagesSnapshotEvent,
- RawEvent,
- TextMessageContentEvent,
- TextMessageEndEvent,
- TextMessageStartEvent,
- ToolCall,
- ToolCallArgsEvent,
- ToolCallEndEvent,
- ToolMessage,
- UserMessage,
- )
- from openai.types.responses import (
- ResponseCodeInterpreterToolCall,
- ResponseFileSearchToolCall,
- ResponseFunctionToolCall,
- ResponseOutputMessage,
- ResponseOutputText,
- )
- from openai.types.responses.response_file_search_tool_call import Result as FileSearchResult
- from openai.types.responses.response_function_call_arguments_delta_event import ResponseFunctionCallArgumentsDeltaEvent
- from openai.types.responses.response_output_item_added_event import ResponseOutputItemAddedEvent
- from openai.types.responses.response_output_item_done_event import ResponseOutputItemDoneEvent
- from openai.types.responses.response_output_text import AnnotationFileCitation
- from openai.types.responses.response_text_delta_event import Logprob, ResponseTextDeltaEvent
- from agency_swarm.ui.core.agui_adapter import AguiAdapter
- def make_raw_event(data):
- return SimpleNamespace(type="raw_response_event", data=data)
- def make_stream_event(name, item):
- return SimpleNamespace(type="run_item_stream_event", name=name, item=item)
- def test_agui_messages_to_chat_history_converts_roles_and_tool_calls():
- tool_call = ToolCall(id="call-1", type="function", function=FunctionCall(name="Weather", arguments="{}"))
- assistant_msg = AssistantMessage(id="a1", role="assistant", content="Hi", tool_calls=[tool_call])
- user_msg = UserMessage(id="u1", role="user", content="Hello")
- tool_msg = ToolMessage(id="t1", role="tool", content="Done", tool_call_id="call-1")
- dev_msg = DeveloperMessage(id="d1", role="developer", content="Dev note")
- history = AguiAdapter().agui_messages_to_chat_history([user_msg, assistant_msg, tool_msg, dev_msg])
- assert history[0] == {"role": "user", "content": "Hello"}
- assert history[1]["type"] == "function_call"
- assert history[1]["name"] == "Weather"
- assert history[2] == {"call_id": "call-1", "output": "Done", "type": "function_call_output"}
- assert history[3] == {"role": "system", "content": "Dev note"}
- def test_agui_messages_to_chat_history_handles_file_search_call():
- tool_call = ToolCall(
- id="call-99",
- type="function",
- function=FunctionCall(name="FileSearchTool", arguments='{"queries": ["foo"], "results": ["bar"]}'),
- )
- assistant_msg = AssistantMessage(id="a2", role="assistant", content="", tool_calls=[tool_call])
- history = AguiAdapter().agui_messages_to_chat_history([assistant_msg])
- assert history[0]["type"] == "file_search_call"
- assert history[0]["queries"] == ["foo"]
- assert history[0]["results"] == ["bar"]
- def test_agui_messages_to_chat_history_handles_code_interpreter_call():
- tool_call = ToolCall(
- id="ci-1",
- type="function",
- function=FunctionCall(
- name="CodeInterpreterTool",
- arguments='{"code": "print(1)", "container_id": "cid", "outputs": ["1"]}',
- ),
- )
- assistant_msg = AssistantMessage(id="a3", role="assistant", content="", tool_calls=[tool_call])
- history = AguiAdapter().agui_messages_to_chat_history([assistant_msg])
- assert history[0]["type"] == "code_interpreter_call"
- assert history[0]["code"] == "print(1)"
- assert history[0]["outputs"] == ["1"]
- def test_agui_messages_to_chat_history_handles_plain_assistant_message():
- assistant_msg = AssistantMessage(id="a4", role="assistant", content="Result ready", tool_calls=[])
- history = AguiAdapter().agui_messages_to_chat_history([assistant_msg])
- assert history == [{"role": "assistant", "content": "Result ready"}]
- def test_openai_events_emit_message_lifecycle():
- adapter = AguiAdapter()
- run_id = "run-1"
- message = ResponseOutputMessage(
- id="m-1",
- content=[ResponseOutputText(annotations=[], text="Hi", type="output_text")],
- role="assistant",
- status="completed",
- type="message",
- )
- start_event = make_raw_event(
- ResponseOutputItemAddedEvent(
- item=message,
- output_index=0,
- sequence_number=1,
- type="response.output_item.added",
- )
- )
- delta_event = make_raw_event(
- ResponseTextDeltaEvent(
- content_index=0,
- delta="Hi",
- item_id="m-1",
- logprobs=[],
- output_index=0,
- sequence_number=2,
- type="response.output_text.delta",
- )
- )
- done_event = make_raw_event(
- ResponseOutputItemDoneEvent(
- item=message,
- output_index=0,
- sequence_number=3,
- type="response.output_item.done",
- )
- )
- start = adapter.openai_to_agui_events(start_event, run_id=run_id)
- delta = adapter.openai_to_agui_events(delta_event, run_id=run_id)
- done = adapter.openai_to_agui_events(done_event, run_id=run_id)
- assert isinstance(start, TextMessageStartEvent)
- assert isinstance(delta, TextMessageContentEvent)
- assert isinstance(done, TextMessageEndEvent)
- assert delta.message_id == "m-1"
- def test_openai_events_track_tool_calls_and_arguments():
- adapter = AguiAdapter()
- run_id = "run-2"
- raw_tool = ResponseFunctionToolCall(
- arguments="{}",
- call_id="call-1",
- name="search",
- type="function_call",
- id="item-1",
- status="in_progress",
- )
- adapter.openai_to_agui_events(
- make_raw_event(
- ResponseOutputItemAddedEvent(
- item=raw_tool,
- output_index=0,
- sequence_number=1,
- type="response.output_item.added",
- )
- ),
- run_id=run_id,
- )
- args_event = adapter.openai_to_agui_events(
- make_raw_event(
- ResponseFunctionCallArgumentsDeltaEvent(
- item_id="item-1",
- delta='{"q": "',
- output_index=0,
- sequence_number=2,
- type="response.function_call_arguments.delta",
- )
- ),
- run_id=run_id,
- )
- done_events = adapter.openai_to_agui_events(
- make_raw_event(
- ResponseOutputItemDoneEvent(
- type="response.output_item.done",
- item=ResponseFunctionToolCall(
- arguments='{"q": "weather"}',
- call_id="call-1",
- name="search",
- type="function_call",
- id="item-1",
- status="completed",
- ),
- output_index=0,
- sequence_number=3,
- )
- ),
- run_id=run_id,
- )
- assert isinstance(args_event, ToolCallArgsEvent)
- assert args_event.tool_call_id == "call-1"
- assert isinstance(done_events, list)
- assert isinstance(done_events[0], ToolCallEndEvent)
- assert isinstance(done_events[1], MessagesSnapshotEvent)
- def test_openai_typed_events_emit_message_lifecycle():
- adapter = AguiAdapter()
- run_id = "typed-run"
- message = ResponseOutputMessage(
- id="msg-typed",
- content=[ResponseOutputText(annotations=[], text="Hello world", type="output_text")],
- role="assistant",
- status="completed",
- type="message",
- )
- start_event = ResponseOutputItemAddedEvent(
- item=message,
- output_index=0,
- sequence_number=1,
- type="response.output_item.added",
- )
- delta_event = ResponseTextDeltaEvent(
- content_index=0,
- delta="!",
- item_id="msg-typed",
- logprobs=[Logprob(token="!", logprob=0.0, top_logprobs=[])],
- output_index=0,
- sequence_number=2,
- type="response.output_text.delta",
- )
- done_event = ResponseOutputItemDoneEvent(
- item=message,
- output_index=0,
- sequence_number=3,
- type="response.output_item.done",
- )
- start = adapter.openai_to_agui_events(make_raw_event(start_event), run_id=run_id)
- delta = adapter.openai_to_agui_events(make_raw_event(delta_event), run_id=run_id)
- done = adapter.openai_to_agui_events(make_raw_event(done_event), run_id=run_id)
- assert isinstance(start, TextMessageStartEvent)
- assert isinstance(delta, TextMessageContentEvent)
- assert isinstance(done, TextMessageEndEvent)
- assert delta.message_id == "msg-typed"
- def test_openai_events_handles_exceptions_with_run_error():
- adapter = AguiAdapter()
- event = MagicMock()
- event.type = "raw_response_event"
- type(event).data = property(lambda self: (_ for _ in ()).throw(RuntimeError("boom")))
- result = adapter.openai_to_agui_events(event, run_id="oops")
- assert result.type == EventType.RUN_ERROR
- assert "boom" in result.message
- def test_openai_events_ignore_message_without_id():
- adapter = AguiAdapter()
- event = make_raw_event(
- SimpleNamespace(
- type="response.output_item.added",
- item=SimpleNamespace(type="message", role="assistant", id=None),
- )
- )
- result = adapter.openai_to_agui_events(event, run_id="missing-message")
- assert isinstance(result, RawEvent)
- assert result.type == EventType.RAW
- assert result.event["data"]["type"] == "response.output_item.added"
- def test_openai_events_ignore_tool_call_without_call_id():
- adapter = AguiAdapter()
- run_id = "missing-tool"
- tool = SimpleNamespace(type="function_call", id="item-99", call_id=None, name="search", arguments="{}")
- adapter.openai_to_agui_events(
- make_raw_event(SimpleNamespace(type="response.output_item.added", item=tool)),
- run_id=run_id,
- )
- args_event = adapter.openai_to_agui_events(
- make_raw_event(SimpleNamespace(type="response.function_call_arguments.delta", item_id="item-99", delta="{}")),
- run_id=run_id,
- )
- assert isinstance(args_event, RawEvent)
- assert args_event.type == EventType.RAW
- assert args_event.event["data"]["type"] == "response.function_call_arguments.delta"
- def test_openai_events_ignore_text_delta_without_item_id():
- adapter = AguiAdapter()
- event = make_raw_event(SimpleNamespace(type="response.output_text.delta", item_id=None, delta="Hi"))
- result = adapter.openai_to_agui_events(event, run_id="missing-delta-id")
- assert isinstance(result, RawEvent)
- assert result.type == EventType.RAW
- assert result.event["data"]["type"] == "response.output_text.delta"
- def test_openai_events_ignore_tool_done_without_call_id():
- adapter = AguiAdapter()
- raw_item = SimpleNamespace(type="function_call", id="item-9", call_id=None, name="search", arguments="{}")
- event = make_raw_event(SimpleNamespace(type="response.output_item.done", item=raw_item))
- result = adapter.openai_to_agui_events(event, run_id="tool-done-missing")
- assert isinstance(result, RawEvent)
- assert result.type == EventType.RAW
- assert result.event["data"]["type"] == "response.output_item.done"
- def test_run_item_stream_events_emit_snapshots():
- adapter = AguiAdapter()
- run_id = "run-3"
- output_content = ResponseOutputText(annotations=[], text="Answer", type="output_text")
- raw_item = ResponseOutputMessage(
- id="msg-1",
- content=[output_content],
- role="assistant",
- status="completed",
- type="message",
- )
- item = SimpleNamespace(raw_item=raw_item)
- events = adapter.openai_to_agui_events(make_stream_event("message_output_created", item), run_id=run_id)
- assert isinstance(events, list)
- assert all(isinstance(e, MessagesSnapshotEvent | CustomEvent) for e in events)
- assert any(isinstance(e, MessagesSnapshotEvent) for e in events)
- def test_run_item_stream_with_annotations_returns_custom_event():
- adapter = AguiAdapter()
- run_id = "annotated"
- annotation = AnnotationFileCitation(file_id="file-annot", filename="doc.pdf", index=1, type="file_citation")
- output_content = ResponseOutputText(annotations=[annotation], text="Answer", type="output_text")
- raw_item = ResponseOutputMessage(
- id="msg-annot",
- content=[output_content],
- role="assistant",
- status="completed",
- type="message",
- )
- item = SimpleNamespace(raw_item=raw_item)
- events = adapter.openai_to_agui_events(make_stream_event("message_output_created", item), run_id=run_id)
- assert isinstance(events, list)
- assert any(isinstance(e, CustomEvent) for e in events)
- custom = next(e for e in events if isinstance(e, CustomEvent))
- assert custom.value["annotations"] == [annotation.model_dump()]
- def test_run_item_stream_ignores_message_without_text():
- adapter = AguiAdapter()
- run_id = "missing-text"
- output_content = SimpleNamespace(text=None, annotations=None)
- raw_item = SimpleNamespace(id="msg-empty", content=[output_content])
- item = SimpleNamespace(raw_item=raw_item)
- result = adapter.openai_to_agui_events(make_stream_event("message_output_created", item), run_id=run_id)
- assert isinstance(result, RawEvent)
- assert result.type == EventType.RAW
- assert result.event["name"] == "message_output_created"
- def test_tool_output_stream_event_converts_to_tool_message():
- adapter = AguiAdapter()
- run_id = "run-4"
- item = SimpleNamespace(raw_item={"call_id": "call-7"}, call_id="call-7", output="done")
- event = adapter.openai_to_agui_events(make_stream_event("tool_output", item), run_id=run_id)
- assert isinstance(event, MessagesSnapshotEvent)
- message = event.messages[0]
- assert isinstance(message, ToolMessage)
- assert message.tool_call_id == "call-7"
- assert message.content == "done"
- def test_tool_output_without_call_id_is_ignored():
- adapter = AguiAdapter()
- item = SimpleNamespace(raw_item={}, call_id=None, output="done")
- result = adapter.openai_to_agui_events(make_stream_event("tool_output", item), run_id="tool-missing")
- assert isinstance(result, RawEvent)
- assert result.type == EventType.RAW
- assert result.event["name"] == "tool_output"
- def test_run_item_stream_unknown_event_is_returned_as_raw_event():
- adapter = AguiAdapter()
- run_id = "unknown-stream"
- unknown_event = make_stream_event("unhandled_event", None)
- result = adapter.openai_to_agui_events(unknown_event, run_id=run_id)
- assert isinstance(result, RawEvent)
- assert result.type == EventType.RAW
- assert result.event["name"] == "unhandled_event"
- assert result.event["type"] == "run_item_stream_event"
- def test_tool_meta_handles_non_function_tools():
- adapter = AguiAdapter()
- typed_file_search = ResponseFileSearchToolCall(
- id="file-1",
- queries=["foo"],
- status="completed",
- type="file_search_call",
- results=[FileSearchResult(file_id="doc", text="bar")],
- )
- typed_code_interpreter = ResponseCodeInterpreterToolCall(
- code="print(42)",
- container_id="cont",
- id="ci-7",
- outputs=[{"type": "logs", "logs": "42"}],
- type="code_interpreter_call",
- status="completed",
- )
- @dataclasses.dataclass
- class LegacyFileSearchCall:
- type: str
- id: str
- queries: list[str]
- results: list[dict]
- @dataclasses.dataclass
- class LegacyCodeInterpreterCall:
- type: str
- id: str
- code: str
- container_id: str
- outputs: list[dict]
- file_search = LegacyFileSearchCall(
- type="file_search_call",
- id=typed_file_search.id,
- queries=typed_file_search.queries or [],
- results=json.loads(typed_file_search.model_dump_json())["results"],
- )
- code_interpreter = LegacyCodeInterpreterCall(
- type="code_interpreter_call",
- id=typed_code_interpreter.id,
- code=typed_code_interpreter.code or "",
- container_id=typed_code_interpreter.container_id,
- outputs=[{"type": "logs", "logs": "42"}],
- )
- file_meta = adapter._tool_meta(file_search)
- code_meta = adapter._tool_meta(code_interpreter)
- assert file_meta[0] == "file-1"
- assert file_meta[1] == "FileSearchTool"
- assert json.loads(file_meta[2])["queries"] == ["foo"]
- assert code_meta[0] == "ci-7"
- assert code_meta[1] == "CodeInterpreterTool"
- assert json.loads(code_meta[2])["code"] == "print(42)"
|