| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641 |
- """Tests for TemplateEnricher."""
- import json
- from pathlib import Path
- from typing import Optional
- from unittest.mock import MagicMock
- import httpx
- import pytest
- from flowsint_core.core.template_enricher import (
- TemplateEnricher,
- TemplateEnricherError,
- )
- from flowsint_core.templates.loader.yaml_loader import SSRFError, YamlLoader
- from flowsint_core.templates.types import (
- Template,
- TemplateHttpRequest,
- TemplateHttpResponse,
- TemplateInput,
- TemplateOutput,
- TemplateRetryConfig,
- TemplateSecret,
- )
- TEST_DIR = Path(__file__).parent
- def create_test_template(
- name: str = "test-template",
- input_type: str = "Ip",
- input_key: str = "address",
- output_type: str = "Ip",
- url: str = "https://api.example.com/{{address}}",
- method: str = "GET",
- headers: dict = None,
- params: dict = None,
- body: Optional[str] = None,
- response_map: dict = None,
- response_expect: str = "json",
- secrets: list = None,
- retry: Optional[TemplateRetryConfig] = None,
- is_array: bool = False,
- array_path: Optional[str] = None,
- timeout: float = 30.0,
- ) -> Template:
- """Helper to create test templates."""
- return Template(
- name=name,
- category="Test",
- version=1.0,
- input=TemplateInput(type=input_type, key=input_key),
- output=TemplateOutput(type=output_type, is_array=is_array, array_path=array_path),
- request=TemplateHttpRequest(
- method=method,
- url=url,
- headers=headers or {},
- params=params or {},
- body=body,
- timeout=timeout,
- ),
- response=TemplateHttpResponse(
- expect=response_expect,
- map=response_map or {"address": "ip"},
- ),
- secrets=[TemplateSecret(**s) for s in (secrets or [])],
- retry=retry,
- )
- class MockVault:
- """Mock vault for testing secret resolution."""
- def __init__(self, secrets: dict = None):
- self._secrets = secrets or {}
- def get_secret(self, name: str) -> Optional[str]:
- return self._secrets.get(name)
- class TestTemplateEnricherInit:
- """Tests for TemplateEnricher initialization."""
- def test_init_basic(self):
- """Basic initialization with valid template."""
- template = create_test_template()
- enricher = TemplateEnricher(template=template, sketch_id="test")
- assert enricher.name() == "test-template"
- assert enricher.category() == "Test"
- assert enricher.key() == "address"
- def test_init_invalid_input_type(self):
- """Invalid input type should raise TypeError."""
- template = create_test_template(input_type="InvalidType")
- with pytest.raises(TypeError) as exc_info:
- TemplateEnricher(template=template)
- assert "not present in registry" in str(exc_info.value)
- def test_init_with_secrets(self):
- """Template with secrets should build params schema."""
- template = create_test_template(
- secrets=[{"name": "API_KEY", "required": True, "description": "Test key"}]
- )
- enricher = TemplateEnricher(template=template)
- assert len(enricher.params_schema) == 1
- assert enricher.params_schema[0]["name"] == "API_KEY"
- assert enricher.params_schema[0]["type"] == "vaultSecret"
- class TestTemplateEnricherSSRF:
- """Tests for SSRF protection in TemplateEnricher."""
- @pytest.mark.asyncio
- async def test_blocks_localhost(self, mock_logger):
- """Requests to localhost should be blocked."""
- template = create_test_template(url="http://localhost/{{address}}")
- enricher = TemplateEnricher(template=template, sketch_id="test")
- # Create mock input
- from flowsint_types import Ip
- inputs = [Ip(address="8.8.8.8")]
- results = await enricher.scan(inputs)
- assert len(results) == 0 # Should be blocked
- @pytest.mark.asyncio
- async def test_blocks_private_ip(self, mock_logger):
- """Requests to private IPs should be blocked."""
- template = create_test_template(url="http://192.168.1.1/{{address}}")
- enricher = TemplateEnricher(template=template, sketch_id="test")
- from flowsint_types import Ip
- inputs = [Ip(address="8.8.8.8")]
- results = await enricher.scan(inputs)
- assert len(results) == 0
- @pytest.mark.asyncio
- async def test_blocks_metadata_endpoint(self, mock_logger):
- """Requests to cloud metadata endpoints should be blocked."""
- # URL with metadata IP hardcoded (not from input)
- template = create_test_template(
- url="http://169.254.169.254/latest/meta-data/{{address}}"
- )
- enricher = TemplateEnricher(template=template, sketch_id="test")
- from flowsint_types import Ip
- inputs = [Ip(address="8.8.8.8")] # Valid IP, but URL is blocked
- results = await enricher.scan(inputs)
- assert len(results) == 0
- class TestTemplateEnricherRequests:
- """Tests for HTTP request handling."""
- @pytest.mark.asyncio
- async def test_get_request(self, mock_logger, httpx_mock):
- """GET request should work correctly."""
- httpx_mock.add_response(
- url="https://api.example.com/8.8.8.8",
- json={"ip": "8.8.8.8", "country": "US"},
- )
- template = create_test_template(
- url="https://api.example.com/{{address}}",
- response_map={"address": "ip", "country": "country"},
- )
- enricher = TemplateEnricher(template=template, sketch_id="test")
- from flowsint_types import Ip
- inputs = [Ip(address="8.8.8.8")]
- results = await enricher.scan(inputs)
- assert len(results) == 1
- assert results[0].address == "8.8.8.8"
- @pytest.mark.asyncio
- async def test_post_request(self, mock_logger, httpx_mock):
- """POST request with body should work correctly."""
- httpx_mock.add_response(
- url="https://api.example.com/lookup",
- method="POST",
- json={"ip": "8.8.8.8", "country": "US"},
- )
- template = create_test_template(
- url="https://api.example.com/lookup",
- method="POST",
- body='{"ip": "{{address}}"}',
- response_map={"address": "ip"},
- )
- enricher = TemplateEnricher(template=template, sketch_id="test")
- from flowsint_types import Ip
- inputs = [Ip(address="8.8.8.8")]
- results = await enricher.scan(inputs)
- assert len(results) == 1
- # Verify the request was made with POST
- request = httpx_mock.get_request()
- assert request.method == "POST"
- @pytest.mark.asyncio
- async def test_request_with_headers(self, mock_logger, httpx_mock):
- """Request headers should be rendered and sent."""
- httpx_mock.add_response(
- url="https://api.example.com/8.8.8.8",
- json={"ip": "8.8.8.8"},
- )
- template = create_test_template(
- url="https://api.example.com/{{address}}",
- headers={"X-Custom-Header": "test-value"},
- )
- enricher = TemplateEnricher(template=template, sketch_id="test")
- from flowsint_types import Ip
- inputs = [Ip(address="8.8.8.8")]
- await enricher.scan(inputs)
- request = httpx_mock.get_request()
- assert request.headers.get("X-Custom-Header") == "test-value"
- @pytest.mark.asyncio
- async def test_request_with_params(self, mock_logger, httpx_mock):
- """Request params should be rendered and sent."""
- httpx_mock.add_response(
- json={"ip": "8.8.8.8"},
- )
- template = create_test_template(
- url="https://api.example.com/lookup",
- params={"ip": "{{address}}", "format": "json"},
- )
- enricher = TemplateEnricher(template=template, sketch_id="test")
- from flowsint_types import Ip
- inputs = [Ip(address="8.8.8.8")]
- await enricher.scan(inputs)
- request = httpx_mock.get_request()
- assert "ip=8.8.8.8" in str(request.url)
- assert "format=json" in str(request.url)
- class TestTemplateEnricherResponseParsing:
- """Tests for response parsing."""
- @pytest.mark.asyncio
- async def test_json_response(self, mock_logger, httpx_mock):
- """JSON response should be parsed correctly."""
- httpx_mock.add_response(
- json={"ip": "8.8.8.8", "country": "US"},
- )
- template = create_test_template(
- response_expect="json",
- response_map={"address": "ip"},
- )
- enricher = TemplateEnricher(template=template, sketch_id="test")
- from flowsint_types import Ip
- inputs = [Ip(address="8.8.8.8")]
- results = await enricher.scan(inputs)
- assert len(results) == 1
- assert results[0].address == "8.8.8.8"
- @pytest.mark.asyncio
- async def test_nested_json_response(self, mock_logger, httpx_mock):
- """Nested JSON paths should work with dot notation."""
- httpx_mock.add_response(
- json={
- "data": {
- "ip": "8.8.8.8",
- "location": {"country": "US", "city": "Mountain View"},
- }
- },
- )
- template = create_test_template(
- response_map={
- "address": "data.ip",
- "country": "data.location.country",
- "city": "data.location.city",
- },
- )
- enricher = TemplateEnricher(template=template, sketch_id="test")
- from flowsint_types import Ip
- inputs = [Ip(address="8.8.8.8")]
- results = await enricher.scan(inputs)
- assert len(results) == 1
- assert results[0].address == "8.8.8.8"
- assert results[0].country == "US"
- assert results[0].city == "Mountain View"
- @pytest.mark.asyncio
- async def test_xml_response(self, mock_logger, httpx_mock):
- """XML response should be parsed correctly."""
- xml_response = """<?xml version="1.0"?>
- <response>
- <ip>8.8.8.8</ip>
- <country>US</country>
- </response>
- """
- httpx_mock.add_response(
- text=xml_response,
- headers={"Content-Type": "application/xml"},
- )
- template = create_test_template(
- response_expect="xml",
- response_map={"address": "ip", "country": "country"},
- )
- enricher = TemplateEnricher(template=template, sketch_id="test")
- from flowsint_types import Ip
- inputs = [Ip(address="8.8.8.8")]
- results = await enricher.scan(inputs)
- assert len(results) == 1
- assert results[0].address == "8.8.8.8"
- assert results[0].country == "US"
- @pytest.mark.asyncio
- async def test_text_response(self, mock_logger, httpx_mock):
- """Text response should be returned as-is."""
- httpx_mock.add_response(text="8.8.8.8")
- template = create_test_template(
- response_expect="text",
- response_map={}, # No mapping for text
- )
- enricher = TemplateEnricher(template=template, sketch_id="test")
- from flowsint_types import Ip
- inputs = [Ip(address="8.8.8.8")]
- # Text response won't map well, but shouldn't crash
- results = await enricher.scan(inputs)
- # May return empty due to mapping failure, that's OK
- class TestTemplateEnricherArrayResponse:
- """Tests for array response handling."""
- @pytest.mark.asyncio
- async def test_array_response(self, mock_logger, httpx_mock):
- """Array responses should produce multiple outputs."""
- httpx_mock.add_response(
- json={
- "data": {
- "results": [
- {"ip": "8.8.8.8", "country": "US"},
- {"ip": "8.8.4.4", "country": "US"},
- ]
- }
- },
- )
- template = create_test_template(
- is_array=True,
- array_path="data.results",
- response_map={"address": "ip", "country": "country"},
- )
- enricher = TemplateEnricher(template=template, sketch_id="test")
- from flowsint_types import Ip
- inputs = [Ip(address="8.8.8.8")]
- results = await enricher.scan(inputs)
- assert len(results) == 2
- assert results[0].address == "8.8.8.8"
- assert results[1].address == "8.8.4.4"
- @pytest.mark.asyncio
- async def test_array_at_root(self, mock_logger, httpx_mock):
- """Array at root level (no array_path) should work."""
- httpx_mock.add_response(
- json=[
- {"ip": "8.8.8.8"},
- {"ip": "8.8.4.4"},
- ],
- )
- template = create_test_template(
- is_array=True,
- array_path=None, # Array at root
- response_map={"address": "ip"},
- )
- enricher = TemplateEnricher(template=template, sketch_id="test")
- from flowsint_types import Ip
- inputs = [Ip(address="1.2.3.4")] # Valid IP address
- results = await enricher.scan(inputs)
- assert len(results) == 2
- class TestTemplateEnricherVaultIntegration:
- """Tests for vault/secrets integration."""
- @pytest.mark.asyncio
- async def test_secret_in_header(self, mock_logger, httpx_mock):
- """Secrets should be injected into headers."""
- httpx_mock.add_response(json={"ip": "8.8.8.8"})
- template = create_test_template(
- secrets=[{"name": "API_KEY", "required": True}],
- headers={"Authorization": "Bearer {{secrets.API_KEY}}"},
- )
- vault = MockVault(secrets={"API_KEY": "secret-token-123"})
- enricher = TemplateEnricher(template=template, sketch_id="test", vault=vault)
- await enricher.async_init()
- from flowsint_types import Ip
- inputs = [Ip(address="8.8.8.8")]
- await enricher.scan(inputs)
- request = httpx_mock.get_request()
- assert request.headers.get("Authorization") == "Bearer secret-token-123"
- @pytest.mark.asyncio
- async def test_missing_required_secret(self, mock_logger):
- """Missing required secret should raise error."""
- template = create_test_template(
- secrets=[{"name": "API_KEY", "required": True}],
- )
- vault = MockVault(secrets={}) # Empty vault
- enricher = TemplateEnricher(template=template, sketch_id="test", vault=vault)
- with pytest.raises(Exception) as exc_info:
- await enricher.async_init()
- assert "API_KEY" in str(exc_info.value)
- @pytest.mark.asyncio
- async def test_optional_secret_missing(self, mock_logger, httpx_mock):
- """Missing optional secret should not raise error."""
- httpx_mock.add_response(json={"ip": "8.8.8.8"})
- template = create_test_template(
- secrets=[{"name": "OPTIONAL_KEY", "required": False}],
- )
- vault = MockVault(secrets={})
- enricher = TemplateEnricher(template=template, sketch_id="test", vault=vault)
- await enricher.async_init() # Should not raise
- from flowsint_types import Ip
- inputs = [Ip(address="8.8.8.8")]
- # Should work without the optional secret
- results = await enricher.scan(inputs)
- assert len(results) == 1
- class TestTemplateEnricherRetry:
- """Tests for retry logic."""
- @pytest.mark.asyncio
- @pytest.mark.httpx_mock(can_send_already_matched_responses=True)
- async def test_retry_on_500(self, mock_logger, httpx_mock):
- """Should retry on 500 errors."""
- # First request fails, second succeeds
- httpx_mock.add_response(status_code=500)
- httpx_mock.add_response(json={"ip": "8.8.8.8"})
- template = create_test_template(
- retry=TemplateRetryConfig(
- max_retries=3, backoff_factor=0.1, retry_on_status=[500]
- ),
- )
- enricher = TemplateEnricher(template=template, sketch_id="test")
- from flowsint_types import Ip
- inputs = [Ip(address="8.8.8.8")]
- results = await enricher.scan(inputs)
- assert len(results) == 1
- assert len(httpx_mock.get_requests()) == 2 # Initial + 1 retry
- @pytest.mark.asyncio
- @pytest.mark.httpx_mock(can_send_already_matched_responses=True)
- async def test_retry_on_429(self, mock_logger, httpx_mock):
- """Should retry on rate limit (429) errors."""
- httpx_mock.add_response(status_code=429)
- httpx_mock.add_response(status_code=429)
- httpx_mock.add_response(json={"ip": "8.8.8.8"})
- template = create_test_template(
- retry=TemplateRetryConfig(
- max_retries=3, backoff_factor=0.1, retry_on_status=[429]
- ),
- )
- enricher = TemplateEnricher(template=template, sketch_id="test")
- from flowsint_types import Ip
- inputs = [Ip(address="8.8.8.8")]
- results = await enricher.scan(inputs)
- assert len(results) == 1
- assert len(httpx_mock.get_requests()) == 3
- @pytest.mark.asyncio
- async def test_no_retry_on_400(self, mock_logger, httpx_mock):
- """Should not retry on 400 errors by default."""
- httpx_mock.add_response(status_code=400)
- template = create_test_template(
- retry=TemplateRetryConfig(max_retries=3, backoff_factor=0.1),
- )
- enricher = TemplateEnricher(template=template, sketch_id="test")
- from flowsint_types import Ip
- inputs = [Ip(address="8.8.8.8")]
- results = await enricher.scan(inputs)
- assert len(results) == 0 # Failed without retry
- assert len(httpx_mock.get_requests()) == 1 # No retries
- class TestTemplateEnricherErrorHandling:
- """Tests for error handling."""
- @pytest.mark.asyncio
- async def test_http_error_continues(self, mock_logger, httpx_mock):
- """HTTP errors should be logged and processing should continue."""
- httpx_mock.add_response(status_code=404)
- httpx_mock.add_response(json={"ip": "1.1.1.1"})
- template = create_test_template()
- enricher = TemplateEnricher(template=template, sketch_id="test")
- from flowsint_types import Ip
- inputs = [Ip(address="8.8.8.8"), Ip(address="1.1.1.1")]
- results = await enricher.scan(inputs)
- # First should fail, second should succeed
- assert len(results) == 1
- assert results[0].address == "1.1.1.1"
- @pytest.mark.asyncio
- async def test_invalid_json_continues(self, mock_logger, httpx_mock):
- """Invalid JSON should be logged and processing should continue."""
- httpx_mock.add_response(text="not json")
- httpx_mock.add_response(json={"ip": "1.1.1.1"})
- template = create_test_template()
- enricher = TemplateEnricher(template=template, sketch_id="test")
- from flowsint_types import Ip
- inputs = [Ip(address="8.8.8.8"), Ip(address="1.1.1.1")]
- results = await enricher.scan(inputs)
- assert len(results) == 1
- @pytest.mark.asyncio
- async def test_timeout_continues(self, mock_logger, httpx_mock):
- """Timeout should be logged and processing should continue."""
- def raise_timeout(request):
- raise httpx.TimeoutException("timeout")
- httpx_mock.add_callback(raise_timeout)
- httpx_mock.add_response(json={"ip": "1.1.1.1"})
- template = create_test_template(
- retry=TemplateRetryConfig(max_retries=0) # No retries for this test
- )
- enricher = TemplateEnricher(template=template, sketch_id="test")
- from flowsint_types import Ip
- inputs = [Ip(address="8.8.8.8"), Ip(address="1.1.1.1")]
- results = await enricher.scan(inputs)
- assert len(results) == 1
- class TestTemplateEnricherFromYaml:
- """Tests loading enrichers from YAML files."""
- def test_load_from_yaml(self):
- """Should load enricher from YAML file."""
- template = YamlLoader.get_template_from_file(str(TEST_DIR / "example.yaml"))
- enricher = TemplateEnricher(template=template, sketch_id="test")
- assert enricher.name() == "ip-api-lookup"
- def test_load_post_template(self):
- """Should load POST template from YAML."""
- template = YamlLoader.get_template_from_file(str(TEST_DIR / "example-post.yaml"))
- enricher = TemplateEnricher(template=template, sketch_id="test")
- assert enricher.request.method == "POST"
- def test_load_secrets_template(self):
- """Should load template with secrets from YAML."""
- template = YamlLoader.get_template_from_file(
- str(TEST_DIR / "example-secrets.yaml")
- )
- enricher = TemplateEnricher(template=template, sketch_id="test")
- assert len(enricher.params_schema) == 1
- def test_load_retry_template(self):
- """Should load template with retry config from YAML."""
- template = YamlLoader.get_template_from_file(
- str(TEST_DIR / "example-retry.yaml")
- )
- enricher = TemplateEnricher(template=template, sketch_id="test")
- assert enricher.template.retry.max_retries == 5
- def test_load_array_template(self):
- """Should load template with array output from YAML."""
- template = YamlLoader.get_template_from_file(
- str(TEST_DIR / "example-array.yaml")
- )
- enricher = TemplateEnricher(template=template, sketch_id="test")
- assert enricher.template.output.is_array is True
- assert enricher.template.output.array_path == "data.results"
|