test_openclaw_proxy_streaming.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. from __future__ import annotations
  2. import json
  3. import socket
  4. import threading
  5. from dataclasses import replace
  6. from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
  7. from pathlib import Path
  8. import pytest
  9. pytest.importorskip("fastapi.testclient")
  10. from fastapi import FastAPI
  11. from fastapi.testclient import TestClient
  12. from agency_swarm.integrations.openclaw import attach_openclaw_to_fastapi
  13. from tests.integration.fastapi._openclaw_test_support import _build_openclaw_config
  14. def _reserve_free_port() -> int:
  15. try:
  16. with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
  17. sock.bind(("127.0.0.1", 0))
  18. return int(sock.getsockname()[1])
  19. except PermissionError as exc:
  20. pytest.skip(f"loopback bind unavailable in this environment: {exc}")
  21. class _OpenClawResponsesStubHandler(BaseHTTPRequestHandler):
  22. mode = "success"
  23. requests_seen: list[dict[str, object]] = []
  24. def log_message(self, *_args, **_kwargs) -> None: # noqa: D401, N802
  25. """Silence stub server logs."""
  26. def do_POST(self) -> None: # noqa: N802
  27. if self.path != "/v1/responses":
  28. self.send_response(404)
  29. self.end_headers()
  30. return
  31. content_length = int(self.headers.get("Content-Length", "0"))
  32. raw_body = self.rfile.read(content_length)
  33. payload = json.loads(raw_body.decode("utf-8"))
  34. self.__class__.requests_seen.append(
  35. {
  36. "path": self.path,
  37. "authorization": self.headers.get("Authorization"),
  38. "payload": payload,
  39. }
  40. )
  41. if self.__class__.mode == "stream-error":
  42. error_body = b'{"error":"upstream"}'
  43. self.send_response(500)
  44. self.send_header("Content-Type", "application/json")
  45. self.send_header("Content-Length", str(len(error_body)))
  46. self.send_header("Retry-After", "5")
  47. self.end_headers()
  48. self.wfile.write(error_body)
  49. return
  50. if payload.get("stream"):
  51. stream_body = b'event: data\ndata: {"chunk": 1}\n\nevent: end\ndata: [DONE]\n\n'
  52. self.send_response(200)
  53. self.send_header("Content-Type", "text/event-stream")
  54. self.send_header("Content-Length", str(len(stream_body)))
  55. self.send_header("X-Request-Id", "req-stream")
  56. self.end_headers()
  57. self.wfile.write(stream_body)
  58. return
  59. response = {
  60. "id": "resp_1",
  61. "object": "response",
  62. "model": payload["model"],
  63. "output": [{"type": "message", "role": "assistant", "content": [{"type": "output_text", "text": "ok"}]}],
  64. }
  65. body = json.dumps(response).encode("utf-8")
  66. self.send_response(200)
  67. self.send_header("Content-Type", "application/json")
  68. self.send_header("Content-Length", str(len(body)))
  69. self.send_header("X-Request-Id", "req-json")
  70. self.end_headers()
  71. self.wfile.write(body)
  72. @pytest.fixture
  73. def openclaw_upstream_server() -> str:
  74. handler = _OpenClawResponsesStubHandler
  75. handler.mode = "success"
  76. handler.requests_seen = []
  77. try:
  78. server = ThreadingHTTPServer(("127.0.0.1", 0), handler)
  79. except PermissionError as exc:
  80. pytest.skip(f"loopback bind unavailable in this environment: {exc}")
  81. thread = threading.Thread(target=server.serve_forever, daemon=True)
  82. thread.start()
  83. host, port = server.server_address
  84. try:
  85. yield f"http://{host}:{port}"
  86. finally:
  87. server.shutdown()
  88. server.server_close()
  89. thread.join(timeout=2)
  90. def test_openclaw_proxy_nonstream_passthrough_rewrites_default_model(
  91. openclaw_upstream_server: str, tmp_path: Path
  92. ) -> None:
  93. host, port_text = openclaw_upstream_server.removeprefix("http://").split(":")
  94. app = FastAPI()
  95. attach_openclaw_to_fastapi(
  96. app,
  97. replace(_build_openclaw_config(tmp_path), host=host, port=int(port_text)),
  98. )
  99. client = TestClient(app)
  100. response = client.post("/openclaw/v1/responses", json={"model": "openclaw:main", "input": "hello"})
  101. assert response.status_code == 200
  102. assert response.headers["x-request-id"] == "req-json"
  103. assert response.json()["model"] == "openai/gpt-5.4-mini"
  104. assert _OpenClawResponsesStubHandler.requests_seen[0]["authorization"] == "Bearer gateway-token"
  105. assert _OpenClawResponsesStubHandler.requests_seen[0]["payload"]["model"] == "openai/gpt-5.4-mini"
  106. def test_openclaw_proxy_stream_passthrough_uses_upstream_event_stream(
  107. openclaw_upstream_server: str, tmp_path: Path
  108. ) -> None:
  109. host, port_text = openclaw_upstream_server.removeprefix("http://").split(":")
  110. app = FastAPI()
  111. attach_openclaw_to_fastapi(
  112. app,
  113. replace(_build_openclaw_config(tmp_path), host=host, port=int(port_text)),
  114. )
  115. client = TestClient(app)
  116. response = client.post("/openclaw/v1/responses", json={"model": "openclaw:main", "input": "hello", "stream": True})
  117. assert response.status_code == 200
  118. assert "text/event-stream" in response.headers["content-type"]
  119. assert response.headers["x-request-id"] == "req-stream"
  120. assert 'data: {"chunk": 1}' in response.text
  121. assert _OpenClawResponsesStubHandler.requests_seen[0]["payload"]["model"] == "openai/gpt-5.4-mini"
  122. def test_openclaw_proxy_stream_error_passthrough_preserves_upstream_payload(
  123. openclaw_upstream_server: str, tmp_path: Path
  124. ) -> None:
  125. host, port_text = openclaw_upstream_server.removeprefix("http://").split(":")
  126. _OpenClawResponsesStubHandler.mode = "stream-error"
  127. app = FastAPI()
  128. attach_openclaw_to_fastapi(
  129. app,
  130. replace(_build_openclaw_config(tmp_path), host=host, port=int(port_text)),
  131. )
  132. client = TestClient(app)
  133. response = client.post("/openclaw/v1/responses", json={"model": "openclaw:main", "input": "hello", "stream": True})
  134. assert response.status_code == 500
  135. assert response.text == '{"error":"upstream"}'
  136. assert response.headers["retry-after"] == "5"
  137. def test_openclaw_proxy_reports_health_from_upstream_port(openclaw_upstream_server: str, tmp_path: Path) -> None:
  138. host, port_text = openclaw_upstream_server.removeprefix("http://").split(":")
  139. app = FastAPI()
  140. attach_openclaw_to_fastapi(
  141. app,
  142. replace(_build_openclaw_config(tmp_path), host=host, port=int(port_text)),
  143. )
  144. client = TestClient(app)
  145. healthy = client.get("/openclaw/health")
  146. assert healthy.status_code == 200
  147. assert healthy.json()["ok"] is True
  148. def test_openclaw_proxy_health_returns_503_when_upstream_port_is_closed(tmp_path: Path) -> None:
  149. closed_port = _reserve_free_port()
  150. app = FastAPI()
  151. attach_openclaw_to_fastapi(app, replace(_build_openclaw_config(tmp_path), port=closed_port))
  152. client = TestClient(app)
  153. unhealthy = client.get("/openclaw/health")
  154. assert unhealthy.status_code == 503
  155. assert unhealthy.json()["ok"] is False
  156. def test_openclaw_proxy_rejects_invalid_json_body(tmp_path: Path) -> None:
  157. app = FastAPI()
  158. attach_openclaw_to_fastapi(app, _build_openclaw_config(tmp_path))
  159. client = TestClient(app)
  160. response = client.post("/openclaw/v1/responses", data="{bad", headers={"content-type": "application/json"})
  161. assert response.status_code == 400
  162. assert "Invalid JSON body" in response.json()["detail"]