base.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050
  1. from __future__ import annotations
  2. from abc import ABC, abstractmethod
  3. from enum import Enum
  4. import os
  5. from dotenv import load_dotenv
  6. from dataclasses import dataclass, field
  7. from typing import (
  8. Any,
  9. Literal,
  10. TypedDict,
  11. TypeVar,
  12. Optional,
  13. Dict,
  14. List,
  15. AsyncIterator,
  16. )
  17. from .utils import EmbeddingFunc
  18. from .types import KnowledgeGraph
  19. from .constants import (
  20. DEFAULT_TOP_K,
  21. DEFAULT_CHUNK_TOP_K,
  22. DEFAULT_MAX_ENTITY_TOKENS,
  23. DEFAULT_MAX_RELATION_TOKENS,
  24. DEFAULT_MAX_TOTAL_TOKENS,
  25. DEFAULT_OLLAMA_MODEL_NAME,
  26. DEFAULT_OLLAMA_MODEL_TAG,
  27. DEFAULT_OLLAMA_MODEL_SIZE,
  28. DEFAULT_OLLAMA_CREATED_AT,
  29. DEFAULT_OLLAMA_DIGEST,
  30. )
  31. # use the .env that is inside the current folder
  32. # allows to use different .env file for each lightrag instance
  33. # the OS environment variables take precedence over the .env file
  34. load_dotenv(dotenv_path=".env", override=False)
  35. class OllamaServerInfos:
  36. def __init__(self, name=None, tag=None):
  37. self._lightrag_name = name or os.getenv(
  38. "OLLAMA_EMULATING_MODEL_NAME", DEFAULT_OLLAMA_MODEL_NAME
  39. )
  40. self._lightrag_tag = tag or os.getenv(
  41. "OLLAMA_EMULATING_MODEL_TAG", DEFAULT_OLLAMA_MODEL_TAG
  42. )
  43. self.LIGHTRAG_SIZE = DEFAULT_OLLAMA_MODEL_SIZE
  44. self.LIGHTRAG_CREATED_AT = DEFAULT_OLLAMA_CREATED_AT
  45. self.LIGHTRAG_DIGEST = DEFAULT_OLLAMA_DIGEST
  46. @property
  47. def LIGHTRAG_NAME(self):
  48. return self._lightrag_name
  49. @LIGHTRAG_NAME.setter
  50. def LIGHTRAG_NAME(self, value):
  51. self._lightrag_name = value
  52. @property
  53. def LIGHTRAG_TAG(self):
  54. return self._lightrag_tag
  55. @LIGHTRAG_TAG.setter
  56. def LIGHTRAG_TAG(self, value):
  57. self._lightrag_tag = value
  58. @property
  59. def LIGHTRAG_MODEL(self):
  60. return f"{self._lightrag_name}:{self._lightrag_tag}"
  61. class TextChunkSchema(TypedDict):
  62. tokens: int
  63. content: str
  64. full_doc_id: str
  65. chunk_order_index: int
  66. T = TypeVar("T")
  67. @dataclass
  68. class QueryParam:
  69. """Configuration parameters for query execution in LightRAG."""
  70. mode: Literal["local", "global", "hybrid", "naive", "mix", "bypass"] = "mix"
  71. """Specifies the retrieval mode:
  72. - "local": Focuses on context-dependent information.
  73. - "global": Utilizes global knowledge.
  74. - "hybrid": Combines local and global retrieval methods.
  75. - "naive": Performs a basic search without advanced techniques.
  76. - "mix": Integrates knowledge graph and vector retrieval.
  77. """
  78. only_need_context: bool = False
  79. """If True, only returns the retrieved context without generating a response."""
  80. only_need_prompt: bool = False
  81. """If True, only returns the generated prompt without producing a response."""
  82. response_type: str = "Multiple Paragraphs"
  83. """Defines the response format. Examples: 'Multiple Paragraphs', 'Single Paragraph', 'Bullet Points'."""
  84. stream: bool = False
  85. """If True, enables streaming output for real-time responses."""
  86. top_k: int = int(os.getenv("TOP_K", str(DEFAULT_TOP_K)))
  87. """Number of top items to retrieve. Represents entities in 'local' mode and relationships in 'global' mode."""
  88. chunk_top_k: int = int(os.getenv("CHUNK_TOP_K", str(DEFAULT_CHUNK_TOP_K)))
  89. """Number of text chunks to retrieve initially from vector search and keep after reranking.
  90. If None, defaults to top_k value.
  91. """
  92. max_entity_tokens: int = int(
  93. os.getenv("MAX_ENTITY_TOKENS", str(DEFAULT_MAX_ENTITY_TOKENS))
  94. )
  95. """Maximum number of tokens allocated for entity context in unified token control system."""
  96. max_relation_tokens: int = int(
  97. os.getenv("MAX_RELATION_TOKENS", str(DEFAULT_MAX_RELATION_TOKENS))
  98. )
  99. """Maximum number of tokens allocated for relationship context in unified token control system."""
  100. max_total_tokens: int = int(
  101. os.getenv("MAX_TOTAL_TOKENS", str(DEFAULT_MAX_TOTAL_TOKENS))
  102. )
  103. """Maximum total tokens budget for the entire query context (entities + relations + chunks + system prompt)."""
  104. hl_keywords: list[str] = field(default_factory=list)
  105. """List of high-level keywords to prioritize in retrieval."""
  106. ll_keywords: list[str] = field(default_factory=list)
  107. """List of low-level keywords to refine retrieval focus."""
  108. # History mesages is only send to LLM for context, not used for retrieval
  109. conversation_history: list[dict[str, str]] = field(default_factory=list)
  110. """Stores past conversation history to maintain context.
  111. Format: [{"role": "user/assistant", "content": "message"}].
  112. """
  113. user_prompt: str | None = None
  114. """User-provided prompt for the query.
  115. Addition instructions for LLM. If provided, this will be inject into the prompt template.
  116. It's purpose is the let user customize the way LLM generate the response.
  117. """
  118. enable_rerank: bool = os.getenv("RERANK_BY_DEFAULT", "true").lower() == "true"
  119. """Enable reranking for retrieved text chunks. If True but no rerank model is configured, a warning will be issued.
  120. Default is True to enable reranking when rerank model is available.
  121. """
  122. include_references: bool = False
  123. """If True, includes reference list in the response for supported endpoints.
  124. This parameter controls whether the API response includes a references field
  125. containing citation information for the retrieved content.
  126. """
  127. @dataclass
  128. class StorageNameSpace(ABC):
  129. namespace: str
  130. workspace: str
  131. global_config: dict[str, Any]
  132. async def initialize(self):
  133. """Initialize the storage"""
  134. pass
  135. async def finalize(self):
  136. """Finalize the storage"""
  137. pass
  138. @abstractmethod
  139. async def index_done_callback(self) -> None:
  140. """Commit the storage operations after indexing"""
  141. @abstractmethod
  142. async def drop(self) -> dict[str, str]:
  143. """Drop all data from storage and clean up resources
  144. This abstract method defines the contract for dropping all data from a storage implementation.
  145. Each storage type must implement this method to:
  146. 1. Clear all data from memory and/or external storage
  147. 2. Remove any associated storage files if applicable
  148. 3. Reset the storage to its initial state
  149. 4. Handle cleanup of any resources
  150. 5. Notify other processes if necessary
  151. 6. This action should persistent the data to disk immediately.
  152. Returns:
  153. dict[str, str]: Operation status and message with the following format:
  154. {
  155. "status": str, # "success" or "error"
  156. "message": str # "data dropped" on success, error details on failure
  157. }
  158. Implementation specific:
  159. - On success: return {"status": "success", "message": "data dropped"}
  160. - On failure: return {"status": "error", "message": "<error details>"}
  161. - If not supported: return {"status": "error", "message": "unsupported"}
  162. """
  163. @dataclass
  164. class BaseVectorStorage(StorageNameSpace, ABC):
  165. embedding_func: EmbeddingFunc
  166. cosine_better_than_threshold: float = field(default=0.2)
  167. meta_fields: set[str] = field(default_factory=set)
  168. def _validate_embedding_func(self):
  169. """Validate that embedding_func is provided.
  170. This method should be called at the beginning of __post_init__
  171. in all vector storage implementations.
  172. Raises:
  173. ValueError: If embedding_func is None
  174. """
  175. if self.embedding_func is None:
  176. raise ValueError(
  177. "embedding_func is required for vector storage. "
  178. "Please provide a valid EmbeddingFunc instance."
  179. )
  180. def _generate_collection_suffix(self) -> str | None:
  181. """Generates collection/table suffix from embedding_func.
  182. Return suffix if model_name exists in embedding_func, otherwise return None.
  183. Note: embedding_func is guaranteed to exist (validated in __post_init__).
  184. Returns:
  185. str | None: Suffix string e.g. "text_embedding_3_large_3072d", or None if model_name not available
  186. """
  187. import re
  188. # Check if model_name exists (model_name is optional in EmbeddingFunc)
  189. model_name = getattr(self.embedding_func, "model_name", None)
  190. if not model_name:
  191. return None
  192. # embedding_dim is required in EmbeddingFunc
  193. embedding_dim = self.embedding_func.embedding_dim
  194. # Generate suffix: clean model name and append dimension
  195. safe_model_name = re.sub(r"[^a-zA-Z0-9_]", "_", model_name.lower())
  196. return f"{safe_model_name}_{embedding_dim}d"
  197. @abstractmethod
  198. async def query(
  199. self, query: str, top_k: int, query_embedding: list[float] = None
  200. ) -> list[dict[str, Any]]:
  201. """Query the vector storage and retrieve top_k results.
  202. Args:
  203. query: The query string to search for
  204. top_k: Number of top results to return
  205. query_embedding: Optional pre-computed embedding for the query.
  206. If provided, skips embedding computation for better performance.
  207. """
  208. @abstractmethod
  209. async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
  210. """Insert or update vectors in the storage.
  211. Importance notes for in-memory storage:
  212. 1. Changes will be persisted to disk during the next index_done_callback
  213. 2. Only one process should updating the storage at a time before index_done_callback,
  214. KG-storage-log should be used to avoid data corruption
  215. Multi-worker note:
  216. Backends that buffer writes in process memory (e.g.
  217. OpenSearchVectorDBStorage as of #3043) keep the buffer
  218. process-local. In a multi-worker deployment (e.g.
  219. lightrag-gunicorn) other workers will not observe these writes
  220. until the writing worker has called index_done_callback().
  221. Callers that depend on cross-worker read-after-write visibility
  222. must explicitly await index_done_callback() before relying on
  223. reads from another worker.
  224. """
  225. @abstractmethod
  226. async def delete_entity(self, entity_name: str) -> None:
  227. """Delete a single entity by its name.
  228. Importance notes for in-memory storage:
  229. 1. Changes will be persisted to disk during the next index_done_callback
  230. 2. Only one process should updating the storage at a time before index_done_callback,
  231. KG-storage-log should be used to avoid data corruption
  232. Multi-worker note: see ``upsert`` -- buffered tombstones are
  233. process-local until index_done_callback() runs.
  234. """
  235. @abstractmethod
  236. async def delete_entity_relation(self, entity_name: str) -> None:
  237. """Delete relations for a given entity.
  238. Importance notes for in-memory storage:
  239. 1. Changes will be persisted to disk during the next index_done_callback
  240. 2. Only one process should updating the storage at a time before index_done_callback,
  241. KG-storage-log should be used to avoid data corruption
  242. Multi-worker note: see ``upsert`` -- backends may prune their
  243. in-process buffer in addition to issuing a server-side delete,
  244. so cross-worker visibility still follows the index_done_callback
  245. contract.
  246. """
  247. @abstractmethod
  248. async def get_by_id(self, id: str) -> dict[str, Any] | None:
  249. """Get vector data by its ID
  250. Args:
  251. id: The unique identifier of the vector
  252. Returns:
  253. The vector data if found, or None if not found
  254. """
  255. pass
  256. @abstractmethod
  257. async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
  258. """Get multiple vector data by their IDs
  259. Args:
  260. ids: List of unique identifiers
  261. Returns:
  262. List of vector data objects that were found
  263. """
  264. pass
  265. @abstractmethod
  266. async def delete(self, ids: list[str]):
  267. """Delete vectors with specified IDs
  268. Importance notes for in-memory storage:
  269. 1. Changes will be persisted to disk during the next index_done_callback
  270. 2. Only one process should updating the storage at a time before index_done_callback,
  271. KG-storage-log should be used to avoid data corruption
  272. Multi-worker note: see ``upsert`` -- buffered tombstones are
  273. process-local until index_done_callback() runs.
  274. Args:
  275. ids: List of vector IDs to be deleted
  276. """
  277. @abstractmethod
  278. async def get_vectors_by_ids(self, ids: list[str]) -> dict[str, list[float]]:
  279. """Get vectors by their IDs, returning only ID and vector data for efficiency
  280. Args:
  281. ids: List of unique identifiers
  282. Returns:
  283. Dictionary mapping IDs to their vector embeddings
  284. Format: {id: [vector_values], ...}
  285. """
  286. pass
  287. @dataclass
  288. class BaseKVStorage(StorageNameSpace, ABC):
  289. embedding_func: EmbeddingFunc
  290. @abstractmethod
  291. async def get_by_id(self, id: str) -> dict[str, Any] | None:
  292. """Get value by id"""
  293. @abstractmethod
  294. async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
  295. """Get values by ids"""
  296. @abstractmethod
  297. async def filter_keys(self, keys: set[str]) -> set[str]:
  298. """Return un-exist keys"""
  299. @abstractmethod
  300. async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
  301. """Upsert data
  302. Importance notes for in-memory storage:
  303. 1. Changes will be persisted to disk during the next index_done_callback
  304. 2. update flags to notify other processes that data persistence is needed
  305. Multi-worker note:
  306. Backends that buffer writes in process memory (e.g.
  307. OpenSearchKVStorage as of the KV-batching change derived from
  308. #2822) keep the buffer process-local. In a multi-worker
  309. deployment (e.g. lightrag-gunicorn) other workers will not
  310. observe these writes until the writing worker has called
  311. index_done_callback(). Callers that depend on cross-worker
  312. read-after-write visibility must explicitly await
  313. index_done_callback() before relying on reads from another
  314. worker.
  315. """
  316. @abstractmethod
  317. async def delete(self, ids: list[str]) -> None:
  318. """Delete specific records from storage by their IDs
  319. Importance notes for in-memory storage:
  320. 1. Changes will be persisted to disk during the next index_done_callback
  321. 2. update flags to notify other processes that data persistence is needed
  322. Multi-worker note: see ``upsert`` -- buffered tombstones are
  323. process-local until index_done_callback() runs.
  324. Args:
  325. ids (list[str]): List of document IDs to be deleted from storage
  326. Returns:
  327. None
  328. """
  329. @abstractmethod
  330. async def is_empty(self) -> bool:
  331. """Check if the storage is empty
  332. Returns:
  333. bool: True if storage contains no data, False otherwise
  334. """
  335. @dataclass
  336. class BaseGraphStorage(StorageNameSpace, ABC):
  337. """All operations related to edges in graph should be undirected."""
  338. embedding_func: EmbeddingFunc
  339. @abstractmethod
  340. async def has_node(self, node_id: str) -> bool:
  341. """Check if a node exists in the graph.
  342. Args:
  343. node_id: The ID of the node to check
  344. Returns:
  345. True if the node exists, False otherwise
  346. """
  347. @abstractmethod
  348. async def has_edge(self, source_node_id: str, target_node_id: str) -> bool:
  349. """Check if an edge exists between two nodes.
  350. Args:
  351. source_node_id: The ID of the source node
  352. target_node_id: The ID of the target node
  353. Returns:
  354. True if the edge exists, False otherwise
  355. """
  356. @abstractmethod
  357. async def node_degree(self, node_id: str) -> int:
  358. """Get the degree (number of connected edges) of a node.
  359. Args:
  360. node_id: The ID of the node
  361. Returns:
  362. The number of edges connected to the node
  363. """
  364. @abstractmethod
  365. async def edge_degree(self, src_id: str, tgt_id: str) -> int:
  366. """Get the total degree of an edge (sum of degrees of its source and target nodes).
  367. Args:
  368. src_id: The ID of the source node
  369. tgt_id: The ID of the target node
  370. Returns:
  371. The sum of the degrees of the source and target nodes
  372. """
  373. @abstractmethod
  374. async def get_node(self, node_id: str) -> dict[str, str] | None:
  375. """Get node by its ID, returning only node properties.
  376. Args:
  377. node_id: The ID of the node to retrieve
  378. Returns:
  379. A dictionary of node properties if found, None otherwise
  380. """
  381. @abstractmethod
  382. async def get_edge(
  383. self, source_node_id: str, target_node_id: str
  384. ) -> dict[str, str] | None:
  385. """Get edge properties between two nodes.
  386. Args:
  387. source_node_id: The ID of the source node
  388. target_node_id: The ID of the target node
  389. Returns:
  390. A dictionary of edge properties if found, None otherwise
  391. """
  392. @abstractmethod
  393. async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None:
  394. """Get all edges connected to a node.
  395. Args:
  396. source_node_id: The ID of the node to get edges for
  397. Returns:
  398. A list of (source_id, target_id) tuples representing edges,
  399. or None if the node doesn't exist
  400. """
  401. async def get_nodes_batch(self, node_ids: list[str]) -> dict[str, dict]:
  402. """Get nodes as a batch using UNWIND
  403. Default implementation fetches nodes one by one.
  404. Override this method for better performance in storage backends
  405. that support batch operations.
  406. """
  407. result = {}
  408. for node_id in node_ids:
  409. node = await self.get_node(node_id)
  410. if node is not None:
  411. result[node_id] = node
  412. return result
  413. async def node_degrees_batch(self, node_ids: list[str]) -> dict[str, int]:
  414. """Node degrees as a batch using UNWIND
  415. Default implementation fetches node degrees one by one.
  416. Override this method for better performance in storage backends
  417. that support batch operations.
  418. """
  419. result = {}
  420. for node_id in node_ids:
  421. degree = await self.node_degree(node_id)
  422. result[node_id] = degree
  423. return result
  424. async def edge_degrees_batch(
  425. self, edge_pairs: list[tuple[str, str]]
  426. ) -> dict[tuple[str, str], int]:
  427. """Edge degrees as a batch using UNWIND also uses node_degrees_batch
  428. Default implementation calculates edge degrees one by one.
  429. Override this method for better performance in storage backends
  430. that support batch operations.
  431. """
  432. result = {}
  433. for src_id, tgt_id in edge_pairs:
  434. degree = await self.edge_degree(src_id, tgt_id)
  435. result[(src_id, tgt_id)] = degree
  436. return result
  437. async def get_edges_batch(
  438. self, pairs: list[dict[str, str]]
  439. ) -> dict[tuple[str, str], dict]:
  440. """Get edges as a batch using UNWIND
  441. Default implementation fetches edges one by one.
  442. Override this method for better performance in storage backends
  443. that support batch operations.
  444. """
  445. result = {}
  446. for pair in pairs:
  447. src_id = pair["src"]
  448. tgt_id = pair["tgt"]
  449. edge = await self.get_edge(src_id, tgt_id)
  450. if edge is not None:
  451. result[(src_id, tgt_id)] = edge
  452. return result
  453. async def get_nodes_edges_batch(
  454. self, node_ids: list[str]
  455. ) -> dict[str, list[tuple[str, str]]]:
  456. """Get nodes edges as a batch using UNWIND
  457. Default implementation fetches node edges one by one.
  458. Override this method for better performance in storage backends
  459. that support batch operations.
  460. """
  461. result = {}
  462. for node_id in node_ids:
  463. edges = await self.get_node_edges(node_id)
  464. result[node_id] = edges if edges is not None else []
  465. return result
  466. @abstractmethod
  467. async def upsert_node(self, node_id: str, node_data: dict[str, str]) -> None:
  468. """Insert a new node or update an existing node in the graph.
  469. Importance notes for in-memory storage:
  470. 1. Changes will be persisted to disk during the next index_done_callback
  471. 2. Only one process should updating the storage at a time before index_done_callback,
  472. KG-storage-log should be used to avoid data corruption
  473. Args:
  474. node_id: The ID of the node to insert or update
  475. node_data: A dictionary of node properties
  476. """
  477. async def upsert_nodes_batch(self, nodes: list[tuple[str, dict[str, str]]]) -> None:
  478. """Insert or update multiple nodes in a single batch call.
  479. Default implementation falls back to calling upsert_node() serially.
  480. Override in storage backends that support native batch operations for
  481. better performance when importing large knowledge graphs.
  482. Args:
  483. nodes: List of (node_id, node_data) tuples.
  484. """
  485. for node_id, node_data in nodes:
  486. await self.upsert_node(node_id, node_data=node_data)
  487. async def has_nodes_batch(self, node_ids: list[str]) -> set[str]:
  488. """Check existence of multiple nodes in a single batch call.
  489. Default implementation falls back to calling has_node() serially.
  490. Override in storage backends that support native batch operations for
  491. better performance when importing large knowledge graphs.
  492. Args:
  493. node_ids: List of node IDs to check.
  494. Returns:
  495. Set of node_ids that exist in the graph.
  496. """
  497. existing: set[str] = set()
  498. for node_id in node_ids:
  499. if await self.has_node(node_id):
  500. existing.add(node_id)
  501. return existing
  502. async def upsert_edges_batch(
  503. self, edges: list[tuple[str, str, dict[str, str]]]
  504. ) -> None:
  505. """Insert or update multiple edges in a single batch call.
  506. Default implementation falls back to calling upsert_edge() serially.
  507. Override in storage backends that support native batch operations for
  508. better performance when importing large knowledge graphs.
  509. Args:
  510. edges: List of (source_node_id, target_node_id, edge_data) tuples.
  511. """
  512. for source_node_id, target_node_id, edge_data in edges:
  513. await self.upsert_edge(source_node_id, target_node_id, edge_data=edge_data)
  514. @abstractmethod
  515. async def upsert_edge(
  516. self, source_node_id: str, target_node_id: str, edge_data: dict[str, str]
  517. ) -> None:
  518. """Insert a new edge or update an existing edge in the graph.
  519. Importance notes for in-memory storage:
  520. 1. Changes will be persisted to disk during the next index_done_callback
  521. 2. Only one process should updating the storage at a time before index_done_callback,
  522. KG-storage-log should be used to avoid data corruption
  523. Args:
  524. source_node_id: The ID of the source node
  525. target_node_id: The ID of the target node
  526. edge_data: A dictionary of edge properties
  527. """
  528. @abstractmethod
  529. async def delete_node(self, node_id: str) -> None:
  530. """Delete a node from the graph.
  531. Importance notes for in-memory storage:
  532. 1. Changes will be persisted to disk during the next index_done_callback
  533. 2. Only one process should updating the storage at a time before index_done_callback,
  534. KG-storage-log should be used to avoid data corruption
  535. Args:
  536. node_id: The ID of the node to delete
  537. """
  538. @abstractmethod
  539. async def remove_nodes(self, nodes: list[str]):
  540. """Delete multiple nodes
  541. Importance notes:
  542. 1. Changes will be persisted to disk during the next index_done_callback
  543. 2. Only one process should updating the storage at a time before index_done_callback,
  544. KG-storage-log should be used to avoid data corruption
  545. Args:
  546. nodes: List of node IDs to be deleted
  547. """
  548. @abstractmethod
  549. async def remove_edges(self, edges: list[tuple[str, str]]):
  550. """Delete multiple edges
  551. Importance notes:
  552. 1. Changes will be persisted to disk during the next index_done_callback
  553. 2. Only one process should updating the storage at a time before index_done_callback,
  554. KG-storage-log should be used to avoid data corruption
  555. Args:
  556. edges: List of edges to be deleted, each edge is a (source, target) tuple
  557. """
  558. @abstractmethod
  559. async def get_all_labels(self) -> list[str]:
  560. """Get all labels(entity names) in the graph.
  561. Do not use this method for large graph, use get_popular_labels or search_labels instead.
  562. Returns:
  563. A list of all node labels in the graph, sorted alphabetically
  564. """
  565. @abstractmethod
  566. async def get_knowledge_graph(
  567. self, node_label: str, max_depth: int = 3, max_nodes: int = 1000
  568. ) -> KnowledgeGraph:
  569. """
  570. Retrieve a connected subgraph of nodes where the label includes the specified `node_label`.
  571. Args:
  572. node_label: Label(entity name) of the starting node,* means all nodes
  573. max_depth: Maximum depth of the subgraph, Defaults to 3
  574. max_nodes: Maxiumu nodes to return, Defaults to 1000(BFS if possible)
  575. Returns:
  576. KnowledgeGraph object containing nodes and edges, with an is_truncated flag
  577. indicating whether the graph was truncated due to max_nodes limit
  578. """
  579. @abstractmethod
  580. async def get_all_nodes(self) -> list[dict]:
  581. """Get all nodes in the graph.
  582. Returns:
  583. A list of all nodes, where each node is a dictionary of its properties
  584. (Edge is bidirectional for some storage implementation; deduplication must be handled by the caller)
  585. """
  586. @abstractmethod
  587. async def get_all_edges(self) -> list[dict]:
  588. """Get all edges in the graph.
  589. Returns:
  590. A list of all edges, where each edge is a dictionary of its properties
  591. """
  592. @abstractmethod
  593. async def get_popular_labels(self, limit: int = 300) -> list[str]:
  594. """Get popular labels(entity names) by node degree (most connected entities)
  595. Args:
  596. limit: Maximum number of labels to return
  597. Returns:
  598. List of labels sorted by degree (highest first)
  599. """
  600. @abstractmethod
  601. async def search_labels(self, query: str, limit: int = 50) -> list[str]:
  602. """Search labels(entity names) with fuzzy matching
  603. Args:
  604. query: Search query string
  605. limit: Maximum number of results to return
  606. Returns:
  607. List of matching labels sorted by relevance
  608. """
  609. class DocStatus(str, Enum):
  610. """Document processing status.
  611. Pipeline order: PENDING -> PARSING -> ANALYZING (optional) -> PROCESSING -> PROCESSED | FAILED.
  612. PREPROCESSED is deprecated, kept for backward compatibility.
  613. """
  614. PENDING = "pending"
  615. PARSING = "parsing" # Phase 1: content extraction (parse_native/mineru/docling)
  616. ANALYZING = "analyzing" # Phase 2: multimodal analysis (VLM)
  617. PROCESSING = "processing" # Phase 3: entity/relation extraction
  618. PREPROCESSED = "preprocessed" # Deprecated: use ANALYZING in new pipeline
  619. PROCESSED = "processed"
  620. FAILED = "failed"
  621. @dataclass
  622. class DocProcessingStatus:
  623. """Document processing status data structure"""
  624. content_summary: str
  625. """First 100 chars of document content, used for preview"""
  626. content_length: int
  627. """Total length of document"""
  628. file_path: str
  629. """Canonical basename of the document.
  630. Always a hint-stripped basename (e.g. ``abc.docx``) or the literal
  631. ``"unknown_source"`` sentinel; never carries directory components or
  632. parser ``[hint]`` segments. UI display, filename-based dedup, and
  633. citation paths all share this value.
  634. """
  635. status: DocStatus
  636. """Current processing status"""
  637. created_at: str
  638. """ISO format timestamp when document was created"""
  639. updated_at: str
  640. """ISO format timestamp when document was last updated"""
  641. track_id: str | None = None
  642. """Tracking ID for monitoring progress"""
  643. chunks_count: int | None = None
  644. """Number of chunks after splitting, used for processing"""
  645. chunks_list: list[str] | None = field(default_factory=list)
  646. """List of chunk IDs associated with this document, used for deletion"""
  647. error_msg: str | None = None
  648. """Error message if failed"""
  649. metadata: dict[str, Any] = field(default_factory=dict)
  650. """Additional metadata"""
  651. multimodal_processed: bool | None = field(default=None, repr=False)
  652. content_hash: str | None = None
  653. """MD5 hash of the underlying document content (raw text or source file).
  654. Used together with file_path basename for duplicate detection. Empty for
  655. pending_parse records whose content has not been extracted yet.
  656. """
  657. """Internal field: indicates if multimodal processing is complete. Not shown in repr() but accessible for debugging."""
  658. def __post_init__(self):
  659. """
  660. Handle status conversion based on multimodal_processed field.
  661. Business rules:
  662. - If multimodal_processed is False and status is PROCESSED,
  663. then change status to PREPROCESSED
  664. - The multimodal_processed field is kept (with repr=False) for internal use and debugging
  665. """
  666. # Apply status conversion logic
  667. if self.multimodal_processed is not None:
  668. if (
  669. self.multimodal_processed is False
  670. and self.status == DocStatus.PROCESSED
  671. ):
  672. self.status = DocStatus.PREPROCESSED
  673. @dataclass
  674. class DocStatusStorage(BaseKVStorage, ABC):
  675. """Base class for document status storage"""
  676. @staticmethod
  677. def resolve_status_filter_values(
  678. status_filter: DocStatus | None = None,
  679. status_filters: list[DocStatus] | None = None,
  680. ) -> set[str] | None:
  681. """Normalize single- and multi-status filters into comparable values.
  682. `status_filters` takes precedence over `status_filter`. Empty multi-status
  683. filters are treated as no filter for backward-compatible request handling.
  684. """
  685. if status_filters:
  686. return {status.value for status in status_filters}
  687. if status_filter is not None:
  688. return {status_filter.value}
  689. return None
  690. @abstractmethod
  691. async def get_status_counts(self) -> dict[str, int]:
  692. """Get counts of documents in each status"""
  693. @abstractmethod
  694. async def get_docs_by_status(
  695. self, status: DocStatus
  696. ) -> dict[str, DocProcessingStatus]:
  697. """Get all documents with a specific status"""
  698. @abstractmethod
  699. async def get_docs_by_statuses(
  700. self, statuses: list[DocStatus]
  701. ) -> dict[str, DocProcessingStatus]:
  702. """Get all documents matching any of the given statuses"""
  703. @abstractmethod
  704. async def get_docs_by_track_id(
  705. self, track_id: str
  706. ) -> dict[str, DocProcessingStatus]:
  707. """Get all documents with a specific track_id"""
  708. @abstractmethod
  709. async def get_docs_paginated(
  710. self,
  711. status_filter: DocStatus | None = None,
  712. status_filters: list[DocStatus] | None = None,
  713. page: int = 1,
  714. page_size: int = 50,
  715. sort_field: str = "updated_at",
  716. sort_direction: str = "desc",
  717. ) -> tuple[list[tuple[str, DocProcessingStatus]], int]:
  718. """Get documents with pagination support
  719. Args:
  720. status_filter: Legacy single-status filter, ignored when status_filters is set
  721. status_filters: Filter by multiple document statuses, None for all statuses
  722. page: Page number (1-based)
  723. page_size: Number of documents per page (10-200)
  724. sort_field: Field to sort by ('created_at', 'updated_at', 'id')
  725. sort_direction: Sort direction ('asc' or 'desc')
  726. Returns:
  727. Tuple of (list of (doc_id, DocProcessingStatus) tuples, total_count)
  728. """
  729. @abstractmethod
  730. async def get_all_status_counts(self) -> dict[str, int]:
  731. """Get counts of documents in each status for all documents
  732. Returns:
  733. Dictionary mapping status names to counts
  734. """
  735. @abstractmethod
  736. async def get_doc_by_file_path(self, file_path: str) -> dict[str, Any] | None:
  737. """Get document by file path
  738. Args:
  739. file_path: The file path to search for
  740. Returns:
  741. dict[str, Any] | None: Document data if found, None otherwise
  742. Returns the same format as get_by_ids method
  743. """
  744. @abstractmethod
  745. async def get_doc_by_file_basename(
  746. self, basename: str
  747. ) -> tuple[str, dict[str, Any]] | None:
  748. """Get document by canonical file basename.
  749. Used for filename-based deduplication. Callers must pass the canonical
  750. basename; storage implementations only compare against the canonical
  751. ``file_path`` persisted by the business layer.
  752. Args:
  753. basename: The filename basename to search for (e.g. "report.pdf").
  754. Returns:
  755. (doc_id, doc_data) when a matching record exists, otherwise None.
  756. """
  757. @abstractmethod
  758. async def get_doc_by_content_hash(
  759. self, content_hash: str
  760. ) -> tuple[str, dict[str, Any]] | None:
  761. """Get document by content_hash field.
  762. Used for content-hash deduplication of full documents.
  763. Args:
  764. content_hash: The content hash value to search for.
  765. Returns:
  766. (doc_id, doc_data) when a matching record exists, otherwise None.
  767. """
  768. class StoragesStatus(str, Enum):
  769. """Storages status"""
  770. NOT_CREATED = "not_created"
  771. CREATED = "created"
  772. INITIALIZED = "initialized"
  773. FINALIZED = "finalized"
  774. @dataclass
  775. class DeletionResult:
  776. """Represents the result of a deletion operation."""
  777. status: Literal["success", "not_found", "not_allowed", "fail"]
  778. doc_id: str
  779. message: str
  780. status_code: int = 200
  781. file_path: str | None = None
  782. # Unified Query Result Data Structures for Reference List Support
  783. @dataclass
  784. class QueryResult:
  785. """
  786. Unified query result data structure for all query modes.
  787. Attributes:
  788. content: Text content for non-streaming responses
  789. response_iterator: Streaming response iterator for streaming responses
  790. raw_data: Complete structured data including references and metadata
  791. is_streaming: Whether this is a streaming result
  792. """
  793. content: Optional[str] = None
  794. response_iterator: Optional[AsyncIterator[str]] = None
  795. raw_data: Optional[Dict[str, Any]] = None
  796. is_streaming: bool = False
  797. @property
  798. def reference_list(self) -> List[Dict[str, str]]:
  799. """
  800. Convenient property to extract reference list from raw_data.
  801. Returns:
  802. List[Dict[str, str]]: Reference list in format:
  803. [{"reference_id": "1", "file_path": "/path/to/file.pdf"}, ...]
  804. """
  805. if self.raw_data:
  806. return self.raw_data.get("data", {}).get("references", [])
  807. return []
  808. @property
  809. def metadata(self) -> Dict[str, Any]:
  810. """
  811. Convenient property to extract metadata from raw_data.
  812. Returns:
  813. Dict[str, Any]: Query metadata including query_mode, keywords, etc.
  814. """
  815. if self.raw_data:
  816. return self.raw_data.get("metadata", {})
  817. return {}
  818. @dataclass
  819. class QueryContextResult:
  820. """
  821. Unified query context result data structure.
  822. Attributes:
  823. context: LLM context string
  824. raw_data: Complete structured data including reference_list
  825. """
  826. context: str
  827. raw_data: Dict[str, Any]
  828. @property
  829. def reference_list(self) -> List[Dict[str, str]]:
  830. """Convenient property to extract reference list from raw_data."""
  831. return self.raw_data.get("data", {}).get("references", [])