| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290 |
- """Tests for YamlLoader and related utilities."""
- import json
- from pathlib import Path
- import pytest
- from flowsint_core.templates.loader.yaml_loader import (
- BLOCKED_HOSTNAMES,
- SSRFError,
- TemplateRenderError,
- YamlLoader,
- is_ip_blocked,
- sanitize_url_component,
- validate_url_safe,
- )
- from flowsint_core.templates.types import Template
- TEST_DIR = Path(__file__).parent
- class TestYamlLoader:
- """Tests for YAML loading and template parsing."""
- def test_yaml_loader_valid_template(self):
- """Load a valid template and verify its properties."""
- file = YamlLoader.get_template_from_file(str(TEST_DIR / "example.yaml"))
- assert isinstance(file, Template)
- assert file.name == "ip-api-lookup"
- assert file.category == "Ip"
- assert file.request.params == {"fields": "query,status,country,city,lat,lon,isp"}
- assert file.request.method == "GET"
- def test_yaml_loader_invalid_method(self):
- """Invalid HTTP method should raise ValueError."""
- with pytest.raises(ValueError) as exc_info:
- YamlLoader.get_template_from_file(str(TEST_DIR / "example-invalid.yaml"))
- assert "not present in" in str(exc_info.value).lower()
- def test_yaml_loader_post_method(self):
- """POST method should be allowed."""
- file = YamlLoader.get_template_from_file(str(TEST_DIR / "example-post.yaml"))
- assert file.request.method == "POST"
- def test_yaml_loader_with_secrets(self):
- """Template with secrets should parse correctly."""
- file = YamlLoader.get_template_from_file(str(TEST_DIR / "example-secrets.yaml"))
- assert len(file.secrets) == 1
- assert file.secrets[0].name == "API_KEY"
- assert file.secrets[0].required is True
- def test_yaml_loader_with_retry_config(self):
- """Template with retry config should parse correctly."""
- file = YamlLoader.get_template_from_file(str(TEST_DIR / "example-retry.yaml"))
- assert file.retry is not None
- assert file.retry.max_retries == 5
- assert file.retry.backoff_factor == 1.0
- def test_yaml_loader_array_output(self):
- """Template with array output should parse correctly."""
- file = YamlLoader.get_template_from_file(str(TEST_DIR / "example-array.yaml"))
- assert file.output.is_array is True
- assert file.output.array_path == "data.results"
- class TestRenderTemplate:
- """Tests for template string rendering."""
- def test_render_simple(self):
- """Simple variable substitution."""
- url_template = "http://ip-api.com/json/{{address}}"
- url = YamlLoader.render_template(url_template, {"address": "8.8.8.8"})
- assert url == "http://ip-api.com/json/8.8.8.8"
- def test_render_multiple_variables(self):
- """Multiple variables in one template."""
- url_template = "http://api.example.com?ip={{address}}&domain={{domain}}"
- url = YamlLoader.render_template(
- url_template, {"address": "8.8.8.8", "domain": "example.com"}
- )
- assert url == "http://api.example.com?ip=8.8.8.8&domain=example.com"
- def test_render_with_spaces(self):
- """Variables with spaces around them."""
- url_template = "http://api.example.com/{{ username }}"
- url = YamlLoader.render_template(url_template, {"username": "testuser"})
- assert url == "http://api.example.com/testuser"
- def test_render_missing_variable(self):
- """Missing variable should raise TemplateRenderError."""
- url_template = "http://api.example.com/{{username}}"
- with pytest.raises(TemplateRenderError) as exc_info:
- YamlLoader.render_template(url_template, {})
- assert "Missing template variable: username" in str(exc_info.value)
- def test_render_sanitizes_special_chars(self):
- """Special characters should be URL-encoded by default."""
- url_template = "http://api.example.com/{{query}}"
- url = YamlLoader.render_template(url_template, {"query": "foo bar&baz=1"})
- assert "foo%20bar%26baz%3D1" in url
- def test_render_no_sanitize(self):
- """Sanitization can be disabled."""
- url_template = "http://api.example.com/{{query}}"
- url = YamlLoader.render_template(
- url_template, {"query": "foo bar"}, sanitize=False
- )
- assert url == "http://api.example.com/foo bar"
- def test_render_secrets_variable(self):
- """secrets.NAME variables should work."""
- template = "Bearer {{secrets.API_KEY}}"
- result = YamlLoader.render_template(
- template, {"secrets.API_KEY": "secret123"}, sanitize=False
- )
- assert result == "Bearer secret123"
- class TestRenderDict:
- """Tests for recursive dictionary rendering."""
- def test_render_dict_simple(self):
- """Simple dict with string values."""
- data = {"key": "{{value}}"}
- result = YamlLoader.render_dict(data, {"value": "test"})
- assert result == {"key": "test"}
- def test_render_dict_nested(self):
- """Nested dict rendering."""
- data = {"outer": {"inner": "{{value}}"}}
- result = YamlLoader.render_dict(data, {"value": "test"})
- assert result == {"outer": {"inner": "test"}}
- def test_render_dict_with_list(self):
- """Dict with list values."""
- data = {"items": ["{{a}}", "{{b}}"]}
- result = YamlLoader.render_dict(data, {"a": "1", "b": "2"})
- assert result == {"items": ["1", "2"]}
- def test_render_dict_preserves_non_string(self):
- """Non-string values should be preserved."""
- data = {"count": 42, "active": True, "name": "{{name}}"}
- result = YamlLoader.render_dict(data, {"name": "test"})
- assert result == {"count": 42, "active": True, "name": "test"}
- class TestExtractNestedValue:
- """Tests for dot-notation value extraction."""
- def test_extract_simple(self):
- """Simple key extraction."""
- data = {"name": "John"}
- assert YamlLoader.extract_nested_value(data, "name") == "John"
- def test_extract_nested(self):
- """Nested key extraction."""
- data = {"user": {"name": "John", "address": {"city": "NYC"}}}
- assert YamlLoader.extract_nested_value(data, "user.name") == "John"
- assert YamlLoader.extract_nested_value(data, "user.address.city") == "NYC"
- def test_extract_array_index(self):
- """Array index extraction."""
- data = {"items": ["a", "b", "c"]}
- assert YamlLoader.extract_nested_value(data, "items.0") == "a"
- assert YamlLoader.extract_nested_value(data, "items.2") == "c"
- def test_extract_array_of_objects(self):
- """Extract from array of objects."""
- data = {"users": [{"name": "Alice"}, {"name": "Bob"}]}
- assert YamlLoader.extract_nested_value(data, "users.0.name") == "Alice"
- assert YamlLoader.extract_nested_value(data, "users.1.name") == "Bob"
- def test_extract_missing_key(self):
- """Missing key returns None."""
- data = {"name": "John"}
- assert YamlLoader.extract_nested_value(data, "missing") is None
- assert YamlLoader.extract_nested_value(data, "name.missing") is None
- def test_extract_empty_path(self):
- """Empty path returns the data itself."""
- data = {"name": "John"}
- assert YamlLoader.extract_nested_value(data, "") == data
- def test_extract_out_of_bounds(self):
- """Out of bounds array index returns None."""
- data = {"items": ["a", "b"]}
- assert YamlLoader.extract_nested_value(data, "items.5") is None
- class TestSSRFProtection:
- """Tests for SSRF protection utilities."""
- def test_is_ip_blocked_loopback(self):
- """Loopback addresses should be blocked."""
- assert is_ip_blocked("127.0.0.1") is True
- assert is_ip_blocked("127.0.0.2") is True
- assert is_ip_blocked("127.255.255.255") is True
- def test_is_ip_blocked_private_ranges(self):
- """Private IP ranges should be blocked."""
- assert is_ip_blocked("10.0.0.1") is True
- assert is_ip_blocked("10.255.255.255") is True
- assert is_ip_blocked("172.16.0.1") is True
- assert is_ip_blocked("172.31.255.255") is True
- assert is_ip_blocked("192.168.0.1") is True
- assert is_ip_blocked("192.168.255.255") is True
- def test_is_ip_blocked_link_local(self):
- """Link-local (metadata) addresses should be blocked."""
- assert is_ip_blocked("169.254.169.254") is True
- assert is_ip_blocked("169.254.0.1") is True
- def test_is_ip_blocked_public(self):
- """Public IPs should not be blocked."""
- assert is_ip_blocked("8.8.8.8") is False
- assert is_ip_blocked("1.1.1.1") is False
- assert is_ip_blocked("93.184.216.34") is False
- def test_is_ip_blocked_invalid(self):
- """Invalid IP strings return False (not blocked)."""
- assert is_ip_blocked("not-an-ip") is False
- assert is_ip_blocked("") is False
- def test_validate_url_safe_public(self):
- """Public URLs should pass validation."""
- validate_url_safe("https://api.example.com/endpoint")
- validate_url_safe("http://8.8.8.8/test")
- def test_validate_url_safe_localhost(self):
- """Localhost should be blocked."""
- with pytest.raises(SSRFError) as exc_info:
- validate_url_safe("http://localhost/admin")
- assert "Blocked hostname" in str(exc_info.value)
- def test_validate_url_safe_private_ip(self):
- """Private IPs should be blocked."""
- with pytest.raises(SSRFError):
- validate_url_safe("http://192.168.1.1/admin")
- with pytest.raises(SSRFError):
- validate_url_safe("http://10.0.0.1/internal")
- with pytest.raises(SSRFError):
- validate_url_safe("http://172.16.0.1/secret")
- def test_validate_url_safe_metadata(self):
- """Cloud metadata endpoints should be blocked."""
- with pytest.raises(SSRFError):
- validate_url_safe("http://169.254.169.254/latest/meta-data/")
- with pytest.raises(SSRFError):
- validate_url_safe("http://metadata.google.internal/")
- def test_validate_url_safe_file_scheme(self):
- """File scheme should be blocked."""
- with pytest.raises(SSRFError):
- validate_url_safe("file:///etc/passwd")
- # Either "no hostname" or "Blocked URL scheme" is acceptable
- def test_validate_url_safe_no_hostname(self):
- """URL without hostname should be blocked."""
- with pytest.raises(SSRFError) as exc_info:
- validate_url_safe("/just/a/path")
- assert "no hostname" in str(exc_info.value)
- class TestSanitizeUrlComponent:
- """Tests for URL component sanitization."""
- def test_sanitize_alphanumeric(self):
- """Alphanumeric strings pass through."""
- assert sanitize_url_component("hello123") == "hello123"
- def test_sanitize_spaces(self):
- """Spaces are encoded."""
- assert sanitize_url_component("hello world") == "hello%20world"
- def test_sanitize_special_chars(self):
- """Special characters are encoded."""
- result = sanitize_url_component("a&b=c?d#e")
- assert "&" not in result
- assert "=" not in result
- assert "?" not in result
- assert "#" not in result
- def test_sanitize_safe_chars(self):
- """Safe characters are preserved."""
- assert sanitize_url_component("a-b_c.d~e") == "a-b_c.d~e"
- def test_sanitize_path_traversal(self):
- """Path traversal attempts are neutralized."""
- result = sanitize_url_component("../../../etc/passwd")
- assert ".." not in result or "%2F" in result # Either .. is encoded or / is
|