| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050 |
- from __future__ import annotations
- from abc import ABC, abstractmethod
- from enum import Enum
- import os
- from dotenv import load_dotenv
- from dataclasses import dataclass, field
- from typing import (
- Any,
- Literal,
- TypedDict,
- TypeVar,
- Optional,
- Dict,
- List,
- AsyncIterator,
- )
- from .utils import EmbeddingFunc
- from .types import KnowledgeGraph
- from .constants import (
- DEFAULT_TOP_K,
- DEFAULT_CHUNK_TOP_K,
- DEFAULT_MAX_ENTITY_TOKENS,
- DEFAULT_MAX_RELATION_TOKENS,
- DEFAULT_MAX_TOTAL_TOKENS,
- DEFAULT_OLLAMA_MODEL_NAME,
- DEFAULT_OLLAMA_MODEL_TAG,
- DEFAULT_OLLAMA_MODEL_SIZE,
- DEFAULT_OLLAMA_CREATED_AT,
- DEFAULT_OLLAMA_DIGEST,
- )
- # use the .env that is inside the current folder
- # allows to use different .env file for each lightrag instance
- # the OS environment variables take precedence over the .env file
- load_dotenv(dotenv_path=".env", override=False)
- class OllamaServerInfos:
- def __init__(self, name=None, tag=None):
- self._lightrag_name = name or os.getenv(
- "OLLAMA_EMULATING_MODEL_NAME", DEFAULT_OLLAMA_MODEL_NAME
- )
- self._lightrag_tag = tag or os.getenv(
- "OLLAMA_EMULATING_MODEL_TAG", DEFAULT_OLLAMA_MODEL_TAG
- )
- self.LIGHTRAG_SIZE = DEFAULT_OLLAMA_MODEL_SIZE
- self.LIGHTRAG_CREATED_AT = DEFAULT_OLLAMA_CREATED_AT
- self.LIGHTRAG_DIGEST = DEFAULT_OLLAMA_DIGEST
- @property
- def LIGHTRAG_NAME(self):
- return self._lightrag_name
- @LIGHTRAG_NAME.setter
- def LIGHTRAG_NAME(self, value):
- self._lightrag_name = value
- @property
- def LIGHTRAG_TAG(self):
- return self._lightrag_tag
- @LIGHTRAG_TAG.setter
- def LIGHTRAG_TAG(self, value):
- self._lightrag_tag = value
- @property
- def LIGHTRAG_MODEL(self):
- return f"{self._lightrag_name}:{self._lightrag_tag}"
- class TextChunkSchema(TypedDict):
- tokens: int
- content: str
- full_doc_id: str
- chunk_order_index: int
- T = TypeVar("T")
- @dataclass
- class QueryParam:
- """Configuration parameters for query execution in LightRAG."""
- mode: Literal["local", "global", "hybrid", "naive", "mix", "bypass"] = "mix"
- """Specifies the retrieval mode:
- - "local": Focuses on context-dependent information.
- - "global": Utilizes global knowledge.
- - "hybrid": Combines local and global retrieval methods.
- - "naive": Performs a basic search without advanced techniques.
- - "mix": Integrates knowledge graph and vector retrieval.
- """
- only_need_context: bool = False
- """If True, only returns the retrieved context without generating a response."""
- only_need_prompt: bool = False
- """If True, only returns the generated prompt without producing a response."""
- response_type: str = "Multiple Paragraphs"
- """Defines the response format. Examples: 'Multiple Paragraphs', 'Single Paragraph', 'Bullet Points'."""
- stream: bool = False
- """If True, enables streaming output for real-time responses."""
- top_k: int = int(os.getenv("TOP_K", str(DEFAULT_TOP_K)))
- """Number of top items to retrieve. Represents entities in 'local' mode and relationships in 'global' mode."""
- chunk_top_k: int = int(os.getenv("CHUNK_TOP_K", str(DEFAULT_CHUNK_TOP_K)))
- """Number of text chunks to retrieve initially from vector search and keep after reranking.
- If None, defaults to top_k value.
- """
- max_entity_tokens: int = int(
- os.getenv("MAX_ENTITY_TOKENS", str(DEFAULT_MAX_ENTITY_TOKENS))
- )
- """Maximum number of tokens allocated for entity context in unified token control system."""
- max_relation_tokens: int = int(
- os.getenv("MAX_RELATION_TOKENS", str(DEFAULT_MAX_RELATION_TOKENS))
- )
- """Maximum number of tokens allocated for relationship context in unified token control system."""
- max_total_tokens: int = int(
- os.getenv("MAX_TOTAL_TOKENS", str(DEFAULT_MAX_TOTAL_TOKENS))
- )
- """Maximum total tokens budget for the entire query context (entities + relations + chunks + system prompt)."""
- hl_keywords: list[str] = field(default_factory=list)
- """List of high-level keywords to prioritize in retrieval."""
- ll_keywords: list[str] = field(default_factory=list)
- """List of low-level keywords to refine retrieval focus."""
- # History mesages is only send to LLM for context, not used for retrieval
- conversation_history: list[dict[str, str]] = field(default_factory=list)
- """Stores past conversation history to maintain context.
- Format: [{"role": "user/assistant", "content": "message"}].
- """
- user_prompt: str | None = None
- """User-provided prompt for the query.
- Addition instructions for LLM. If provided, this will be inject into the prompt template.
- It's purpose is the let user customize the way LLM generate the response.
- """
- enable_rerank: bool = os.getenv("RERANK_BY_DEFAULT", "true").lower() == "true"
- """Enable reranking for retrieved text chunks. If True but no rerank model is configured, a warning will be issued.
- Default is True to enable reranking when rerank model is available.
- """
- include_references: bool = False
- """If True, includes reference list in the response for supported endpoints.
- This parameter controls whether the API response includes a references field
- containing citation information for the retrieved content.
- """
- @dataclass
- class StorageNameSpace(ABC):
- namespace: str
- workspace: str
- global_config: dict[str, Any]
- async def initialize(self):
- """Initialize the storage"""
- pass
- async def finalize(self):
- """Finalize the storage"""
- pass
- @abstractmethod
- async def index_done_callback(self) -> None:
- """Commit the storage operations after indexing"""
- @abstractmethod
- async def drop(self) -> dict[str, str]:
- """Drop all data from storage and clean up resources
- This abstract method defines the contract for dropping all data from a storage implementation.
- Each storage type must implement this method to:
- 1. Clear all data from memory and/or external storage
- 2. Remove any associated storage files if applicable
- 3. Reset the storage to its initial state
- 4. Handle cleanup of any resources
- 5. Notify other processes if necessary
- 6. This action should persistent the data to disk immediately.
- Returns:
- dict[str, str]: Operation status and message with the following format:
- {
- "status": str, # "success" or "error"
- "message": str # "data dropped" on success, error details on failure
- }
- Implementation specific:
- - On success: return {"status": "success", "message": "data dropped"}
- - On failure: return {"status": "error", "message": "<error details>"}
- - If not supported: return {"status": "error", "message": "unsupported"}
- """
- @dataclass
- class BaseVectorStorage(StorageNameSpace, ABC):
- embedding_func: EmbeddingFunc
- cosine_better_than_threshold: float = field(default=0.2)
- meta_fields: set[str] = field(default_factory=set)
- def _validate_embedding_func(self):
- """Validate that embedding_func is provided.
- This method should be called at the beginning of __post_init__
- in all vector storage implementations.
- Raises:
- ValueError: If embedding_func is None
- """
- if self.embedding_func is None:
- raise ValueError(
- "embedding_func is required for vector storage. "
- "Please provide a valid EmbeddingFunc instance."
- )
- def _generate_collection_suffix(self) -> str | None:
- """Generates collection/table suffix from embedding_func.
- Return suffix if model_name exists in embedding_func, otherwise return None.
- Note: embedding_func is guaranteed to exist (validated in __post_init__).
- Returns:
- str | None: Suffix string e.g. "text_embedding_3_large_3072d", or None if model_name not available
- """
- import re
- # Check if model_name exists (model_name is optional in EmbeddingFunc)
- model_name = getattr(self.embedding_func, "model_name", None)
- if not model_name:
- return None
- # embedding_dim is required in EmbeddingFunc
- embedding_dim = self.embedding_func.embedding_dim
- # Generate suffix: clean model name and append dimension
- safe_model_name = re.sub(r"[^a-zA-Z0-9_]", "_", model_name.lower())
- return f"{safe_model_name}_{embedding_dim}d"
- @abstractmethod
- async def query(
- self, query: str, top_k: int, query_embedding: list[float] = None
- ) -> list[dict[str, Any]]:
- """Query the vector storage and retrieve top_k results.
- Args:
- query: The query string to search for
- top_k: Number of top results to return
- query_embedding: Optional pre-computed embedding for the query.
- If provided, skips embedding computation for better performance.
- """
- @abstractmethod
- async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
- """Insert or update vectors in the storage.
- Importance notes for in-memory storage:
- 1. Changes will be persisted to disk during the next index_done_callback
- 2. Only one process should updating the storage at a time before index_done_callback,
- KG-storage-log should be used to avoid data corruption
- Multi-worker note:
- Backends that buffer writes in process memory (e.g.
- OpenSearchVectorDBStorage as of #3043) keep the buffer
- process-local. In a multi-worker deployment (e.g.
- lightrag-gunicorn) other workers will not observe these writes
- until the writing worker has called index_done_callback().
- Callers that depend on cross-worker read-after-write visibility
- must explicitly await index_done_callback() before relying on
- reads from another worker.
- """
- @abstractmethod
- async def delete_entity(self, entity_name: str) -> None:
- """Delete a single entity by its name.
- Importance notes for in-memory storage:
- 1. Changes will be persisted to disk during the next index_done_callback
- 2. Only one process should updating the storage at a time before index_done_callback,
- KG-storage-log should be used to avoid data corruption
- Multi-worker note: see ``upsert`` -- buffered tombstones are
- process-local until index_done_callback() runs.
- """
- @abstractmethod
- async def delete_entity_relation(self, entity_name: str) -> None:
- """Delete relations for a given entity.
- Importance notes for in-memory storage:
- 1. Changes will be persisted to disk during the next index_done_callback
- 2. Only one process should updating the storage at a time before index_done_callback,
- KG-storage-log should be used to avoid data corruption
- Multi-worker note: see ``upsert`` -- backends may prune their
- in-process buffer in addition to issuing a server-side delete,
- so cross-worker visibility still follows the index_done_callback
- contract.
- """
- @abstractmethod
- async def get_by_id(self, id: str) -> dict[str, Any] | None:
- """Get vector data by its ID
- Args:
- id: The unique identifier of the vector
- Returns:
- The vector data if found, or None if not found
- """
- pass
- @abstractmethod
- async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
- """Get multiple vector data by their IDs
- Args:
- ids: List of unique identifiers
- Returns:
- List of vector data objects that were found
- """
- pass
- @abstractmethod
- async def delete(self, ids: list[str]):
- """Delete vectors with specified IDs
- Importance notes for in-memory storage:
- 1. Changes will be persisted to disk during the next index_done_callback
- 2. Only one process should updating the storage at a time before index_done_callback,
- KG-storage-log should be used to avoid data corruption
- Multi-worker note: see ``upsert`` -- buffered tombstones are
- process-local until index_done_callback() runs.
- Args:
- ids: List of vector IDs to be deleted
- """
- @abstractmethod
- async def get_vectors_by_ids(self, ids: list[str]) -> dict[str, list[float]]:
- """Get vectors by their IDs, returning only ID and vector data for efficiency
- Args:
- ids: List of unique identifiers
- Returns:
- Dictionary mapping IDs to their vector embeddings
- Format: {id: [vector_values], ...}
- """
- pass
- @dataclass
- class BaseKVStorage(StorageNameSpace, ABC):
- embedding_func: EmbeddingFunc
- @abstractmethod
- async def get_by_id(self, id: str) -> dict[str, Any] | None:
- """Get value by id"""
- @abstractmethod
- async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
- """Get values by ids"""
- @abstractmethod
- async def filter_keys(self, keys: set[str]) -> set[str]:
- """Return un-exist keys"""
- @abstractmethod
- async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
- """Upsert data
- Importance notes for in-memory storage:
- 1. Changes will be persisted to disk during the next index_done_callback
- 2. update flags to notify other processes that data persistence is needed
- Multi-worker note:
- Backends that buffer writes in process memory (e.g.
- OpenSearchKVStorage as of the KV-batching change derived from
- #2822) keep the buffer process-local. In a multi-worker
- deployment (e.g. lightrag-gunicorn) other workers will not
- observe these writes until the writing worker has called
- index_done_callback(). Callers that depend on cross-worker
- read-after-write visibility must explicitly await
- index_done_callback() before relying on reads from another
- worker.
- """
- @abstractmethod
- async def delete(self, ids: list[str]) -> None:
- """Delete specific records from storage by their IDs
- Importance notes for in-memory storage:
- 1. Changes will be persisted to disk during the next index_done_callback
- 2. update flags to notify other processes that data persistence is needed
- Multi-worker note: see ``upsert`` -- buffered tombstones are
- process-local until index_done_callback() runs.
- Args:
- ids (list[str]): List of document IDs to be deleted from storage
- Returns:
- None
- """
- @abstractmethod
- async def is_empty(self) -> bool:
- """Check if the storage is empty
- Returns:
- bool: True if storage contains no data, False otherwise
- """
- @dataclass
- class BaseGraphStorage(StorageNameSpace, ABC):
- """All operations related to edges in graph should be undirected."""
- embedding_func: EmbeddingFunc
- @abstractmethod
- async def has_node(self, node_id: str) -> bool:
- """Check if a node exists in the graph.
- Args:
- node_id: The ID of the node to check
- Returns:
- True if the node exists, False otherwise
- """
- @abstractmethod
- async def has_edge(self, source_node_id: str, target_node_id: str) -> bool:
- """Check if an edge exists between two nodes.
- Args:
- source_node_id: The ID of the source node
- target_node_id: The ID of the target node
- Returns:
- True if the edge exists, False otherwise
- """
- @abstractmethod
- async def node_degree(self, node_id: str) -> int:
- """Get the degree (number of connected edges) of a node.
- Args:
- node_id: The ID of the node
- Returns:
- The number of edges connected to the node
- """
- @abstractmethod
- async def edge_degree(self, src_id: str, tgt_id: str) -> int:
- """Get the total degree of an edge (sum of degrees of its source and target nodes).
- Args:
- src_id: The ID of the source node
- tgt_id: The ID of the target node
- Returns:
- The sum of the degrees of the source and target nodes
- """
- @abstractmethod
- async def get_node(self, node_id: str) -> dict[str, str] | None:
- """Get node by its ID, returning only node properties.
- Args:
- node_id: The ID of the node to retrieve
- Returns:
- A dictionary of node properties if found, None otherwise
- """
- @abstractmethod
- async def get_edge(
- self, source_node_id: str, target_node_id: str
- ) -> dict[str, str] | None:
- """Get edge properties between two nodes.
- Args:
- source_node_id: The ID of the source node
- target_node_id: The ID of the target node
- Returns:
- A dictionary of edge properties if found, None otherwise
- """
- @abstractmethod
- async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None:
- """Get all edges connected to a node.
- Args:
- source_node_id: The ID of the node to get edges for
- Returns:
- A list of (source_id, target_id) tuples representing edges,
- or None if the node doesn't exist
- """
- async def get_nodes_batch(self, node_ids: list[str]) -> dict[str, dict]:
- """Get nodes as a batch using UNWIND
- Default implementation fetches nodes one by one.
- Override this method for better performance in storage backends
- that support batch operations.
- """
- result = {}
- for node_id in node_ids:
- node = await self.get_node(node_id)
- if node is not None:
- result[node_id] = node
- return result
- async def node_degrees_batch(self, node_ids: list[str]) -> dict[str, int]:
- """Node degrees as a batch using UNWIND
- Default implementation fetches node degrees one by one.
- Override this method for better performance in storage backends
- that support batch operations.
- """
- result = {}
- for node_id in node_ids:
- degree = await self.node_degree(node_id)
- result[node_id] = degree
- return result
- async def edge_degrees_batch(
- self, edge_pairs: list[tuple[str, str]]
- ) -> dict[tuple[str, str], int]:
- """Edge degrees as a batch using UNWIND also uses node_degrees_batch
- Default implementation calculates edge degrees one by one.
- Override this method for better performance in storage backends
- that support batch operations.
- """
- result = {}
- for src_id, tgt_id in edge_pairs:
- degree = await self.edge_degree(src_id, tgt_id)
- result[(src_id, tgt_id)] = degree
- return result
- async def get_edges_batch(
- self, pairs: list[dict[str, str]]
- ) -> dict[tuple[str, str], dict]:
- """Get edges as a batch using UNWIND
- Default implementation fetches edges one by one.
- Override this method for better performance in storage backends
- that support batch operations.
- """
- result = {}
- for pair in pairs:
- src_id = pair["src"]
- tgt_id = pair["tgt"]
- edge = await self.get_edge(src_id, tgt_id)
- if edge is not None:
- result[(src_id, tgt_id)] = edge
- return result
- async def get_nodes_edges_batch(
- self, node_ids: list[str]
- ) -> dict[str, list[tuple[str, str]]]:
- """Get nodes edges as a batch using UNWIND
- Default implementation fetches node edges one by one.
- Override this method for better performance in storage backends
- that support batch operations.
- """
- result = {}
- for node_id in node_ids:
- edges = await self.get_node_edges(node_id)
- result[node_id] = edges if edges is not None else []
- return result
- @abstractmethod
- async def upsert_node(self, node_id: str, node_data: dict[str, str]) -> None:
- """Insert a new node or update an existing node in the graph.
- Importance notes for in-memory storage:
- 1. Changes will be persisted to disk during the next index_done_callback
- 2. Only one process should updating the storage at a time before index_done_callback,
- KG-storage-log should be used to avoid data corruption
- Args:
- node_id: The ID of the node to insert or update
- node_data: A dictionary of node properties
- """
- async def upsert_nodes_batch(self, nodes: list[tuple[str, dict[str, str]]]) -> None:
- """Insert or update multiple nodes in a single batch call.
- Default implementation falls back to calling upsert_node() serially.
- Override in storage backends that support native batch operations for
- better performance when importing large knowledge graphs.
- Args:
- nodes: List of (node_id, node_data) tuples.
- """
- for node_id, node_data in nodes:
- await self.upsert_node(node_id, node_data=node_data)
- async def has_nodes_batch(self, node_ids: list[str]) -> set[str]:
- """Check existence of multiple nodes in a single batch call.
- Default implementation falls back to calling has_node() serially.
- Override in storage backends that support native batch operations for
- better performance when importing large knowledge graphs.
- Args:
- node_ids: List of node IDs to check.
- Returns:
- Set of node_ids that exist in the graph.
- """
- existing: set[str] = set()
- for node_id in node_ids:
- if await self.has_node(node_id):
- existing.add(node_id)
- return existing
- async def upsert_edges_batch(
- self, edges: list[tuple[str, str, dict[str, str]]]
- ) -> None:
- """Insert or update multiple edges in a single batch call.
- Default implementation falls back to calling upsert_edge() serially.
- Override in storage backends that support native batch operations for
- better performance when importing large knowledge graphs.
- Args:
- edges: List of (source_node_id, target_node_id, edge_data) tuples.
- """
- for source_node_id, target_node_id, edge_data in edges:
- await self.upsert_edge(source_node_id, target_node_id, edge_data=edge_data)
- @abstractmethod
- async def upsert_edge(
- self, source_node_id: str, target_node_id: str, edge_data: dict[str, str]
- ) -> None:
- """Insert a new edge or update an existing edge in the graph.
- Importance notes for in-memory storage:
- 1. Changes will be persisted to disk during the next index_done_callback
- 2. Only one process should updating the storage at a time before index_done_callback,
- KG-storage-log should be used to avoid data corruption
- Args:
- source_node_id: The ID of the source node
- target_node_id: The ID of the target node
- edge_data: A dictionary of edge properties
- """
- @abstractmethod
- async def delete_node(self, node_id: str) -> None:
- """Delete a node from the graph.
- Importance notes for in-memory storage:
- 1. Changes will be persisted to disk during the next index_done_callback
- 2. Only one process should updating the storage at a time before index_done_callback,
- KG-storage-log should be used to avoid data corruption
- Args:
- node_id: The ID of the node to delete
- """
- @abstractmethod
- async def remove_nodes(self, nodes: list[str]):
- """Delete multiple nodes
- Importance notes:
- 1. Changes will be persisted to disk during the next index_done_callback
- 2. Only one process should updating the storage at a time before index_done_callback,
- KG-storage-log should be used to avoid data corruption
- Args:
- nodes: List of node IDs to be deleted
- """
- @abstractmethod
- async def remove_edges(self, edges: list[tuple[str, str]]):
- """Delete multiple edges
- Importance notes:
- 1. Changes will be persisted to disk during the next index_done_callback
- 2. Only one process should updating the storage at a time before index_done_callback,
- KG-storage-log should be used to avoid data corruption
- Args:
- edges: List of edges to be deleted, each edge is a (source, target) tuple
- """
- @abstractmethod
- async def get_all_labels(self) -> list[str]:
- """Get all labels(entity names) in the graph.
- Do not use this method for large graph, use get_popular_labels or search_labels instead.
- Returns:
- A list of all node labels in the graph, sorted alphabetically
- """
- @abstractmethod
- async def get_knowledge_graph(
- self, node_label: str, max_depth: int = 3, max_nodes: int = 1000
- ) -> KnowledgeGraph:
- """
- Retrieve a connected subgraph of nodes where the label includes the specified `node_label`.
- Args:
- node_label: Label(entity name) of the starting node,* means all nodes
- max_depth: Maximum depth of the subgraph, Defaults to 3
- max_nodes: Maxiumu nodes to return, Defaults to 1000(BFS if possible)
- Returns:
- KnowledgeGraph object containing nodes and edges, with an is_truncated flag
- indicating whether the graph was truncated due to max_nodes limit
- """
- @abstractmethod
- async def get_all_nodes(self) -> list[dict]:
- """Get all nodes in the graph.
- Returns:
- A list of all nodes, where each node is a dictionary of its properties
- (Edge is bidirectional for some storage implementation; deduplication must be handled by the caller)
- """
- @abstractmethod
- async def get_all_edges(self) -> list[dict]:
- """Get all edges in the graph.
- Returns:
- A list of all edges, where each edge is a dictionary of its properties
- """
- @abstractmethod
- async def get_popular_labels(self, limit: int = 300) -> list[str]:
- """Get popular labels(entity names) by node degree (most connected entities)
- Args:
- limit: Maximum number of labels to return
- Returns:
- List of labels sorted by degree (highest first)
- """
- @abstractmethod
- async def search_labels(self, query: str, limit: int = 50) -> list[str]:
- """Search labels(entity names) with fuzzy matching
- Args:
- query: Search query string
- limit: Maximum number of results to return
- Returns:
- List of matching labels sorted by relevance
- """
- class DocStatus(str, Enum):
- """Document processing status.
- Pipeline order: PENDING -> PARSING -> ANALYZING (optional) -> PROCESSING -> PROCESSED | FAILED.
- PREPROCESSED is deprecated, kept for backward compatibility.
- """
- PENDING = "pending"
- PARSING = "parsing" # Phase 1: content extraction (parse_native/mineru/docling)
- ANALYZING = "analyzing" # Phase 2: multimodal analysis (VLM)
- PROCESSING = "processing" # Phase 3: entity/relation extraction
- PREPROCESSED = "preprocessed" # Deprecated: use ANALYZING in new pipeline
- PROCESSED = "processed"
- FAILED = "failed"
- @dataclass
- class DocProcessingStatus:
- """Document processing status data structure"""
- content_summary: str
- """First 100 chars of document content, used for preview"""
- content_length: int
- """Total length of document"""
- file_path: str
- """Canonical basename of the document.
- Always a hint-stripped basename (e.g. ``abc.docx``) or the literal
- ``"unknown_source"`` sentinel; never carries directory components or
- parser ``[hint]`` segments. UI display, filename-based dedup, and
- citation paths all share this value.
- """
- status: DocStatus
- """Current processing status"""
- created_at: str
- """ISO format timestamp when document was created"""
- updated_at: str
- """ISO format timestamp when document was last updated"""
- track_id: str | None = None
- """Tracking ID for monitoring progress"""
- chunks_count: int | None = None
- """Number of chunks after splitting, used for processing"""
- chunks_list: list[str] | None = field(default_factory=list)
- """List of chunk IDs associated with this document, used for deletion"""
- error_msg: str | None = None
- """Error message if failed"""
- metadata: dict[str, Any] = field(default_factory=dict)
- """Additional metadata"""
- multimodal_processed: bool | None = field(default=None, repr=False)
- content_hash: str | None = None
- """MD5 hash of the underlying document content (raw text or source file).
- Used together with file_path basename for duplicate detection. Empty for
- pending_parse records whose content has not been extracted yet.
- """
- """Internal field: indicates if multimodal processing is complete. Not shown in repr() but accessible for debugging."""
- def __post_init__(self):
- """
- Handle status conversion based on multimodal_processed field.
- Business rules:
- - If multimodal_processed is False and status is PROCESSED,
- then change status to PREPROCESSED
- - The multimodal_processed field is kept (with repr=False) for internal use and debugging
- """
- # Apply status conversion logic
- if self.multimodal_processed is not None:
- if (
- self.multimodal_processed is False
- and self.status == DocStatus.PROCESSED
- ):
- self.status = DocStatus.PREPROCESSED
- @dataclass
- class DocStatusStorage(BaseKVStorage, ABC):
- """Base class for document status storage"""
- @staticmethod
- def resolve_status_filter_values(
- status_filter: DocStatus | None = None,
- status_filters: list[DocStatus] | None = None,
- ) -> set[str] | None:
- """Normalize single- and multi-status filters into comparable values.
- `status_filters` takes precedence over `status_filter`. Empty multi-status
- filters are treated as no filter for backward-compatible request handling.
- """
- if status_filters:
- return {status.value for status in status_filters}
- if status_filter is not None:
- return {status_filter.value}
- return None
- @abstractmethod
- async def get_status_counts(self) -> dict[str, int]:
- """Get counts of documents in each status"""
- @abstractmethod
- async def get_docs_by_status(
- self, status: DocStatus
- ) -> dict[str, DocProcessingStatus]:
- """Get all documents with a specific status"""
- @abstractmethod
- async def get_docs_by_statuses(
- self, statuses: list[DocStatus]
- ) -> dict[str, DocProcessingStatus]:
- """Get all documents matching any of the given statuses"""
- @abstractmethod
- async def get_docs_by_track_id(
- self, track_id: str
- ) -> dict[str, DocProcessingStatus]:
- """Get all documents with a specific track_id"""
- @abstractmethod
- async def get_docs_paginated(
- self,
- status_filter: DocStatus | None = None,
- status_filters: list[DocStatus] | None = None,
- page: int = 1,
- page_size: int = 50,
- sort_field: str = "updated_at",
- sort_direction: str = "desc",
- ) -> tuple[list[tuple[str, DocProcessingStatus]], int]:
- """Get documents with pagination support
- Args:
- status_filter: Legacy single-status filter, ignored when status_filters is set
- status_filters: Filter by multiple document statuses, None for all statuses
- page: Page number (1-based)
- page_size: Number of documents per page (10-200)
- sort_field: Field to sort by ('created_at', 'updated_at', 'id')
- sort_direction: Sort direction ('asc' or 'desc')
- Returns:
- Tuple of (list of (doc_id, DocProcessingStatus) tuples, total_count)
- """
- @abstractmethod
- async def get_all_status_counts(self) -> dict[str, int]:
- """Get counts of documents in each status for all documents
- Returns:
- Dictionary mapping status names to counts
- """
- @abstractmethod
- async def get_doc_by_file_path(self, file_path: str) -> dict[str, Any] | None:
- """Get document by file path
- Args:
- file_path: The file path to search for
- Returns:
- dict[str, Any] | None: Document data if found, None otherwise
- Returns the same format as get_by_ids method
- """
- @abstractmethod
- async def get_doc_by_file_basename(
- self, basename: str
- ) -> tuple[str, dict[str, Any]] | None:
- """Get document by canonical file basename.
- Used for filename-based deduplication. Callers must pass the canonical
- basename; storage implementations only compare against the canonical
- ``file_path`` persisted by the business layer.
- Args:
- basename: The filename basename to search for (e.g. "report.pdf").
- Returns:
- (doc_id, doc_data) when a matching record exists, otherwise None.
- """
- @abstractmethod
- async def get_doc_by_content_hash(
- self, content_hash: str
- ) -> tuple[str, dict[str, Any]] | None:
- """Get document by content_hash field.
- Used for content-hash deduplication of full documents.
- Args:
- content_hash: The content hash value to search for.
- Returns:
- (doc_id, doc_data) when a matching record exists, otherwise None.
- """
- class StoragesStatus(str, Enum):
- """Storages status"""
- NOT_CREATED = "not_created"
- CREATED = "created"
- INITIALIZED = "initialized"
- FINALIZED = "finalized"
- @dataclass
- class DeletionResult:
- """Represents the result of a deletion operation."""
- status: Literal["success", "not_found", "not_allowed", "fail"]
- doc_id: str
- message: str
- status_code: int = 200
- file_path: str | None = None
- # Unified Query Result Data Structures for Reference List Support
- @dataclass
- class QueryResult:
- """
- Unified query result data structure for all query modes.
- Attributes:
- content: Text content for non-streaming responses
- response_iterator: Streaming response iterator for streaming responses
- raw_data: Complete structured data including references and metadata
- is_streaming: Whether this is a streaming result
- """
- content: Optional[str] = None
- response_iterator: Optional[AsyncIterator[str]] = None
- raw_data: Optional[Dict[str, Any]] = None
- is_streaming: bool = False
- @property
- def reference_list(self) -> List[Dict[str, str]]:
- """
- Convenient property to extract reference list from raw_data.
- Returns:
- List[Dict[str, str]]: Reference list in format:
- [{"reference_id": "1", "file_path": "/path/to/file.pdf"}, ...]
- """
- if self.raw_data:
- return self.raw_data.get("data", {}).get("references", [])
- return []
- @property
- def metadata(self) -> Dict[str, Any]:
- """
- Convenient property to extract metadata from raw_data.
- Returns:
- Dict[str, Any]: Query metadata including query_mode, keywords, etc.
- """
- if self.raw_data:
- return self.raw_data.get("metadata", {})
- return {}
- @dataclass
- class QueryContextResult:
- """
- Unified query context result data structure.
- Attributes:
- context: LLM context string
- raw_data: Complete structured data including reference_list
- """
- context: str
- raw_data: Dict[str, Any]
- @property
- def reference_list(self) -> List[Dict[str, str]]:
- """Convenient property to extract reference list from raw_data."""
- return self.raw_data.get("data", {}).get("references", [])
|