--- title: "Managing enrichers" description: "Quick start guide to creating Enricher for your OSINT investigations." category: "Developers" order: 10 author: "Flowsint Team" tags: ["tutorial", "developers", "creating-a-new-enricher"] version: "1.2.8" last_updated_at: "2026-05-15" --- ## Understanding Enrichers Enrichers are the high-level business logic layer in Flowsint. While types define data structures and tools wrap external utilities, enrichers orchestrate the entire intelligence gathering workflow. An enricher takes input data of one type, processes it through various tools or APIs, validates and enriches the results, creates graph database nodes and relationships, and returns structured output. Every enricher in Flowsint follows a two-phase execution model. The scan phase contains the core enriching logic where tools are executed, APIs are called, and data is gathered. Then, the postprocessing phase creates Neo4j graph nodes and relationships while returning the processed results. Input validation and normalization happens automatically through Pydantic type validation. Enrichers differ from tools in several fundamental ways. Tools are low-level wrappers that return raw data without knowledge of the Flowsint ecosystem. Enrichers are high-level workflows that understand types, create graph nodes, handle parameters, and orchestrate multiple tools. When you want to add a new data source, you create a tool. When you want to add a new intelligence workflow, you create an enricher. ## Enricher architecture The enricher system is built around an abstract base class that defines the interface and execution flow. Every enricher you create inherits from this base class and implements specific methods. ### The Enricher base class The base class lives at `flowsint-core/src/flowsint_core/core/enricher_base.py` and provides the framework for all enrichers. Here's what a minimal enricher looks like: ```python from typing import List from flowsint_core.core.enricher_base import Enricher from flowsint_enrichers.registry import flowsint_enricher from flowsint_types import Domain, Ip @flowsint_enricher class MyEnricher(Enricher): """Description of what this enricher does.""" # Define input and output types as base types (not lists) InputType = Domain OutputType = Ip @classmethod def name(cls) -> str: """Unique identifier for this enricher.""" return "domain_to_ip" @classmethod def category(cls) -> str: """Category this enricher belongs to.""" return "Domain" @classmethod def key(cls) -> str: """Primary key field name for this enricher.""" return "domain" async def scan(self, data: List[InputType]) -> List[OutputType]: """Core enriching logic.""" pass def postprocess(self, results: List[OutputType], input_data: List[InputType]) -> List[OutputType]: """Create graph nodes and relationships.""" pass # Export types for easy access InputType = MyEnricher.InputType OutputType = MyEnricher.OutputType ``` The `InputType` and `OutputType` class attributes define what data the enricher accepts and returns. These should be Pydantic types from the `flowsint-types` package defined as base types (e.g., `Domain`, not `List[Domain]`). The base class uses these type definitions to automatically generate JSON schemas for the API and handle validation automatically. At the end of the file, you should export the types for easy access by other modules. ### The two phases Understanding the two execution phases is crucial for writing effective enrichers. **Scanning** is where the real work happens. This async method receives validated input data as a list of `InputType` instances and executes your intelligence gathering logic. You might instantiate tools, call external APIs, process results, and build up your output data. This phase should focus purely on gathering and processing data without worrying about the graph database. The base class automatically handles input validation through Pydantic before the scan phase begins. **Postprocessing** creates the graph database structure. After scanning completes, this method receives both the results and the original input. It creates Neo4j nodes for each entity, establishes relationships between them, and returns the final results. This separation keeps graph logic separate from business logic. ## Creating a simple enricher Let's walk through creating a complete enricher from scratch. We'll build an enricher that converts domains to IP addresses using DNS resolution. ### Setting up the file structure Enrichers are organized by their input type. Create a new file in the appropriate directory under `flowsint-enrichers/src/flowsint_enrichers/`: ```bash cd flowsint-enrichers/src/flowsint_enrichers/domain/ touch to_ip.py ``` If you're creating an enricher for a new input type, you may need to create a new directory first. ### Implementing the basic structure Start with the imports and class definition: ```python import socket from typing import List from flowsint_enrichers.registry import flowsint_enricher from flowsint_core.core.enricher_base import Enricher from flowsint_core.core.logger import Logger from flowsint_types import Domain, Ip @flowsint_enricher class DomainToIpEnricher(Enricher): """Resolves domain names to their IP addresses using DNS.""" # Define types as base types (not lists) InputType = Domain OutputType = Ip @classmethod def name(cls) -> str: return "domain_to_ip" @classmethod def category(cls) -> str: return "Domain" @classmethod def key(cls) -> str: return "domain" @classmethod def documentation(cls) -> str: return """ This enricher resolves domain names to their IP addresses using standard DNS queries. It accepts a list of domains and returns the corresponding IP addresses. """ # Export types at the end of the file InputType = DomainToIpEnricher.InputType OutputType = DomainToIpEnricher.OutputType ``` The `name()` method returns a unique identifier for this enricher. Use lowercase with underscores, following the pattern `inputtype_to_outputtype`. The `category()` groups related enrichers together in the UI. The `key()` specifies which field serves as the primary identifier, typically matching the input type. ### Implementing the scan logic The scan method contains your core intelligence gathering logic. It receives a list of validated `InputType` instances and returns a list of `OutputType` instances. ```python async def scan(self, data: List[InputType]) -> List[OutputType]: """ Resolve each domain to its IP address. Args: data: List of Domain objects to resolve Returns: List of Ip objects """ results: List[OutputType] = [] for domain in data: try: # Perform DNS resolution ip_address = socket.gethostbyname(domain.domain) # Create IP object ip = Ip(address=ip_address) results.append(ip) # Log successful resolution Logger.info( self.sketch_id, {"message": f"Resolved {domain.domain} to {ip_address}"} ) except socket.gaierror as e: # DNS resolution failed Logger.info( self.sketch_id, {"message": f"Failed to resolve {domain.domain}: {e}"} ) continue except Exception as e: # Unexpected error Logger.error( self.sketch_id, {"message": f"Error resolving {domain.domain}: {e}"} ) continue return results ``` This implementation iterates through each domain, performs DNS resolution, creates an IP object for successful resolutions, and logs both successes and failures. The error handling ensures that failures don't crash the entire enricher, which is important when processing many domains. The input data has already been validated by Pydantic before reaching the scan method, so you can trust that all items are proper `Domain` objects. ### Implementing postprocessing The postprocess method creates graph database nodes and relationships using the new simplified API: ```python def postprocess(self, results: List[OutputType], input_data: List[InputType] = None) -> List[OutputType]: """ Create graph nodes and relationships. Args: results: IP objects from scan phase input_data: Original Domain objects (preprocessed input) Returns: IP objects (unchanged) """ # Create nodes and relationships for domain, ip in zip(input_data, results): # Create nodes by passing Pydantic objects directly self.create_node(domain) self.create_node(ip) # Create relationship by passing Pydantic objects directly self.create_relationship(domain, ip, "RESOLVES_TO") # Log the operation self.log_graph_message( f"IP found for domain {domain.domain} -> {ip.address}" ) return results ``` You can pass Pydantic objects directly to `create_node()` and `create_relationship()`. The methods automatically infer the node types, primary keys, and property values from the Pydantic models. The `create_node()` method accepts a Pydantic object and automatically creates a Neo4j node with the correct label and properties. The `create_relationship()` method takes two Pydantic objects and a relationship type string, inferring all necessary information from the objects. ## Creating an enricher with tools Most enrichers use external tools for data gathering. Let's create an enricher that uses the Subfinder tool for subdomain enumeration. ### Importing the tool Start by importing the tool along with your other dependencies: ```python from typing import List from flowsint_core.core.enricher_base import Enricher from flowsint_enrichers.registry import flowsint_enricher from flowsint_core.core.logger import Logger from flowsint_types import Domain from tools.network.subfinder import SubfinderTool @flowsint_enricher class SubdomainEnricher(Enricher): """Enumerates subdomains for given domains using Subfinder.""" # Define types as base types InputType = Domain OutputType = Domain @classmethod def name(cls) -> str: return "domain_to_subdomains" @classmethod def category(cls) -> str: return "Domain" @classmethod def key(cls) -> str: return "domain" # Export types InputType = SubdomainEnricher.InputType OutputType = SubdomainEnricher.OutputType ``` ### Using the tool in scan The scan method instantiates and uses the tool: ```python async def scan(self, data: List[InputType]) -> List[OutputType]: """ Find subdomains using Subfinder tool. Args: data: List of Domain objects Returns: List of discovered subdomain Domain objects """ results: List[OutputType] = [] # Instantiate the tool subfinder = SubfinderTool() for domain in data: Logger.info( self.sketch_id, {"message": f"Enumerating subdomains for {domain.domain}"} ) try: # Launch the tool subdomains = subfinder.launch(domain.domain) # Convert strings to Domain objects for subdomain in subdomains: results.append(Domain(domain=subdomain, root=False)) Logger.info( self.sketch_id, {"message": f"Found {len(subdomains)} subdomains for {domain.domain}"} ) except Exception as e: Logger.error( self.sketch_id, {"message": f"Error enumerating subdomains for {domain.domain}: {e}"} ) continue return results ``` Notice how the tool returns raw strings, and the enricher converts them into proper Domain objects. This separation of concerns keeps tools simple while enrichers handle type conversion. ### Creating graph nodes and relationships The postprocess phase creates parent-child relationships between domains and subdomains: ```python def postprocess(self, results: List[OutputType], input_data: List[InputType]) -> List[OutputType]: """ Create graph nodes and relationships for domains and subdomains. Args: results: Discovered subdomain Domain objects input_data: Original parent Domain objects Returns: Subdomain Domain objects """ # Create nodes for parent domains for domain in input_data: self.create_node(domain) # Create nodes for subdomains and relationships for subdomain in results: self.create_node(subdomain) # Extract parent domain name and create relationship parent_domain_name = self._extract_parent_domain(subdomain.domain) parent_domain = Domain(domain=parent_domain_name) # Create relationship using Pydantic objects self.create_relationship(parent_domain, subdomain, "HAS_SUBDOMAIN") # Log the operation self.log_graph_message( f"Subdomain found: {parent_domain_name} -> {subdomain.domain}" ) return results def _extract_parent_domain(self, subdomain: str) -> str: """Extract parent domain from subdomain.""" parts = subdomain.split('.') if len(parts) >= 2: return '.'.join(parts[-2:]) return subdomain ``` ## Adding parameters to enrichers Many enrichers need user-configurable parameters. Let's create an enricher that scans ports with configurable options. ### Defining the parameter schema The `get_params_schema()` class method defines what parameters your enricher accepts: ```python from typing import List, Dict, Any, Optional from flowsint_enrichers.registry import flowsint_enricher from flowsint_core.core.enricher_base import Enricher from flowsint_types import Ip, Port from tools.network.naabu import NaabuTool @flowsint_enricher class IpToPortsEnricher(Enricher): """Scans IP addresses for open ports.""" # Define types as base types InputType = Ip OutputType = Port @classmethod def name(cls) -> str: return "ip_to_ports" @classmethod def category(cls) -> str: return "IP" @classmethod def key(cls) -> str: return "address" @classmethod def get_params_schema(cls) -> List[Dict[str, Any]]: """Define configurable parameters for this enricher.""" return [ { "name": "mode", "type": "select", "description": "Scan mode: active scanning or passive enumeration", "required": True, "default": "passive", "options": [ {"label": "Passive", "value": "passive"}, {"label": "Active", "value": "active"}, ], }, { "name": "port_range", "type": "string", "description": "Port range to scan (e.g., '1-1000' or '80,443,8080')", "required": False, }, { "name": "top_ports", "type": "select", "description": "Scan only the most common ports", "required": False, "options": [ {"label": "Top 100", "value": "100"}, {"label": "Top 1000", "value": "1000"}, ], }, { "name": "PDCP_API_KEY", "type": "vaultSecret", "description": "ProjectDiscovery Cloud Platform API key for passive mode", "required": False, }, ] # Export types InputType = IpToPortsEnricher.InputType OutputType = IpToPortsEnricher.OutputType ``` The parameter schema defines the type, description, whether it's required, default values, and for select parameters, the available options. The `vaultSecret` type integrates with Flowsint's encrypted credential storage. ### Using parameters in your enricher Parameters are accessed through `self.params` in your scan method: ```python async def scan(self, data: List[InputType]) -> List[OutputType]: """ Scan IPs for open ports using configured parameters. Args: data: List of Ip objects to scan Returns: List of Port objects """ results: List[OutputType] = [] # Extract parameters mode = self.params.get("mode", "passive") port_range = self.params.get("port_range") top_ports = self.params.get("top_ports") api_key = self.get_secret("PDCP_API_KEY") # Instantiate tool naabu = NaabuTool() for ip in data: Logger.info( self.sketch_id, {"message": f"Scanning {ip.address} in {mode} mode"} ) try: # Launch tool with parameters scan_results = naabu.launch( target=ip.address, mode=mode, port_range=port_range, top_ports=top_ports, api_key=api_key ) # Convert tool results to Port objects for result in scan_results: port = Port( number=result.get("port"), protocol=result.get("protocol", "tcp").upper(), state="open", service=result.get("service"), banner=result.get("version") ) results.append(port) except Exception as e: Logger.error( self.sketch_id, {"message": f"Error scanning {ip.address}: {e}"} ) continue return results ``` ## Handling multiple output types Some enrichers produce multiple types of results. You can define a custom return type using Pydantic: ```python from pydantic import BaseModel from flowsint_enrichers.registry import flowsint_enricher from flowsint_core.core.enricher_base import Enricher from typing import List from flowsint_types import Website, Email, Phone class CrawlerResults(BaseModel): """Results from web crawler including multiple entity types.""" website: Website emails: List[Email] = [] phones: List[Phone] = [] @flowsint_enricher class WebsiteToCrawlerEnricher(Enricher): """Crawls websites to extract emails and phone numbers.""" # Define types as base types InputType = Website OutputType = CrawlerResults async def scan(self, data: List[InputType]) -> List[OutputType]: """Crawl websites and extract contact information.""" from tools.network.reconcrawl import ReconCrawlTool results: List[OutputType] = [] crawler_tool = ReconCrawlTool() for website in data: try: # Launch crawler crawl_data = crawler_tool.launch(website.url) # Extract entities emails = [Email(email=e) for e in crawl_data.get("emails", [])] phones = [Phone(number=p) for p in crawl_data.get("phones", [])] # Create result object result = CrawlerResults( website=website, emails=emails, phones=phones ) results.append(result) except Exception as e: Logger.error(self.sketch_id, {"message": f"Crawl error: {e}"}) return results def postprocess(self, results: List[OutputType], input_data: List[InputType]) -> List[OutputType]: """Create nodes for all discovered entities.""" for result in results: # Create website node using Pydantic object self.create_node(result.website) # Create email nodes and relationships for email in result.emails: self.create_node(email) self.create_relationship(result.website, email, "HAS_EMAIL") # Create phone nodes and relationships for phone in result.phones: self.create_node(phone) self.create_relationship(result.website, phone, "HAS_PHONE") self.log_graph_message( f"Processed {len(result.emails)} emails and {len(result.phones)} phones from {result.website.url}" ) return results # Export types InputType = WebsiteToCrawlerEnricher.InputType OutputType = WebsiteToCrawlerEnricher.OutputType ``` ## Registering your enricher You don't need to register your enricher anywhere, adding the decorator `@flowsint_enricher` to your enricher class triggers the auto discovery. ```python from flowsint_enrichers.registry import flowsint_enricher from flowsint_core.core.enricher_base import Enricher @flowsint_enricher class MyEnricher(Enricher): ... ``` ## Testing your enricher Creating tests helps ensure your enricher works correctly and makes debugging easier. Create a test file in `flowsint-enrichers/tests/`: ```python # tests/test_domain_to_ip.py import pytest from flowsint_enrichers.domain.to_ip import DomainToIpEnricher from flowsint_types import Domain, Ip @pytest.mark.asyncio async def test_enricher_metadata(): """Test enricher metadata is correctly defined.""" assert DomainToIpEnricher.name() == "domain_to_ip" assert DomainToIpEnricher.category() == "Domain" assert DomainToIpEnricher.key() == "domain" @pytest.mark.asyncio async def test_type_definitions(): """Test InputType and OutputType are correctly defined.""" assert DomainToIpEnricher.InputType == Domain assert DomainToIpEnricher.OutputType == Ip @pytest.mark.asyncio async def test_scan(): """Test DNS resolution works.""" enricher = DomainToIpEnricher(sketch_id="test", scan_id="test") input_data = [Domain(domain="example.com")] results = await enricher.scan(input_data) assert len(results) > 0 assert isinstance(results[0], Ip) assert results[0].address # Should have an IP address ``` These tests verify that your enricher's metadata is correct, type definitions are properly set, and the scan logic produces expected results. Input validation is handled automatically by Pydantic, so you don't need to test preprocessing separately. ## Best practices When creating enrichers, think carefully about error handling. Intelligence gathering involves many external systems that can fail in unpredictable ways. Your enricher should handle errors gracefully, log failures clearly, and continue processing remaining items rather than crashing entirely. Always use the Logger utility for tracking progress and errors. The logger integrates with Flowsint's monitoring system and helps users understand what's happening during long-running enrichers. Log successful operations at the info level and errors at the error level. Define InputType and OutputType as base types (e.g., `Domain`, not `List[Domain]`). The base class automatically handles the list wrapping and validation. This makes the type definitions cleaner and more intuitive. Always export your types at the end of the file using: ```python InputType = YourEnricher.InputType OutputType = YourEnricher.OutputType ``` Use the simplified graph API by passing Pydantic objects directly to `create_node()` and `create_relationship()`. This eliminates boilerplate code and reduces errors. The methods automatically infer node types, primary keys, and properties from your Pydantic models. Separate concerns between the two phases. Scanning should focus on gathering and processing data. Postprocessing should create graph structures. Input validation happens automatically through Pydantic, so you don't need to handle it manually. Use type hints everywhere. They provide automatic validation, better IDE support, and serve as inline documentation. The InputType and OutputType class attributes should always be Pydantic types from the flowsint-types package. When working with tools, remember that they return raw data structures. Your enricher is responsible for converting tool output into proper Flowsint types. This type conversion is typically done in the scan phase. Document your enricher thoroughly. The class docstring, documentation method, and parameter descriptions all appear in the UI. Clear documentation helps users understand what your enricher does and how to configure it. ### Handling API Rate Limits When working with rate-limited APIs, add delays between requests: ```python async def scan(self, data: InputType) -> OutputType: """Scan with rate limiting to respect API limits.""" import asyncio results = [] delay_seconds = 1 # Delay between requests for item in data: result = await self._query_api(item) if result: results.append(result) # Respect rate limits await asyncio.sleep(delay_seconds) return results ``` ### Fallback data sources Implement fallback logic when primary sources fail: ```python async def scan(self, data: InputType) -> OutputType: """Try multiple data sources with fallback logic.""" results = [] for domain in data: # Try primary source result = self._query_primary_source(domain) if not result: # Fall back to secondary source Logger.info( self.sketch_id, {"message": f"Primary source failed for {domain}, trying fallback"} ) result = self._query_fallback_source(domain) if result: results.append(result) return results ``` ## Troubleshooting If your enricher doesn't appear in the API, verify that you've applied the `@flowsint_enricher` decorator to your class, that the file lives under `flowsint-enrichers/src/flowsint_enrichers/`, and that you've restarted the API server. Auto-discovery via `load_all_enrichers()` runs at startup, so file additions require a restart. For import errors, make sure all your dependencies are installed and the enricher file has no syntax errors. Check that you're importing from the correct packages. If the scan method isn't finding any results, add logging statements to debug what's happening. Verify that tools are installed and accessible, API keys are valid if required, and input data is in the expected format. When graph relationships aren't appearing, check that you're creating both nodes and relationships in the postprocess method. Verify that the relationship type name is correct and that you're passing the right node objects to `create_relationship()`. ## Next steps Once you've created and registered your enricher, it becomes available through the API for users to run. Enrichers can be chained together into Flows where the output of one enricher feeds into the input of another. This enables complex intelligence gathering sequences. Remember that enrichers are the heart of Flowsint's intelligence gathering capabilities. Well-designed enrichers that handle errors gracefully, log progress clearly, and create meaningful graph relationships make the entire platform more powerful and user-friendly. If you create new enrichers that you think can help the community, it's highly appreciated that you open-source them. Help the community as much as it helps you !