| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- """Storage data migration helpers for :class:`LightRAG`.
- Mixed into LightRAG and runs once at startup (``initialize_storages`` →
- ``check_and_migrate_data``) to upgrade legacy data layouts:
- - Backfill ``full_entities`` / ``full_relations`` from the graph + doc_status
- history when those KV stores are empty (entity-relation migration).
- - Rebuild ``entity_chunks`` / ``relation_chunks`` indexes by walking nodes/
- edges in the graph storage when they are empty
- (chunk-tracking migration).
- """
- from __future__ import annotations
- from lightrag.base import DocStatus
- from lightrag.constants import GRAPH_FIELD_SEP
- from lightrag.kg.shared_storage import get_data_init_lock
- from lightrag.utils import logger, make_relation_chunk_key
- class _StorageMigrationMixin:
- """Mixin that owns one-shot data migrations on :class:`LightRAG`.
- Mixed into LightRAG only. Relies on attributes that the main class
- initializes in ``__post_init__`` (``doc_status``, ``full_entities``,
- ``full_relations``, ``chunk_entity_relation_graph``, ``entity_chunks``,
- ``relation_chunks``).
- """
- async def check_and_migrate_data(self):
- """Check if data migration is needed and perform migration if necessary"""
- async with get_data_init_lock():
- try:
- # Check if migration is needed:
- # 1. chunk_entity_relation_graph has entities and relations (count > 0)
- # 2. full_entities and full_relations are empty
- # Get all entity labels from graph
- all_entity_labels = (
- await self.chunk_entity_relation_graph.get_all_labels()
- )
- if not all_entity_labels:
- logger.debug("No entities found in graph, skipping migration check")
- return
- try:
- # Initialize chunk tracking storage after migration
- await self._migrate_chunk_tracking_storage()
- except Exception as e:
- logger.error(f"Error during chunk_tracking migration: {e}")
- raise e
- # Check if full_entities and full_relations are empty
- # Get all processed documents to check their entity/relation data
- try:
- processed_docs = await self.doc_status.get_docs_by_status(
- DocStatus.PROCESSED
- )
- if not processed_docs:
- logger.debug("No processed documents found, skipping migration")
- return
- # Check first few documents to see if they have full_entities/full_relations data
- migration_needed = True
- checked_count = 0
- max_check = min(5, len(processed_docs)) # Check up to 5 documents
- for doc_id in list(processed_docs.keys())[:max_check]:
- checked_count += 1
- entity_data = await self.full_entities.get_by_id(doc_id)
- relation_data = await self.full_relations.get_by_id(doc_id)
- if entity_data or relation_data:
- migration_needed = False
- break
- if not migration_needed:
- logger.debug(
- "Full entities/relations data already exists, no migration needed"
- )
- return
- logger.info(
- f"Data migration needed: found {len(all_entity_labels)} entities in graph but no full_entities/full_relations data"
- )
- # Perform migration
- await self._migrate_entity_relation_data(processed_docs)
- except Exception as e:
- logger.error(f"Error during migration check: {e}")
- raise e
- except Exception as e:
- logger.error(f"Error in data migration check: {e}")
- raise e
- async def _migrate_entity_relation_data(self, processed_docs: dict):
- """Migrate existing entity and relation data to full_entities and full_relations storage"""
- logger.info(f"Starting data migration for {len(processed_docs)} documents")
- # Create mapping from chunk_id to doc_id
- chunk_to_doc = {}
- for doc_id, doc_status in processed_docs.items():
- chunk_ids = (
- doc_status.chunks_list
- if hasattr(doc_status, "chunks_list") and doc_status.chunks_list
- else []
- )
- for chunk_id in chunk_ids:
- chunk_to_doc[chunk_id] = doc_id
- # Initialize document entity and relation mappings
- doc_entities = {} # doc_id -> set of entity_names
- doc_relations = {} # doc_id -> set of relation_pairs (as tuples)
- # Get all nodes and edges from graph
- all_nodes = await self.chunk_entity_relation_graph.get_all_nodes()
- all_edges = await self.chunk_entity_relation_graph.get_all_edges()
- # Process all nodes once
- for node in all_nodes:
- if "source_id" in node:
- entity_id = node.get("entity_id") or node.get("id")
- if not entity_id:
- continue
- # Get chunk IDs from source_id
- source_ids = node["source_id"].split(GRAPH_FIELD_SEP)
- # Find which documents this entity belongs to
- for chunk_id in source_ids:
- doc_id = chunk_to_doc.get(chunk_id)
- if doc_id:
- if doc_id not in doc_entities:
- doc_entities[doc_id] = set()
- doc_entities[doc_id].add(entity_id)
- # Process all edges once
- for edge in all_edges:
- if "source_id" in edge:
- src = edge.get("source")
- tgt = edge.get("target")
- if not src or not tgt:
- continue
- # Get chunk IDs from source_id
- source_ids = edge["source_id"].split(GRAPH_FIELD_SEP)
- # Find which documents this relation belongs to
- for chunk_id in source_ids:
- doc_id = chunk_to_doc.get(chunk_id)
- if doc_id:
- if doc_id not in doc_relations:
- doc_relations[doc_id] = set()
- # Use tuple for set operations, convert to list later
- doc_relations[doc_id].add(tuple(sorted((src, tgt))))
- # Store the results in full_entities and full_relations
- migration_count = 0
- # Store entities
- if doc_entities:
- entities_data = {}
- for doc_id, entity_set in doc_entities.items():
- entities_data[doc_id] = {
- "entity_names": list(entity_set),
- "count": len(entity_set),
- }
- await self.full_entities.upsert(entities_data)
- # Store relations
- if doc_relations:
- relations_data = {}
- for doc_id, relation_set in doc_relations.items():
- # Convert tuples back to lists
- relations_data[doc_id] = {
- "relation_pairs": [list(pair) for pair in relation_set],
- "count": len(relation_set),
- }
- await self.full_relations.upsert(relations_data)
- migration_count = len(
- set(list(doc_entities.keys()) + list(doc_relations.keys()))
- )
- # Persist the migrated data
- await self.full_entities.index_done_callback()
- await self.full_relations.index_done_callback()
- logger.info(
- f"Data migration completed: migrated {migration_count} documents with entities/relations"
- )
- async def _migrate_chunk_tracking_storage(self) -> None:
- """Ensure entity/relation chunk tracking KV stores exist and are seeded."""
- if not self.entity_chunks or not self.relation_chunks:
- return
- need_entity_migration = False
- need_relation_migration = False
- try:
- need_entity_migration = await self.entity_chunks.is_empty()
- except Exception as exc: # pragma: no cover - defensive logging
- logger.error(f"Failed to check entity chunks storage: {exc}")
- raise exc
- try:
- need_relation_migration = await self.relation_chunks.is_empty()
- except Exception as exc: # pragma: no cover - defensive logging
- logger.error(f"Failed to check relation chunks storage: {exc}")
- raise exc
- if not need_entity_migration and not need_relation_migration:
- return
- BATCH_SIZE = 500 # Process 500 records per batch
- if need_entity_migration:
- try:
- nodes = await self.chunk_entity_relation_graph.get_all_nodes()
- except Exception as exc:
- logger.error(f"Failed to fetch nodes for chunk migration: {exc}")
- nodes = []
- logger.info(f"Starting chunk_tracking data migration: {len(nodes)} nodes")
- # Process nodes in batches
- total_nodes = len(nodes)
- total_batches = (total_nodes + BATCH_SIZE - 1) // BATCH_SIZE
- total_migrated = 0
- for batch_idx in range(total_batches):
- start_idx = batch_idx * BATCH_SIZE
- end_idx = min((batch_idx + 1) * BATCH_SIZE, total_nodes)
- batch_nodes = nodes[start_idx:end_idx]
- upsert_payload: dict[str, dict[str, object]] = {}
- for node in batch_nodes:
- entity_id = node.get("entity_id") or node.get("id")
- if not entity_id:
- continue
- raw_source = node.get("source_id") or ""
- chunk_ids = [
- chunk_id
- for chunk_id in raw_source.split(GRAPH_FIELD_SEP)
- if chunk_id
- ]
- if not chunk_ids:
- continue
- upsert_payload[entity_id] = {
- "chunk_ids": chunk_ids,
- "count": len(chunk_ids),
- }
- if upsert_payload:
- await self.entity_chunks.upsert(upsert_payload)
- total_migrated += len(upsert_payload)
- logger.info(
- f"Processed entity batch {batch_idx + 1}/{total_batches}: {len(upsert_payload)} records (total: {total_migrated}/{total_nodes})"
- )
- if total_migrated > 0:
- # Persist entity_chunks data to disk
- await self.entity_chunks.index_done_callback()
- logger.info(
- f"Entity chunk_tracking migration completed: {total_migrated} records persisted"
- )
- if need_relation_migration:
- try:
- edges = await self.chunk_entity_relation_graph.get_all_edges()
- except Exception as exc:
- logger.error(f"Failed to fetch edges for chunk migration: {exc}")
- edges = []
- logger.info(f"Starting chunk_tracking data migration: {len(edges)} edges")
- # Process edges in batches
- total_edges = len(edges)
- total_batches = (total_edges + BATCH_SIZE - 1) // BATCH_SIZE
- total_migrated = 0
- for batch_idx in range(total_batches):
- start_idx = batch_idx * BATCH_SIZE
- end_idx = min((batch_idx + 1) * BATCH_SIZE, total_edges)
- batch_edges = edges[start_idx:end_idx]
- upsert_payload: dict[str, dict[str, object]] = {}
- for edge in batch_edges:
- src = edge.get("source") or edge.get("src_id") or edge.get("src")
- tgt = edge.get("target") or edge.get("tgt_id") or edge.get("tgt")
- if not src or not tgt:
- continue
- raw_source = edge.get("source_id") or ""
- chunk_ids = [
- chunk_id
- for chunk_id in raw_source.split(GRAPH_FIELD_SEP)
- if chunk_id
- ]
- if not chunk_ids:
- continue
- storage_key = make_relation_chunk_key(src, tgt)
- upsert_payload[storage_key] = {
- "chunk_ids": chunk_ids,
- "count": len(chunk_ids),
- }
- if upsert_payload:
- await self.relation_chunks.upsert(upsert_payload)
- total_migrated += len(upsert_payload)
- logger.info(
- f"Processed relation batch {batch_idx + 1}/{total_batches}: {len(upsert_payload)} records (total: {total_migrated}/{total_edges})"
- )
- if total_migrated > 0:
- # Persist relation_chunks data to disk
- await self.relation_chunks.index_done_callback()
- logger.info(
- f"Relation chunk_tracking migration completed: {total_migrated} records persisted"
- )
|