base.py 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. from typing import List
  2. from flowsint_core.core.enricher_base import Enricher
  3. from flowsint_types import Phone
  4. from flowsint_types.domain import Domain
  5. from flowsint_types.ip import Ip
  6. class ResolveEnricher(Enricher):
  7. InputType = List[Domain]
  8. OutputType = List[Ip]
  9. @classmethod
  10. def name(cls) -> str:
  11. return "domain_to_ip"
  12. @classmethod
  13. def category(cls) -> str:
  14. return "Domain"
  15. @classmethod
  16. def key(cls) -> str:
  17. return "domain"
  18. async def scan(self, data: InputType) -> OutputType:
  19. return []
  20. def postprocess(self, results: OutputType, original_input: InputType) -> OutputType:
  21. return []
  22. # Make types available at module level for easy access
  23. InputType = ResolveEnricher.InputType
  24. OutputType = ResolveEnricher.OutputType
  25. enricher = ResolveEnricher("sketch_123", "scan_123")
  26. def test_correct_preprocess():
  27. inputs = [
  28. Domain(domain="mydomain.com"),
  29. {"domain": "blog.mydomain2.com"},
  30. "mydomain3.com",
  31. "notADomaiN",
  32. ]
  33. preprocessed = enricher.preprocess(inputs)
  34. assert len(preprocessed) == 3 # 3 valid domains
  35. assert preprocessed[0].domain == "mydomain.com"
  36. assert preprocessed[0].label == "mydomain.com"
  37. assert preprocessed[0].root == True
  38. assert preprocessed[1].domain == "blog.mydomain2.com"
  39. assert preprocessed[1].label == "blog.mydomain2.com"
  40. assert preprocessed[1].root == False
  41. assert preprocessed[2].domain == "mydomain3.com"
  42. assert preprocessed[2].label == "mydomain3.com"
  43. assert preprocessed[2].root == True
  44. def test_incorrect_preprocess():
  45. inputs = [
  46. Phone(number="+33634565423"),
  47. {"name": "JohnDoe"},
  48. "mydomain.com",
  49. "notADomaiN",
  50. ]
  51. preprocessed = enricher.preprocess(inputs)
  52. assert len(preprocessed) == 1 # 1 valid domain
  53. assert preprocessed[0].domain == "mydomain.com"
  54. assert preprocessed[0].root == True
  55. assert preprocessed[0].label == "mydomain.com"