| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715 |
- ---
- title: "Managing enrichers"
- description: "Quick start guide to creating Enricher for your OSINT investigations."
- category: "Developers"
- order: 10
- author: "Flowsint Team"
- tags: ["tutorial", "developers", "creating-a-new-enricher"]
- version: "1.2.8"
- last_updated_at: "2026-05-15"
- ---
- ## Understanding Enrichers
- Enrichers are the high-level business logic layer in Flowsint. While types define data structures and tools wrap external utilities, enrichers orchestrate the entire intelligence gathering workflow. An enricher takes input data of one type, processes it through various tools or APIs, validates and enriches the results, creates graph database nodes and relationships, and returns structured output.
- Every enricher in Flowsint follows a two-phase execution model. The scan phase contains the core enriching logic where tools are executed, APIs are called, and data is gathered. Then, the postprocessing phase creates Neo4j graph nodes and relationships while returning the processed results. Input validation and normalization happens automatically through Pydantic type validation.
- Enrichers differ from tools in several fundamental ways. Tools are low-level wrappers that return raw data without knowledge of the Flowsint ecosystem. Enrichers are high-level workflows that understand types, create graph nodes, handle parameters, and orchestrate multiple tools. When you want to add a new data source, you create a tool. When you want to add a new intelligence workflow, you create an enricher.
- ## Enricher architecture
- The enricher system is built around an abstract base class that defines the interface and execution flow. Every enricher you create inherits from this base class and implements specific methods.
- ### The Enricher base class
- The base class lives at `flowsint-core/src/flowsint_core/core/enricher_base.py` and provides the framework for all enrichers. Here's what a minimal enricher looks like:
- ```python
- from typing import List
- from flowsint_core.core.enricher_base import Enricher
- from flowsint_enrichers.registry import flowsint_enricher
- from flowsint_types import Domain, Ip
- @flowsint_enricher
- class MyEnricher(Enricher):
- """Description of what this enricher does."""
- # Define input and output types as base types (not lists)
- InputType = Domain
- OutputType = Ip
- @classmethod
- def name(cls) -> str:
- """Unique identifier for this enricher."""
- return "domain_to_ip"
- @classmethod
- def category(cls) -> str:
- """Category this enricher belongs to."""
- return "Domain"
- @classmethod
- def key(cls) -> str:
- """Primary key field name for this enricher."""
- return "domain"
- async def scan(self, data: List[InputType]) -> List[OutputType]:
- """Core enriching logic."""
- pass
- def postprocess(self, results: List[OutputType], input_data: List[InputType]) -> List[OutputType]:
- """Create graph nodes and relationships."""
- pass
- # Export types for easy access
- InputType = MyEnricher.InputType
- OutputType = MyEnricher.OutputType
- ```
- The `InputType` and `OutputType` class attributes define what data the enricher accepts and returns. These should be Pydantic types from the `flowsint-types` package defined as base types (e.g., `Domain`, not `List[Domain]`). The base class uses these type definitions to automatically generate JSON schemas for the API and handle validation automatically.
- At the end of the file, you should export the types for easy access by other modules.
- ### The two phases
- Understanding the two execution phases is crucial for writing effective enrichers.
- **Scanning** is where the real work happens. This async method receives validated input data as a list of `InputType` instances and executes your intelligence gathering logic. You might instantiate tools, call external APIs, process results, and build up your output data. This phase should focus purely on gathering and processing data without worrying about the graph database. The base class automatically handles input validation through Pydantic before the scan phase begins.
- **Postprocessing** creates the graph database structure. After scanning completes, this method receives both the results and the original input. It creates Neo4j nodes for each entity, establishes relationships between them, and returns the final results. This separation keeps graph logic separate from business logic.
- ## Creating a simple enricher
- Let's walk through creating a complete enricher from scratch. We'll build an enricher that converts domains to IP addresses using DNS resolution.
- ### Setting up the file structure
- Enrichers are organized by their input type. Create a new file in the appropriate directory under `flowsint-enrichers/src/flowsint_enrichers/`:
- ```bash
- cd flowsint-enrichers/src/flowsint_enrichers/domain/
- touch to_ip.py
- ```
- If you're creating an enricher for a new input type, you may need to create a new directory first.
- ### Implementing the basic structure
- Start with the imports and class definition:
- ```python
- import socket
- from typing import List
- from flowsint_enrichers.registry import flowsint_enricher
- from flowsint_core.core.enricher_base import Enricher
- from flowsint_core.core.logger import Logger
- from flowsint_types import Domain, Ip
- @flowsint_enricher
- class DomainToIpEnricher(Enricher):
- """Resolves domain names to their IP addresses using DNS."""
- # Define types as base types (not lists)
- InputType = Domain
- OutputType = Ip
- @classmethod
- def name(cls) -> str:
- return "domain_to_ip"
- @classmethod
- def category(cls) -> str:
- return "Domain"
- @classmethod
- def key(cls) -> str:
- return "domain"
- @classmethod
- def documentation(cls) -> str:
- return """
- This enricher resolves domain names to their IP addresses using
- standard DNS queries. It accepts a list of domains and returns
- the corresponding IP addresses.
- """
- # Export types at the end of the file
- InputType = DomainToIpEnricher.InputType
- OutputType = DomainToIpEnricher.OutputType
- ```
- The `name()` method returns a unique identifier for this enricher. Use lowercase with underscores, following the pattern `inputtype_to_outputtype`. The `category()` groups related enrichers together in the UI. The `key()` specifies which field serves as the primary identifier, typically matching the input type.
- ### Implementing the scan logic
- The scan method contains your core intelligence gathering logic. It receives a list of validated `InputType` instances and returns a list of `OutputType` instances.
- ```python
- async def scan(self, data: List[InputType]) -> List[OutputType]:
- """
- Resolve each domain to its IP address.
- Args:
- data: List of Domain objects to resolve
- Returns:
- List of Ip objects
- """
- results: List[OutputType] = []
- for domain in data:
- try:
- # Perform DNS resolution
- ip_address = socket.gethostbyname(domain.domain)
- # Create IP object
- ip = Ip(address=ip_address)
- results.append(ip)
- # Log successful resolution
- Logger.info(
- self.sketch_id,
- {"message": f"Resolved {domain.domain} to {ip_address}"}
- )
- except socket.gaierror as e:
- # DNS resolution failed
- Logger.info(
- self.sketch_id,
- {"message": f"Failed to resolve {domain.domain}: {e}"}
- )
- continue
- except Exception as e:
- # Unexpected error
- Logger.error(
- self.sketch_id,
- {"message": f"Error resolving {domain.domain}: {e}"}
- )
- continue
- return results
- ```
- This implementation iterates through each domain, performs DNS resolution, creates an IP object for successful resolutions, and logs both successes and failures. The error handling ensures that failures don't crash the entire enricher, which is important when processing many domains.
- The input data has already been validated by Pydantic before reaching the scan method, so you can trust that all items are proper `Domain` objects.
- ### Implementing postprocessing
- The postprocess method creates graph database nodes and relationships using the new simplified API:
- ```python
- def postprocess(self, results: List[OutputType], input_data: List[InputType] = None) -> List[OutputType]:
- """
- Create graph nodes and relationships.
- Args:
- results: IP objects from scan phase
- input_data: Original Domain objects (preprocessed input)
- Returns:
- IP objects (unchanged)
- """
- # Create nodes and relationships
- for domain, ip in zip(input_data, results):
- # Create nodes by passing Pydantic objects directly
- self.create_node(domain)
- self.create_node(ip)
- # Create relationship by passing Pydantic objects directly
- self.create_relationship(domain, ip, "RESOLVES_TO")
- # Log the operation
- self.log_graph_message(
- f"IP found for domain {domain.domain} -> {ip.address}"
- )
- return results
- ```
- You can pass Pydantic objects directly to `create_node()` and `create_relationship()`. The methods automatically infer the node types, primary keys, and property values from the Pydantic models.
- The `create_node()` method accepts a Pydantic object and automatically creates a Neo4j node with the correct label and properties. The `create_relationship()` method takes two Pydantic objects and a relationship type string, inferring all necessary information from the objects.
- ## Creating an enricher with tools
- Most enrichers use external tools for data gathering. Let's create an enricher that uses the Subfinder tool for subdomain enumeration.
- ### Importing the tool
- Start by importing the tool along with your other dependencies:
- ```python
- from typing import List
- from flowsint_core.core.enricher_base import Enricher
- from flowsint_enrichers.registry import flowsint_enricher
- from flowsint_core.core.logger import Logger
- from flowsint_types import Domain
- from tools.network.subfinder import SubfinderTool
- @flowsint_enricher
- class SubdomainEnricher(Enricher):
- """Enumerates subdomains for given domains using Subfinder."""
- # Define types as base types
- InputType = Domain
- OutputType = Domain
- @classmethod
- def name(cls) -> str:
- return "domain_to_subdomains"
- @classmethod
- def category(cls) -> str:
- return "Domain"
- @classmethod
- def key(cls) -> str:
- return "domain"
- # Export types
- InputType = SubdomainEnricher.InputType
- OutputType = SubdomainEnricher.OutputType
- ```
- ### Using the tool in scan
- The scan method instantiates and uses the tool:
- ```python
- async def scan(self, data: List[InputType]) -> List[OutputType]:
- """
- Find subdomains using Subfinder tool.
- Args:
- data: List of Domain objects
- Returns:
- List of discovered subdomain Domain objects
- """
- results: List[OutputType] = []
- # Instantiate the tool
- subfinder = SubfinderTool()
- for domain in data:
- Logger.info(
- self.sketch_id,
- {"message": f"Enumerating subdomains for {domain.domain}"}
- )
- try:
- # Launch the tool
- subdomains = subfinder.launch(domain.domain)
- # Convert strings to Domain objects
- for subdomain in subdomains:
- results.append(Domain(domain=subdomain, root=False))
- Logger.info(
- self.sketch_id,
- {"message": f"Found {len(subdomains)} subdomains for {domain.domain}"}
- )
- except Exception as e:
- Logger.error(
- self.sketch_id,
- {"message": f"Error enumerating subdomains for {domain.domain}: {e}"}
- )
- continue
- return results
- ```
- Notice how the tool returns raw strings, and the enricher converts them into proper Domain objects. This separation of concerns keeps tools simple while enrichers handle type conversion.
- ### Creating graph nodes and relationships
- The postprocess phase creates parent-child relationships between domains and subdomains:
- ```python
- def postprocess(self, results: List[OutputType], input_data: List[InputType]) -> List[OutputType]:
- """
- Create graph nodes and relationships for domains and subdomains.
- Args:
- results: Discovered subdomain Domain objects
- input_data: Original parent Domain objects
- Returns:
- Subdomain Domain objects
- """
- # Create nodes for parent domains
- for domain in input_data:
- self.create_node(domain)
- # Create nodes for subdomains and relationships
- for subdomain in results:
- self.create_node(subdomain)
- # Extract parent domain name and create relationship
- parent_domain_name = self._extract_parent_domain(subdomain.domain)
- parent_domain = Domain(domain=parent_domain_name)
- # Create relationship using Pydantic objects
- self.create_relationship(parent_domain, subdomain, "HAS_SUBDOMAIN")
- # Log the operation
- self.log_graph_message(
- f"Subdomain found: {parent_domain_name} -> {subdomain.domain}"
- )
- return results
- def _extract_parent_domain(self, subdomain: str) -> str:
- """Extract parent domain from subdomain."""
- parts = subdomain.split('.')
- if len(parts) >= 2:
- return '.'.join(parts[-2:])
- return subdomain
- ```
- ## Adding parameters to enrichers
- Many enrichers need user-configurable parameters. Let's create an enricher that scans ports with configurable options.
- ### Defining the parameter schema
- The `get_params_schema()` class method defines what parameters your enricher accepts:
- ```python
- from typing import List, Dict, Any, Optional
- from flowsint_enrichers.registry import flowsint_enricher
- from flowsint_core.core.enricher_base import Enricher
- from flowsint_types import Ip, Port
- from tools.network.naabu import NaabuTool
- @flowsint_enricher
- class IpToPortsEnricher(Enricher):
- """Scans IP addresses for open ports."""
- # Define types as base types
- InputType = Ip
- OutputType = Port
- @classmethod
- def name(cls) -> str:
- return "ip_to_ports"
- @classmethod
- def category(cls) -> str:
- return "IP"
- @classmethod
- def key(cls) -> str:
- return "address"
- @classmethod
- def get_params_schema(cls) -> List[Dict[str, Any]]:
- """Define configurable parameters for this enricher."""
- return [
- {
- "name": "mode",
- "type": "select",
- "description": "Scan mode: active scanning or passive enumeration",
- "required": True,
- "default": "passive",
- "options": [
- {"label": "Passive", "value": "passive"},
- {"label": "Active", "value": "active"},
- ],
- },
- {
- "name": "port_range",
- "type": "string",
- "description": "Port range to scan (e.g., '1-1000' or '80,443,8080')",
- "required": False,
- },
- {
- "name": "top_ports",
- "type": "select",
- "description": "Scan only the most common ports",
- "required": False,
- "options": [
- {"label": "Top 100", "value": "100"},
- {"label": "Top 1000", "value": "1000"},
- ],
- },
- {
- "name": "PDCP_API_KEY",
- "type": "vaultSecret",
- "description": "ProjectDiscovery Cloud Platform API key for passive mode",
- "required": False,
- },
- ]
- # Export types
- InputType = IpToPortsEnricher.InputType
- OutputType = IpToPortsEnricher.OutputType
- ```
- The parameter schema defines the type, description, whether it's required, default values, and for select parameters, the available options. The `vaultSecret` type integrates with Flowsint's encrypted credential storage.
- ### Using parameters in your enricher
- Parameters are accessed through `self.params` in your scan method:
- ```python
- async def scan(self, data: List[InputType]) -> List[OutputType]:
- """
- Scan IPs for open ports using configured parameters.
- Args:
- data: List of Ip objects to scan
- Returns:
- List of Port objects
- """
- results: List[OutputType] = []
- # Extract parameters
- mode = self.params.get("mode", "passive")
- port_range = self.params.get("port_range")
- top_ports = self.params.get("top_ports")
- api_key = self.get_secret("PDCP_API_KEY")
- # Instantiate tool
- naabu = NaabuTool()
- for ip in data:
- Logger.info(
- self.sketch_id,
- {"message": f"Scanning {ip.address} in {mode} mode"}
- )
- try:
- # Launch tool with parameters
- scan_results = naabu.launch(
- target=ip.address,
- mode=mode,
- port_range=port_range,
- top_ports=top_ports,
- api_key=api_key
- )
- # Convert tool results to Port objects
- for result in scan_results:
- port = Port(
- number=result.get("port"),
- protocol=result.get("protocol", "tcp").upper(),
- state="open",
- service=result.get("service"),
- banner=result.get("version")
- )
- results.append(port)
- except Exception as e:
- Logger.error(
- self.sketch_id,
- {"message": f"Error scanning {ip.address}: {e}"}
- )
- continue
- return results
- ```
- ## Handling multiple output types
- Some enrichers produce multiple types of results. You can define a custom return type using Pydantic:
- ```python
- from pydantic import BaseModel
- from flowsint_enrichers.registry import flowsint_enricher
- from flowsint_core.core.enricher_base import Enricher
- from typing import List
- from flowsint_types import Website, Email, Phone
- class CrawlerResults(BaseModel):
- """Results from web crawler including multiple entity types."""
- website: Website
- emails: List[Email] = []
- phones: List[Phone] = []
- @flowsint_enricher
- class WebsiteToCrawlerEnricher(Enricher):
- """Crawls websites to extract emails and phone numbers."""
- # Define types as base types
- InputType = Website
- OutputType = CrawlerResults
- async def scan(self, data: List[InputType]) -> List[OutputType]:
- """Crawl websites and extract contact information."""
- from tools.network.reconcrawl import ReconCrawlTool
- results: List[OutputType] = []
- crawler_tool = ReconCrawlTool()
- for website in data:
- try:
- # Launch crawler
- crawl_data = crawler_tool.launch(website.url)
- # Extract entities
- emails = [Email(email=e) for e in crawl_data.get("emails", [])]
- phones = [Phone(number=p) for p in crawl_data.get("phones", [])]
- # Create result object
- result = CrawlerResults(
- website=website,
- emails=emails,
- phones=phones
- )
- results.append(result)
- except Exception as e:
- Logger.error(self.sketch_id, {"message": f"Crawl error: {e}"})
- return results
- def postprocess(self, results: List[OutputType], input_data: List[InputType]) -> List[OutputType]:
- """Create nodes for all discovered entities."""
- for result in results:
- # Create website node using Pydantic object
- self.create_node(result.website)
- # Create email nodes and relationships
- for email in result.emails:
- self.create_node(email)
- self.create_relationship(result.website, email, "HAS_EMAIL")
- # Create phone nodes and relationships
- for phone in result.phones:
- self.create_node(phone)
- self.create_relationship(result.website, phone, "HAS_PHONE")
- self.log_graph_message(
- f"Processed {len(result.emails)} emails and {len(result.phones)} phones from {result.website.url}"
- )
- return results
- # Export types
- InputType = WebsiteToCrawlerEnricher.InputType
- OutputType = WebsiteToCrawlerEnricher.OutputType
- ```
- ## Registering your enricher
- You don't need to register your enricher anywhere, adding the decorator `@flowsint_enricher` to your enricher class triggers the auto discovery.
- ```python
- from flowsint_enrichers.registry import flowsint_enricher
- from flowsint_core.core.enricher_base import Enricher
- @flowsint_enricher
- class MyEnricher(Enricher):
- ...
- ```
- ## Testing your enricher
- Creating tests helps ensure your enricher works correctly and makes debugging easier. Create a test file in `flowsint-enrichers/tests/`:
- ```python
- # tests/test_domain_to_ip.py
- import pytest
- from flowsint_enrichers.domain.to_ip import DomainToIpEnricher
- from flowsint_types import Domain, Ip
- @pytest.mark.asyncio
- async def test_enricher_metadata():
- """Test enricher metadata is correctly defined."""
- assert DomainToIpEnricher.name() == "domain_to_ip"
- assert DomainToIpEnricher.category() == "Domain"
- assert DomainToIpEnricher.key() == "domain"
- @pytest.mark.asyncio
- async def test_type_definitions():
- """Test InputType and OutputType are correctly defined."""
- assert DomainToIpEnricher.InputType == Domain
- assert DomainToIpEnricher.OutputType == Ip
- @pytest.mark.asyncio
- async def test_scan():
- """Test DNS resolution works."""
- enricher = DomainToIpEnricher(sketch_id="test", scan_id="test")
- input_data = [Domain(domain="example.com")]
- results = await enricher.scan(input_data)
- assert len(results) > 0
- assert isinstance(results[0], Ip)
- assert results[0].address # Should have an IP address
- ```
- These tests verify that your enricher's metadata is correct, type definitions are properly set, and the scan logic produces expected results. Input validation is handled automatically by Pydantic, so you don't need to test preprocessing separately.
- ## Best practices
- When creating enrichers, think carefully about error handling. Intelligence gathering involves many external systems that can fail in unpredictable ways. Your enricher should handle errors gracefully, log failures clearly, and continue processing remaining items rather than crashing entirely.
- Always use the Logger utility for tracking progress and errors. The logger integrates with Flowsint's monitoring system and helps users understand what's happening during long-running enrichers. Log successful operations at the info level and errors at the error level.
- Define InputType and OutputType as base types (e.g., `Domain`, not `List[Domain]`). The base class automatically handles the list wrapping and validation. This makes the type definitions cleaner and more intuitive.
- Always export your types at the end of the file using:
- ```python
- InputType = YourEnricher.InputType
- OutputType = YourEnricher.OutputType
- ```
- Use the simplified graph API by passing Pydantic objects directly to `create_node()` and `create_relationship()`. This eliminates boilerplate code and reduces errors. The methods automatically infer node types, primary keys, and properties from your Pydantic models.
- Separate concerns between the two phases. Scanning should focus on gathering and processing data. Postprocessing should create graph structures. Input validation happens automatically through Pydantic, so you don't need to handle it manually.
- Use type hints everywhere. They provide automatic validation, better IDE support, and serve as inline documentation. The InputType and OutputType class attributes should always be Pydantic types from the flowsint-types package.
- When working with tools, remember that they return raw data structures. Your enricher is responsible for converting tool output into proper Flowsint types. This type conversion is typically done in the scan phase.
- Document your enricher thoroughly. The class docstring, documentation method, and parameter descriptions all appear in the UI. Clear documentation helps users understand what your enricher does and how to configure it.
- ### Handling API Rate Limits
- When working with rate-limited APIs, add delays between requests:
- ```python
- async def scan(self, data: InputType) -> OutputType:
- """Scan with rate limiting to respect API limits."""
- import asyncio
- results = []
- delay_seconds = 1 # Delay between requests
- for item in data:
- result = await self._query_api(item)
- if result:
- results.append(result)
- # Respect rate limits
- await asyncio.sleep(delay_seconds)
- return results
- ```
- ### Fallback data sources
- Implement fallback logic when primary sources fail:
- ```python
- async def scan(self, data: InputType) -> OutputType:
- """Try multiple data sources with fallback logic."""
- results = []
- for domain in data:
- # Try primary source
- result = self._query_primary_source(domain)
- if not result:
- # Fall back to secondary source
- Logger.info(
- self.sketch_id,
- {"message": f"Primary source failed for {domain}, trying fallback"}
- )
- result = self._query_fallback_source(domain)
- if result:
- results.append(result)
- return results
- ```
- ## Troubleshooting
- If your enricher doesn't appear in the API, verify that you've applied the `@flowsint_enricher` decorator to your class, that the file lives under `flowsint-enrichers/src/flowsint_enrichers/`, and that you've restarted the API server. Auto-discovery via `load_all_enrichers()` runs at startup, so file additions require a restart.
- For import errors, make sure all your dependencies are installed and the enricher file has no syntax errors. Check that you're importing from the correct packages.
- If the scan method isn't finding any results, add logging statements to debug what's happening. Verify that tools are installed and accessible, API keys are valid if required, and input data is in the expected format.
- When graph relationships aren't appearing, check that you're creating both nodes and relationships in the postprocess method. Verify that the relationship type name is correct and that you're passing the right node objects to `create_relationship()`.
- ## Next steps
- Once you've created and registered your enricher, it becomes available through the API for users to run. Enrichers can be chained together into Flows where the output of one enricher feeds into the input of another. This enables complex intelligence gathering sequences.
- Remember that enrichers are the heart of Flowsint's intelligence gathering capabilities. Well-designed enrichers that handle errors gracefully, log progress clearly, and create meaningful graph relationships make the entire platform more powerful and user-friendly.
- If you create new enrichers that you think can help the community, it's highly appreciated that you open-source them. Help the community as much as it helps you !
|