test_template_enricher.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641
  1. """Tests for TemplateEnricher."""
  2. import json
  3. from pathlib import Path
  4. from typing import Optional
  5. from unittest.mock import MagicMock
  6. import httpx
  7. import pytest
  8. from flowsint_core.core.template_enricher import (
  9. TemplateEnricher,
  10. TemplateEnricherError,
  11. )
  12. from flowsint_core.templates.loader.yaml_loader import SSRFError, YamlLoader
  13. from flowsint_core.templates.types import (
  14. Template,
  15. TemplateHttpRequest,
  16. TemplateHttpResponse,
  17. TemplateInput,
  18. TemplateOutput,
  19. TemplateRetryConfig,
  20. TemplateSecret,
  21. )
  22. TEST_DIR = Path(__file__).parent
  23. def create_test_template(
  24. name: str = "test-template",
  25. input_type: str = "Ip",
  26. input_key: str = "address",
  27. output_type: str = "Ip",
  28. url: str = "https://api.example.com/{{address}}",
  29. method: str = "GET",
  30. headers: dict = None,
  31. params: dict = None,
  32. body: Optional[str] = None,
  33. response_map: dict = None,
  34. response_expect: str = "json",
  35. secrets: list = None,
  36. retry: Optional[TemplateRetryConfig] = None,
  37. is_array: bool = False,
  38. array_path: Optional[str] = None,
  39. timeout: float = 30.0,
  40. ) -> Template:
  41. """Helper to create test templates."""
  42. return Template(
  43. name=name,
  44. category="Test",
  45. version=1.0,
  46. input=TemplateInput(type=input_type, key=input_key),
  47. output=TemplateOutput(type=output_type, is_array=is_array, array_path=array_path),
  48. request=TemplateHttpRequest(
  49. method=method,
  50. url=url,
  51. headers=headers or {},
  52. params=params or {},
  53. body=body,
  54. timeout=timeout,
  55. ),
  56. response=TemplateHttpResponse(
  57. expect=response_expect,
  58. map=response_map or {"address": "ip"},
  59. ),
  60. secrets=[TemplateSecret(**s) for s in (secrets or [])],
  61. retry=retry,
  62. )
  63. class MockVault:
  64. """Mock vault for testing secret resolution."""
  65. def __init__(self, secrets: dict = None):
  66. self._secrets = secrets or {}
  67. def get_secret(self, name: str) -> Optional[str]:
  68. return self._secrets.get(name)
  69. class TestTemplateEnricherInit:
  70. """Tests for TemplateEnricher initialization."""
  71. def test_init_basic(self):
  72. """Basic initialization with valid template."""
  73. template = create_test_template()
  74. enricher = TemplateEnricher(template=template, sketch_id="test")
  75. assert enricher.name() == "test-template"
  76. assert enricher.category() == "Test"
  77. assert enricher.key() == "address"
  78. def test_init_invalid_input_type(self):
  79. """Invalid input type should raise TypeError."""
  80. template = create_test_template(input_type="InvalidType")
  81. with pytest.raises(TypeError) as exc_info:
  82. TemplateEnricher(template=template)
  83. assert "not present in registry" in str(exc_info.value)
  84. def test_init_with_secrets(self):
  85. """Template with secrets should build params schema."""
  86. template = create_test_template(
  87. secrets=[{"name": "API_KEY", "required": True, "description": "Test key"}]
  88. )
  89. enricher = TemplateEnricher(template=template)
  90. assert len(enricher.params_schema) == 1
  91. assert enricher.params_schema[0]["name"] == "API_KEY"
  92. assert enricher.params_schema[0]["type"] == "vaultSecret"
  93. class TestTemplateEnricherSSRF:
  94. """Tests for SSRF protection in TemplateEnricher."""
  95. @pytest.mark.asyncio
  96. async def test_blocks_localhost(self, mock_logger):
  97. """Requests to localhost should be blocked."""
  98. template = create_test_template(url="http://localhost/{{address}}")
  99. enricher = TemplateEnricher(template=template, sketch_id="test")
  100. # Create mock input
  101. from flowsint_types import Ip
  102. inputs = [Ip(address="8.8.8.8")]
  103. results = await enricher.scan(inputs)
  104. assert len(results) == 0 # Should be blocked
  105. @pytest.mark.asyncio
  106. async def test_blocks_private_ip(self, mock_logger):
  107. """Requests to private IPs should be blocked."""
  108. template = create_test_template(url="http://192.168.1.1/{{address}}")
  109. enricher = TemplateEnricher(template=template, sketch_id="test")
  110. from flowsint_types import Ip
  111. inputs = [Ip(address="8.8.8.8")]
  112. results = await enricher.scan(inputs)
  113. assert len(results) == 0
  114. @pytest.mark.asyncio
  115. async def test_blocks_metadata_endpoint(self, mock_logger):
  116. """Requests to cloud metadata endpoints should be blocked."""
  117. # URL with metadata IP hardcoded (not from input)
  118. template = create_test_template(
  119. url="http://169.254.169.254/latest/meta-data/{{address}}"
  120. )
  121. enricher = TemplateEnricher(template=template, sketch_id="test")
  122. from flowsint_types import Ip
  123. inputs = [Ip(address="8.8.8.8")] # Valid IP, but URL is blocked
  124. results = await enricher.scan(inputs)
  125. assert len(results) == 0
  126. class TestTemplateEnricherRequests:
  127. """Tests for HTTP request handling."""
  128. @pytest.mark.asyncio
  129. async def test_get_request(self, mock_logger, httpx_mock):
  130. """GET request should work correctly."""
  131. httpx_mock.add_response(
  132. url="https://api.example.com/8.8.8.8",
  133. json={"ip": "8.8.8.8", "country": "US"},
  134. )
  135. template = create_test_template(
  136. url="https://api.example.com/{{address}}",
  137. response_map={"address": "ip", "country": "country"},
  138. )
  139. enricher = TemplateEnricher(template=template, sketch_id="test")
  140. from flowsint_types import Ip
  141. inputs = [Ip(address="8.8.8.8")]
  142. results = await enricher.scan(inputs)
  143. assert len(results) == 1
  144. assert results[0].address == "8.8.8.8"
  145. @pytest.mark.asyncio
  146. async def test_post_request(self, mock_logger, httpx_mock):
  147. """POST request with body should work correctly."""
  148. httpx_mock.add_response(
  149. url="https://api.example.com/lookup",
  150. method="POST",
  151. json={"ip": "8.8.8.8", "country": "US"},
  152. )
  153. template = create_test_template(
  154. url="https://api.example.com/lookup",
  155. method="POST",
  156. body='{"ip": "{{address}}"}',
  157. response_map={"address": "ip"},
  158. )
  159. enricher = TemplateEnricher(template=template, sketch_id="test")
  160. from flowsint_types import Ip
  161. inputs = [Ip(address="8.8.8.8")]
  162. results = await enricher.scan(inputs)
  163. assert len(results) == 1
  164. # Verify the request was made with POST
  165. request = httpx_mock.get_request()
  166. assert request.method == "POST"
  167. @pytest.mark.asyncio
  168. async def test_request_with_headers(self, mock_logger, httpx_mock):
  169. """Request headers should be rendered and sent."""
  170. httpx_mock.add_response(
  171. url="https://api.example.com/8.8.8.8",
  172. json={"ip": "8.8.8.8"},
  173. )
  174. template = create_test_template(
  175. url="https://api.example.com/{{address}}",
  176. headers={"X-Custom-Header": "test-value"},
  177. )
  178. enricher = TemplateEnricher(template=template, sketch_id="test")
  179. from flowsint_types import Ip
  180. inputs = [Ip(address="8.8.8.8")]
  181. await enricher.scan(inputs)
  182. request = httpx_mock.get_request()
  183. assert request.headers.get("X-Custom-Header") == "test-value"
  184. @pytest.mark.asyncio
  185. async def test_request_with_params(self, mock_logger, httpx_mock):
  186. """Request params should be rendered and sent."""
  187. httpx_mock.add_response(
  188. json={"ip": "8.8.8.8"},
  189. )
  190. template = create_test_template(
  191. url="https://api.example.com/lookup",
  192. params={"ip": "{{address}}", "format": "json"},
  193. )
  194. enricher = TemplateEnricher(template=template, sketch_id="test")
  195. from flowsint_types import Ip
  196. inputs = [Ip(address="8.8.8.8")]
  197. await enricher.scan(inputs)
  198. request = httpx_mock.get_request()
  199. assert "ip=8.8.8.8" in str(request.url)
  200. assert "format=json" in str(request.url)
  201. class TestTemplateEnricherResponseParsing:
  202. """Tests for response parsing."""
  203. @pytest.mark.asyncio
  204. async def test_json_response(self, mock_logger, httpx_mock):
  205. """JSON response should be parsed correctly."""
  206. httpx_mock.add_response(
  207. json={"ip": "8.8.8.8", "country": "US"},
  208. )
  209. template = create_test_template(
  210. response_expect="json",
  211. response_map={"address": "ip"},
  212. )
  213. enricher = TemplateEnricher(template=template, sketch_id="test")
  214. from flowsint_types import Ip
  215. inputs = [Ip(address="8.8.8.8")]
  216. results = await enricher.scan(inputs)
  217. assert len(results) == 1
  218. assert results[0].address == "8.8.8.8"
  219. @pytest.mark.asyncio
  220. async def test_nested_json_response(self, mock_logger, httpx_mock):
  221. """Nested JSON paths should work with dot notation."""
  222. httpx_mock.add_response(
  223. json={
  224. "data": {
  225. "ip": "8.8.8.8",
  226. "location": {"country": "US", "city": "Mountain View"},
  227. }
  228. },
  229. )
  230. template = create_test_template(
  231. response_map={
  232. "address": "data.ip",
  233. "country": "data.location.country",
  234. "city": "data.location.city",
  235. },
  236. )
  237. enricher = TemplateEnricher(template=template, sketch_id="test")
  238. from flowsint_types import Ip
  239. inputs = [Ip(address="8.8.8.8")]
  240. results = await enricher.scan(inputs)
  241. assert len(results) == 1
  242. assert results[0].address == "8.8.8.8"
  243. assert results[0].country == "US"
  244. assert results[0].city == "Mountain View"
  245. @pytest.mark.asyncio
  246. async def test_xml_response(self, mock_logger, httpx_mock):
  247. """XML response should be parsed correctly."""
  248. xml_response = """<?xml version="1.0"?>
  249. <response>
  250. <ip>8.8.8.8</ip>
  251. <country>US</country>
  252. </response>
  253. """
  254. httpx_mock.add_response(
  255. text=xml_response,
  256. headers={"Content-Type": "application/xml"},
  257. )
  258. template = create_test_template(
  259. response_expect="xml",
  260. response_map={"address": "ip", "country": "country"},
  261. )
  262. enricher = TemplateEnricher(template=template, sketch_id="test")
  263. from flowsint_types import Ip
  264. inputs = [Ip(address="8.8.8.8")]
  265. results = await enricher.scan(inputs)
  266. assert len(results) == 1
  267. assert results[0].address == "8.8.8.8"
  268. assert results[0].country == "US"
  269. @pytest.mark.asyncio
  270. async def test_text_response(self, mock_logger, httpx_mock):
  271. """Text response should be returned as-is."""
  272. httpx_mock.add_response(text="8.8.8.8")
  273. template = create_test_template(
  274. response_expect="text",
  275. response_map={}, # No mapping for text
  276. )
  277. enricher = TemplateEnricher(template=template, sketch_id="test")
  278. from flowsint_types import Ip
  279. inputs = [Ip(address="8.8.8.8")]
  280. # Text response won't map well, but shouldn't crash
  281. results = await enricher.scan(inputs)
  282. # May return empty due to mapping failure, that's OK
  283. class TestTemplateEnricherArrayResponse:
  284. """Tests for array response handling."""
  285. @pytest.mark.asyncio
  286. async def test_array_response(self, mock_logger, httpx_mock):
  287. """Array responses should produce multiple outputs."""
  288. httpx_mock.add_response(
  289. json={
  290. "data": {
  291. "results": [
  292. {"ip": "8.8.8.8", "country": "US"},
  293. {"ip": "8.8.4.4", "country": "US"},
  294. ]
  295. }
  296. },
  297. )
  298. template = create_test_template(
  299. is_array=True,
  300. array_path="data.results",
  301. response_map={"address": "ip", "country": "country"},
  302. )
  303. enricher = TemplateEnricher(template=template, sketch_id="test")
  304. from flowsint_types import Ip
  305. inputs = [Ip(address="8.8.8.8")]
  306. results = await enricher.scan(inputs)
  307. assert len(results) == 2
  308. assert results[0].address == "8.8.8.8"
  309. assert results[1].address == "8.8.4.4"
  310. @pytest.mark.asyncio
  311. async def test_array_at_root(self, mock_logger, httpx_mock):
  312. """Array at root level (no array_path) should work."""
  313. httpx_mock.add_response(
  314. json=[
  315. {"ip": "8.8.8.8"},
  316. {"ip": "8.8.4.4"},
  317. ],
  318. )
  319. template = create_test_template(
  320. is_array=True,
  321. array_path=None, # Array at root
  322. response_map={"address": "ip"},
  323. )
  324. enricher = TemplateEnricher(template=template, sketch_id="test")
  325. from flowsint_types import Ip
  326. inputs = [Ip(address="1.2.3.4")] # Valid IP address
  327. results = await enricher.scan(inputs)
  328. assert len(results) == 2
  329. class TestTemplateEnricherVaultIntegration:
  330. """Tests for vault/secrets integration."""
  331. @pytest.mark.asyncio
  332. async def test_secret_in_header(self, mock_logger, httpx_mock):
  333. """Secrets should be injected into headers."""
  334. httpx_mock.add_response(json={"ip": "8.8.8.8"})
  335. template = create_test_template(
  336. secrets=[{"name": "API_KEY", "required": True}],
  337. headers={"Authorization": "Bearer {{secrets.API_KEY}}"},
  338. )
  339. vault = MockVault(secrets={"API_KEY": "secret-token-123"})
  340. enricher = TemplateEnricher(template=template, sketch_id="test", vault=vault)
  341. await enricher.async_init()
  342. from flowsint_types import Ip
  343. inputs = [Ip(address="8.8.8.8")]
  344. await enricher.scan(inputs)
  345. request = httpx_mock.get_request()
  346. assert request.headers.get("Authorization") == "Bearer secret-token-123"
  347. @pytest.mark.asyncio
  348. async def test_missing_required_secret(self, mock_logger):
  349. """Missing required secret should raise error."""
  350. template = create_test_template(
  351. secrets=[{"name": "API_KEY", "required": True}],
  352. )
  353. vault = MockVault(secrets={}) # Empty vault
  354. enricher = TemplateEnricher(template=template, sketch_id="test", vault=vault)
  355. with pytest.raises(Exception) as exc_info:
  356. await enricher.async_init()
  357. assert "API_KEY" in str(exc_info.value)
  358. @pytest.mark.asyncio
  359. async def test_optional_secret_missing(self, mock_logger, httpx_mock):
  360. """Missing optional secret should not raise error."""
  361. httpx_mock.add_response(json={"ip": "8.8.8.8"})
  362. template = create_test_template(
  363. secrets=[{"name": "OPTIONAL_KEY", "required": False}],
  364. )
  365. vault = MockVault(secrets={})
  366. enricher = TemplateEnricher(template=template, sketch_id="test", vault=vault)
  367. await enricher.async_init() # Should not raise
  368. from flowsint_types import Ip
  369. inputs = [Ip(address="8.8.8.8")]
  370. # Should work without the optional secret
  371. results = await enricher.scan(inputs)
  372. assert len(results) == 1
  373. class TestTemplateEnricherRetry:
  374. """Tests for retry logic."""
  375. @pytest.mark.asyncio
  376. @pytest.mark.httpx_mock(can_send_already_matched_responses=True)
  377. async def test_retry_on_500(self, mock_logger, httpx_mock):
  378. """Should retry on 500 errors."""
  379. # First request fails, second succeeds
  380. httpx_mock.add_response(status_code=500)
  381. httpx_mock.add_response(json={"ip": "8.8.8.8"})
  382. template = create_test_template(
  383. retry=TemplateRetryConfig(
  384. max_retries=3, backoff_factor=0.1, retry_on_status=[500]
  385. ),
  386. )
  387. enricher = TemplateEnricher(template=template, sketch_id="test")
  388. from flowsint_types import Ip
  389. inputs = [Ip(address="8.8.8.8")]
  390. results = await enricher.scan(inputs)
  391. assert len(results) == 1
  392. assert len(httpx_mock.get_requests()) == 2 # Initial + 1 retry
  393. @pytest.mark.asyncio
  394. @pytest.mark.httpx_mock(can_send_already_matched_responses=True)
  395. async def test_retry_on_429(self, mock_logger, httpx_mock):
  396. """Should retry on rate limit (429) errors."""
  397. httpx_mock.add_response(status_code=429)
  398. httpx_mock.add_response(status_code=429)
  399. httpx_mock.add_response(json={"ip": "8.8.8.8"})
  400. template = create_test_template(
  401. retry=TemplateRetryConfig(
  402. max_retries=3, backoff_factor=0.1, retry_on_status=[429]
  403. ),
  404. )
  405. enricher = TemplateEnricher(template=template, sketch_id="test")
  406. from flowsint_types import Ip
  407. inputs = [Ip(address="8.8.8.8")]
  408. results = await enricher.scan(inputs)
  409. assert len(results) == 1
  410. assert len(httpx_mock.get_requests()) == 3
  411. @pytest.mark.asyncio
  412. async def test_no_retry_on_400(self, mock_logger, httpx_mock):
  413. """Should not retry on 400 errors by default."""
  414. httpx_mock.add_response(status_code=400)
  415. template = create_test_template(
  416. retry=TemplateRetryConfig(max_retries=3, backoff_factor=0.1),
  417. )
  418. enricher = TemplateEnricher(template=template, sketch_id="test")
  419. from flowsint_types import Ip
  420. inputs = [Ip(address="8.8.8.8")]
  421. results = await enricher.scan(inputs)
  422. assert len(results) == 0 # Failed without retry
  423. assert len(httpx_mock.get_requests()) == 1 # No retries
  424. class TestTemplateEnricherErrorHandling:
  425. """Tests for error handling."""
  426. @pytest.mark.asyncio
  427. async def test_http_error_continues(self, mock_logger, httpx_mock):
  428. """HTTP errors should be logged and processing should continue."""
  429. httpx_mock.add_response(status_code=404)
  430. httpx_mock.add_response(json={"ip": "1.1.1.1"})
  431. template = create_test_template()
  432. enricher = TemplateEnricher(template=template, sketch_id="test")
  433. from flowsint_types import Ip
  434. inputs = [Ip(address="8.8.8.8"), Ip(address="1.1.1.1")]
  435. results = await enricher.scan(inputs)
  436. # First should fail, second should succeed
  437. assert len(results) == 1
  438. assert results[0].address == "1.1.1.1"
  439. @pytest.mark.asyncio
  440. async def test_invalid_json_continues(self, mock_logger, httpx_mock):
  441. """Invalid JSON should be logged and processing should continue."""
  442. httpx_mock.add_response(text="not json")
  443. httpx_mock.add_response(json={"ip": "1.1.1.1"})
  444. template = create_test_template()
  445. enricher = TemplateEnricher(template=template, sketch_id="test")
  446. from flowsint_types import Ip
  447. inputs = [Ip(address="8.8.8.8"), Ip(address="1.1.1.1")]
  448. results = await enricher.scan(inputs)
  449. assert len(results) == 1
  450. @pytest.mark.asyncio
  451. async def test_timeout_continues(self, mock_logger, httpx_mock):
  452. """Timeout should be logged and processing should continue."""
  453. def raise_timeout(request):
  454. raise httpx.TimeoutException("timeout")
  455. httpx_mock.add_callback(raise_timeout)
  456. httpx_mock.add_response(json={"ip": "1.1.1.1"})
  457. template = create_test_template(
  458. retry=TemplateRetryConfig(max_retries=0) # No retries for this test
  459. )
  460. enricher = TemplateEnricher(template=template, sketch_id="test")
  461. from flowsint_types import Ip
  462. inputs = [Ip(address="8.8.8.8"), Ip(address="1.1.1.1")]
  463. results = await enricher.scan(inputs)
  464. assert len(results) == 1
  465. class TestTemplateEnricherFromYaml:
  466. """Tests loading enrichers from YAML files."""
  467. def test_load_from_yaml(self):
  468. """Should load enricher from YAML file."""
  469. template = YamlLoader.get_template_from_file(str(TEST_DIR / "example.yaml"))
  470. enricher = TemplateEnricher(template=template, sketch_id="test")
  471. assert enricher.name() == "ip-api-lookup"
  472. def test_load_post_template(self):
  473. """Should load POST template from YAML."""
  474. template = YamlLoader.get_template_from_file(str(TEST_DIR / "example-post.yaml"))
  475. enricher = TemplateEnricher(template=template, sketch_id="test")
  476. assert enricher.request.method == "POST"
  477. def test_load_secrets_template(self):
  478. """Should load template with secrets from YAML."""
  479. template = YamlLoader.get_template_from_file(
  480. str(TEST_DIR / "example-secrets.yaml")
  481. )
  482. enricher = TemplateEnricher(template=template, sketch_id="test")
  483. assert len(enricher.params_schema) == 1
  484. def test_load_retry_template(self):
  485. """Should load template with retry config from YAML."""
  486. template = YamlLoader.get_template_from_file(
  487. str(TEST_DIR / "example-retry.yaml")
  488. )
  489. enricher = TemplateEnricher(template=template, sketch_id="test")
  490. assert enricher.template.retry.max_retries == 5
  491. def test_load_array_template(self):
  492. """Should load template with array output from YAML."""
  493. template = YamlLoader.get_template_from_file(
  494. str(TEST_DIR / "example-array.yaml")
  495. )
  496. enricher = TemplateEnricher(template=template, sketch_id="test")
  497. assert enricher.template.output.is_array is True
  498. assert enricher.template.output.array_path == "data.results"