test_ipython_interpreter.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. """Integration tests for IPythonInterpreter tool with agent isolation."""
  2. import asyncio
  3. import pytest
  4. from agents.run_context import RunContextWrapper
  5. from agency_swarm import Agent
  6. from agency_swarm.context import MasterContext
  7. from agency_swarm.tools.built_in import IPythonInterpreter
  8. from agency_swarm.utils.thread import ThreadManager
  9. @pytest.fixture
  10. def shared_context():
  11. """Create a shared context wrapped for tools to persist state."""
  12. thread_manager = ThreadManager()
  13. master_context = MasterContext(
  14. thread_manager=thread_manager,
  15. agents={},
  16. user_context={},
  17. )
  18. return RunContextWrapper(context=master_context)
  19. @pytest.fixture
  20. def agent_with_ipython():
  21. """Create an agent with IPython interpreter tool."""
  22. return Agent(
  23. name="TestAgent",
  24. description="Test agent with IPython interpreter",
  25. instructions="Execute Python code when requested",
  26. tools=[IPythonInterpreter],
  27. )
  28. class TestIPythonInterpreterPersistence:
  29. """Test that state persists within same agent across multiple executions."""
  30. @pytest.mark.asyncio
  31. async def test_variable_persistence(self, agent_with_ipython, shared_context):
  32. """Test that variables persist across tool executions."""
  33. tool1 = IPythonInterpreter(code="my_var = 100")
  34. tool1._caller_agent = agent_with_ipython
  35. tool1._context = shared_context
  36. result1 = await tool1.run()
  37. assert "Error:" not in result1
  38. tool2 = IPythonInterpreter(code="my_var * 2")
  39. tool2._caller_agent = agent_with_ipython
  40. tool2._context = shared_context
  41. result2 = await tool2.run()
  42. assert "200" in result2
  43. @pytest.mark.asyncio
  44. async def test_import_persistence(self, agent_with_ipython, shared_context):
  45. """Test that imports persist and can be reused."""
  46. tool1 = IPythonInterpreter(code="import math")
  47. tool1._caller_agent = agent_with_ipython
  48. tool1._context = shared_context
  49. await tool1.run()
  50. tool2 = IPythonInterpreter(code="math.sqrt(16)")
  51. tool2._caller_agent = agent_with_ipython
  52. tool2._context = shared_context
  53. result2 = await tool2.run()
  54. assert "4" in result2
  55. @pytest.mark.asyncio
  56. async def test_function_definition_persistence(self, agent_with_ipython, shared_context):
  57. """Test that function definitions persist across executions."""
  58. code_def = """
  59. def fibonacci(n):
  60. if n <= 1:
  61. return n
  62. return fibonacci(n-1) + fibonacci(n-2)
  63. """
  64. tool1 = IPythonInterpreter(code=code_def)
  65. tool1._caller_agent = agent_with_ipython
  66. tool1._context = shared_context
  67. await tool1.run()
  68. tool2 = IPythonInterpreter(code="fibonacci(10)")
  69. tool2._caller_agent = agent_with_ipython
  70. tool2._context = shared_context
  71. result2 = await tool2.run()
  72. assert "55" in result2
  73. class TestIPythonInterpreterAgentIsolation:
  74. """Test that agents have fully isolated execution environments (core feature)."""
  75. @pytest.mark.asyncio
  76. async def test_module_mutation_isolation(self):
  77. """Test that module mutations in one agent don't leak to another."""
  78. agent_a = Agent(name="AgentA", description="First", instructions="", tools=[IPythonInterpreter])
  79. agent_b = Agent(name="AgentB", description="Second", instructions="", tools=[IPythonInterpreter])
  80. # Agent A mutates math module by adding custom attribute
  81. tool_a = IPythonInterpreter(code="import math; math.CUSTOM_X = 999; math.CUSTOM_X")
  82. tool_a._caller_agent = agent_a
  83. result_a = await tool_a.run()
  84. assert "999" in result_a
  85. # Agent B checks if mutation is visible - it should NOT be
  86. tool_b = IPythonInterpreter(code="import math; hasattr(math, 'CUSTOM_X')")
  87. tool_b._caller_agent = agent_b
  88. result_b = await tool_b.run()
  89. assert "False" in result_b
  90. @pytest.mark.asyncio
  91. async def test_variable_isolation_between_agents(self):
  92. """Test that variables are completely isolated between agents."""
  93. agent_a = Agent(name="AgentA", description="", instructions="", tools=[IPythonInterpreter])
  94. agent_b = Agent(name="AgentB", description="", instructions="", tools=[IPythonInterpreter])
  95. # Agent A defines secret variable
  96. tool_a = IPythonInterpreter(code="secret_value = 12345; len(dir())")
  97. tool_a._caller_agent = agent_a
  98. result_a = await tool_a.run()
  99. assert "Error:" not in result_a
  100. # Agent B tries to access it - should not exist
  101. tool_b = IPythonInterpreter(code="'secret_value' in dir()")
  102. tool_b._caller_agent = agent_b
  103. result_b = await tool_b.run()
  104. assert "False" in result_b
  105. @pytest.mark.asyncio
  106. async def test_concurrent_execution_isolation(self):
  107. """Test that concurrent executions on different agents maintain isolation."""
  108. agent_a = Agent(name="AgentA", description="", instructions="", tools=[IPythonInterpreter])
  109. agent_b = Agent(name="AgentB", description="", instructions="", tools=[IPythonInterpreter])
  110. # Execute code concurrently - each sets different value for same variable name
  111. tool_a = IPythonInterpreter(code="x = 1; import time; time.sleep(0.05); x")
  112. tool_a._caller_agent = agent_a
  113. tool_b = IPythonInterpreter(code="x = 100; import time; time.sleep(0.05); x")
  114. tool_b._caller_agent = agent_b
  115. results = await asyncio.gather(tool_a.run(), tool_b.run())
  116. # Each agent should see only its own value
  117. assert "1" in results[0]
  118. assert "100" in results[1]
  119. @pytest.mark.asyncio
  120. async def test_sys_path_isolation(self):
  121. """Test that sys.path modifications don't leak between agents."""
  122. agent_a = Agent(name="AgentA", description="", instructions="", tools=[IPythonInterpreter])
  123. agent_b = Agent(name="AgentB", description="", instructions="", tools=[IPythonInterpreter])
  124. code_a = "import sys; sys.path.insert(0, '/unique/test/path'); '/unique/test/path' in sys.path"
  125. tool_a = IPythonInterpreter(code=code_a)
  126. tool_a._caller_agent = agent_a
  127. result_a = await tool_a.run()
  128. assert "True" in result_a
  129. tool_b = IPythonInterpreter(code="import sys; '/unique/test/path' in sys.path")
  130. tool_b._caller_agent = agent_b
  131. result_b = await tool_b.run()
  132. assert "False" in result_b
  133. @pytest.mark.asyncio
  134. async def test_global_module_attribute_isolation(self):
  135. """Test that adding attributes to built-in modules doesn't leak."""
  136. agent_a = Agent(name="AgentA", description="", instructions="", tools=[IPythonInterpreter])
  137. agent_b = Agent(name="AgentB", description="", instructions="", tools=[IPythonInterpreter])
  138. # Agent A adds attribute to sys module
  139. tool_a = IPythonInterpreter(code="import sys; sys._test_attr = 'agent_a_data'; hasattr(sys, '_test_attr')")
  140. tool_a._caller_agent = agent_a
  141. result_a = await tool_a.run()
  142. assert "True" in result_a
  143. # Agent B should not see this attribute
  144. tool_b = IPythonInterpreter(code="import sys; hasattr(sys, '_test_attr')")
  145. tool_b._caller_agent = agent_b
  146. result_b = await tool_b.run()
  147. assert "False" in result_b
  148. class TestIPythonInterpreterEdgeCases:
  149. """Test edge cases, error handling, and special scenarios."""
  150. @pytest.mark.asyncio
  151. async def test_error_handling_with_traceback(self, agent_with_ipython, shared_context):
  152. """Test that errors return proper traceback information."""
  153. code = """
  154. def buggy_function():
  155. return 1 / 0
  156. buggy_function()
  157. """
  158. tool = IPythonInterpreter(code=code)
  159. tool._caller_agent = agent_with_ipython
  160. tool._context = shared_context
  161. result = await tool.run()
  162. assert "Error:" in result
  163. assert "ZeroDivisionError" in result
  164. assert "buggy_function" in result
  165. @pytest.mark.asyncio
  166. async def test_multiline_output_capture(self, agent_with_ipython, shared_context):
  167. """Test that both print output and return values are captured."""
  168. code = """
  169. print('Step 1: Starting calculation')
  170. result = 42 * 2
  171. print(f'Step 2: Result is {result}')
  172. result
  173. """
  174. tool = IPythonInterpreter(code=code)
  175. tool._caller_agent = agent_with_ipython
  176. tool._context = shared_context
  177. result = await tool.run()
  178. assert "Step 1" in result
  179. assert "Step 2" in result
  180. assert "84" in result
  181. @pytest.mark.asyncio
  182. async def test_no_agent_context_ephemeral_kernel(self):
  183. """Test that tool creates ephemeral kernel when no agent context."""
  184. tool = IPythonInterpreter(code="import os; os.getpid()")
  185. # Deliberately don't set _caller_agent or context
  186. result = await tool.run()
  187. # Should work and return a process ID
  188. assert "Error:" not in result
  189. assert result.strip() # Non-empty result
  190. @pytest.mark.asyncio
  191. async def test_recovery_after_error(self, agent_with_ipython, shared_context):
  192. """Test that kernel recovers and continues working after an error."""
  193. # Cause an error
  194. tool1 = IPythonInterpreter(code="undefined_variable")
  195. tool1._caller_agent = agent_with_ipython
  196. tool1._context = shared_context
  197. result1 = await tool1.run()
  198. assert "Error:" in result1
  199. # Should still work after error
  200. tool2 = IPythonInterpreter(code="x = 100; x * 2")
  201. tool2._caller_agent = agent_with_ipython
  202. tool2._context = shared_context
  203. result2 = await tool2.run()
  204. assert "200" in result2
  205. assert "Error:" not in result2
  206. @pytest.mark.asyncio
  207. async def test_timeout_on_infinite_loop(self, shared_context):
  208. """Test that infinite loops are properly timed out."""
  209. # Create a custom tool class with a short timeout
  210. class ShortTimeoutInterpreter(IPythonInterpreter):
  211. class ToolConfig:
  212. kernel_timeout_seconds = 1.0
  213. agent = Agent(name="Test", description="", instructions="", tools=[ShortTimeoutInterpreter])
  214. tool = ShortTimeoutInterpreter(code="while True: pass")
  215. tool._caller_agent = agent
  216. tool._context = shared_context
  217. # Fail fast if ToolConfig override stops being respected
  218. result = await asyncio.wait_for(tool.run(), timeout=5)
  219. assert "Error:" in result
  220. assert "TimeoutError" in result or "timeout" in result.lower()
  221. @pytest.mark.asyncio
  222. async def test_nest_asyncio_reapplied_after_restart(self, shared_context):
  223. """Ensure kernel restart keeps asyncio.run usable by reapplying nest_asyncio."""
  224. class ShortTimeoutInterpreter(IPythonInterpreter):
  225. class ToolConfig:
  226. kernel_timeout_seconds = 0.5
  227. agent = Agent(name="RestartAgent", description="", instructions="", tools=[ShortTimeoutInterpreter])
  228. # Trigger a timeout to force a kernel restart
  229. timed_out = ShortTimeoutInterpreter(code="while True: pass")
  230. timed_out._caller_agent = agent
  231. timed_out._context = shared_context
  232. timeout_result = await asyncio.wait_for(timed_out.run(), timeout=20)
  233. assert "Error:" in timeout_result
  234. assert "Timeout" in timeout_result or "timeout" in timeout_result.lower()
  235. # After restart, asyncio.run should work if nest_asyncio was re-applied
  236. post_restart = ShortTimeoutInterpreter(code="import asyncio; asyncio.run(asyncio.sleep(0)); 'ok'")
  237. post_restart._caller_agent = agent
  238. post_restart._context = shared_context
  239. success_result = await asyncio.wait_for(post_restart.run(), timeout=20)
  240. assert "Error:" not in success_result
  241. assert "ok" in success_result
  242. @pytest.mark.asyncio
  243. async def test_large_output_handling(self, agent_with_ipython, shared_context):
  244. """Test that large outputs are properly captured."""
  245. # Generate large output
  246. code = "data = 'x' * 50000; print(f'Generated {len(data)} chars'); len(data)"
  247. tool = IPythonInterpreter(code=code)
  248. tool._caller_agent = agent_with_ipython
  249. tool._context = shared_context
  250. result = await tool.run()
  251. assert "50000" in result
  252. assert "Generated" in result
  253. @pytest.mark.asyncio
  254. async def test_stderr_capture(self, agent_with_ipython, shared_context):
  255. """Test that stderr output is captured alongside stdout."""
  256. code = "import sys; sys.stderr.write('Warning message\\n'); sys.stdout.write('Normal output\\n'); 'done'"
  257. tool = IPythonInterpreter(code=code)
  258. tool._caller_agent = agent_with_ipython
  259. tool._context = shared_context
  260. result = await tool.run()
  261. # Both stderr and stdout should be captured
  262. assert "Warning message" in result or "Normal output" in result
  263. assert "done" in result
  264. class TestIPythonInterpreterWorkingDirectory:
  265. """Test working_dir parameter functionality."""
  266. @pytest.mark.asyncio
  267. async def test_working_dir_changes_and_restores(self, agent_with_ipython, shared_context, tmp_path):
  268. """Test that working_dir changes directory, executes code, and restores directory."""
  269. # Create a test file in temp directory
  270. test_file = tmp_path / "test.txt"
  271. test_file.write_text("hello from temp")
  272. # Get initial directory
  273. tool1 = IPythonInterpreter(code="import os; os.getcwd()")
  274. tool1._caller_agent = agent_with_ipython
  275. tool1._context = shared_context
  276. initial_result = await tool1.run()
  277. initial_dir = initial_result.split("Result:")[-1].strip().strip("'\"")
  278. # Execute code in different directory with expression result
  279. tool2 = IPythonInterpreter(code="open('test.txt').read()", working_dir=str(tmp_path))
  280. tool2._caller_agent = agent_with_ipython
  281. tool2._context = shared_context
  282. result2 = await tool2.run()
  283. # Verify file was read (proves we were in the right directory)
  284. assert "hello from temp" in result2
  285. # Verify directory was restored
  286. tool3 = IPythonInterpreter(code="import os; os.getcwd()")
  287. tool3._caller_agent = agent_with_ipython
  288. tool3._context = shared_context
  289. restored_result = await tool3.run()
  290. restored_dir = restored_result.split("Result:")[-1].strip().strip("'\"")
  291. assert initial_dir == restored_dir
  292. @pytest.mark.asyncio
  293. async def test_working_dir_restores_after_error(self, agent_with_ipython, shared_context, tmp_path):
  294. """Test that directory is restored even when code raises an error."""
  295. # Get initial directory
  296. tool1 = IPythonInterpreter(code="import os; os.getcwd()")
  297. tool1._caller_agent = agent_with_ipython
  298. tool1._context = shared_context
  299. initial_result = await tool1.run()
  300. initial_dir = initial_result.split("Result:")[-1].strip().strip("'\"")
  301. # Execute code that will fail in different directory
  302. tool2 = IPythonInterpreter(
  303. code="1 / 0", # This will raise ZeroDivisionError
  304. working_dir=str(tmp_path),
  305. )
  306. tool2._caller_agent = agent_with_ipython
  307. tool2._context = shared_context
  308. error_result = await tool2.run()
  309. assert "Error:" in error_result
  310. assert "ZeroDivisionError" in error_result
  311. # Verify directory was still restored
  312. tool3 = IPythonInterpreter(code="import os; os.getcwd()")
  313. tool3._caller_agent = agent_with_ipython
  314. tool3._context = shared_context
  315. restored_result = await tool3.run()
  316. restored_dir = restored_result.split("Result:")[-1].strip().strip("'\"")
  317. assert initial_dir == restored_dir