test_openclaw_tool_mode.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. from __future__ import annotations
  2. import json
  3. import os
  4. from dataclasses import replace
  5. from pathlib import Path
  6. import pytest
  7. from agency_swarm.integrations import openclaw as openclaw_mod
  8. from agency_swarm.integrations.openclaw import OpenClawRuntime
  9. from tests.integration.fastapi._openclaw_test_support import _build_openclaw_config
  10. def test_openclaw_worker_tool_mode_disables_competing_native_messaging(tmp_path: Path) -> None:
  11. config = replace(_build_openclaw_config(tmp_path), tool_mode="worker")
  12. runtime = OpenClawRuntime(config)
  13. runtime.ensure_layout()
  14. payload = json.loads(config.config_path.read_text(encoding="utf-8"))
  15. assert payload["tools"]["agentToAgent"]["enabled"] is False
  16. assert payload["tools"]["deny"] == ["message", "sessions_send", "sessions_spawn"]
  17. def test_openclaw_full_tool_mode_restores_previous_settings(tmp_path: Path) -> None:
  18. config = replace(_build_openclaw_config(tmp_path), tool_mode="worker")
  19. config.config_path.parent.mkdir(parents=True, exist_ok=True)
  20. config.config_path.write_text(
  21. json.dumps(
  22. {
  23. "tools": {
  24. "agentToAgent": {"enabled": True, "mode": "custom"},
  25. "deny": ["browser"],
  26. }
  27. }
  28. ),
  29. encoding="utf-8",
  30. )
  31. OpenClawRuntime(config).ensure_layout()
  32. restore_config = replace(config, tool_mode="full")
  33. OpenClawRuntime(restore_config).ensure_layout()
  34. payload = json.loads(config.config_path.read_text(encoding="utf-8"))
  35. assert payload["tools"]["agentToAgent"] == {"enabled": True, "mode": "custom"}
  36. assert payload["tools"]["deny"] == ["browser"]
  37. backup_path = openclaw_mod._tool_mode_backup_path(config.config_path)
  38. assert not backup_path.exists()
  39. def test_openclaw_full_tool_mode_keeps_backup_when_config_write_fails(
  40. monkeypatch: pytest.MonkeyPatch,
  41. tmp_path: Path,
  42. ) -> None:
  43. config = replace(_build_openclaw_config(tmp_path), tool_mode="worker")
  44. config.config_path.parent.mkdir(parents=True, exist_ok=True)
  45. config.config_path.write_text(
  46. json.dumps(
  47. {
  48. "tools": {
  49. "agentToAgent": {"enabled": True, "mode": "custom"},
  50. "deny": ["browser"],
  51. }
  52. }
  53. ),
  54. encoding="utf-8",
  55. )
  56. OpenClawRuntime(config).ensure_layout()
  57. backup_path = openclaw_mod._tool_mode_backup_path(config.config_path)
  58. assert backup_path.exists()
  59. original_open = openclaw_mod.os.open
  60. def _failing_open(path: str | os.PathLike[str], flags: int, mode: int = 0o777) -> int:
  61. if Path(path) == config.config_path:
  62. raise OSError("disk full")
  63. return original_open(path, flags, mode)
  64. monkeypatch.setattr(openclaw_mod.os, "open", _failing_open)
  65. with pytest.raises(OSError, match="disk full"):
  66. OpenClawRuntime(replace(config, tool_mode="full")).ensure_layout()
  67. assert backup_path.exists()
  68. def test_openclaw_full_tool_mode_keeps_backup_when_tool_mode_backup_is_unreadable(tmp_path: Path) -> None:
  69. config = replace(_build_openclaw_config(tmp_path), tool_mode="worker")
  70. config.config_path.parent.mkdir(parents=True, exist_ok=True)
  71. config.config_path.write_text(
  72. json.dumps(
  73. {
  74. "tools": {
  75. "agentToAgent": {"enabled": True, "mode": "custom"},
  76. "deny": ["browser"],
  77. }
  78. }
  79. ),
  80. encoding="utf-8",
  81. )
  82. OpenClawRuntime(config).ensure_layout()
  83. backup_path = openclaw_mod._tool_mode_backup_path(config.config_path)
  84. backup_path.write_text("{invalid", encoding="utf-8")
  85. OpenClawRuntime(replace(config, tool_mode="full")).ensure_layout()
  86. assert backup_path.exists()
  87. def test_openclaw_full_tool_mode_preserves_deleted_agent_to_agent_block(tmp_path: Path) -> None:
  88. config = replace(_build_openclaw_config(tmp_path), tool_mode="worker")
  89. config.config_path.parent.mkdir(parents=True, exist_ok=True)
  90. config.config_path.write_text(
  91. json.dumps(
  92. {
  93. "tools": {
  94. "agentToAgent": {"enabled": True, "mode": "custom"},
  95. "deny": ["browser"],
  96. }
  97. }
  98. ),
  99. encoding="utf-8",
  100. )
  101. OpenClawRuntime(config).ensure_layout()
  102. payload = json.loads(config.config_path.read_text(encoding="utf-8"))
  103. payload["tools"].pop("agentToAgent")
  104. config.config_path.write_text(json.dumps(payload), encoding="utf-8")
  105. OpenClawRuntime(replace(config, tool_mode="full")).ensure_layout()
  106. restored = json.loads(config.config_path.read_text(encoding="utf-8"))
  107. assert "agentToAgent" not in restored["tools"]
  108. assert restored["tools"]["deny"] == ["browser"]
  109. def test_openclaw_full_tool_mode_preserves_user_changes_made_while_worker_mode_is_active(tmp_path: Path) -> None:
  110. config = replace(_build_openclaw_config(tmp_path), tool_mode="worker")
  111. config.config_path.parent.mkdir(parents=True, exist_ok=True)
  112. config.config_path.write_text(
  113. json.dumps(
  114. {
  115. "tools": {
  116. "agentToAgent": {"enabled": True, "mode": "custom"},
  117. "deny": ["browser"],
  118. }
  119. }
  120. ),
  121. encoding="utf-8",
  122. )
  123. OpenClawRuntime(config).ensure_layout()
  124. payload = json.loads(config.config_path.read_text(encoding="utf-8"))
  125. payload["tools"]["deny"].append("shell")
  126. payload["tools"]["agentToAgent"]["notes"] = "keep-me"
  127. config.config_path.write_text(json.dumps(payload), encoding="utf-8")
  128. OpenClawRuntime(replace(config, tool_mode="full")).ensure_layout()
  129. restored = json.loads(config.config_path.read_text(encoding="utf-8"))
  130. assert restored["tools"]["agentToAgent"] == {
  131. "enabled": True,
  132. "mode": "custom",
  133. "notes": "keep-me",
  134. }
  135. assert restored["tools"]["deny"] == ["browser", "shell"]
  136. def test_openclaw_full_tool_mode_preserves_user_edits_to_existing_agent_to_agent_keys(tmp_path: Path) -> None:
  137. config = replace(_build_openclaw_config(tmp_path), tool_mode="worker")
  138. config.config_path.parent.mkdir(parents=True, exist_ok=True)
  139. config.config_path.write_text(
  140. json.dumps(
  141. {
  142. "tools": {
  143. "agentToAgent": {"enabled": True, "mode": "custom"},
  144. "deny": ["browser"],
  145. }
  146. }
  147. ),
  148. encoding="utf-8",
  149. )
  150. OpenClawRuntime(config).ensure_layout()
  151. payload = json.loads(config.config_path.read_text(encoding="utf-8"))
  152. payload["tools"]["agentToAgent"]["mode"] = "strict"
  153. config.config_path.write_text(json.dumps(payload), encoding="utf-8")
  154. OpenClawRuntime(replace(config, tool_mode="full")).ensure_layout()
  155. restored = json.loads(config.config_path.read_text(encoding="utf-8"))
  156. assert restored["tools"]["agentToAgent"] == {"enabled": True, "mode": "strict"}
  157. def test_openclaw_full_tool_mode_preserves_deleted_agent_to_agent_keys(tmp_path: Path) -> None:
  158. config = replace(_build_openclaw_config(tmp_path), tool_mode="worker")
  159. config.config_path.parent.mkdir(parents=True, exist_ok=True)
  160. config.config_path.write_text(
  161. json.dumps(
  162. {
  163. "tools": {
  164. "agentToAgent": {"enabled": True, "mode": "custom", "scope": "full"},
  165. "deny": ["browser"],
  166. }
  167. }
  168. ),
  169. encoding="utf-8",
  170. )
  171. OpenClawRuntime(config).ensure_layout()
  172. payload = json.loads(config.config_path.read_text(encoding="utf-8"))
  173. payload["tools"]["agentToAgent"].pop("mode")
  174. config.config_path.write_text(json.dumps(payload), encoding="utf-8")
  175. OpenClawRuntime(replace(config, tool_mode="full")).ensure_layout()
  176. restored = json.loads(config.config_path.read_text(encoding="utf-8"))
  177. assert restored["tools"]["agentToAgent"] == {"enabled": True, "scope": "full"}
  178. def test_openclaw_full_tool_mode_preserves_deleted_deny_entries(tmp_path: Path) -> None:
  179. config = replace(_build_openclaw_config(tmp_path), tool_mode="worker")
  180. config.config_path.parent.mkdir(parents=True, exist_ok=True)
  181. config.config_path.write_text(
  182. json.dumps(
  183. {
  184. "tools": {
  185. "agentToAgent": {"enabled": True, "mode": "custom"},
  186. "deny": ["browser", "shell"],
  187. }
  188. }
  189. ),
  190. encoding="utf-8",
  191. )
  192. OpenClawRuntime(config).ensure_layout()
  193. payload = json.loads(config.config_path.read_text(encoding="utf-8"))
  194. payload["tools"]["deny"] = ["shell"]
  195. config.config_path.write_text(json.dumps(payload), encoding="utf-8")
  196. stat_result = config.config_path.stat()
  197. os.utime(config.config_path, ns=(stat_result.st_atime_ns, stat_result.st_mtime_ns + 1))
  198. OpenClawRuntime(replace(config, tool_mode="full")).ensure_layout()
  199. restored = json.loads(config.config_path.read_text(encoding="utf-8"))
  200. assert restored["tools"]["deny"] == ["shell"]
  201. def test_openclaw_full_tool_mode_preserves_explicit_worker_style_deny_entries(tmp_path: Path) -> None:
  202. config = replace(_build_openclaw_config(tmp_path), tool_mode="worker")
  203. config.config_path.parent.mkdir(parents=True, exist_ok=True)
  204. config.config_path.write_text(
  205. json.dumps(
  206. {
  207. "tools": {
  208. "agentToAgent": {"enabled": True, "mode": "custom"},
  209. "deny": ["browser"],
  210. }
  211. }
  212. ),
  213. encoding="utf-8",
  214. )
  215. OpenClawRuntime(config).ensure_layout()
  216. payload = json.loads(config.config_path.read_text(encoding="utf-8"))
  217. payload["tools"]["deny"] = ["browser", "message"]
  218. config.config_path.write_text(json.dumps(payload), encoding="utf-8")
  219. OpenClawRuntime(replace(config, tool_mode="full")).ensure_layout()
  220. restored = json.loads(config.config_path.read_text(encoding="utf-8"))
  221. assert restored["tools"]["deny"] == ["browser", "message"]
  222. def test_openclaw_full_tool_mode_restores_agent_to_agent_when_only_deny_changes(tmp_path: Path) -> None:
  223. config = replace(_build_openclaw_config(tmp_path), tool_mode="worker")
  224. config.config_path.parent.mkdir(parents=True, exist_ok=True)
  225. config.config_path.write_text(
  226. json.dumps(
  227. {
  228. "tools": {
  229. "agentToAgent": {"enabled": True, "mode": "custom"},
  230. "deny": ["browser"],
  231. }
  232. }
  233. ),
  234. encoding="utf-8",
  235. )
  236. OpenClawRuntime(config).ensure_layout()
  237. payload = json.loads(config.config_path.read_text(encoding="utf-8"))
  238. payload["tools"]["deny"].append("shell")
  239. config.config_path.write_text(json.dumps(payload), encoding="utf-8")
  240. OpenClawRuntime(replace(config, tool_mode="full")).ensure_layout()
  241. restored = json.loads(config.config_path.read_text(encoding="utf-8"))
  242. assert restored["tools"]["agentToAgent"] == {"enabled": True, "mode": "custom"}
  243. assert restored["tools"]["deny"] == ["browser", "shell"]
  244. def test_openclaw_full_tool_mode_restores_original_agent_to_agent_after_format_only_rewrite(tmp_path: Path) -> None:
  245. config = replace(_build_openclaw_config(tmp_path), tool_mode="worker")
  246. config.config_path.parent.mkdir(parents=True, exist_ok=True)
  247. config.config_path.write_text(
  248. json.dumps(
  249. {
  250. "tools": {
  251. "agentToAgent": {"enabled": True, "mode": "custom"},
  252. "deny": ["browser"],
  253. }
  254. },
  255. indent=2,
  256. ),
  257. encoding="utf-8",
  258. )
  259. OpenClawRuntime(config).ensure_layout()
  260. payload = json.loads(config.config_path.read_text(encoding="utf-8"))
  261. config.config_path.write_text(json.dumps(payload), encoding="utf-8")
  262. OpenClawRuntime(replace(config, tool_mode="full")).ensure_layout()
  263. restored = json.loads(config.config_path.read_text(encoding="utf-8"))
  264. assert restored["tools"]["agentToAgent"] == {"enabled": True, "mode": "custom"}
  265. assert restored["tools"]["deny"] == ["browser"]
  266. def test_openclaw_full_tool_mode_preserves_user_changes_across_worker_restart(tmp_path: Path) -> None:
  267. config = replace(_build_openclaw_config(tmp_path), tool_mode="worker")
  268. config.config_path.parent.mkdir(parents=True, exist_ok=True)
  269. config.config_path.write_text(
  270. json.dumps(
  271. {
  272. "tools": {
  273. "agentToAgent": {"enabled": True, "mode": "custom"},
  274. "deny": ["browser"],
  275. }
  276. }
  277. ),
  278. encoding="utf-8",
  279. )
  280. worker_runtime = OpenClawRuntime(config)
  281. worker_runtime.ensure_layout()
  282. payload = json.loads(config.config_path.read_text(encoding="utf-8"))
  283. payload["tools"]["agentToAgent"]["notes"] = "keep-me"
  284. payload["tools"]["deny"].append("shell")
  285. config.config_path.write_text(json.dumps(payload), encoding="utf-8")
  286. worker_runtime.ensure_layout()
  287. OpenClawRuntime(replace(config, tool_mode="full")).ensure_layout()
  288. restored = json.loads(config.config_path.read_text(encoding="utf-8"))
  289. assert restored["tools"]["agentToAgent"] == {
  290. "enabled": True,
  291. "mode": "custom",
  292. "notes": "keep-me",
  293. }
  294. assert restored["tools"]["deny"] == ["browser", "shell"]
  295. def test_openclaw_full_tool_mode_drops_worker_enabled_flag_when_backup_never_had_it(tmp_path: Path) -> None:
  296. config = replace(_build_openclaw_config(tmp_path), tool_mode="worker")
  297. config.config_path.parent.mkdir(parents=True, exist_ok=True)
  298. config.config_path.write_text(
  299. json.dumps(
  300. {
  301. "tools": {
  302. "agentToAgent": {"mode": "custom"},
  303. "deny": ["browser"],
  304. }
  305. }
  306. ),
  307. encoding="utf-8",
  308. )
  309. worker_runtime = OpenClawRuntime(config)
  310. worker_runtime.ensure_layout()
  311. payload = json.loads(config.config_path.read_text(encoding="utf-8"))
  312. payload["tools"]["agentToAgent"]["notes"] = "keep-me"
  313. config.config_path.write_text(json.dumps(payload), encoding="utf-8")
  314. OpenClawRuntime(replace(config, tool_mode="full")).ensure_layout()
  315. restored = json.loads(config.config_path.read_text(encoding="utf-8"))
  316. assert restored["tools"]["agentToAgent"] == {
  317. "mode": "custom",
  318. "notes": "keep-me",
  319. }
  320. def test_openclaw_full_tool_mode_restores_original_tools_after_unrelated_worker_edits(tmp_path: Path) -> None:
  321. config = replace(_build_openclaw_config(tmp_path), tool_mode="worker")
  322. config.config_path.parent.mkdir(parents=True, exist_ok=True)
  323. config.config_path.write_text(
  324. json.dumps(
  325. {
  326. "tools": {
  327. "agentToAgent": {"enabled": True, "mode": "custom"},
  328. "deny": ["browser"],
  329. },
  330. "agents": {"main": {"description": "before"}},
  331. }
  332. ),
  333. encoding="utf-8",
  334. )
  335. worker_runtime = OpenClawRuntime(config)
  336. worker_runtime.ensure_layout()
  337. payload = json.loads(config.config_path.read_text(encoding="utf-8"))
  338. payload["agents"]["main"]["description"] = "after"
  339. config.config_path.write_text(json.dumps(payload), encoding="utf-8")
  340. OpenClawRuntime(replace(config, tool_mode="full")).ensure_layout()
  341. restored = json.loads(config.config_path.read_text(encoding="utf-8"))
  342. assert restored["tools"]["agentToAgent"] == {"enabled": True, "mode": "custom"}
  343. assert restored["tools"]["deny"] == ["browser"]
  344. assert restored["agents"]["main"]["description"] == "after"