test_loader.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. """Tests for YamlLoader and related utilities."""
  2. import json
  3. from pathlib import Path
  4. import pytest
  5. from flowsint_core.templates.loader.yaml_loader import (
  6. BLOCKED_HOSTNAMES,
  7. SSRFError,
  8. TemplateRenderError,
  9. YamlLoader,
  10. is_ip_blocked,
  11. sanitize_url_component,
  12. validate_url_safe,
  13. )
  14. from flowsint_core.templates.types import Template
  15. TEST_DIR = Path(__file__).parent
  16. class TestYamlLoader:
  17. """Tests for YAML loading and template parsing."""
  18. def test_yaml_loader_valid_template(self):
  19. """Load a valid template and verify its properties."""
  20. file = YamlLoader.get_template_from_file(str(TEST_DIR / "example.yaml"))
  21. assert isinstance(file, Template)
  22. assert file.name == "ip-api-lookup"
  23. assert file.category == "Ip"
  24. assert file.request.params == {"fields": "query,status,country,city,lat,lon,isp"}
  25. assert file.request.method == "GET"
  26. def test_yaml_loader_invalid_method(self):
  27. """Invalid HTTP method should raise ValueError."""
  28. with pytest.raises(ValueError) as exc_info:
  29. YamlLoader.get_template_from_file(str(TEST_DIR / "example-invalid.yaml"))
  30. assert "not present in" in str(exc_info.value).lower()
  31. def test_yaml_loader_post_method(self):
  32. """POST method should be allowed."""
  33. file = YamlLoader.get_template_from_file(str(TEST_DIR / "example-post.yaml"))
  34. assert file.request.method == "POST"
  35. def test_yaml_loader_with_secrets(self):
  36. """Template with secrets should parse correctly."""
  37. file = YamlLoader.get_template_from_file(str(TEST_DIR / "example-secrets.yaml"))
  38. assert len(file.secrets) == 1
  39. assert file.secrets[0].name == "API_KEY"
  40. assert file.secrets[0].required is True
  41. def test_yaml_loader_with_retry_config(self):
  42. """Template with retry config should parse correctly."""
  43. file = YamlLoader.get_template_from_file(str(TEST_DIR / "example-retry.yaml"))
  44. assert file.retry is not None
  45. assert file.retry.max_retries == 5
  46. assert file.retry.backoff_factor == 1.0
  47. def test_yaml_loader_array_output(self):
  48. """Template with array output should parse correctly."""
  49. file = YamlLoader.get_template_from_file(str(TEST_DIR / "example-array.yaml"))
  50. assert file.output.is_array is True
  51. assert file.output.array_path == "data.results"
  52. class TestRenderTemplate:
  53. """Tests for template string rendering."""
  54. def test_render_simple(self):
  55. """Simple variable substitution."""
  56. url_template = "http://ip-api.com/json/{{address}}"
  57. url = YamlLoader.render_template(url_template, {"address": "8.8.8.8"})
  58. assert url == "http://ip-api.com/json/8.8.8.8"
  59. def test_render_multiple_variables(self):
  60. """Multiple variables in one template."""
  61. url_template = "http://api.example.com?ip={{address}}&domain={{domain}}"
  62. url = YamlLoader.render_template(
  63. url_template, {"address": "8.8.8.8", "domain": "example.com"}
  64. )
  65. assert url == "http://api.example.com?ip=8.8.8.8&domain=example.com"
  66. def test_render_with_spaces(self):
  67. """Variables with spaces around them."""
  68. url_template = "http://api.example.com/{{ username }}"
  69. url = YamlLoader.render_template(url_template, {"username": "testuser"})
  70. assert url == "http://api.example.com/testuser"
  71. def test_render_missing_variable(self):
  72. """Missing variable should raise TemplateRenderError."""
  73. url_template = "http://api.example.com/{{username}}"
  74. with pytest.raises(TemplateRenderError) as exc_info:
  75. YamlLoader.render_template(url_template, {})
  76. assert "Missing template variable: username" in str(exc_info.value)
  77. def test_render_sanitizes_special_chars(self):
  78. """Special characters should be URL-encoded by default."""
  79. url_template = "http://api.example.com/{{query}}"
  80. url = YamlLoader.render_template(url_template, {"query": "foo bar&baz=1"})
  81. assert "foo%20bar%26baz%3D1" in url
  82. def test_render_no_sanitize(self):
  83. """Sanitization can be disabled."""
  84. url_template = "http://api.example.com/{{query}}"
  85. url = YamlLoader.render_template(
  86. url_template, {"query": "foo bar"}, sanitize=False
  87. )
  88. assert url == "http://api.example.com/foo bar"
  89. def test_render_secrets_variable(self):
  90. """secrets.NAME variables should work."""
  91. template = "Bearer {{secrets.API_KEY}}"
  92. result = YamlLoader.render_template(
  93. template, {"secrets.API_KEY": "secret123"}, sanitize=False
  94. )
  95. assert result == "Bearer secret123"
  96. class TestRenderDict:
  97. """Tests for recursive dictionary rendering."""
  98. def test_render_dict_simple(self):
  99. """Simple dict with string values."""
  100. data = {"key": "{{value}}"}
  101. result = YamlLoader.render_dict(data, {"value": "test"})
  102. assert result == {"key": "test"}
  103. def test_render_dict_nested(self):
  104. """Nested dict rendering."""
  105. data = {"outer": {"inner": "{{value}}"}}
  106. result = YamlLoader.render_dict(data, {"value": "test"})
  107. assert result == {"outer": {"inner": "test"}}
  108. def test_render_dict_with_list(self):
  109. """Dict with list values."""
  110. data = {"items": ["{{a}}", "{{b}}"]}
  111. result = YamlLoader.render_dict(data, {"a": "1", "b": "2"})
  112. assert result == {"items": ["1", "2"]}
  113. def test_render_dict_preserves_non_string(self):
  114. """Non-string values should be preserved."""
  115. data = {"count": 42, "active": True, "name": "{{name}}"}
  116. result = YamlLoader.render_dict(data, {"name": "test"})
  117. assert result == {"count": 42, "active": True, "name": "test"}
  118. class TestExtractNestedValue:
  119. """Tests for dot-notation value extraction."""
  120. def test_extract_simple(self):
  121. """Simple key extraction."""
  122. data = {"name": "John"}
  123. assert YamlLoader.extract_nested_value(data, "name") == "John"
  124. def test_extract_nested(self):
  125. """Nested key extraction."""
  126. data = {"user": {"name": "John", "address": {"city": "NYC"}}}
  127. assert YamlLoader.extract_nested_value(data, "user.name") == "John"
  128. assert YamlLoader.extract_nested_value(data, "user.address.city") == "NYC"
  129. def test_extract_array_index(self):
  130. """Array index extraction."""
  131. data = {"items": ["a", "b", "c"]}
  132. assert YamlLoader.extract_nested_value(data, "items.0") == "a"
  133. assert YamlLoader.extract_nested_value(data, "items.2") == "c"
  134. def test_extract_array_of_objects(self):
  135. """Extract from array of objects."""
  136. data = {"users": [{"name": "Alice"}, {"name": "Bob"}]}
  137. assert YamlLoader.extract_nested_value(data, "users.0.name") == "Alice"
  138. assert YamlLoader.extract_nested_value(data, "users.1.name") == "Bob"
  139. def test_extract_missing_key(self):
  140. """Missing key returns None."""
  141. data = {"name": "John"}
  142. assert YamlLoader.extract_nested_value(data, "missing") is None
  143. assert YamlLoader.extract_nested_value(data, "name.missing") is None
  144. def test_extract_empty_path(self):
  145. """Empty path returns the data itself."""
  146. data = {"name": "John"}
  147. assert YamlLoader.extract_nested_value(data, "") == data
  148. def test_extract_out_of_bounds(self):
  149. """Out of bounds array index returns None."""
  150. data = {"items": ["a", "b"]}
  151. assert YamlLoader.extract_nested_value(data, "items.5") is None
  152. class TestSSRFProtection:
  153. """Tests for SSRF protection utilities."""
  154. def test_is_ip_blocked_loopback(self):
  155. """Loopback addresses should be blocked."""
  156. assert is_ip_blocked("127.0.0.1") is True
  157. assert is_ip_blocked("127.0.0.2") is True
  158. assert is_ip_blocked("127.255.255.255") is True
  159. def test_is_ip_blocked_private_ranges(self):
  160. """Private IP ranges should be blocked."""
  161. assert is_ip_blocked("10.0.0.1") is True
  162. assert is_ip_blocked("10.255.255.255") is True
  163. assert is_ip_blocked("172.16.0.1") is True
  164. assert is_ip_blocked("172.31.255.255") is True
  165. assert is_ip_blocked("192.168.0.1") is True
  166. assert is_ip_blocked("192.168.255.255") is True
  167. def test_is_ip_blocked_link_local(self):
  168. """Link-local (metadata) addresses should be blocked."""
  169. assert is_ip_blocked("169.254.169.254") is True
  170. assert is_ip_blocked("169.254.0.1") is True
  171. def test_is_ip_blocked_public(self):
  172. """Public IPs should not be blocked."""
  173. assert is_ip_blocked("8.8.8.8") is False
  174. assert is_ip_blocked("1.1.1.1") is False
  175. assert is_ip_blocked("93.184.216.34") is False
  176. def test_is_ip_blocked_invalid(self):
  177. """Invalid IP strings return False (not blocked)."""
  178. assert is_ip_blocked("not-an-ip") is False
  179. assert is_ip_blocked("") is False
  180. def test_validate_url_safe_public(self):
  181. """Public URLs should pass validation."""
  182. validate_url_safe("https://api.example.com/endpoint")
  183. validate_url_safe("http://8.8.8.8/test")
  184. def test_validate_url_safe_localhost(self):
  185. """Localhost should be blocked."""
  186. with pytest.raises(SSRFError) as exc_info:
  187. validate_url_safe("http://localhost/admin")
  188. assert "Blocked hostname" in str(exc_info.value)
  189. def test_validate_url_safe_private_ip(self):
  190. """Private IPs should be blocked."""
  191. with pytest.raises(SSRFError):
  192. validate_url_safe("http://192.168.1.1/admin")
  193. with pytest.raises(SSRFError):
  194. validate_url_safe("http://10.0.0.1/internal")
  195. with pytest.raises(SSRFError):
  196. validate_url_safe("http://172.16.0.1/secret")
  197. def test_validate_url_safe_metadata(self):
  198. """Cloud metadata endpoints should be blocked."""
  199. with pytest.raises(SSRFError):
  200. validate_url_safe("http://169.254.169.254/latest/meta-data/")
  201. with pytest.raises(SSRFError):
  202. validate_url_safe("http://metadata.google.internal/")
  203. def test_validate_url_safe_file_scheme(self):
  204. """File scheme should be blocked."""
  205. with pytest.raises(SSRFError):
  206. validate_url_safe("file:///etc/passwd")
  207. # Either "no hostname" or "Blocked URL scheme" is acceptable
  208. def test_validate_url_safe_no_hostname(self):
  209. """URL without hostname should be blocked."""
  210. with pytest.raises(SSRFError) as exc_info:
  211. validate_url_safe("/just/a/path")
  212. assert "no hostname" in str(exc_info.value)
  213. class TestSanitizeUrlComponent:
  214. """Tests for URL component sanitization."""
  215. def test_sanitize_alphanumeric(self):
  216. """Alphanumeric strings pass through."""
  217. assert sanitize_url_component("hello123") == "hello123"
  218. def test_sanitize_spaces(self):
  219. """Spaces are encoded."""
  220. assert sanitize_url_component("hello world") == "hello%20world"
  221. def test_sanitize_special_chars(self):
  222. """Special characters are encoded."""
  223. result = sanitize_url_component("a&b=c?d#e")
  224. assert "&" not in result
  225. assert "=" not in result
  226. assert "?" not in result
  227. assert "#" not in result
  228. def test_sanitize_safe_chars(self):
  229. """Safe characters are preserved."""
  230. assert sanitize_url_component("a-b_c.d~e") == "a-b_c.d~e"
  231. def test_sanitize_path_traversal(self):
  232. """Path traversal attempts are neutralized."""
  233. result = sanitize_url_component("../../../etc/passwd")
  234. assert ".." not in result or "%2F" in result # Either .. is encoded or / is