| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789 |
- import os
- from collections import deque
- from dataclasses import dataclass
- from typing import final
- from lightrag.file_atomic import atomic_write, reap_orphan_tmp_files
- from lightrag.types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge
- from lightrag.utils import logger
- from lightrag.base import BaseGraphStorage
- import networkx as nx
- from .shared_storage import (
- get_namespace_lock,
- get_update_flag,
- set_all_update_flags,
- )
- from dotenv import load_dotenv
- # use the .env that is inside the current folder
- # allows to use different .env file for each lightrag instance
- # the OS environment variables take precedence over the .env file
- load_dotenv(dotenv_path=".env", override=False)
- @final
- @dataclass
- class NetworkXStorage(BaseGraphStorage):
- """File-backed knowledge-graph storage built on ``networkx.Graph``.
- Storage model:
- A single ``networkx.Graph`` instance lives in process memory; its
- full state is serialized to one GraphML file at
- ``working_dir/[workspace/]graph_<namespace>.graphml``. That GraphML
- file is the **only** cross-process synchronization surface — there
- is no shared memory, no message bus, and no network channel
- between processes. Cross-process visibility is mediated by (a) an
- atomic file write at commit time and (b) a per-namespace
- ``storage_updated`` flag distributed through
- ``lightrag.kg.shared_storage``.
- Concurrency invariants (the code in this file is correct *only* while
- all three hold):
- 1. **Single writer per workspace.** The document pipeline's
- ``busy`` / ``destructive_busy`` flags (see ``AGENTS.md``
- *Pipeline concurrency contract*) guarantee at most one process
- performs ``upsert_*`` / ``delete_*`` / ``remove_*`` /
- ``index_done_callback`` at any time. Every other process is
- read-only.
- 2. **Eventual consistency is sufficient.** Read-only processes
- only need to observe the writer's data *after* the writer's
- ``index_done_callback`` completes. Reads landing in the gap
- between a writer's in-memory mutation and its commit may
- legitimately return the pre-update snapshot.
- 3. **networkx operations are fully synchronous.** Under a
- single-threaded asyncio event loop, ``graph.add_node`` /
- ``graph.remove_node`` / ``graph.degree`` / etc. cannot be
- preempted by another coroutine, which gives them implicit
- mutual exclusion over ``self._graph``. This is why the methods
- below don't have to hold ``_storage_lock`` while calling into
- ``graph``.
- Cross-process sync protocol (identical in shape to
- ``NanoVectorDBStorage`` — see that class's docstring for the canonical
- description):
- Writer side (``index_done_callback``):
- 1. ``write_nx_graph`` atomically writes the GraphML file
- (``atomic_write`` lays a tmp file beside the target and
- renames it into place — readers either see the previous
- file in full or the new file in full, never a torn write).
- 2. ``set_all_update_flags`` flips every process's
- ``storage_updated`` flag (including the writer's own).
- 3. Immediately reset the writer's own flag to ``False`` so
- the next call to ``_get_graph`` does not trigger a
- self-reload of the data this process just wrote.
- Reader side (any method that goes through ``_get_graph``):
- 1. Inside ``_storage_lock``, observe
- ``storage_updated.value is True``.
- 2. **Fully reload** ``self._graph`` from disk via
- ``load_nx_graph``. networkx GraphML has no incremental
- sync API, so the entire file is re-parsed.
- 3. Reset the reader's own flag.
- Lock scope:
- ``_storage_lock`` is a per-``(namespace, workspace)`` keyed lock
- spanning both intra-process coroutines and inter-process workers.
- It wraps only the *reload* and *commit* critical sections, not
- every ``graph.xxx`` call. Operating on ``graph`` outside the lock
- is safe today *because of invariant (3)* — if either premise is
- ever broken (e.g. ``graph.xxx`` is moved to a thread pool, or
- networkx is swapped for an async graph library), the lock scope
- must be widened to cover the mutation/read itself.
- Implementation differences from ``NanoVectorDBStorage`` (same design,
- different surface):
- * No ``client_storage`` property — there is no equivalent live
- reference being exposed to callers, so NanoVectorDB's
- "do-not-retain-across-await" caveat does not apply here.
- * ``write_nx_graph`` passes the tmp path directly to
- ``nx.write_graphml``, so the writer needs no equivalent of
- NanoVectorDB's "temporarily reassign ``storage_file``" trick.
- * Mutation surface is finer-grained (``upsert_node`` /
- ``upsert_edge`` / ``upsert_nodes_batch`` /
- ``upsert_edges_batch`` / ``delete_node`` / ``remove_nodes`` /
- ``remove_edges``); each goes through ``_get_graph`` once and
- then operates synchronously on ``self._graph``.
- Non-pipeline write paths:
- The pipeline's ``busy`` gate serializes mutation calls reached
- through the document ingestion and purge flows. The following
- entry points are **not** serialized by the pipeline gate and
- must be guarded externally:
- * ``drop`` — currently gated by the API layer (the
- ``/documents/clear`` endpoint takes the pipeline busy
- reservation before invoking it).
- * ``delete_node`` / ``remove_nodes`` / ``remove_edges`` /
- ``upsert_node`` / ``upsert_edge`` when invoked from
- ``utils_graph.py`` admin flows (``adelete_by_entity`` /
- ``adelete_by_relation`` / entity-edit flows). These flows
- are currently not exposed in the WebUI; any future caller
- must arrange single-writer serialization the same way the
- pipeline does.
- """
- @staticmethod
- def load_nx_graph(file_name) -> nx.Graph:
- if os.path.exists(file_name):
- return nx.read_graphml(file_name)
- return None
- @staticmethod
- def write_nx_graph(graph: nx.Graph, file_name, workspace="_"):
- logger.info(
- f"[{workspace}] Writing graph with {graph.number_of_nodes()} nodes, {graph.number_of_edges()} edges"
- )
- atomic_write(
- file_name,
- lambda tmp: nx.write_graphml(graph, tmp),
- workspace,
- )
- def __post_init__(self):
- working_dir = self.global_config["working_dir"]
- if self.workspace:
- # Include workspace in the file path for data isolation
- workspace_dir = os.path.join(working_dir, self.workspace)
- else:
- # Default behavior when workspace is empty
- workspace_dir = working_dir
- self.workspace = ""
- os.makedirs(workspace_dir, exist_ok=True)
- self._graphml_xml_file = os.path.join(
- workspace_dir, f"graph_{self.namespace}.graphml"
- )
- self._storage_lock = None
- self.storage_updated = None
- self._graph = None
- reap_orphan_tmp_files(self._graphml_xml_file, workspace=self.workspace or "_")
- # Load initial graph
- preloaded_graph = NetworkXStorage.load_nx_graph(self._graphml_xml_file)
- if preloaded_graph is not None:
- logger.info(
- f"[{self.workspace}] Loaded graph from {self._graphml_xml_file} with {preloaded_graph.number_of_nodes()} nodes, {preloaded_graph.number_of_edges()} edges"
- )
- else:
- logger.info(
- f"[{self.workspace}] Created new empty graph file: {self._graphml_xml_file}"
- )
- self._graph = preloaded_graph or nx.Graph()
- async def initialize(self):
- """Initialize storage data"""
- # Get the update flag for cross-process update notification
- self.storage_updated = await get_update_flag(
- self.namespace, workspace=self.workspace
- )
- # Get the storage lock for use in other methods
- self._storage_lock = get_namespace_lock(
- self.namespace, workspace=self.workspace
- )
- async def _get_graph(self):
- """Return the live ``networkx.Graph``, reloading from disk if needed.
- This is the **single entry point** every public method funnels
- through to obtain ``self._graph``. It is also the **only place
- readers transition to a fresher on-disk snapshot**: when another
- process has committed (via ``index_done_callback``) and flipped
- this process's ``storage_updated`` flag, the next call here
- rebuilds ``self._graph`` by re-parsing the entire GraphML file.
- networkx has no incremental sync API — the reload is
- unconditionally a full file reload.
- Under the *Single writer* invariant (see class docstring), the
- reload branch never fires in the writer process: the writer
- resets its own flag at the end of every ``index_done_callback``.
- The branch exists for readers.
- ``_storage_lock`` is held during the check-and-reload to (a)
- serialize concurrent reload attempts by sibling coroutines in
- the same process and (b) interlock with ``index_done_callback``
- so a reader cannot observe a partially-saved file.
- """
- async with self._storage_lock:
- # Check if data needs to be reloaded
- if self.storage_updated.value:
- logger.info(
- f"[{self.workspace}] Process {os.getpid()} reloading graph {self._graphml_xml_file} due to modifications by another process"
- )
- # Reload data
- self._graph = (
- NetworkXStorage.load_nx_graph(self._graphml_xml_file) or nx.Graph()
- )
- # Reset update flag
- self.storage_updated.value = False
- return self._graph
- async def has_node(self, node_id: str) -> bool:
- graph = await self._get_graph()
- return graph.has_node(node_id)
- async def has_edge(self, source_node_id: str, target_node_id: str) -> bool:
- graph = await self._get_graph()
- return graph.has_edge(source_node_id, target_node_id)
- async def get_node(self, node_id: str) -> dict[str, str] | None:
- graph = await self._get_graph()
- return graph.nodes.get(node_id)
- async def node_degree(self, node_id: str) -> int:
- graph = await self._get_graph()
- if graph.has_node(node_id):
- return graph.degree(node_id)
- return 0
- async def edge_degree(self, src_id: str, tgt_id: str) -> int:
- graph = await self._get_graph()
- src_degree = graph.degree(src_id) if graph.has_node(src_id) else 0
- tgt_degree = graph.degree(tgt_id) if graph.has_node(tgt_id) else 0
- return src_degree + tgt_degree
- async def get_edge(
- self, source_node_id: str, target_node_id: str
- ) -> dict[str, str] | None:
- graph = await self._get_graph()
- return graph.edges.get((source_node_id, target_node_id))
- async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None:
- graph = await self._get_graph()
- if graph.has_node(source_node_id):
- return list(graph.edges(source_node_id))
- return None
- async def upsert_node(self, node_id: str, node_data: dict[str, str]) -> None:
- """Insert or update a single node; persistence is deferred.
- Persistence:
- Changes are in-memory only; cross-process visibility requires
- a subsequent ``index_done_callback``. In ``lightrag.py`` this
- is handled by ``_insert_done()`` at the end of the document
- batch. Callers outside the pipeline must persist explicitly.
- Correctness relies on the class docstring *Lock scope* invariant
- (synchronous networkx ops + single-writer pipeline gate).
- """
- graph = await self._get_graph()
- graph.add_node(node_id, **node_data)
- async def upsert_edge(
- self, source_node_id: str, target_node_id: str, edge_data: dict[str, str]
- ) -> None:
- """Insert or update a single edge; persistence is deferred.
- Persistence:
- Changes are in-memory only; cross-process visibility requires
- a subsequent ``index_done_callback``. Callers outside the
- pipeline must persist explicitly.
- Correctness relies on the class docstring *Lock scope* invariant.
- """
- graph = await self._get_graph()
- graph.add_edge(source_node_id, target_node_id, **edge_data)
- async def upsert_nodes_batch(self, nodes: list[tuple[str, dict[str, str]]]) -> None:
- """Batch insert/update multiple nodes in a single call.
- Much faster than calling upsert_node() in a loop for large imports
- because it avoids per-call async event loop overhead.
- Persistence:
- Changes are in-memory only; cross-process visibility requires
- a subsequent ``index_done_callback``. Callers outside the
- pipeline must persist explicitly.
- Args:
- nodes: List of (node_id, node_data) tuples.
- """
- graph = await self._get_graph()
- for node_id, node_data in nodes:
- graph.add_node(node_id, **node_data)
- async def has_nodes_batch(self, node_ids: list[str]) -> set[str]:
- """Check existence of multiple nodes in a single call.
- Returns:
- Set of node_ids that exist in the graph.
- """
- graph = await self._get_graph()
- return {nid for nid in node_ids if graph.has_node(nid)}
- async def upsert_edges_batch(
- self, edges: list[tuple[str, str, dict[str, str]]]
- ) -> None:
- """Batch insert/update multiple edges in a single call.
- Persistence:
- Changes are in-memory only; cross-process visibility requires
- a subsequent ``index_done_callback``. Callers outside the
- pipeline must persist explicitly.
- Args:
- edges: List of (source_id, target_id, edge_data) tuples.
- """
- graph = await self._get_graph()
- for src, tgt, edge_data in edges:
- graph.add_edge(src, tgt, **edge_data)
- async def delete_node(self, node_id: str) -> None:
- """Remove a single node from the graph; persistence is deferred.
- Persistence:
- Changes are in-memory only; cross-process visibility requires
- a subsequent ``index_done_callback``. Callers outside the
- pipeline must persist explicitly.
- Pipeline-gating depends on the caller: invocations from the
- document purge flow are serialized by ``pipeline busy``;
- invocations from ``utils_graph.py`` admin flows are **not** —
- see class docstring *Non-pipeline write paths*.
- """
- graph = await self._get_graph()
- if graph.has_node(node_id):
- graph.remove_node(node_id)
- logger.debug(f"[{self.workspace}] Node {node_id} deleted from the graph")
- else:
- logger.warning(
- f"[{self.workspace}] Node {node_id} not found in the graph for deletion"
- )
- async def remove_nodes(self, nodes: list[str]):
- """Delete multiple nodes from the graph.
- Persistence:
- Changes are in-memory only; cross-process visibility requires
- a subsequent ``index_done_callback``. Callers outside the
- pipeline must persist explicitly.
- Pipeline-gating depends on the caller — see ``delete_node`` and
- class docstring *Non-pipeline write paths*.
- Args:
- nodes: List of node IDs to be deleted
- """
- graph = await self._get_graph()
- for node in nodes:
- if graph.has_node(node):
- graph.remove_node(node)
- async def remove_edges(self, edges: list[tuple[str, str]]):
- """Delete multiple edges from the graph.
- Persistence:
- Changes are in-memory only; cross-process visibility requires
- a subsequent ``index_done_callback``. Callers outside the
- pipeline must persist explicitly.
- Pipeline-gating depends on the caller — see ``delete_node`` and
- class docstring *Non-pipeline write paths*.
- Args:
- edges: List of edges to be deleted, each edge is a (source, target) tuple
- """
- graph = await self._get_graph()
- for source, target in edges:
- if graph.has_edge(source, target):
- graph.remove_edge(source, target)
- async def get_all_labels(self) -> list[str]:
- """
- Get all node labels(entity names) in the graph
- Returns:
- [label1, label2, ...] # Alphabetically sorted label list
- """
- graph = await self._get_graph()
- labels = set()
- for node in graph.nodes():
- labels.add(str(node)) # Add node id as a label
- # Return sorted list
- return sorted(list(labels))
- async def get_popular_labels(self, limit: int = 300) -> list[str]:
- """
- Get popular labels(entity names) by node degree (most connected entities)
- Args:
- limit: Maximum number of labels to return
- Returns:
- List of labels sorted by degree (highest first)
- """
- graph = await self._get_graph()
- # Get degrees of all nodes and sort by degree descending
- degrees = dict(graph.degree())
- sorted_nodes = sorted(degrees.items(), key=lambda x: x[1], reverse=True)
- # Return top labels limited by the specified limit
- popular_labels = [str(node) for node, _ in sorted_nodes[:limit]]
- logger.debug(
- f"[{self.workspace}] Retrieved {len(popular_labels)} popular labels (limit: {limit})"
- )
- return popular_labels
- async def search_labels(self, query: str, limit: int = 50) -> list[str]:
- """
- Search labels(entity names) with fuzzy matching
- Args:
- query: Search query string
- limit: Maximum number of results to return
- Returns:
- List of matching labels sorted by relevance
- """
- graph = await self._get_graph()
- query_lower = query.lower().strip()
- if not query_lower:
- return []
- # Collect matching nodes with relevance scores
- matches = []
- for node in graph.nodes():
- node_str = str(node)
- node_lower = node_str.lower()
- # Skip if no match
- if query_lower not in node_lower:
- continue
- # Calculate relevance score
- # Exact match gets highest score
- if node_lower == query_lower:
- score = 1000
- # Prefix match gets high score
- elif node_lower.startswith(query_lower):
- score = 500
- # Contains match gets base score, with bonus for shorter strings
- else:
- # Shorter strings with matches are more relevant
- score = 100 - len(node_str)
- # Bonus for word boundary matches
- if f" {query_lower}" in node_lower or f"_{query_lower}" in node_lower:
- score += 50
- matches.append((node_str, score))
- # Sort by relevance score (desc) then alphabetically
- matches.sort(key=lambda x: (-x[1], x[0]))
- # Return top matches limited by the specified limit
- search_results = [match[0] for match in matches[:limit]]
- logger.debug(
- f"[{self.workspace}] Search query '{query}' returned {len(search_results)} results (limit: {limit})"
- )
- return search_results
- async def get_knowledge_graph(
- self,
- node_label: str,
- max_depth: int = 3,
- max_nodes: int = None,
- ) -> KnowledgeGraph:
- """
- Retrieve a connected subgraph of nodes where the label includes the specified `node_label`.
- Args:
- node_label: Label of the starting node,* means all nodes
- max_depth: Maximum depth of the subgraph, Defaults to 3
- max_nodes: Maxiumu nodes to return by BFS, Defaults to 1000
- Returns:
- KnowledgeGraph object containing nodes and edges, with an is_truncated flag
- indicating whether the graph was truncated due to max_nodes limit
- """
- # Get max_nodes from global_config if not provided
- if max_nodes is None:
- max_nodes = self.global_config.get("max_graph_nodes", 1000)
- else:
- # Limit max_nodes to not exceed global_config max_graph_nodes
- max_nodes = min(max_nodes, self.global_config.get("max_graph_nodes", 1000))
- graph = await self._get_graph()
- result = KnowledgeGraph()
- # Handle special case for "*" label
- if node_label == "*":
- # Get degrees of all nodes
- degrees = dict(graph.degree())
- # Sort nodes by degree in descending order and take top max_nodes
- sorted_nodes = sorted(degrees.items(), key=lambda x: x[1], reverse=True)
- # Check if graph is truncated
- if len(sorted_nodes) > max_nodes:
- result.is_truncated = True
- logger.info(
- f"[{self.workspace}] Graph truncated: {len(sorted_nodes)} nodes found, limited to {max_nodes}"
- )
- limited_nodes = [node for node, _ in sorted_nodes[:max_nodes]]
- # Create subgraph with the highest degree nodes
- subgraph = graph.subgraph(limited_nodes)
- else:
- # Check if node exists
- if node_label not in graph:
- logger.warning(
- f"[{self.workspace}] Node {node_label} not found in the graph"
- )
- return KnowledgeGraph() # Return empty graph
- # Use modified BFS to get nodes, prioritizing high-degree nodes at the same depth
- bfs_nodes = []
- visited = set()
- # Store (node, depth, degree) in the queue
- queue = deque([(node_label, 0, graph.degree(node_label))])
- # Flag to track if there are unexplored neighbors due to depth limit
- has_unexplored_neighbors = False
- # Modified breadth-first search with degree-based prioritization
- while queue and len(bfs_nodes) < max_nodes:
- # Get the current depth from the first node in queue
- current_depth = queue[0][1]
- # Collect all nodes at the current depth
- current_level_nodes = []
- while queue and queue[0][1] == current_depth:
- current_level_nodes.append(queue.popleft())
- # Sort nodes at current depth by degree (highest first)
- current_level_nodes.sort(key=lambda x: x[2], reverse=True)
- # Process all nodes at current depth in order of degree
- for current_node, depth, degree in current_level_nodes:
- if current_node not in visited:
- visited.add(current_node)
- bfs_nodes.append(current_node)
- # Only explore neighbors if we haven't reached max_depth
- if depth < max_depth:
- # Add neighbor nodes to queue with incremented depth
- neighbors = list(graph.neighbors(current_node))
- # Filter out already visited neighbors
- unvisited_neighbors = [
- n for n in neighbors if n not in visited
- ]
- # Add neighbors to the queue with their degrees
- for neighbor in unvisited_neighbors:
- neighbor_degree = graph.degree(neighbor)
- queue.append((neighbor, depth + 1, neighbor_degree))
- else:
- # Check if there are unexplored neighbors (skipped due to depth limit)
- neighbors = list(graph.neighbors(current_node))
- unvisited_neighbors = [
- n for n in neighbors if n not in visited
- ]
- if unvisited_neighbors:
- has_unexplored_neighbors = True
- # Check if we've reached max_nodes
- if len(bfs_nodes) >= max_nodes:
- break
- # Check if graph is truncated - either due to max_nodes limit or depth limit
- if (queue and len(bfs_nodes) >= max_nodes) or has_unexplored_neighbors:
- if len(bfs_nodes) >= max_nodes:
- result.is_truncated = True
- logger.info(
- f"[{self.workspace}] Graph truncated: max_nodes limit {max_nodes} reached"
- )
- else:
- logger.info(
- f"[{self.workspace}] Graph truncated: found {len(bfs_nodes)} nodes within max_depth {max_depth}"
- )
- # Create subgraph with BFS discovered nodes
- subgraph = graph.subgraph(bfs_nodes)
- # Add nodes to result
- seen_nodes = set()
- seen_edges = set()
- for node in subgraph.nodes():
- if str(node) in seen_nodes:
- continue
- node_data = dict(subgraph.nodes[node])
- # Get entity_type as labels
- labels = []
- if "entity_type" in node_data:
- if isinstance(node_data["entity_type"], list):
- labels.extend(node_data["entity_type"])
- else:
- labels.append(node_data["entity_type"])
- # Create node with properties
- node_properties = {k: v for k, v in node_data.items()}
- result.nodes.append(
- KnowledgeGraphNode(
- id=str(node), labels=[str(node)], properties=node_properties
- )
- )
- seen_nodes.add(str(node))
- # Add edges to result
- for edge in subgraph.edges():
- source, target = edge
- # Esure unique edge_id for undirect graph
- if str(source) > str(target):
- source, target = target, source
- edge_id = f"{source}-{target}"
- if edge_id in seen_edges:
- continue
- edge_data = dict(subgraph.edges[edge])
- # Create edge with complete information
- result.edges.append(
- KnowledgeGraphEdge(
- id=edge_id,
- type="DIRECTED",
- source=str(source),
- target=str(target),
- properties=edge_data,
- )
- )
- seen_edges.add(edge_id)
- logger.info(
- f"[{self.workspace}] Subgraph query successful | Node count: {len(result.nodes)} | Edge count: {len(result.edges)}"
- )
- return result
- async def get_all_nodes(self) -> list[dict]:
- """Get all nodes in the graph.
- Returns:
- A list of all nodes, where each node is a dictionary of its properties
- """
- graph = await self._get_graph()
- all_nodes = []
- for node_id, node_data in graph.nodes(data=True):
- node_data_with_id = node_data.copy()
- node_data_with_id["id"] = node_id
- all_nodes.append(node_data_with_id)
- return all_nodes
- async def get_all_edges(self) -> list[dict]:
- """Get all edges in the graph.
- Returns:
- A list of all edges, where each edge is a dictionary of its properties
- """
- graph = await self._get_graph()
- all_edges = []
- for u, v, edge_data in graph.edges(data=True):
- edge_data_with_nodes = edge_data.copy()
- edge_data_with_nodes["source"] = u
- edge_data_with_nodes["target"] = v
- all_edges.append(edge_data_with_nodes)
- return all_edges
- async def index_done_callback(self) -> bool:
- """Commit in-memory graph to disk and notify other processes.
- This is the writer's **commit point** in the cross-process sync
- protocol (see class docstring). Two effects, in order:
- 1. ``write_nx_graph`` atomically writes the GraphML file
- (``atomic_write`` swaps a tmp file into place).
- 2. ``set_all_update_flags`` flips every registered process's
- ``storage_updated`` flag, then we immediately reset our
- own flag to ``False`` so the writer does not self-reload
- on the next call to ``_get_graph``.
- Two-block structure (intentional, do not collapse):
- * **First ``async with``** — early-return path for a
- hypothetical second writer. Under the current single-writer
- pipeline contract (class docstring, invariant 1) the
- ``storage_updated.value`` check is permanently ``False`` in
- the writer, so this branch is **dead code in production**.
- It is kept as defensive scaffolding for any future
- relaxation of the single-writer invariant; removing it
- would silently re-enable lost-write bugs the moment a
- second writer is introduced.
- * **Second ``async with``** — the actual save + notify.
- """
- async with self._storage_lock:
- # Check if storage was updated by another process
- if self.storage_updated.value:
- # Storage was updated by another process, reload data instead of saving
- logger.info(
- f"[{self.workspace}] Graph was updated by another process, reloading..."
- )
- self._graph = (
- NetworkXStorage.load_nx_graph(self._graphml_xml_file) or nx.Graph()
- )
- # Reset update flag
- self.storage_updated.value = False
- return False # Return error
- # Acquire lock and perform persistence
- async with self._storage_lock:
- try:
- # Save data to disk
- NetworkXStorage.write_nx_graph(
- self._graph, self._graphml_xml_file, self.workspace
- )
- # Notify other processes that data has been updated
- await set_all_update_flags(self.namespace, workspace=self.workspace)
- # Reset own update flag to avoid self-reloading
- self.storage_updated.value = False
- return True # Return success
- except Exception as e:
- logger.error(f"[{self.workspace}] Error saving graph: {e}")
- return False # Return error
- return True
- async def drop(self) -> dict[str, str]:
- """Drop all graph data from storage and reinitialize the graph.
- This method will:
- 1. Remove the graph storage file if it exists
- 2. Reset the graph to an empty ``nx.Graph()``
- 3. Update flags to notify other processes
- 4. Changes are persisted to disk immediately
- Caller contract:
- ``drop`` is destructive and **not** serialized by this storage
- class. The caller must hold the pipeline ``busy`` reservation
- (the ``/documents/clear`` endpoint does this) before invoking
- it — running ``drop`` concurrently with an active document
- pipeline will tear down storage out from under the writer and
- silently lose data. See class docstring,
- *Non-pipeline write paths*.
- Returns:
- dict[str, str]: Operation status and message
- - On success: {"status": "success", "message": "data dropped"}
- - On failure: {"status": "error", "message": "<error details>"}
- """
- try:
- async with self._storage_lock:
- # delete _client_file_name
- if os.path.exists(self._graphml_xml_file):
- os.remove(self._graphml_xml_file)
- self._graph = nx.Graph()
- # Notify other processes that data has been updated
- await set_all_update_flags(self.namespace, workspace=self.workspace)
- # Reset own update flag to avoid self-reloading
- self.storage_updated.value = False
- logger.info(
- f"[{self.workspace}] Process {os.getpid()} drop graph file:{self._graphml_xml_file}"
- )
- return {"status": "success", "message": "data dropped"}
- except Exception as e:
- logger.error(
- f"[{self.workspace}] Error dropping graph file:{self._graphml_xml_file}: {e}"
- )
- return {"status": "error", "message": str(e)}
|