managing-enrichers.mdx 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715
  1. ---
  2. title: "Managing enrichers"
  3. description: "Quick start guide to creating Enricher for your OSINT investigations."
  4. category: "Developers"
  5. order: 10
  6. author: "Flowsint Team"
  7. tags: ["tutorial", "developers", "creating-a-new-enricher"]
  8. version: "1.2.8"
  9. last_updated_at: "2026-05-15"
  10. ---
  11. ## Understanding Enrichers
  12. Enrichers are the high-level business logic layer in Flowsint. While types define data structures and tools wrap external utilities, enrichers orchestrate the entire intelligence gathering workflow. An enricher takes input data of one type, processes it through various tools or APIs, validates and enriches the results, creates graph database nodes and relationships, and returns structured output.
  13. Every enricher in Flowsint follows a two-phase execution model. The scan phase contains the core enriching logic where tools are executed, APIs are called, and data is gathered. Then, the postprocessing phase creates Neo4j graph nodes and relationships while returning the processed results. Input validation and normalization happens automatically through Pydantic type validation.
  14. Enrichers differ from tools in several fundamental ways. Tools are low-level wrappers that return raw data without knowledge of the Flowsint ecosystem. Enrichers are high-level workflows that understand types, create graph nodes, handle parameters, and orchestrate multiple tools. When you want to add a new data source, you create a tool. When you want to add a new intelligence workflow, you create an enricher.
  15. ## Enricher architecture
  16. The enricher system is built around an abstract base class that defines the interface and execution flow. Every enricher you create inherits from this base class and implements specific methods.
  17. ### The Enricher base class
  18. The base class lives at `flowsint-core/src/flowsint_core/core/enricher_base.py` and provides the framework for all enrichers. Here's what a minimal enricher looks like:
  19. ```python
  20. from typing import List
  21. from flowsint_core.core.enricher_base import Enricher
  22. from flowsint_enrichers.registry import flowsint_enricher
  23. from flowsint_types import Domain, Ip
  24. @flowsint_enricher
  25. class MyEnricher(Enricher):
  26. """Description of what this enricher does."""
  27. # Define input and output types as base types (not lists)
  28. InputType = Domain
  29. OutputType = Ip
  30. @classmethod
  31. def name(cls) -> str:
  32. """Unique identifier for this enricher."""
  33. return "domain_to_ip"
  34. @classmethod
  35. def category(cls) -> str:
  36. """Category this enricher belongs to."""
  37. return "Domain"
  38. @classmethod
  39. def key(cls) -> str:
  40. """Primary key field name for this enricher."""
  41. return "domain"
  42. async def scan(self, data: List[InputType]) -> List[OutputType]:
  43. """Core enriching logic."""
  44. pass
  45. def postprocess(self, results: List[OutputType], input_data: List[InputType]) -> List[OutputType]:
  46. """Create graph nodes and relationships."""
  47. pass
  48. # Export types for easy access
  49. InputType = MyEnricher.InputType
  50. OutputType = MyEnricher.OutputType
  51. ```
  52. The `InputType` and `OutputType` class attributes define what data the enricher accepts and returns. These should be Pydantic types from the `flowsint-types` package defined as base types (e.g., `Domain`, not `List[Domain]`). The base class uses these type definitions to automatically generate JSON schemas for the API and handle validation automatically.
  53. At the end of the file, you should export the types for easy access by other modules.
  54. ### The two phases
  55. Understanding the two execution phases is crucial for writing effective enrichers.
  56. **Scanning** is where the real work happens. This async method receives validated input data as a list of `InputType` instances and executes your intelligence gathering logic. You might instantiate tools, call external APIs, process results, and build up your output data. This phase should focus purely on gathering and processing data without worrying about the graph database. The base class automatically handles input validation through Pydantic before the scan phase begins.
  57. **Postprocessing** creates the graph database structure. After scanning completes, this method receives both the results and the original input. It creates Neo4j nodes for each entity, establishes relationships between them, and returns the final results. This separation keeps graph logic separate from business logic.
  58. ## Creating a simple enricher
  59. Let's walk through creating a complete enricher from scratch. We'll build an enricher that converts domains to IP addresses using DNS resolution.
  60. ### Setting up the file structure
  61. Enrichers are organized by their input type. Create a new file in the appropriate directory under `flowsint-enrichers/src/flowsint_enrichers/`:
  62. ```bash
  63. cd flowsint-enrichers/src/flowsint_enrichers/domain/
  64. touch to_ip.py
  65. ```
  66. If you're creating an enricher for a new input type, you may need to create a new directory first.
  67. ### Implementing the basic structure
  68. Start with the imports and class definition:
  69. ```python
  70. import socket
  71. from typing import List
  72. from flowsint_enrichers.registry import flowsint_enricher
  73. from flowsint_core.core.enricher_base import Enricher
  74. from flowsint_core.core.logger import Logger
  75. from flowsint_types import Domain, Ip
  76. @flowsint_enricher
  77. class DomainToIpEnricher(Enricher):
  78. """Resolves domain names to their IP addresses using DNS."""
  79. # Define types as base types (not lists)
  80. InputType = Domain
  81. OutputType = Ip
  82. @classmethod
  83. def name(cls) -> str:
  84. return "domain_to_ip"
  85. @classmethod
  86. def category(cls) -> str:
  87. return "Domain"
  88. @classmethod
  89. def key(cls) -> str:
  90. return "domain"
  91. @classmethod
  92. def documentation(cls) -> str:
  93. return """
  94. This enricher resolves domain names to their IP addresses using
  95. standard DNS queries. It accepts a list of domains and returns
  96. the corresponding IP addresses.
  97. """
  98. # Export types at the end of the file
  99. InputType = DomainToIpEnricher.InputType
  100. OutputType = DomainToIpEnricher.OutputType
  101. ```
  102. The `name()` method returns a unique identifier for this enricher. Use lowercase with underscores, following the pattern `inputtype_to_outputtype`. The `category()` groups related enrichers together in the UI. The `key()` specifies which field serves as the primary identifier, typically matching the input type.
  103. ### Implementing the scan logic
  104. The scan method contains your core intelligence gathering logic. It receives a list of validated `InputType` instances and returns a list of `OutputType` instances.
  105. ```python
  106. async def scan(self, data: List[InputType]) -> List[OutputType]:
  107. """
  108. Resolve each domain to its IP address.
  109. Args:
  110. data: List of Domain objects to resolve
  111. Returns:
  112. List of Ip objects
  113. """
  114. results: List[OutputType] = []
  115. for domain in data:
  116. try:
  117. # Perform DNS resolution
  118. ip_address = socket.gethostbyname(domain.domain)
  119. # Create IP object
  120. ip = Ip(address=ip_address)
  121. results.append(ip)
  122. # Log successful resolution
  123. Logger.info(
  124. self.sketch_id,
  125. {"message": f"Resolved {domain.domain} to {ip_address}"}
  126. )
  127. except socket.gaierror as e:
  128. # DNS resolution failed
  129. Logger.info(
  130. self.sketch_id,
  131. {"message": f"Failed to resolve {domain.domain}: {e}"}
  132. )
  133. continue
  134. except Exception as e:
  135. # Unexpected error
  136. Logger.error(
  137. self.sketch_id,
  138. {"message": f"Error resolving {domain.domain}: {e}"}
  139. )
  140. continue
  141. return results
  142. ```
  143. This implementation iterates through each domain, performs DNS resolution, creates an IP object for successful resolutions, and logs both successes and failures. The error handling ensures that failures don't crash the entire enricher, which is important when processing many domains.
  144. The input data has already been validated by Pydantic before reaching the scan method, so you can trust that all items are proper `Domain` objects.
  145. ### Implementing postprocessing
  146. The postprocess method creates graph database nodes and relationships using the new simplified API:
  147. ```python
  148. def postprocess(self, results: List[OutputType], input_data: List[InputType] = None) -> List[OutputType]:
  149. """
  150. Create graph nodes and relationships.
  151. Args:
  152. results: IP objects from scan phase
  153. input_data: Original Domain objects (preprocessed input)
  154. Returns:
  155. IP objects (unchanged)
  156. """
  157. # Create nodes and relationships
  158. for domain, ip in zip(input_data, results):
  159. # Create nodes by passing Pydantic objects directly
  160. self.create_node(domain)
  161. self.create_node(ip)
  162. # Create relationship by passing Pydantic objects directly
  163. self.create_relationship(domain, ip, "RESOLVES_TO")
  164. # Log the operation
  165. self.log_graph_message(
  166. f"IP found for domain {domain.domain} -> {ip.address}"
  167. )
  168. return results
  169. ```
  170. You can pass Pydantic objects directly to `create_node()` and `create_relationship()`. The methods automatically infer the node types, primary keys, and property values from the Pydantic models.
  171. The `create_node()` method accepts a Pydantic object and automatically creates a Neo4j node with the correct label and properties. The `create_relationship()` method takes two Pydantic objects and a relationship type string, inferring all necessary information from the objects.
  172. ## Creating an enricher with tools
  173. Most enrichers use external tools for data gathering. Let's create an enricher that uses the Subfinder tool for subdomain enumeration.
  174. ### Importing the tool
  175. Start by importing the tool along with your other dependencies:
  176. ```python
  177. from typing import List
  178. from flowsint_core.core.enricher_base import Enricher
  179. from flowsint_enrichers.registry import flowsint_enricher
  180. from flowsint_core.core.logger import Logger
  181. from flowsint_types import Domain
  182. from tools.network.subfinder import SubfinderTool
  183. @flowsint_enricher
  184. class SubdomainEnricher(Enricher):
  185. """Enumerates subdomains for given domains using Subfinder."""
  186. # Define types as base types
  187. InputType = Domain
  188. OutputType = Domain
  189. @classmethod
  190. def name(cls) -> str:
  191. return "domain_to_subdomains"
  192. @classmethod
  193. def category(cls) -> str:
  194. return "Domain"
  195. @classmethod
  196. def key(cls) -> str:
  197. return "domain"
  198. # Export types
  199. InputType = SubdomainEnricher.InputType
  200. OutputType = SubdomainEnricher.OutputType
  201. ```
  202. ### Using the tool in scan
  203. The scan method instantiates and uses the tool:
  204. ```python
  205. async def scan(self, data: List[InputType]) -> List[OutputType]:
  206. """
  207. Find subdomains using Subfinder tool.
  208. Args:
  209. data: List of Domain objects
  210. Returns:
  211. List of discovered subdomain Domain objects
  212. """
  213. results: List[OutputType] = []
  214. # Instantiate the tool
  215. subfinder = SubfinderTool()
  216. for domain in data:
  217. Logger.info(
  218. self.sketch_id,
  219. {"message": f"Enumerating subdomains for {domain.domain}"}
  220. )
  221. try:
  222. # Launch the tool
  223. subdomains = subfinder.launch(domain.domain)
  224. # Convert strings to Domain objects
  225. for subdomain in subdomains:
  226. results.append(Domain(domain=subdomain, root=False))
  227. Logger.info(
  228. self.sketch_id,
  229. {"message": f"Found {len(subdomains)} subdomains for {domain.domain}"}
  230. )
  231. except Exception as e:
  232. Logger.error(
  233. self.sketch_id,
  234. {"message": f"Error enumerating subdomains for {domain.domain}: {e}"}
  235. )
  236. continue
  237. return results
  238. ```
  239. Notice how the tool returns raw strings, and the enricher converts them into proper Domain objects. This separation of concerns keeps tools simple while enrichers handle type conversion.
  240. ### Creating graph nodes and relationships
  241. The postprocess phase creates parent-child relationships between domains and subdomains:
  242. ```python
  243. def postprocess(self, results: List[OutputType], input_data: List[InputType]) -> List[OutputType]:
  244. """
  245. Create graph nodes and relationships for domains and subdomains.
  246. Args:
  247. results: Discovered subdomain Domain objects
  248. input_data: Original parent Domain objects
  249. Returns:
  250. Subdomain Domain objects
  251. """
  252. # Create nodes for parent domains
  253. for domain in input_data:
  254. self.create_node(domain)
  255. # Create nodes for subdomains and relationships
  256. for subdomain in results:
  257. self.create_node(subdomain)
  258. # Extract parent domain name and create relationship
  259. parent_domain_name = self._extract_parent_domain(subdomain.domain)
  260. parent_domain = Domain(domain=parent_domain_name)
  261. # Create relationship using Pydantic objects
  262. self.create_relationship(parent_domain, subdomain, "HAS_SUBDOMAIN")
  263. # Log the operation
  264. self.log_graph_message(
  265. f"Subdomain found: {parent_domain_name} -> {subdomain.domain}"
  266. )
  267. return results
  268. def _extract_parent_domain(self, subdomain: str) -> str:
  269. """Extract parent domain from subdomain."""
  270. parts = subdomain.split('.')
  271. if len(parts) >= 2:
  272. return '.'.join(parts[-2:])
  273. return subdomain
  274. ```
  275. ## Adding parameters to enrichers
  276. Many enrichers need user-configurable parameters. Let's create an enricher that scans ports with configurable options.
  277. ### Defining the parameter schema
  278. The `get_params_schema()` class method defines what parameters your enricher accepts:
  279. ```python
  280. from typing import List, Dict, Any, Optional
  281. from flowsint_enrichers.registry import flowsint_enricher
  282. from flowsint_core.core.enricher_base import Enricher
  283. from flowsint_types import Ip, Port
  284. from tools.network.naabu import NaabuTool
  285. @flowsint_enricher
  286. class IpToPortsEnricher(Enricher):
  287. """Scans IP addresses for open ports."""
  288. # Define types as base types
  289. InputType = Ip
  290. OutputType = Port
  291. @classmethod
  292. def name(cls) -> str:
  293. return "ip_to_ports"
  294. @classmethod
  295. def category(cls) -> str:
  296. return "IP"
  297. @classmethod
  298. def key(cls) -> str:
  299. return "address"
  300. @classmethod
  301. def get_params_schema(cls) -> List[Dict[str, Any]]:
  302. """Define configurable parameters for this enricher."""
  303. return [
  304. {
  305. "name": "mode",
  306. "type": "select",
  307. "description": "Scan mode: active scanning or passive enumeration",
  308. "required": True,
  309. "default": "passive",
  310. "options": [
  311. {"label": "Passive", "value": "passive"},
  312. {"label": "Active", "value": "active"},
  313. ],
  314. },
  315. {
  316. "name": "port_range",
  317. "type": "string",
  318. "description": "Port range to scan (e.g., '1-1000' or '80,443,8080')",
  319. "required": False,
  320. },
  321. {
  322. "name": "top_ports",
  323. "type": "select",
  324. "description": "Scan only the most common ports",
  325. "required": False,
  326. "options": [
  327. {"label": "Top 100", "value": "100"},
  328. {"label": "Top 1000", "value": "1000"},
  329. ],
  330. },
  331. {
  332. "name": "PDCP_API_KEY",
  333. "type": "vaultSecret",
  334. "description": "ProjectDiscovery Cloud Platform API key for passive mode",
  335. "required": False,
  336. },
  337. ]
  338. # Export types
  339. InputType = IpToPortsEnricher.InputType
  340. OutputType = IpToPortsEnricher.OutputType
  341. ```
  342. The parameter schema defines the type, description, whether it's required, default values, and for select parameters, the available options. The `vaultSecret` type integrates with Flowsint's encrypted credential storage.
  343. ### Using parameters in your enricher
  344. Parameters are accessed through `self.params` in your scan method:
  345. ```python
  346. async def scan(self, data: List[InputType]) -> List[OutputType]:
  347. """
  348. Scan IPs for open ports using configured parameters.
  349. Args:
  350. data: List of Ip objects to scan
  351. Returns:
  352. List of Port objects
  353. """
  354. results: List[OutputType] = []
  355. # Extract parameters
  356. mode = self.params.get("mode", "passive")
  357. port_range = self.params.get("port_range")
  358. top_ports = self.params.get("top_ports")
  359. api_key = self.get_secret("PDCP_API_KEY")
  360. # Instantiate tool
  361. naabu = NaabuTool()
  362. for ip in data:
  363. Logger.info(
  364. self.sketch_id,
  365. {"message": f"Scanning {ip.address} in {mode} mode"}
  366. )
  367. try:
  368. # Launch tool with parameters
  369. scan_results = naabu.launch(
  370. target=ip.address,
  371. mode=mode,
  372. port_range=port_range,
  373. top_ports=top_ports,
  374. api_key=api_key
  375. )
  376. # Convert tool results to Port objects
  377. for result in scan_results:
  378. port = Port(
  379. number=result.get("port"),
  380. protocol=result.get("protocol", "tcp").upper(),
  381. state="open",
  382. service=result.get("service"),
  383. banner=result.get("version")
  384. )
  385. results.append(port)
  386. except Exception as e:
  387. Logger.error(
  388. self.sketch_id,
  389. {"message": f"Error scanning {ip.address}: {e}"}
  390. )
  391. continue
  392. return results
  393. ```
  394. ## Handling multiple output types
  395. Some enrichers produce multiple types of results. You can define a custom return type using Pydantic:
  396. ```python
  397. from pydantic import BaseModel
  398. from flowsint_enrichers.registry import flowsint_enricher
  399. from flowsint_core.core.enricher_base import Enricher
  400. from typing import List
  401. from flowsint_types import Website, Email, Phone
  402. class CrawlerResults(BaseModel):
  403. """Results from web crawler including multiple entity types."""
  404. website: Website
  405. emails: List[Email] = []
  406. phones: List[Phone] = []
  407. @flowsint_enricher
  408. class WebsiteToCrawlerEnricher(Enricher):
  409. """Crawls websites to extract emails and phone numbers."""
  410. # Define types as base types
  411. InputType = Website
  412. OutputType = CrawlerResults
  413. async def scan(self, data: List[InputType]) -> List[OutputType]:
  414. """Crawl websites and extract contact information."""
  415. from tools.network.reconcrawl import ReconCrawlTool
  416. results: List[OutputType] = []
  417. crawler_tool = ReconCrawlTool()
  418. for website in data:
  419. try:
  420. # Launch crawler
  421. crawl_data = crawler_tool.launch(website.url)
  422. # Extract entities
  423. emails = [Email(email=e) for e in crawl_data.get("emails", [])]
  424. phones = [Phone(number=p) for p in crawl_data.get("phones", [])]
  425. # Create result object
  426. result = CrawlerResults(
  427. website=website,
  428. emails=emails,
  429. phones=phones
  430. )
  431. results.append(result)
  432. except Exception as e:
  433. Logger.error(self.sketch_id, {"message": f"Crawl error: {e}"})
  434. return results
  435. def postprocess(self, results: List[OutputType], input_data: List[InputType]) -> List[OutputType]:
  436. """Create nodes for all discovered entities."""
  437. for result in results:
  438. # Create website node using Pydantic object
  439. self.create_node(result.website)
  440. # Create email nodes and relationships
  441. for email in result.emails:
  442. self.create_node(email)
  443. self.create_relationship(result.website, email, "HAS_EMAIL")
  444. # Create phone nodes and relationships
  445. for phone in result.phones:
  446. self.create_node(phone)
  447. self.create_relationship(result.website, phone, "HAS_PHONE")
  448. self.log_graph_message(
  449. f"Processed {len(result.emails)} emails and {len(result.phones)} phones from {result.website.url}"
  450. )
  451. return results
  452. # Export types
  453. InputType = WebsiteToCrawlerEnricher.InputType
  454. OutputType = WebsiteToCrawlerEnricher.OutputType
  455. ```
  456. ## Registering your enricher
  457. You don't need to register your enricher anywhere, adding the decorator `@flowsint_enricher` to your enricher class triggers the auto discovery.
  458. ```python
  459. from flowsint_enrichers.registry import flowsint_enricher
  460. from flowsint_core.core.enricher_base import Enricher
  461. @flowsint_enricher
  462. class MyEnricher(Enricher):
  463. ...
  464. ```
  465. ## Testing your enricher
  466. Creating tests helps ensure your enricher works correctly and makes debugging easier. Create a test file in `flowsint-enrichers/tests/`:
  467. ```python
  468. # tests/test_domain_to_ip.py
  469. import pytest
  470. from flowsint_enrichers.domain.to_ip import DomainToIpEnricher
  471. from flowsint_types import Domain, Ip
  472. @pytest.mark.asyncio
  473. async def test_enricher_metadata():
  474. """Test enricher metadata is correctly defined."""
  475. assert DomainToIpEnricher.name() == "domain_to_ip"
  476. assert DomainToIpEnricher.category() == "Domain"
  477. assert DomainToIpEnricher.key() == "domain"
  478. @pytest.mark.asyncio
  479. async def test_type_definitions():
  480. """Test InputType and OutputType are correctly defined."""
  481. assert DomainToIpEnricher.InputType == Domain
  482. assert DomainToIpEnricher.OutputType == Ip
  483. @pytest.mark.asyncio
  484. async def test_scan():
  485. """Test DNS resolution works."""
  486. enricher = DomainToIpEnricher(sketch_id="test", scan_id="test")
  487. input_data = [Domain(domain="example.com")]
  488. results = await enricher.scan(input_data)
  489. assert len(results) > 0
  490. assert isinstance(results[0], Ip)
  491. assert results[0].address # Should have an IP address
  492. ```
  493. These tests verify that your enricher's metadata is correct, type definitions are properly set, and the scan logic produces expected results. Input validation is handled automatically by Pydantic, so you don't need to test preprocessing separately.
  494. ## Best practices
  495. When creating enrichers, think carefully about error handling. Intelligence gathering involves many external systems that can fail in unpredictable ways. Your enricher should handle errors gracefully, log failures clearly, and continue processing remaining items rather than crashing entirely.
  496. Always use the Logger utility for tracking progress and errors. The logger integrates with Flowsint's monitoring system and helps users understand what's happening during long-running enrichers. Log successful operations at the info level and errors at the error level.
  497. Define InputType and OutputType as base types (e.g., `Domain`, not `List[Domain]`). The base class automatically handles the list wrapping and validation. This makes the type definitions cleaner and more intuitive.
  498. Always export your types at the end of the file using:
  499. ```python
  500. InputType = YourEnricher.InputType
  501. OutputType = YourEnricher.OutputType
  502. ```
  503. Use the simplified graph API by passing Pydantic objects directly to `create_node()` and `create_relationship()`. This eliminates boilerplate code and reduces errors. The methods automatically infer node types, primary keys, and properties from your Pydantic models.
  504. Separate concerns between the two phases. Scanning should focus on gathering and processing data. Postprocessing should create graph structures. Input validation happens automatically through Pydantic, so you don't need to handle it manually.
  505. Use type hints everywhere. They provide automatic validation, better IDE support, and serve as inline documentation. The InputType and OutputType class attributes should always be Pydantic types from the flowsint-types package.
  506. When working with tools, remember that they return raw data structures. Your enricher is responsible for converting tool output into proper Flowsint types. This type conversion is typically done in the scan phase.
  507. Document your enricher thoroughly. The class docstring, documentation method, and parameter descriptions all appear in the UI. Clear documentation helps users understand what your enricher does and how to configure it.
  508. ### Handling API Rate Limits
  509. When working with rate-limited APIs, add delays between requests:
  510. ```python
  511. async def scan(self, data: InputType) -> OutputType:
  512. """Scan with rate limiting to respect API limits."""
  513. import asyncio
  514. results = []
  515. delay_seconds = 1 # Delay between requests
  516. for item in data:
  517. result = await self._query_api(item)
  518. if result:
  519. results.append(result)
  520. # Respect rate limits
  521. await asyncio.sleep(delay_seconds)
  522. return results
  523. ```
  524. ### Fallback data sources
  525. Implement fallback logic when primary sources fail:
  526. ```python
  527. async def scan(self, data: InputType) -> OutputType:
  528. """Try multiple data sources with fallback logic."""
  529. results = []
  530. for domain in data:
  531. # Try primary source
  532. result = self._query_primary_source(domain)
  533. if not result:
  534. # Fall back to secondary source
  535. Logger.info(
  536. self.sketch_id,
  537. {"message": f"Primary source failed for {domain}, trying fallback"}
  538. )
  539. result = self._query_fallback_source(domain)
  540. if result:
  541. results.append(result)
  542. return results
  543. ```
  544. ## Troubleshooting
  545. If your enricher doesn't appear in the API, verify that you've applied the `@flowsint_enricher` decorator to your class, that the file lives under `flowsint-enrichers/src/flowsint_enrichers/`, and that you've restarted the API server. Auto-discovery via `load_all_enrichers()` runs at startup, so file additions require a restart.
  546. For import errors, make sure all your dependencies are installed and the enricher file has no syntax errors. Check that you're importing from the correct packages.
  547. If the scan method isn't finding any results, add logging statements to debug what's happening. Verify that tools are installed and accessible, API keys are valid if required, and input data is in the expected format.
  548. When graph relationships aren't appearing, check that you're creating both nodes and relationships in the postprocess method. Verify that the relationship type name is correct and that you're passing the right node objects to `create_relationship()`.
  549. ## Next steps
  550. Once you've created and registered your enricher, it becomes available through the API for users to run. Enrichers can be chained together into Flows where the output of one enricher feeds into the input of another. This enables complex intelligence gathering sequences.
  551. Remember that enrichers are the heart of Flowsint's intelligence gathering capabilities. Well-designed enrichers that handle errors gracefully, log progress clearly, and create meaningful graph relationships make the entire platform more powerful and user-friendly.
  552. If you create new enrichers that you think can help the community, it's highly appreciated that you open-source them. Help the community as much as it helps you !