test_vault_integration.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. """Integration tests for enrichers using vault secrets."""
  2. import pytest
  3. import uuid
  4. from unittest.mock import Mock
  5. from flowsint_enrichers.domain.to_history import DomainToHistoryEnricher
  6. from flowsint_enrichers.individual.to_domains import IndividualToDomainsEnricher
  7. from flowsint_enrichers.email.to_domains import EmailToDomainsEnricher
  8. from flowsint_enrichers.crypto.to_nfts import CryptoWalletAddressToNFTs
  9. from flowsint_enrichers.crypto.to_transactions import CryptoWalletAddressToTransactions
  10. from flowsint_enrichers.email.to_leaks import EmailToBreachesEnricher
  11. @pytest.fixture
  12. def mock_vault():
  13. """Create a mock vault instance."""
  14. vault = Mock()
  15. vault.get_secret = Mock()
  16. return vault
  17. @pytest.fixture
  18. def sketch_id():
  19. """Create a test sketch ID."""
  20. return str(uuid.uuid4())
  21. class TestWhoxyEnrichersVaultIntegration:
  22. """Tests for WHOXY enrichers vault integration."""
  23. @pytest.mark.asyncio
  24. async def test_domain_to_history_gets_api_key_from_vault(
  25. self, mock_vault, sketch_id
  26. ):
  27. """Test that domain_to_history retrieves WHOXY_API_KEY from vault."""
  28. api_key = "whoxy-api-key-12345"
  29. mock_vault.get_secret.return_value = api_key
  30. enricher = DomainToHistoryEnricher(
  31. sketch_id=sketch_id, scan_id="scan_123", vault=mock_vault, params={}
  32. )
  33. await enricher.async_init()
  34. # Verify vault was queried for the API key
  35. mock_vault.get_secret.assert_called()
  36. calls = [call[0][0] for call in mock_vault.get_secret.call_args_list]
  37. assert "WHOXY_API_KEY" in calls
  38. # Verify the key is accessible via get_secret
  39. assert enricher.get_secret("WHOXY_API_KEY") == api_key
  40. @pytest.mark.asyncio
  41. async def test_individual_to_domains_gets_api_key_from_vault(
  42. self, mock_vault, sketch_id
  43. ):
  44. """Test that individual_to_domains retrieves WHOXY_API_KEY from vault."""
  45. api_key = "whoxy-api-key-12345"
  46. mock_vault.get_secret.return_value = api_key
  47. enricher = IndividualToDomainsEnricher(
  48. sketch_id=sketch_id, scan_id="scan_123", vault=mock_vault, params={}
  49. )
  50. await enricher.async_init()
  51. # Verify vault was queried for the API key
  52. mock_vault.get_secret.assert_called()
  53. calls = [call[0][0] for call in mock_vault.get_secret.call_args_list]
  54. assert "WHOXY_API_KEY" in calls
  55. # Verify the key is accessible via get_secret
  56. assert enricher.get_secret("WHOXY_API_KEY") == api_key
  57. @pytest.mark.asyncio
  58. async def test_email_to_domains_gets_api_key_from_vault(
  59. self, mock_vault, sketch_id
  60. ):
  61. """Test that email_to_domains retrieves WHOXY_API_KEY from vault."""
  62. api_key = "whoxy-api-key-12345"
  63. mock_vault.get_secret.return_value = api_key
  64. enricher = EmailToDomainsEnricher(
  65. sketch_id=sketch_id, scan_id="scan_123", vault=mock_vault, params={}
  66. )
  67. await enricher.async_init()
  68. # Verify vault was queried for the API key
  69. mock_vault.get_secret.assert_called()
  70. calls = [call[0][0] for call in mock_vault.get_secret.call_args_list]
  71. assert "WHOXY_API_KEY" in calls
  72. # Verify the key is accessible via get_secret
  73. assert enricher.get_secret("WHOXY_API_KEY") == api_key
  74. class TestEtherscanEnrichersVaultIntegration:
  75. """Tests for Etherscan enrichers vault integration."""
  76. @pytest.mark.asyncio
  77. async def test_wallet_to_nfts_gets_api_key_from_vault(self, mock_vault, sketch_id):
  78. """Test that wallet_to_nfts retrieves ETHERSCAN_API_KEY from vault."""
  79. api_key = "etherscan-api-key-12345"
  80. mock_vault.get_secret.return_value = api_key
  81. enricher = CryptoWalletAddressToNFTs(
  82. sketch_id=sketch_id, scan_id="scan_123", vault=mock_vault, params={}
  83. )
  84. await enricher.async_init()
  85. # Verify vault was queried for the API key
  86. mock_vault.get_secret.assert_called()
  87. calls = [call[0][0] for call in mock_vault.get_secret.call_args_list]
  88. assert "ETHERSCAN_API_KEY" in calls
  89. # Verify the key is accessible via get_secret
  90. assert enricher.get_secret("ETHERSCAN_API_KEY") == api_key
  91. @pytest.mark.asyncio
  92. async def test_wallet_to_transactions_gets_api_key_from_vault(
  93. self, mock_vault, sketch_id
  94. ):
  95. """Test that wallet_to_transactions retrieves ETHERSCAN_API_KEY from vault."""
  96. api_key = "etherscan-api-key-12345"
  97. mock_vault.get_secret.return_value = api_key
  98. enricher = CryptoWalletAddressToTransactions(
  99. sketch_id=sketch_id, scan_id="scan_123", vault=mock_vault, params={}
  100. )
  101. await enricher.async_init()
  102. # Verify vault was queried for the API key
  103. mock_vault.get_secret.assert_called()
  104. calls = [call[0][0] for call in mock_vault.get_secret.call_args_list]
  105. assert "ETHERSCAN_API_KEY" in calls
  106. # Verify the key is accessible via get_secret
  107. assert enricher.get_secret("ETHERSCAN_API_KEY") == api_key
  108. class TestHIBPEnrichersVaultIntegration:
  109. """Tests for HIBP enrichers vault integration."""
  110. @pytest.mark.asyncio
  111. async def test_email_to_breaches_gets_api_key_from_vault(
  112. self, mock_vault, sketch_id
  113. ):
  114. """Test that email_to_breaches retrieves HIBP_API_KEY from vault."""
  115. api_key = "hibp-api-key-12345"
  116. mock_vault.get_secret.return_value = api_key
  117. enricher = EmailToBreachesEnricher(
  118. sketch_id=sketch_id, scan_id="scan_123", vault=mock_vault, params={}
  119. )
  120. await enricher.async_init()
  121. # Verify vault was queried for the API key
  122. mock_vault.get_secret.assert_called()
  123. calls = [call[0][0] for call in mock_vault.get_secret.call_args_list]
  124. assert "HIBP_API_KEY" in calls
  125. # Verify the key is accessible via get_secret
  126. assert enricher.get_secret("HIBP_API_KEY") == api_key
  127. class TestVaultSecretWithUserProvidedID:
  128. """Tests for vault secrets when user provides a specific vault ID."""
  129. @pytest.mark.asyncio
  130. async def test_enricher_uses_user_provided_vault_id(self, mock_vault, sketch_id):
  131. """Test that enricher uses user-provided vault ID if specified."""
  132. vault_id = str(uuid.uuid4())
  133. api_key = "whoxy-api-key-12345"
  134. # First call (by ID) returns the secret
  135. mock_vault.get_secret.return_value = api_key
  136. enricher = DomainToHistoryEnricher(
  137. sketch_id=sketch_id,
  138. scan_id="scan_123",
  139. vault=mock_vault,
  140. params={"WHOXY_API_KEY": vault_id}, # User selected a specific vault entry
  141. )
  142. await enricher.async_init()
  143. # Verify vault was queried with the provided ID first
  144. first_call = mock_vault.get_secret.call_args_list[0][0][0]
  145. assert first_call == vault_id
  146. # Verify the key is accessible
  147. assert enricher.get_secret("WHOXY_API_KEY") == api_key
  148. @pytest.mark.asyncio
  149. async def test_enricher_fallback_to_name_if_id_not_found(
  150. self, mock_vault, sketch_id
  151. ):
  152. """Test that enricher falls back to name lookup if vault ID not found."""
  153. vault_id = str(uuid.uuid4())
  154. api_key = "whoxy-api-key-12345"
  155. # First call (by ID) returns None, second call (by name) returns the secret
  156. mock_vault.get_secret.side_effect = [None, api_key]
  157. enricher = DomainToHistoryEnricher(
  158. sketch_id=sketch_id,
  159. scan_id="scan_123",
  160. vault=mock_vault,
  161. params={"WHOXY_API_KEY": vault_id},
  162. )
  163. await enricher.async_init()
  164. # Verify vault was queried twice: first by ID, then by name
  165. assert mock_vault.get_secret.call_count == 2
  166. assert mock_vault.get_secret.call_args_list[0][0][0] == vault_id
  167. assert mock_vault.get_secret.call_args_list[1][0][0] == "WHOXY_API_KEY"
  168. # Verify the key is accessible
  169. assert enricher.get_secret("WHOXY_API_KEY") == api_key
  170. class TestMissingRequiredVaultSecret:
  171. """Tests for error handling when required vault secrets are missing."""
  172. @pytest.mark.asyncio
  173. async def test_missing_required_secret_raises_exception(self, mock_vault, sketch_id):
  174. """Test that missing required secret raises an exception."""
  175. # Vault returns None (secret not found)
  176. mock_vault.get_secret.return_value = None
  177. enricher = DomainToHistoryEnricher(
  178. sketch_id=sketch_id, scan_id="scan_123", vault=mock_vault, params={}
  179. )
  180. # async_init should raise an exception when required secret is missing
  181. with pytest.raises(Exception) as exc_info:
  182. await enricher.async_init()
  183. # Verify the exception message contains the secret name
  184. assert "WHOXY_API_KEY" in str(exc_info.value)
  185. assert "missing" in str(exc_info.value).lower()
  186. assert "Vault settings" in str(exc_info.value)
  187. @pytest.mark.asyncio
  188. async def test_enricher_with_provided_secret_succeeds(self, mock_vault, sketch_id):
  189. """Test that enricher succeeds when required secret is provided."""
  190. # Vault returns the secret
  191. api_key = "whoxy-api-key-12345"
  192. mock_vault.get_secret.return_value = api_key
  193. enricher = DomainToHistoryEnricher(
  194. sketch_id=sketch_id, scan_id="scan_123", vault=mock_vault, params={}
  195. )
  196. # async_init should succeed
  197. await enricher.async_init()
  198. # Secret should be accessible
  199. assert enricher.get_secret("WHOXY_API_KEY") == api_key