test_console_event_adapter.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. from types import SimpleNamespace
  2. from unittest.mock import MagicMock, patch
  3. from agency_swarm.ui.core.console_event_adapter import ConsoleEventAdapter
  4. def raw_event(data_type: str, **kwargs):
  5. return SimpleNamespace(type="raw_response_event", data=SimpleNamespace(type=data_type, **kwargs))
  6. def test_reasoning_disabled_emits_single_header():
  7. adapter = ConsoleEventAdapter(show_reasoning=False)
  8. with (
  9. patch.object(adapter.console, "print") as mock_print,
  10. patch("agency_swarm.ui.core.console_event_adapter.Live") as live,
  11. ):
  12. event = raw_event("response.reasoning_summary_text.delta", delta="thinking")
  13. adapter.openai_to_message_output(event, recipient_agent="AgentX")
  14. outputs = [str(call.args[0]) for call in mock_print.call_args_list]
  15. assert any("🧠 AgentX Reasoning" in text for text in outputs)
  16. live.assert_not_called()
  17. def test_reasoning_stream_followed_by_output_inserts_blank_line():
  18. adapter = ConsoleEventAdapter(show_reasoning=True)
  19. with (
  20. patch("agency_swarm.ui.core.console_event_adapter.Live") as live_cls,
  21. patch("agency_swarm.ui.core.console_event_adapter.Markdown") as markdown,
  22. patch.object(adapter.console, "print") as mock_print,
  23. patch.object(adapter.console, "rule"),
  24. ):
  25. live = MagicMock()
  26. live_cls.return_value = live
  27. markdown.side_effect = lambda *a, **k: MagicMock()
  28. adapter.openai_to_message_output(raw_event("response.reasoning_summary_text.delta", delta="First"), "Agent")
  29. adapter.openai_to_message_output(raw_event("response.reasoning_summary_part.done"), "Agent")
  30. adapter.openai_to_message_output(raw_event("response.output_text.delta", delta="Reply"), "Agent")
  31. printed_lines = [call.args[0] for call in mock_print.call_args_list]
  32. assert "" in printed_lines
  33. def test_send_message_events_update_console_and_registry():
  34. adapter = ConsoleEventAdapter(show_reasoning=True)
  35. send_item = SimpleNamespace(
  36. type="function_call",
  37. name="send_message",
  38. arguments='{"recipient_agent": "Coach", "message": "Hi"}',
  39. call_id="call-22",
  40. )
  41. event = raw_event("response.output_item.done", item=send_item)
  42. with patch.object(adapter.console, "print") as mock_print, patch.object(adapter.console, "rule"):
  43. adapter.openai_to_message_output(event, recipient_agent="AgentA")
  44. assert adapter.agent_to_agent_communication["call-22"]["receiver"] == "Coach"
  45. printed_text = "\n".join(str(call.args[0]) for call in mock_print.call_args_list)
  46. assert "AgentA" in printed_text and "Coach" in printed_text and "Hi" in printed_text
  47. def test_output_text_delta_updates_live_and_done_closes_region():
  48. adapter = ConsoleEventAdapter(show_reasoning=True)
  49. delta = raw_event("response.output_text.delta", delta="Hello")
  50. done = raw_event("response.output_text.done")
  51. with (
  52. patch("agency_swarm.ui.core.console_event_adapter.Live") as live_cls,
  53. patch("agency_swarm.ui.core.console_event_adapter.Markdown") as markdown,
  54. patch.object(adapter.console, "print"),
  55. patch.object(adapter.console, "rule"),
  56. ):
  57. live = MagicMock()
  58. live_cls.return_value = live
  59. markdown.side_effect = lambda text, **_: text
  60. adapter.openai_to_message_output(delta, recipient_agent="AgentA")
  61. adapter.openai_to_message_output(done, recipient_agent="AgentA")
  62. live.update.assert_called()
  63. assert live.__exit__.called
  64. def test_cleanup_live_display_resets_state():
  65. adapter = ConsoleEventAdapter(show_reasoning=True)
  66. adapter.message_output = MagicMock()
  67. adapter.message_output.__exit__ = MagicMock()
  68. adapter.reasoning_output = MagicMock()
  69. adapter.reasoning_output.__exit__ = MagicMock(side_effect=Exception("boom"))
  70. adapter.response_buffer = "something"
  71. adapter.reasoning_buffer = "more"
  72. adapter._final_rendered = True
  73. adapter._reasoning_final_rendered = True
  74. adapter._cleanup_live_display()
  75. assert adapter.message_output is None
  76. assert adapter.reasoning_output is None
  77. assert adapter.response_buffer == ""
  78. assert adapter.reasoning_buffer == ""
  79. assert adapter._final_rendered is False
  80. assert adapter._reasoning_final_rendered is False
  81. def test_update_console_formats_headers():
  82. adapter = ConsoleEventAdapter()
  83. cases = [
  84. ("function", "Builder", "User", "🛠️ Executing Function"), # Two spaces after emoji
  85. ("function_output", "Builder", "User", "⚙️ Function Output"),
  86. ("text", "AgentA", "AgentB", "AgentA → 🤖 AgentB"),
  87. ("text", "user", "AgentB", "👤 user → 🤖 AgentB"),
  88. ]
  89. for msg_type, sender, receiver, snippet in cases:
  90. with patch.object(adapter.console, "print") as mock_print, patch.object(adapter.console, "rule") as mock_rule:
  91. adapter._update_console(msg_type, sender, receiver, "Body")
  92. printed = mock_print.call_args[0][0]
  93. assert snippet in printed
  94. mock_rule.assert_called_once()
  95. def test_update_console_separator_and_event_dispatch_behavior():
  96. """Console update should honor separator flags and route/ignore events correctly."""
  97. adapter = ConsoleEventAdapter()
  98. with patch.object(adapter.console, "print"), patch.object(adapter.console, "rule") as mock_rule:
  99. adapter._update_console("text", "Agent", "user", "Body", add_separator=False)
  100. mock_rule.assert_not_called()
  101. with patch.object(adapter.console, "print") as mock_print, patch.object(adapter.console, "rule") as mock_rule:
  102. adapter.openai_to_message_output(SimpleNamespace(type="unexpected"), recipient_agent="Agent")
  103. mock_print.assert_not_called()
  104. mock_rule.assert_not_called()
  105. with patch.object(adapter, "_update_console") as mock_update:
  106. adapter.openai_to_message_output(
  107. raw_event("response.output_text.delta", delta="Hello"), recipient_agent="Agent"
  108. )
  109. mock_update.assert_not_called()
  110. with patch.object(ConsoleEventAdapter, "_handle_raw_response_event") as mock_handler:
  111. event = raw_event("response.output_text.delta", delta="Hi")
  112. adapter.openai_to_message_output(event, recipient_agent="Agent")
  113. mock_handler.assert_called_once_with(event.data, "Agent", "user")
  114. # --- Tests for apply_patch_call formatting ---
  115. def test_apply_patch_call_displays_header_and_path():
  116. """Test that apply_patch_call events display the tool header and file path."""
  117. adapter = ConsoleEventAdapter()
  118. operation = SimpleNamespace(type="update_file", path="src/main.py", diff="+new line")
  119. item = SimpleNamespace(type="apply_patch_call", operation=operation)
  120. event = raw_event("response.output_item.done", item=item)
  121. with patch.object(adapter.console, "print") as mock_print, patch.object(adapter.console, "rule"):
  122. adapter.openai_to_message_output(event, recipient_agent="Agent")
  123. printed_texts = [str(call.args[0]) for call in mock_print.call_args_list]
  124. assert any("Apply Patch" in text for text in printed_texts)
  125. assert any("src/main.py" in str(text) for text in printed_texts)
  126. def test_apply_patch_call_shows_correct_operation_label():
  127. """apply_patch_call should map operation types to their display labels."""
  128. adapter = ConsoleEventAdapter()
  129. cases = [
  130. ("create_file", "Creating"),
  131. ("update_file", "Updating"),
  132. ("delete_file", "Deleting"),
  133. ]
  134. for op_type, expected_label in cases:
  135. operation = SimpleNamespace(
  136. type=op_type, path="file.txt", diff="+content" if op_type != "delete_file" else None
  137. )
  138. item = SimpleNamespace(type="apply_patch_call", operation=operation)
  139. event = raw_event("response.output_item.done", item=item)
  140. with patch.object(adapter.console, "print") as mock_print, patch.object(adapter.console, "rule"):
  141. adapter.openai_to_message_output(event, recipient_agent="Agent")
  142. printed_texts = [str(call.args[0]) for call in mock_print.call_args_list]
  143. assert any(expected_label in text for text in printed_texts)
  144. def test_apply_patch_call_renders_diff_and_non_diff_paths() -> None:
  145. """apply_patch_call should render diff panels only when operation+diff content exists."""
  146. adapter = ConsoleEventAdapter()
  147. with (
  148. patch.object(adapter.console, "print"),
  149. patch.object(adapter.console, "rule"),
  150. patch("agency_swarm.ui.core.console_event_adapter.Panel") as mock_panel,
  151. patch("agency_swarm.ui.core.console_event_adapter.Syntax") as mock_syntax,
  152. ):
  153. update_event = raw_event(
  154. "response.output_item.done",
  155. item=SimpleNamespace(
  156. type="apply_patch_call",
  157. operation=SimpleNamespace(type="update_file", path="file.py", diff="+new line\n-old line"),
  158. ),
  159. )
  160. adapter.openai_to_message_output(update_event, recipient_agent="Agent")
  161. mock_syntax.assert_called_once()
  162. mock_panel.assert_called_once()
  163. with (
  164. patch.object(adapter.console, "print"),
  165. patch.object(adapter.console, "rule") as mock_rule,
  166. patch("agency_swarm.ui.core.console_event_adapter.Panel") as mock_panel,
  167. ):
  168. delete_event = raw_event(
  169. "response.output_item.done",
  170. item=SimpleNamespace(
  171. type="apply_patch_call", operation=SimpleNamespace(type="delete_file", path="file.txt")
  172. ),
  173. )
  174. adapter.openai_to_message_output(delete_event, recipient_agent="Agent")
  175. mock_panel.assert_not_called()
  176. mock_rule.assert_called_once()
  177. with (
  178. patch.object(adapter.console, "print") as mock_print,
  179. patch.object(adapter.console, "rule") as mock_rule,
  180. ):
  181. no_op_event = raw_event(
  182. "response.output_item.done", item=SimpleNamespace(type="apply_patch_call", operation=None)
  183. )
  184. adapter.openai_to_message_output(no_op_event, recipient_agent="Agent")
  185. mock_print.assert_not_called()
  186. mock_rule.assert_not_called()
  187. # --- Tests for shell_call formatting ---
  188. def test_shell_call_rendering_variants() -> None:
  189. """Shell and local shell events should render commands, while empty calls stay silent."""
  190. adapter = ConsoleEventAdapter()
  191. with (
  192. patch.object(adapter.console, "print") as mock_print,
  193. patch.object(adapter.console, "rule"),
  194. patch("agency_swarm.ui.core.console_event_adapter.Panel"),
  195. patch("agency_swarm.ui.core.console_event_adapter.Syntax") as mock_syntax,
  196. ):
  197. shell_event = raw_event(
  198. "response.output_item.done",
  199. item=SimpleNamespace(type="shell_call", action=SimpleNamespace(commands=["pwd", "ls -la"])),
  200. )
  201. adapter.openai_to_message_output(shell_event, recipient_agent="Agent")
  202. printed_texts = [str(call.args[0]) for call in mock_print.call_args_list]
  203. assert any("Shell" in text for text in printed_texts)
  204. syntax_payload = mock_syntax.call_args[0][0]
  205. assert "$ pwd" in syntax_payload and "$ ls -la" in syntax_payload
  206. with (
  207. patch.object(adapter.console, "print") as mock_print,
  208. patch.object(adapter.console, "rule"),
  209. patch("agency_swarm.ui.core.console_event_adapter.Panel"),
  210. patch("agency_swarm.ui.core.console_event_adapter.Syntax") as mock_syntax,
  211. ):
  212. local_shell_event = raw_event(
  213. "response.output_item.done",
  214. item=SimpleNamespace(
  215. type="local_shell_call",
  216. action=SimpleNamespace(command=["ls", "-la", "/tmp"], working_directory="/home/user"),
  217. ),
  218. )
  219. adapter.openai_to_message_output(local_shell_event, recipient_agent="Agent")
  220. printed_texts = [str(call.args[0]) for call in mock_print.call_args_list]
  221. assert any("Local Shell" in text for text in printed_texts)
  222. assert any("/home/user" in str(text) for text in printed_texts)
  223. assert "$ ls -la /tmp" in mock_syntax.call_args[0][0]
  224. with (
  225. patch.object(adapter.console, "print") as mock_print,
  226. patch.object(adapter.console, "rule") as mock_rule,
  227. patch("agency_swarm.ui.core.console_event_adapter.Panel") as mock_panel,
  228. ):
  229. empty_event = raw_event(
  230. "response.output_item.done",
  231. item=SimpleNamespace(type="shell_call", action=SimpleNamespace(commands=[])),
  232. )
  233. adapter.openai_to_message_output(empty_event, recipient_agent="Agent")
  234. mock_print.assert_not_called()
  235. mock_panel.assert_not_called()
  236. mock_rule.assert_not_called()
  237. # --- Tests for Rich escape functionality ---
  238. def test_rich_markup_escaping_rules_by_message_type() -> None:
  239. """Function/function_output should escape rich markup while text should preserve markdown content."""
  240. adapter = ConsoleEventAdapter()
  241. cases = [
  242. ("function_output", "[build-system]", "\\[build-system]"),
  243. ("function", '{"section": "[red]"}', "\\[red]"),
  244. ("text", "[bold]test[/bold]", "[bold]test[/bold]"),
  245. ]
  246. for msg_type, body, expected_token in cases:
  247. with patch.object(adapter.console, "print") as mock_print, patch.object(adapter.console, "rule"):
  248. adapter._update_console(msg_type, "Agent", "user", body)
  249. printed = str(mock_print.call_args[0][0])
  250. assert expected_token in printed