networkx_impl.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789
  1. import os
  2. from collections import deque
  3. from dataclasses import dataclass
  4. from typing import final
  5. from lightrag.file_atomic import atomic_write, reap_orphan_tmp_files
  6. from lightrag.types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge
  7. from lightrag.utils import logger
  8. from lightrag.base import BaseGraphStorage
  9. import networkx as nx
  10. from .shared_storage import (
  11. get_namespace_lock,
  12. get_update_flag,
  13. set_all_update_flags,
  14. )
  15. from dotenv import load_dotenv
  16. # use the .env that is inside the current folder
  17. # allows to use different .env file for each lightrag instance
  18. # the OS environment variables take precedence over the .env file
  19. load_dotenv(dotenv_path=".env", override=False)
  20. @final
  21. @dataclass
  22. class NetworkXStorage(BaseGraphStorage):
  23. """File-backed knowledge-graph storage built on ``networkx.Graph``.
  24. Storage model:
  25. A single ``networkx.Graph`` instance lives in process memory; its
  26. full state is serialized to one GraphML file at
  27. ``working_dir/[workspace/]graph_<namespace>.graphml``. That GraphML
  28. file is the **only** cross-process synchronization surface — there
  29. is no shared memory, no message bus, and no network channel
  30. between processes. Cross-process visibility is mediated by (a) an
  31. atomic file write at commit time and (b) a per-namespace
  32. ``storage_updated`` flag distributed through
  33. ``lightrag.kg.shared_storage``.
  34. Concurrency invariants (the code in this file is correct *only* while
  35. all three hold):
  36. 1. **Single writer per workspace.** The document pipeline's
  37. ``busy`` / ``destructive_busy`` flags (see ``AGENTS.md``
  38. *Pipeline concurrency contract*) guarantee at most one process
  39. performs ``upsert_*`` / ``delete_*`` / ``remove_*`` /
  40. ``index_done_callback`` at any time. Every other process is
  41. read-only.
  42. 2. **Eventual consistency is sufficient.** Read-only processes
  43. only need to observe the writer's data *after* the writer's
  44. ``index_done_callback`` completes. Reads landing in the gap
  45. between a writer's in-memory mutation and its commit may
  46. legitimately return the pre-update snapshot.
  47. 3. **networkx operations are fully synchronous.** Under a
  48. single-threaded asyncio event loop, ``graph.add_node`` /
  49. ``graph.remove_node`` / ``graph.degree`` / etc. cannot be
  50. preempted by another coroutine, which gives them implicit
  51. mutual exclusion over ``self._graph``. This is why the methods
  52. below don't have to hold ``_storage_lock`` while calling into
  53. ``graph``.
  54. Cross-process sync protocol (identical in shape to
  55. ``NanoVectorDBStorage`` — see that class's docstring for the canonical
  56. description):
  57. Writer side (``index_done_callback``):
  58. 1. ``write_nx_graph`` atomically writes the GraphML file
  59. (``atomic_write`` lays a tmp file beside the target and
  60. renames it into place — readers either see the previous
  61. file in full or the new file in full, never a torn write).
  62. 2. ``set_all_update_flags`` flips every process's
  63. ``storage_updated`` flag (including the writer's own).
  64. 3. Immediately reset the writer's own flag to ``False`` so
  65. the next call to ``_get_graph`` does not trigger a
  66. self-reload of the data this process just wrote.
  67. Reader side (any method that goes through ``_get_graph``):
  68. 1. Inside ``_storage_lock``, observe
  69. ``storage_updated.value is True``.
  70. 2. **Fully reload** ``self._graph`` from disk via
  71. ``load_nx_graph``. networkx GraphML has no incremental
  72. sync API, so the entire file is re-parsed.
  73. 3. Reset the reader's own flag.
  74. Lock scope:
  75. ``_storage_lock`` is a per-``(namespace, workspace)`` keyed lock
  76. spanning both intra-process coroutines and inter-process workers.
  77. It wraps only the *reload* and *commit* critical sections, not
  78. every ``graph.xxx`` call. Operating on ``graph`` outside the lock
  79. is safe today *because of invariant (3)* — if either premise is
  80. ever broken (e.g. ``graph.xxx`` is moved to a thread pool, or
  81. networkx is swapped for an async graph library), the lock scope
  82. must be widened to cover the mutation/read itself.
  83. Implementation differences from ``NanoVectorDBStorage`` (same design,
  84. different surface):
  85. * No ``client_storage`` property — there is no equivalent live
  86. reference being exposed to callers, so NanoVectorDB's
  87. "do-not-retain-across-await" caveat does not apply here.
  88. * ``write_nx_graph`` passes the tmp path directly to
  89. ``nx.write_graphml``, so the writer needs no equivalent of
  90. NanoVectorDB's "temporarily reassign ``storage_file``" trick.
  91. * Mutation surface is finer-grained (``upsert_node`` /
  92. ``upsert_edge`` / ``upsert_nodes_batch`` /
  93. ``upsert_edges_batch`` / ``delete_node`` / ``remove_nodes`` /
  94. ``remove_edges``); each goes through ``_get_graph`` once and
  95. then operates synchronously on ``self._graph``.
  96. Non-pipeline write paths:
  97. The pipeline's ``busy`` gate serializes mutation calls reached
  98. through the document ingestion and purge flows. The following
  99. entry points are **not** serialized by the pipeline gate and
  100. must be guarded externally:
  101. * ``drop`` — currently gated by the API layer (the
  102. ``/documents/clear`` endpoint takes the pipeline busy
  103. reservation before invoking it).
  104. * ``delete_node`` / ``remove_nodes`` / ``remove_edges`` /
  105. ``upsert_node`` / ``upsert_edge`` when invoked from
  106. ``utils_graph.py`` admin flows (``adelete_by_entity`` /
  107. ``adelete_by_relation`` / entity-edit flows). These flows
  108. are currently not exposed in the WebUI; any future caller
  109. must arrange single-writer serialization the same way the
  110. pipeline does.
  111. """
  112. @staticmethod
  113. def load_nx_graph(file_name) -> nx.Graph:
  114. if os.path.exists(file_name):
  115. return nx.read_graphml(file_name)
  116. return None
  117. @staticmethod
  118. def write_nx_graph(graph: nx.Graph, file_name, workspace="_"):
  119. logger.info(
  120. f"[{workspace}] Writing graph with {graph.number_of_nodes()} nodes, {graph.number_of_edges()} edges"
  121. )
  122. atomic_write(
  123. file_name,
  124. lambda tmp: nx.write_graphml(graph, tmp),
  125. workspace,
  126. )
  127. def __post_init__(self):
  128. working_dir = self.global_config["working_dir"]
  129. if self.workspace:
  130. # Include workspace in the file path for data isolation
  131. workspace_dir = os.path.join(working_dir, self.workspace)
  132. else:
  133. # Default behavior when workspace is empty
  134. workspace_dir = working_dir
  135. self.workspace = ""
  136. os.makedirs(workspace_dir, exist_ok=True)
  137. self._graphml_xml_file = os.path.join(
  138. workspace_dir, f"graph_{self.namespace}.graphml"
  139. )
  140. self._storage_lock = None
  141. self.storage_updated = None
  142. self._graph = None
  143. reap_orphan_tmp_files(self._graphml_xml_file, workspace=self.workspace or "_")
  144. # Load initial graph
  145. preloaded_graph = NetworkXStorage.load_nx_graph(self._graphml_xml_file)
  146. if preloaded_graph is not None:
  147. logger.info(
  148. f"[{self.workspace}] Loaded graph from {self._graphml_xml_file} with {preloaded_graph.number_of_nodes()} nodes, {preloaded_graph.number_of_edges()} edges"
  149. )
  150. else:
  151. logger.info(
  152. f"[{self.workspace}] Created new empty graph file: {self._graphml_xml_file}"
  153. )
  154. self._graph = preloaded_graph or nx.Graph()
  155. async def initialize(self):
  156. """Initialize storage data"""
  157. # Get the update flag for cross-process update notification
  158. self.storage_updated = await get_update_flag(
  159. self.namespace, workspace=self.workspace
  160. )
  161. # Get the storage lock for use in other methods
  162. self._storage_lock = get_namespace_lock(
  163. self.namespace, workspace=self.workspace
  164. )
  165. async def _get_graph(self):
  166. """Return the live ``networkx.Graph``, reloading from disk if needed.
  167. This is the **single entry point** every public method funnels
  168. through to obtain ``self._graph``. It is also the **only place
  169. readers transition to a fresher on-disk snapshot**: when another
  170. process has committed (via ``index_done_callback``) and flipped
  171. this process's ``storage_updated`` flag, the next call here
  172. rebuilds ``self._graph`` by re-parsing the entire GraphML file.
  173. networkx has no incremental sync API — the reload is
  174. unconditionally a full file reload.
  175. Under the *Single writer* invariant (see class docstring), the
  176. reload branch never fires in the writer process: the writer
  177. resets its own flag at the end of every ``index_done_callback``.
  178. The branch exists for readers.
  179. ``_storage_lock`` is held during the check-and-reload to (a)
  180. serialize concurrent reload attempts by sibling coroutines in
  181. the same process and (b) interlock with ``index_done_callback``
  182. so a reader cannot observe a partially-saved file.
  183. """
  184. async with self._storage_lock:
  185. # Check if data needs to be reloaded
  186. if self.storage_updated.value:
  187. logger.info(
  188. f"[{self.workspace}] Process {os.getpid()} reloading graph {self._graphml_xml_file} due to modifications by another process"
  189. )
  190. # Reload data
  191. self._graph = (
  192. NetworkXStorage.load_nx_graph(self._graphml_xml_file) or nx.Graph()
  193. )
  194. # Reset update flag
  195. self.storage_updated.value = False
  196. return self._graph
  197. async def has_node(self, node_id: str) -> bool:
  198. graph = await self._get_graph()
  199. return graph.has_node(node_id)
  200. async def has_edge(self, source_node_id: str, target_node_id: str) -> bool:
  201. graph = await self._get_graph()
  202. return graph.has_edge(source_node_id, target_node_id)
  203. async def get_node(self, node_id: str) -> dict[str, str] | None:
  204. graph = await self._get_graph()
  205. return graph.nodes.get(node_id)
  206. async def node_degree(self, node_id: str) -> int:
  207. graph = await self._get_graph()
  208. if graph.has_node(node_id):
  209. return graph.degree(node_id)
  210. return 0
  211. async def edge_degree(self, src_id: str, tgt_id: str) -> int:
  212. graph = await self._get_graph()
  213. src_degree = graph.degree(src_id) if graph.has_node(src_id) else 0
  214. tgt_degree = graph.degree(tgt_id) if graph.has_node(tgt_id) else 0
  215. return src_degree + tgt_degree
  216. async def get_edge(
  217. self, source_node_id: str, target_node_id: str
  218. ) -> dict[str, str] | None:
  219. graph = await self._get_graph()
  220. return graph.edges.get((source_node_id, target_node_id))
  221. async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None:
  222. graph = await self._get_graph()
  223. if graph.has_node(source_node_id):
  224. return list(graph.edges(source_node_id))
  225. return None
  226. async def upsert_node(self, node_id: str, node_data: dict[str, str]) -> None:
  227. """Insert or update a single node; persistence is deferred.
  228. Persistence:
  229. Changes are in-memory only; cross-process visibility requires
  230. a subsequent ``index_done_callback``. In ``lightrag.py`` this
  231. is handled by ``_insert_done()`` at the end of the document
  232. batch. Callers outside the pipeline must persist explicitly.
  233. Correctness relies on the class docstring *Lock scope* invariant
  234. (synchronous networkx ops + single-writer pipeline gate).
  235. """
  236. graph = await self._get_graph()
  237. graph.add_node(node_id, **node_data)
  238. async def upsert_edge(
  239. self, source_node_id: str, target_node_id: str, edge_data: dict[str, str]
  240. ) -> None:
  241. """Insert or update a single edge; persistence is deferred.
  242. Persistence:
  243. Changes are in-memory only; cross-process visibility requires
  244. a subsequent ``index_done_callback``. Callers outside the
  245. pipeline must persist explicitly.
  246. Correctness relies on the class docstring *Lock scope* invariant.
  247. """
  248. graph = await self._get_graph()
  249. graph.add_edge(source_node_id, target_node_id, **edge_data)
  250. async def upsert_nodes_batch(self, nodes: list[tuple[str, dict[str, str]]]) -> None:
  251. """Batch insert/update multiple nodes in a single call.
  252. Much faster than calling upsert_node() in a loop for large imports
  253. because it avoids per-call async event loop overhead.
  254. Persistence:
  255. Changes are in-memory only; cross-process visibility requires
  256. a subsequent ``index_done_callback``. Callers outside the
  257. pipeline must persist explicitly.
  258. Args:
  259. nodes: List of (node_id, node_data) tuples.
  260. """
  261. graph = await self._get_graph()
  262. for node_id, node_data in nodes:
  263. graph.add_node(node_id, **node_data)
  264. async def has_nodes_batch(self, node_ids: list[str]) -> set[str]:
  265. """Check existence of multiple nodes in a single call.
  266. Returns:
  267. Set of node_ids that exist in the graph.
  268. """
  269. graph = await self._get_graph()
  270. return {nid for nid in node_ids if graph.has_node(nid)}
  271. async def upsert_edges_batch(
  272. self, edges: list[tuple[str, str, dict[str, str]]]
  273. ) -> None:
  274. """Batch insert/update multiple edges in a single call.
  275. Persistence:
  276. Changes are in-memory only; cross-process visibility requires
  277. a subsequent ``index_done_callback``. Callers outside the
  278. pipeline must persist explicitly.
  279. Args:
  280. edges: List of (source_id, target_id, edge_data) tuples.
  281. """
  282. graph = await self._get_graph()
  283. for src, tgt, edge_data in edges:
  284. graph.add_edge(src, tgt, **edge_data)
  285. async def delete_node(self, node_id: str) -> None:
  286. """Remove a single node from the graph; persistence is deferred.
  287. Persistence:
  288. Changes are in-memory only; cross-process visibility requires
  289. a subsequent ``index_done_callback``. Callers outside the
  290. pipeline must persist explicitly.
  291. Pipeline-gating depends on the caller: invocations from the
  292. document purge flow are serialized by ``pipeline busy``;
  293. invocations from ``utils_graph.py`` admin flows are **not** —
  294. see class docstring *Non-pipeline write paths*.
  295. """
  296. graph = await self._get_graph()
  297. if graph.has_node(node_id):
  298. graph.remove_node(node_id)
  299. logger.debug(f"[{self.workspace}] Node {node_id} deleted from the graph")
  300. else:
  301. logger.warning(
  302. f"[{self.workspace}] Node {node_id} not found in the graph for deletion"
  303. )
  304. async def remove_nodes(self, nodes: list[str]):
  305. """Delete multiple nodes from the graph.
  306. Persistence:
  307. Changes are in-memory only; cross-process visibility requires
  308. a subsequent ``index_done_callback``. Callers outside the
  309. pipeline must persist explicitly.
  310. Pipeline-gating depends on the caller — see ``delete_node`` and
  311. class docstring *Non-pipeline write paths*.
  312. Args:
  313. nodes: List of node IDs to be deleted
  314. """
  315. graph = await self._get_graph()
  316. for node in nodes:
  317. if graph.has_node(node):
  318. graph.remove_node(node)
  319. async def remove_edges(self, edges: list[tuple[str, str]]):
  320. """Delete multiple edges from the graph.
  321. Persistence:
  322. Changes are in-memory only; cross-process visibility requires
  323. a subsequent ``index_done_callback``. Callers outside the
  324. pipeline must persist explicitly.
  325. Pipeline-gating depends on the caller — see ``delete_node`` and
  326. class docstring *Non-pipeline write paths*.
  327. Args:
  328. edges: List of edges to be deleted, each edge is a (source, target) tuple
  329. """
  330. graph = await self._get_graph()
  331. for source, target in edges:
  332. if graph.has_edge(source, target):
  333. graph.remove_edge(source, target)
  334. async def get_all_labels(self) -> list[str]:
  335. """
  336. Get all node labels(entity names) in the graph
  337. Returns:
  338. [label1, label2, ...] # Alphabetically sorted label list
  339. """
  340. graph = await self._get_graph()
  341. labels = set()
  342. for node in graph.nodes():
  343. labels.add(str(node)) # Add node id as a label
  344. # Return sorted list
  345. return sorted(list(labels))
  346. async def get_popular_labels(self, limit: int = 300) -> list[str]:
  347. """
  348. Get popular labels(entity names) by node degree (most connected entities)
  349. Args:
  350. limit: Maximum number of labels to return
  351. Returns:
  352. List of labels sorted by degree (highest first)
  353. """
  354. graph = await self._get_graph()
  355. # Get degrees of all nodes and sort by degree descending
  356. degrees = dict(graph.degree())
  357. sorted_nodes = sorted(degrees.items(), key=lambda x: x[1], reverse=True)
  358. # Return top labels limited by the specified limit
  359. popular_labels = [str(node) for node, _ in sorted_nodes[:limit]]
  360. logger.debug(
  361. f"[{self.workspace}] Retrieved {len(popular_labels)} popular labels (limit: {limit})"
  362. )
  363. return popular_labels
  364. async def search_labels(self, query: str, limit: int = 50) -> list[str]:
  365. """
  366. Search labels(entity names) with fuzzy matching
  367. Args:
  368. query: Search query string
  369. limit: Maximum number of results to return
  370. Returns:
  371. List of matching labels sorted by relevance
  372. """
  373. graph = await self._get_graph()
  374. query_lower = query.lower().strip()
  375. if not query_lower:
  376. return []
  377. # Collect matching nodes with relevance scores
  378. matches = []
  379. for node in graph.nodes():
  380. node_str = str(node)
  381. node_lower = node_str.lower()
  382. # Skip if no match
  383. if query_lower not in node_lower:
  384. continue
  385. # Calculate relevance score
  386. # Exact match gets highest score
  387. if node_lower == query_lower:
  388. score = 1000
  389. # Prefix match gets high score
  390. elif node_lower.startswith(query_lower):
  391. score = 500
  392. # Contains match gets base score, with bonus for shorter strings
  393. else:
  394. # Shorter strings with matches are more relevant
  395. score = 100 - len(node_str)
  396. # Bonus for word boundary matches
  397. if f" {query_lower}" in node_lower or f"_{query_lower}" in node_lower:
  398. score += 50
  399. matches.append((node_str, score))
  400. # Sort by relevance score (desc) then alphabetically
  401. matches.sort(key=lambda x: (-x[1], x[0]))
  402. # Return top matches limited by the specified limit
  403. search_results = [match[0] for match in matches[:limit]]
  404. logger.debug(
  405. f"[{self.workspace}] Search query '{query}' returned {len(search_results)} results (limit: {limit})"
  406. )
  407. return search_results
  408. async def get_knowledge_graph(
  409. self,
  410. node_label: str,
  411. max_depth: int = 3,
  412. max_nodes: int = None,
  413. ) -> KnowledgeGraph:
  414. """
  415. Retrieve a connected subgraph of nodes where the label includes the specified `node_label`.
  416. Args:
  417. node_label: Label of the starting node,* means all nodes
  418. max_depth: Maximum depth of the subgraph, Defaults to 3
  419. max_nodes: Maxiumu nodes to return by BFS, Defaults to 1000
  420. Returns:
  421. KnowledgeGraph object containing nodes and edges, with an is_truncated flag
  422. indicating whether the graph was truncated due to max_nodes limit
  423. """
  424. # Get max_nodes from global_config if not provided
  425. if max_nodes is None:
  426. max_nodes = self.global_config.get("max_graph_nodes", 1000)
  427. else:
  428. # Limit max_nodes to not exceed global_config max_graph_nodes
  429. max_nodes = min(max_nodes, self.global_config.get("max_graph_nodes", 1000))
  430. graph = await self._get_graph()
  431. result = KnowledgeGraph()
  432. # Handle special case for "*" label
  433. if node_label == "*":
  434. # Get degrees of all nodes
  435. degrees = dict(graph.degree())
  436. # Sort nodes by degree in descending order and take top max_nodes
  437. sorted_nodes = sorted(degrees.items(), key=lambda x: x[1], reverse=True)
  438. # Check if graph is truncated
  439. if len(sorted_nodes) > max_nodes:
  440. result.is_truncated = True
  441. logger.info(
  442. f"[{self.workspace}] Graph truncated: {len(sorted_nodes)} nodes found, limited to {max_nodes}"
  443. )
  444. limited_nodes = [node for node, _ in sorted_nodes[:max_nodes]]
  445. # Create subgraph with the highest degree nodes
  446. subgraph = graph.subgraph(limited_nodes)
  447. else:
  448. # Check if node exists
  449. if node_label not in graph:
  450. logger.warning(
  451. f"[{self.workspace}] Node {node_label} not found in the graph"
  452. )
  453. return KnowledgeGraph() # Return empty graph
  454. # Use modified BFS to get nodes, prioritizing high-degree nodes at the same depth
  455. bfs_nodes = []
  456. visited = set()
  457. # Store (node, depth, degree) in the queue
  458. queue = deque([(node_label, 0, graph.degree(node_label))])
  459. # Flag to track if there are unexplored neighbors due to depth limit
  460. has_unexplored_neighbors = False
  461. # Modified breadth-first search with degree-based prioritization
  462. while queue and len(bfs_nodes) < max_nodes:
  463. # Get the current depth from the first node in queue
  464. current_depth = queue[0][1]
  465. # Collect all nodes at the current depth
  466. current_level_nodes = []
  467. while queue and queue[0][1] == current_depth:
  468. current_level_nodes.append(queue.popleft())
  469. # Sort nodes at current depth by degree (highest first)
  470. current_level_nodes.sort(key=lambda x: x[2], reverse=True)
  471. # Process all nodes at current depth in order of degree
  472. for current_node, depth, degree in current_level_nodes:
  473. if current_node not in visited:
  474. visited.add(current_node)
  475. bfs_nodes.append(current_node)
  476. # Only explore neighbors if we haven't reached max_depth
  477. if depth < max_depth:
  478. # Add neighbor nodes to queue with incremented depth
  479. neighbors = list(graph.neighbors(current_node))
  480. # Filter out already visited neighbors
  481. unvisited_neighbors = [
  482. n for n in neighbors if n not in visited
  483. ]
  484. # Add neighbors to the queue with their degrees
  485. for neighbor in unvisited_neighbors:
  486. neighbor_degree = graph.degree(neighbor)
  487. queue.append((neighbor, depth + 1, neighbor_degree))
  488. else:
  489. # Check if there are unexplored neighbors (skipped due to depth limit)
  490. neighbors = list(graph.neighbors(current_node))
  491. unvisited_neighbors = [
  492. n for n in neighbors if n not in visited
  493. ]
  494. if unvisited_neighbors:
  495. has_unexplored_neighbors = True
  496. # Check if we've reached max_nodes
  497. if len(bfs_nodes) >= max_nodes:
  498. break
  499. # Check if graph is truncated - either due to max_nodes limit or depth limit
  500. if (queue and len(bfs_nodes) >= max_nodes) or has_unexplored_neighbors:
  501. if len(bfs_nodes) >= max_nodes:
  502. result.is_truncated = True
  503. logger.info(
  504. f"[{self.workspace}] Graph truncated: max_nodes limit {max_nodes} reached"
  505. )
  506. else:
  507. logger.info(
  508. f"[{self.workspace}] Graph truncated: found {len(bfs_nodes)} nodes within max_depth {max_depth}"
  509. )
  510. # Create subgraph with BFS discovered nodes
  511. subgraph = graph.subgraph(bfs_nodes)
  512. # Add nodes to result
  513. seen_nodes = set()
  514. seen_edges = set()
  515. for node in subgraph.nodes():
  516. if str(node) in seen_nodes:
  517. continue
  518. node_data = dict(subgraph.nodes[node])
  519. # Get entity_type as labels
  520. labels = []
  521. if "entity_type" in node_data:
  522. if isinstance(node_data["entity_type"], list):
  523. labels.extend(node_data["entity_type"])
  524. else:
  525. labels.append(node_data["entity_type"])
  526. # Create node with properties
  527. node_properties = {k: v for k, v in node_data.items()}
  528. result.nodes.append(
  529. KnowledgeGraphNode(
  530. id=str(node), labels=[str(node)], properties=node_properties
  531. )
  532. )
  533. seen_nodes.add(str(node))
  534. # Add edges to result
  535. for edge in subgraph.edges():
  536. source, target = edge
  537. # Esure unique edge_id for undirect graph
  538. if str(source) > str(target):
  539. source, target = target, source
  540. edge_id = f"{source}-{target}"
  541. if edge_id in seen_edges:
  542. continue
  543. edge_data = dict(subgraph.edges[edge])
  544. # Create edge with complete information
  545. result.edges.append(
  546. KnowledgeGraphEdge(
  547. id=edge_id,
  548. type="DIRECTED",
  549. source=str(source),
  550. target=str(target),
  551. properties=edge_data,
  552. )
  553. )
  554. seen_edges.add(edge_id)
  555. logger.info(
  556. f"[{self.workspace}] Subgraph query successful | Node count: {len(result.nodes)} | Edge count: {len(result.edges)}"
  557. )
  558. return result
  559. async def get_all_nodes(self) -> list[dict]:
  560. """Get all nodes in the graph.
  561. Returns:
  562. A list of all nodes, where each node is a dictionary of its properties
  563. """
  564. graph = await self._get_graph()
  565. all_nodes = []
  566. for node_id, node_data in graph.nodes(data=True):
  567. node_data_with_id = node_data.copy()
  568. node_data_with_id["id"] = node_id
  569. all_nodes.append(node_data_with_id)
  570. return all_nodes
  571. async def get_all_edges(self) -> list[dict]:
  572. """Get all edges in the graph.
  573. Returns:
  574. A list of all edges, where each edge is a dictionary of its properties
  575. """
  576. graph = await self._get_graph()
  577. all_edges = []
  578. for u, v, edge_data in graph.edges(data=True):
  579. edge_data_with_nodes = edge_data.copy()
  580. edge_data_with_nodes["source"] = u
  581. edge_data_with_nodes["target"] = v
  582. all_edges.append(edge_data_with_nodes)
  583. return all_edges
  584. async def index_done_callback(self) -> bool:
  585. """Commit in-memory graph to disk and notify other processes.
  586. This is the writer's **commit point** in the cross-process sync
  587. protocol (see class docstring). Two effects, in order:
  588. 1. ``write_nx_graph`` atomically writes the GraphML file
  589. (``atomic_write`` swaps a tmp file into place).
  590. 2. ``set_all_update_flags`` flips every registered process's
  591. ``storage_updated`` flag, then we immediately reset our
  592. own flag to ``False`` so the writer does not self-reload
  593. on the next call to ``_get_graph``.
  594. Two-block structure (intentional, do not collapse):
  595. * **First ``async with``** — early-return path for a
  596. hypothetical second writer. Under the current single-writer
  597. pipeline contract (class docstring, invariant 1) the
  598. ``storage_updated.value`` check is permanently ``False`` in
  599. the writer, so this branch is **dead code in production**.
  600. It is kept as defensive scaffolding for any future
  601. relaxation of the single-writer invariant; removing it
  602. would silently re-enable lost-write bugs the moment a
  603. second writer is introduced.
  604. * **Second ``async with``** — the actual save + notify.
  605. """
  606. async with self._storage_lock:
  607. # Check if storage was updated by another process
  608. if self.storage_updated.value:
  609. # Storage was updated by another process, reload data instead of saving
  610. logger.info(
  611. f"[{self.workspace}] Graph was updated by another process, reloading..."
  612. )
  613. self._graph = (
  614. NetworkXStorage.load_nx_graph(self._graphml_xml_file) or nx.Graph()
  615. )
  616. # Reset update flag
  617. self.storage_updated.value = False
  618. return False # Return error
  619. # Acquire lock and perform persistence
  620. async with self._storage_lock:
  621. try:
  622. # Save data to disk
  623. NetworkXStorage.write_nx_graph(
  624. self._graph, self._graphml_xml_file, self.workspace
  625. )
  626. # Notify other processes that data has been updated
  627. await set_all_update_flags(self.namespace, workspace=self.workspace)
  628. # Reset own update flag to avoid self-reloading
  629. self.storage_updated.value = False
  630. return True # Return success
  631. except Exception as e:
  632. logger.error(f"[{self.workspace}] Error saving graph: {e}")
  633. return False # Return error
  634. return True
  635. async def drop(self) -> dict[str, str]:
  636. """Drop all graph data from storage and reinitialize the graph.
  637. This method will:
  638. 1. Remove the graph storage file if it exists
  639. 2. Reset the graph to an empty ``nx.Graph()``
  640. 3. Update flags to notify other processes
  641. 4. Changes are persisted to disk immediately
  642. Caller contract:
  643. ``drop`` is destructive and **not** serialized by this storage
  644. class. The caller must hold the pipeline ``busy`` reservation
  645. (the ``/documents/clear`` endpoint does this) before invoking
  646. it — running ``drop`` concurrently with an active document
  647. pipeline will tear down storage out from under the writer and
  648. silently lose data. See class docstring,
  649. *Non-pipeline write paths*.
  650. Returns:
  651. dict[str, str]: Operation status and message
  652. - On success: {"status": "success", "message": "data dropped"}
  653. - On failure: {"status": "error", "message": "<error details>"}
  654. """
  655. try:
  656. async with self._storage_lock:
  657. # delete _client_file_name
  658. if os.path.exists(self._graphml_xml_file):
  659. os.remove(self._graphml_xml_file)
  660. self._graph = nx.Graph()
  661. # Notify other processes that data has been updated
  662. await set_all_update_flags(self.namespace, workspace=self.workspace)
  663. # Reset own update flag to avoid self-reloading
  664. self.storage_updated.value = False
  665. logger.info(
  666. f"[{self.workspace}] Process {os.getpid()} drop graph file:{self._graphml_xml_file}"
  667. )
  668. return {"status": "success", "message": "data dropped"}
  669. except Exception as e:
  670. logger.error(
  671. f"[{self.workspace}] Error dropping graph file:{self._graphml_xml_file}: {e}"
  672. )
  673. return {"status": "error", "message": str(e)}