| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302 |
- import asyncio
- import configparser
- import hashlib
- import json
- import os
- import uuid
- from dataclasses import dataclass
- from typing import Any, List, final
- import numpy as np
- import pipmaster as pm
- from ..base import BaseVectorStorage
- from ..exceptions import DataMigrationError
- from ..kg.shared_storage import get_data_init_lock, get_namespace_lock
- from ..utils import _cooperative_yield, compute_mdhash_id, logger
- if not pm.is_installed("qdrant-client"):
- pm.install("qdrant-client")
- from qdrant_client import QdrantClient, models # type: ignore
- @dataclass
- class _PendingVectorDoc:
- """Buffered vector upsert waiting for embedding and/or bulk flush."""
- source: dict[str, Any]
- content: str
- vector: list[float] | None = None
- DEFAULT_WORKSPACE = "_"
- WORKSPACE_ID_FIELD = "workspace_id"
- ENTITY_PREFIX = "ent-"
- CREATED_AT_FIELD = "created_at"
- ID_FIELD = "id"
- DEFAULT_QDRANT_UPSERT_MAX_PAYLOAD_BYTES = 16 * 1024 * 1024 # 16MB
- DEFAULT_QDRANT_UPSERT_MAX_POINTS_PER_BATCH = 128
- config = configparser.ConfigParser()
- config.read("config.ini", "utf-8")
- def compute_mdhash_id_for_qdrant(
- content: str, prefix: str = "", style: str = "simple"
- ) -> str:
- """
- Generate a UUID based on the content and support multiple formats.
- :param content: The content used to generate the UUID.
- :param style: The format of the UUID, optional values are "simple", "hyphenated", "urn".
- :return: A UUID that meets the requirements of Qdrant.
- """
- if not content:
- raise ValueError("Content must not be empty.")
- # Use the hash value of the content to create a UUID.
- hashed_content = hashlib.sha256((prefix + content).encode("utf-8")).digest()
- generated_uuid = uuid.UUID(bytes=hashed_content[:16], version=4)
- # Return the UUID according to the specified format.
- if style == "simple":
- return generated_uuid.hex
- elif style == "hyphenated":
- return str(generated_uuid)
- elif style == "urn":
- return f"urn:uuid:{generated_uuid}"
- else:
- raise ValueError("Invalid style. Choose from 'simple', 'hyphenated', or 'urn'.")
- def workspace_filter_condition(workspace: str) -> models.FieldCondition:
- """
- Create a workspace filter condition for Qdrant queries.
- """
- return models.FieldCondition(
- key=WORKSPACE_ID_FIELD, match=models.MatchValue(value=workspace)
- )
- def _find_legacy_collection(
- client: QdrantClient,
- namespace: str,
- workspace: str = None,
- model_suffix: str = None,
- ) -> str | None:
- """
- Find legacy collection with backward compatibility support.
- This function tries multiple naming patterns to locate legacy collections
- created by older versions of LightRAG:
- 1. lightrag_vdb_{namespace} - if model_suffix is provided (HIGHEST PRIORITY)
- 2. {workspace}_{namespace} or {namespace} - no matter if model_suffix is provided or not
- 3. lightrag_vdb_{namespace} - fall back value no matter if model_suffix is provided or not (LOWEST PRIORITY)
- Args:
- client: QdrantClient instance
- namespace: Base namespace (e.g., "chunks", "entities")
- workspace: Optional workspace identifier
- model_suffix: Optional model suffix for new collection
- Returns:
- Collection name if found, None otherwise
- """
- # Try multiple naming patterns for backward compatibility
- # More specific names (with workspace) have higher priority
- candidates = [
- f"lightrag_vdb_{namespace}" if model_suffix else None,
- f"{workspace}_{namespace}" if workspace else None,
- f"lightrag_vdb_{namespace}",
- namespace,
- ]
- for candidate in candidates:
- if candidate and client.collection_exists(candidate):
- logger.info(
- f"Qdrant: Found legacy collection '{candidate}' "
- f"(namespace={namespace}, workspace={workspace or 'none'})"
- )
- return candidate
- return None
- @final
- @dataclass
- class QdrantVectorDBStorage(BaseVectorStorage):
- def __init__(
- self, namespace, global_config, embedding_func, workspace=None, meta_fields=None
- ):
- super().__init__(
- namespace=namespace,
- workspace=workspace or "",
- global_config=global_config,
- embedding_func=embedding_func,
- meta_fields=meta_fields or set(),
- )
- self.__post_init__()
- @staticmethod
- def setup_collection(
- client: QdrantClient,
- collection_name: str,
- namespace: str,
- workspace: str,
- vectors_config: models.VectorParams,
- hnsw_config: models.HnswConfigDiff,
- model_suffix: str,
- ):
- """
- Setup Qdrant collection with migration support from legacy collections.
- Ensure final collection has workspace isolation index.
- Check vector dimension compatibility before new collection creation.
- Drop legacy collection if it exists and is empty.
- Only migrate data from legacy collection to new collection when new collection first created and legacy collection is not empty.
- Args:
- client: QdrantClient instance
- collection_name: Name of the final collection
- namespace: Base namespace (e.g., "chunks", "entities")
- workspace: Workspace identifier for data isolation
- vectors_config: Vector configuration parameters for the collection
- hnsw_config: HNSW index configuration diff for the collection
- """
- if not namespace or not workspace:
- raise ValueError("namespace and workspace must be provided")
- workspace_count_filter = models.Filter(
- must=[workspace_filter_condition(workspace)]
- )
- new_collection_exists = client.collection_exists(collection_name)
- legacy_collection = _find_legacy_collection(
- client, namespace, workspace, model_suffix
- )
- # Case 1: Only new collection exists or new collection is the same as legacy collection
- # No data migration needed, and ensuring index is created then return
- if (new_collection_exists and not legacy_collection) or (
- collection_name == legacy_collection
- ):
- # create_payload_index return without error if index already exists
- client.create_payload_index(
- collection_name=collection_name,
- field_name=WORKSPACE_ID_FIELD,
- field_schema=models.KeywordIndexParams(
- type=models.KeywordIndexType.KEYWORD,
- is_tenant=True,
- ),
- )
- new_workspace_count = client.count(
- collection_name=collection_name,
- count_filter=workspace_count_filter,
- exact=True,
- ).count
- # Skip data migration if new collection already has workspace data
- if new_workspace_count == 0 and not (collection_name == legacy_collection):
- logger.warning(
- f"Qdrant: workspace data in collection '{collection_name}' is empty. "
- f"Ensure it is caused by new workspace setup and not an unexpected embedding model change."
- )
- return
- legacy_count = None
- if not new_collection_exists:
- # Check vector dimension compatibility before creating new collection
- if legacy_collection:
- legacy_count = client.count(
- collection_name=legacy_collection, exact=True
- ).count
- if legacy_count > 0:
- legacy_info = client.get_collection(legacy_collection)
- legacy_dim = legacy_info.config.params.vectors.size
- if vectors_config.size and legacy_dim != vectors_config.size:
- logger.error(
- f"Qdrant: Dimension mismatch detected! "
- f"Legacy collection '{legacy_collection}' has {legacy_dim}d vectors, "
- f"but new embedding model expects {vectors_config.size}d."
- )
- raise DataMigrationError(
- f"Dimension mismatch between legacy collection '{legacy_collection}' "
- f"and new collection. Expected {vectors_config.size}d but got {legacy_dim}d."
- )
- client.create_collection(
- collection_name, vectors_config=vectors_config, hnsw_config=hnsw_config
- )
- logger.info(f"Qdrant: Collection '{collection_name}' created successfully")
- if not legacy_collection:
- logger.warning(
- "Qdrant: Ensure this new collection creation is caused by new workspace setup and not an unexpected embedding model change."
- )
- # create_payload_index return without error if index already exists
- client.create_payload_index(
- collection_name=collection_name,
- field_name=WORKSPACE_ID_FIELD,
- field_schema=models.KeywordIndexParams(
- type=models.KeywordIndexType.KEYWORD,
- is_tenant=True,
- ),
- )
- # Case 2: Legacy collection exist
- if legacy_collection:
- # Only drop legacy collection if it's empty
- if legacy_count is None:
- legacy_count = client.count(
- collection_name=legacy_collection, exact=True
- ).count
- if legacy_count == 0:
- client.delete_collection(collection_name=legacy_collection)
- logger.info(
- f"Qdrant: Empty legacy collection '{legacy_collection}' deleted successfully"
- )
- return
- new_workspace_count = client.count(
- collection_name=collection_name,
- count_filter=workspace_count_filter,
- exact=True,
- ).count
- # Skip data migration if new collection already has workspace data
- if new_workspace_count > 0:
- logger.warning(
- f"Qdrant: Both new and legacy collection have data. "
- f"{legacy_count} records in {legacy_collection} require manual deletion after migration verification."
- )
- return
- # Case 3: Only legacy exists - migrate data from legacy collection to new collection
- # Check if legacy collection has workspace_id to determine migration strategy
- # Note: payload_schema only reflects INDEXED fields, so we also sample
- # actual payloads to detect unindexed workspace_id fields
- legacy_info = client.get_collection(legacy_collection)
- has_workspace_index = WORKSPACE_ID_FIELD in (
- legacy_info.payload_schema or {}
- )
- # Detect workspace_id field presence by sampling payloads if not indexed
- # This prevents cross-workspace data leakage when workspace_id exists but isn't indexed
- has_workspace_field = has_workspace_index
- if not has_workspace_index:
- # Sample a small batch of points to check for workspace_id in payloads
- # All points must have workspace_id if any point has it
- sample_result = client.scroll(
- collection_name=legacy_collection,
- limit=10, # Small sample is sufficient for detection
- with_payload=True,
- with_vectors=False,
- )
- sample_points, _ = sample_result
- for point in sample_points:
- if point.payload and WORKSPACE_ID_FIELD in point.payload:
- has_workspace_field = True
- logger.info(
- f"Qdrant: Detected unindexed {WORKSPACE_ID_FIELD} field "
- f"in legacy collection '{legacy_collection}' via payload sampling"
- )
- break
- # Build workspace filter if legacy collection has workspace support
- # This prevents cross-workspace data leakage during migration
- legacy_scroll_filter = None
- if has_workspace_field:
- legacy_scroll_filter = models.Filter(
- must=[workspace_filter_condition(workspace)]
- )
- # Recount with workspace filter for accurate migration tracking
- legacy_count = client.count(
- collection_name=legacy_collection,
- count_filter=legacy_scroll_filter,
- exact=True,
- ).count
- logger.info(
- f"Qdrant: Legacy collection has workspace support, "
- f"filtering to {legacy_count} records for workspace '{workspace}'"
- )
- logger.info(
- f"Qdrant: Found legacy collection '{legacy_collection}' with {legacy_count} records to migrate."
- )
- logger.info(
- f"Qdrant: Migrating data from legacy collection '{legacy_collection}' to new collection '{collection_name}'"
- )
- try:
- # Batch migration (500 records per batch)
- migrated_count = 0
- offset = None
- batch_size = 500
- while True:
- # Scroll through legacy data with optional workspace filter
- result = client.scroll(
- collection_name=legacy_collection,
- scroll_filter=legacy_scroll_filter,
- limit=batch_size,
- offset=offset,
- with_vectors=True,
- with_payload=True,
- )
- points, next_offset = result
- if not points:
- break
- # Transform points for new collection
- new_points = []
- for point in points:
- # Set workspace_id in payload
- new_payload = dict(point.payload or {})
- new_payload[WORKSPACE_ID_FIELD] = workspace
- # Create new point with workspace-prefixed ID
- original_id = new_payload.get(ID_FIELD)
- if original_id:
- new_point_id = compute_mdhash_id_for_qdrant(
- original_id, prefix=workspace
- )
- else:
- # Fallback: use original point ID
- new_point_id = str(point.id)
- new_points.append(
- models.PointStruct(
- id=new_point_id,
- vector=point.vector,
- payload=new_payload,
- )
- )
- # Upsert to new collection
- client.upsert(
- collection_name=collection_name, points=new_points, wait=True
- )
- migrated_count += len(points)
- logger.info(
- f"Qdrant: {migrated_count}/{legacy_count} records migrated"
- )
- # Check if we've reached the end
- if next_offset is None:
- break
- offset = next_offset
- new_count_after = client.count(
- collection_name=collection_name,
- count_filter=workspace_count_filter,
- exact=True,
- ).count
- inserted_count = new_count_after - new_workspace_count
- if inserted_count != legacy_count:
- error_msg = (
- "Qdrant: Migration verification failed, expected "
- f"{legacy_count} inserted records, got {inserted_count}."
- )
- logger.error(error_msg)
- raise DataMigrationError(error_msg)
- except DataMigrationError:
- # Re-raise DataMigrationError as-is to preserve specific error messages
- raise
- except Exception as e:
- logger.error(
- f"Qdrant: Failed to migrate data from legacy collection '{legacy_collection}' to new collection '{collection_name}': {e}"
- )
- raise DataMigrationError(
- f"Failed to migrate data from legacy collection '{legacy_collection}' to new collection '{collection_name}'"
- ) from e
- logger.info(
- f"Qdrant: Migration from '{legacy_collection}' to '{collection_name}' completed successfully"
- )
- logger.warning(
- "Qdrant: Manual deletion is required after data migration verification."
- )
- def __post_init__(self):
- self._validate_embedding_func()
- # Check for QDRANT_WORKSPACE environment variable first (higher priority)
- # This allows administrators to force a specific workspace for all Qdrant storage instances
- qdrant_workspace = os.environ.get("QDRANT_WORKSPACE")
- if qdrant_workspace and qdrant_workspace.strip():
- # Use environment variable value, overriding the passed workspace parameter
- effective_workspace = qdrant_workspace.strip()
- logger.info(
- f"Using QDRANT_WORKSPACE environment variable: '{effective_workspace}' (overriding '{self.workspace}/{self.namespace}')"
- )
- else:
- # Use the workspace parameter passed during initialization
- effective_workspace = self.workspace
- if effective_workspace:
- logger.debug(
- f"Using passed workspace parameter: '{effective_workspace}'"
- )
- self.effective_workspace = effective_workspace or DEFAULT_WORKSPACE
- # Generate model suffix
- self.model_suffix = self._generate_collection_suffix()
- # New naming scheme with model isolation
- # Example: "lightrag_vdb_chunks_text_embedding_ada_002_1536d"
- # Ensure model_suffix is not empty before appending
- if self.model_suffix:
- self.final_namespace = f"lightrag_vdb_{self.namespace}_{self.model_suffix}"
- logger.info(f"Qdrant collection: {self.final_namespace}")
- else:
- # Fallback: use legacy namespace if model_suffix is unavailable
- self.final_namespace = f"lightrag_vdb_{self.namespace}"
- logger.warning(
- f"Qdrant collection: {self.final_namespace} missing suffix. Pls add model_name to embedding_func for proper workspace data isolation."
- )
- kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {})
- cosine_threshold = kwargs.get("cosine_better_than_threshold")
- if cosine_threshold is None:
- raise ValueError(
- "cosine_better_than_threshold must be specified in vector_db_storage_cls_kwargs"
- )
- self.cosine_better_than_threshold = cosine_threshold
- # Initialize client as None - will be created in initialize() method
- self._client = None
- self._max_batch_size = self.global_config["embedding_batch_num"]
- self._max_upsert_payload_bytes = int(
- os.getenv(
- "QDRANT_UPSERT_MAX_PAYLOAD_BYTES",
- str(DEFAULT_QDRANT_UPSERT_MAX_PAYLOAD_BYTES),
- )
- )
- self._max_upsert_points_per_batch = int(
- os.getenv(
- "QDRANT_UPSERT_MAX_POINTS_PER_BATCH",
- str(DEFAULT_QDRANT_UPSERT_MAX_POINTS_PER_BATCH),
- )
- )
- if self._max_upsert_payload_bytes <= 0:
- logger.warning(
- f"QDRANT_UPSERT_MAX_PAYLOAD_BYTES={self._max_upsert_payload_bytes} is non-positive, disable payload-size splitting"
- )
- if self._max_upsert_points_per_batch <= 0:
- logger.warning(
- f"QDRANT_UPSERT_MAX_POINTS_PER_BATCH={self._max_upsert_points_per_batch} is non-positive, disable point-count splitting"
- )
- self._initialized = False
- # Deferred-embedding buffers and the per-namespace flush lock.
- # Qdrant partitions a single physical collection across workspaces
- # via the workspace_id payload field, so the lock must include the
- # effective workspace (not just final_namespace) to avoid letting
- # two effectively-different writers race on the same collection.
- self._pending_vector_docs: dict[str, _PendingVectorDoc] = {}
- self._pending_vector_deletes: set[str] = set()
- self._flush_lock = None
- @staticmethod
- def _to_json_serializable(value: Any) -> Any:
- """Convert nested values to JSON-serializable types for payload size estimation."""
- if isinstance(value, np.ndarray):
- return value.tolist()
- if isinstance(value, np.integer):
- return int(value)
- if isinstance(value, np.floating):
- return float(value)
- if isinstance(value, dict):
- return {
- str(k): QdrantVectorDBStorage._to_json_serializable(v)
- for k, v in value.items()
- }
- if isinstance(value, (list, tuple)):
- return [QdrantVectorDBStorage._to_json_serializable(v) for v in value]
- return value
- @staticmethod
- def _estimate_point_payload_bytes(point: models.PointStruct) -> int:
- """Estimate serialized JSON byte size of a single Qdrant point."""
- point_obj = {
- "id": point.id,
- "vector": QdrantVectorDBStorage._to_json_serializable(point.vector),
- "payload": QdrantVectorDBStorage._to_json_serializable(point.payload or {}),
- }
- return len(
- json.dumps(
- point_obj,
- ensure_ascii=False,
- separators=(",", ":"),
- ).encode("utf-8")
- )
- @staticmethod
- def _build_upsert_batches(
- points: list[models.PointStruct],
- max_payload_bytes: int,
- max_points_per_batch: int,
- ) -> list[tuple[list[models.PointStruct], int]]:
- """Split points into batches using payload size and point count limits."""
- if not points:
- return []
- payload_limit = max_payload_bytes if max_payload_bytes > 0 else float("inf")
- points_limit = (
- max_points_per_batch if max_points_per_batch > 0 else float("inf")
- )
- batches: list[tuple[list[models.PointStruct], int]] = []
- current_batch: list[models.PointStruct] = []
- # JSON array overhead ("[]")
- current_estimated_bytes = 2
- for point in points:
- point_size = QdrantVectorDBStorage._estimate_point_payload_bytes(point)
- point_with_array_overhead = point_size + 2
- point_id = str(point.id)
- if point_with_array_overhead > payload_limit:
- raise ValueError(
- f"Single Qdrant point exceeds payload limit: id={point_id}, "
- f"estimated_bytes={point_with_array_overhead}, "
- f"limit={int(payload_limit)}"
- )
- # If current batch not empty, a comma is needed before next element.
- separator_overhead = 1 if current_batch else 0
- next_batch_size = current_estimated_bytes + separator_overhead + point_size
- if current_batch and (
- len(current_batch) >= points_limit or next_batch_size > payload_limit
- ):
- batches.append((current_batch, current_estimated_bytes))
- current_batch = []
- current_estimated_bytes = 2
- next_batch_size = current_estimated_bytes + point_size
- current_batch.append(point)
- current_estimated_bytes = next_batch_size
- if current_batch:
- batches.append((current_batch, current_estimated_bytes))
- return batches
- async def initialize(self):
- """Initialize Qdrant collection"""
- async with get_data_init_lock():
- if self._initialized:
- return
- try:
- # Create QdrantClient if not already created
- if self._client is None:
- self._client = QdrantClient(
- url=os.environ.get(
- "QDRANT_URL", config.get("qdrant", "uri", fallback=None)
- ),
- api_key=os.environ.get(
- "QDRANT_API_KEY",
- config.get("qdrant", "apikey", fallback=None),
- ),
- )
- logger.debug(
- f"[{self.workspace}] QdrantClient created successfully"
- )
- # Setup collection (create if not exists and configure indexes)
- # Pass namespace and workspace for backward-compatible migration support
- QdrantVectorDBStorage.setup_collection(
- self._client,
- self.final_namespace,
- namespace=self.namespace,
- workspace=self.effective_workspace,
- vectors_config=models.VectorParams(
- size=self.embedding_func.embedding_dim,
- distance=models.Distance.COSINE,
- ),
- hnsw_config=models.HnswConfigDiff(
- payload_m=16,
- m=0,
- ),
- model_suffix=self.model_suffix,
- )
- # Removed duplicate max batch size initialization
- self._initialized = True
- logger.info(
- f"[{self.workspace}] Qdrant collection '{self.namespace}' initialized successfully"
- )
- except Exception as e:
- logger.error(
- f"[{self.workspace}] Failed to initialize Qdrant collection '{self.namespace}': {e}"
- )
- raise
- if self._flush_lock is None:
- self._flush_lock = get_namespace_lock(
- namespace=self.final_namespace,
- workspace=self.effective_workspace,
- )
- async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
- """Buffer vector docs for embedding and batched flush.
- Embedding deliberately does NOT happen here: repeated upserts of
- the same id, or many small batches, collapse into a single
- flush-time embedding pass. The buffer is keyed by the caller's
- original doc id; the Qdrant UUID conversion runs at flush time.
- """
- if not data:
- return
- import time
- current_time = int(time.time())
- pending_docs: list[tuple[str, _PendingVectorDoc]] = []
- for i, (k, v) in enumerate(data.items(), start=1):
- source = {
- ID_FIELD: k,
- WORKSPACE_ID_FIELD: self.effective_workspace,
- CREATED_AT_FIELD: current_time,
- **{k1: v1 for k1, v1 in v.items() if k1 in self.meta_fields},
- }
- pending_docs.append(
- (
- k,
- _PendingVectorDoc(source=source, content=v["content"]),
- )
- )
- await _cooperative_yield(i)
- # An upsert overrides any pending delete on the same id; installing
- # a fresh _PendingVectorDoc invalidates any vector cached by a
- # prior get_vectors_by_ids() call on a stale revision.
- async with self._flush_lock:
- for doc_id, pdoc in pending_docs:
- self._pending_vector_deletes.discard(doc_id)
- self._pending_vector_docs[doc_id] = pdoc
- async def query(
- self, query: str, top_k: int, query_embedding: list[float] = None
- ) -> list[dict[str, Any]]:
- """Query the vector database via Qdrant ``query_points``.
- Reads from the server-side index only; buffered upserts and deletes
- are NOT visible until ``index_done_callback`` / ``finalize`` flushes
- them. Callers that need read-your-writes for a freshly upserted id
- should use ``get_by_id`` / ``get_by_ids`` (which consult the buffer)
- or flush first. Matches the deferred-embedding contract used by the
- other lazy-embedding backends (Mongo / OpenSearch / FAISS / Nano).
- """
- if query_embedding is not None:
- embedding = query_embedding
- else:
- embedding_result = await self.embedding_func(
- [query], context="query", _priority=5
- ) # higher priority for query
- embedding = embedding_result[0]
- results = self._client.query_points(
- collection_name=self.final_namespace,
- query=embedding,
- limit=top_k,
- with_payload=True,
- score_threshold=self.cosine_better_than_threshold,
- query_filter=models.Filter(
- must=[workspace_filter_condition(self.effective_workspace)]
- ),
- ).points
- return [
- {
- **dp.payload,
- "distance": dp.score,
- CREATED_AT_FIELD: dp.payload.get(CREATED_AT_FIELD),
- }
- for dp in results
- ]
- async def index_done_callback(self) -> None:
- """Flush buffered vector ops; Qdrant persists automatically once written."""
- await self._flush_pending_vector_ops()
- async def _flush_pending_vector_ops(self) -> None:
- """Flush buffered vector upserts and deletes via batched client calls.
- Embedding runs *inside* this lock (not in `upsert` or lock-free):
- it makes deferred embedding and the upsert atomic against
- concurrent upserts and destructive mutations. Reuses
- ``_build_upsert_batches`` to respect Qdrant's payload size limit.
- Any failure (embed or server write) raises and leaves both
- buffers intact; the next ``index_done_callback`` retries.
- Concurrency invariant: ``_flush_lock`` is a non-reentrant asyncio
- lock. Callers MUST NOT hold it when invoking this method --
- re-entry would deadlock. The only in-tree callers are
- ``index_done_callback`` and ``finalize``, both lock-free.
- """
- async with self._flush_lock:
- if not self._pending_vector_docs and not self._pending_vector_deletes:
- return
- if self._client is None:
- return
- pending_docs = self._pending_vector_docs
- pending_deletes = self._pending_vector_deletes
- docs_to_embed: list[tuple[str, _PendingVectorDoc]] = [
- (doc_id, pdoc)
- for doc_id, pdoc in pending_docs.items()
- if pdoc.vector is None
- ]
- if docs_to_embed:
- contents = [pdoc.content for _, pdoc in docs_to_embed]
- batches = [
- contents[i : i + self._max_batch_size]
- for i in range(0, len(contents), self._max_batch_size)
- ]
- logger.info(
- f"[{self.workspace}] {self.namespace} flush: embedding "
- f"{len(docs_to_embed)} vectors in {len(batches)} batch(es) "
- f"(batch_num={self._max_batch_size})"
- )
- try:
- embeddings_list = await asyncio.gather(
- *[
- self.embedding_func(batch, context="document")
- for batch in batches
- ]
- )
- except Exception as e:
- logger.error(
- f"[{self.workspace}] Error embedding pending vector ops "
- f"(upserts={len(docs_to_embed)}): {e}"
- )
- raise
- embeddings = np.concatenate(embeddings_list)
- if len(embeddings) != len(docs_to_embed):
- raise RuntimeError(
- f"[{self.workspace}] Embedding count mismatch: expected "
- f"{len(docs_to_embed)}, got {len(embeddings)}"
- )
- for i, ((_, pdoc), embedding) in enumerate(
- zip(docs_to_embed, embeddings), start=1
- ):
- # Cache the raw numpy row so a second flush after a
- # server-side error doesn't re-embed.
- pdoc.vector = np.array(embedding, dtype=np.float32).tolist()
- await _cooperative_yield(i)
- # Build PointStruct list, converting caller-supplied ids to
- # Qdrant UUIDs only now (the buffer keeps caller ids so
- # read-your-writes works against the same key).
- list_points: list[models.PointStruct] = []
- committed_ids: list[str] = []
- for doc_id, pdoc in pending_docs.items():
- if pdoc.vector is None:
- continue
- committed_ids.append(doc_id)
- list_points.append(
- models.PointStruct(
- id=compute_mdhash_id_for_qdrant(
- doc_id, prefix=self.effective_workspace
- ),
- vector=pdoc.vector,
- payload=dict(pdoc.source),
- )
- )
- try:
- if list_points:
- point_batches = self._build_upsert_batches(
- list_points,
- max_payload_bytes=self._max_upsert_payload_bytes,
- max_points_per_batch=self._max_upsert_points_per_batch,
- )
- if len(point_batches) > 1:
- logger.info(
- f"[{self.workspace}] Qdrant upsert split into {len(point_batches)} batches "
- f"for {len(list_points)} points (max_payload_bytes={self._max_upsert_payload_bytes}, "
- f"max_points_per_batch={self._max_upsert_points_per_batch})"
- )
- for batch_index, (points_batch, estimated_bytes) in enumerate(
- point_batches, 1
- ):
- logger.debug(
- f"[{self.workspace}] Qdrant upsert batch {batch_index}/{len(point_batches)}: "
- f"points={len(points_batch)}, estimated_payload_bytes={estimated_bytes}"
- )
- # Fail-fast: any batch failure raises immediately
- # and stops subsequent batches; the full buffer is
- # retained so the next flush retries.
- self._client.upsert(
- collection_name=self.final_namespace,
- points=points_batch,
- wait=True,
- )
- if pending_deletes:
- qdrant_delete_ids = [
- compute_mdhash_id_for_qdrant(
- doc_id, prefix=self.effective_workspace
- )
- for doc_id in pending_deletes
- ]
- self._client.delete(
- collection_name=self.final_namespace,
- points_selector=models.PointIdsList(points=qdrant_delete_ids),
- wait=True,
- )
- except Exception as e:
- logger.error(
- f"[{self.workspace}] Error flushing vector ops "
- f"(upserts={len(pending_docs)}, "
- f"deletes={len(pending_deletes)}): {e}"
- )
- raise
- for doc_id in committed_ids:
- pending_docs.pop(doc_id, None)
- pending_deletes.clear()
- async def delete(self, ids: List[str]) -> None:
- """Buffer vector deletes for batched flush."""
- if not ids:
- return
- if isinstance(ids, set):
- ids = list(ids)
- async with self._flush_lock:
- for doc_id in ids:
- self._pending_vector_docs.pop(doc_id, None)
- self._pending_vector_deletes.add(doc_id)
- logger.debug(
- f"[{self.workspace}] Buffered delete for {len(ids)} vectors in {self.namespace}"
- )
- async def delete_entity(self, entity_name: str) -> None:
- """Buffer an entity vector delete by computing its hash ID."""
- entity_id = compute_mdhash_id(entity_name, prefix=ENTITY_PREFIX)
- async with self._flush_lock:
- self._pending_vector_docs.pop(entity_id, None)
- self._pending_vector_deletes.add(entity_id)
- logger.debug(
- f"[{self.workspace}] Buffered delete for entity {entity_name} (id={entity_id})"
- )
- async def delete_entity_relation(self, entity_name: str) -> None:
- """Delete all relation vectors where entity appears as src or tgt.
- The whole method runs under ``_flush_lock`` so the server-side
- scroll + delete cannot interleave with an in-flight bulk upsert.
- Server-side failures are re-raised (no log-and-swallow): the
- caller decides whether to retry.
- Buffer semantics — post-prune with caller short-circuit contract:
- Matching pending upserts in ``_pending_vector_docs`` are
- pruned **only after** the server-side scroll+delete loop
- completes fully. If any iteration raises, the pending buffer
- is left intact so a higher-level failure does not silently
- drop buffered relation vectors that the user never told us
- to discard. The trade-off is that partial server-side
- deletes plus preserved pending upserts can re-insert deleted
- relations on the next flush — correctness therefore relies
- on the caller short-circuiting before ``index_done_callback``
- can run. The single in-tree caller ``adelete_by_entity``
- in ``utils_graph.py`` honors this: its ``except`` clause
- skips both ``delete_node`` and ``_persist_graph_updates``,
- so on failure the graph and the pending buffer stay
- consistent with the "delete never happened" state and the
- operation converges on the next retry.
- """
- async with self._flush_lock:
- if self._client is None:
- # pre-init / post-finalize: only buffer state remains, so
- # apply the delete intent there.
- for doc_id in [
- k
- for k, v in self._pending_vector_docs.items()
- if v.source.get("src_id") == entity_name
- or v.source.get("tgt_id") == entity_name
- ]:
- self._pending_vector_docs.pop(doc_id, None)
- return
- relation_filter = models.Filter(
- must=[workspace_filter_condition(self.effective_workspace)],
- should=[
- models.FieldCondition(
- key="src_id", match=models.MatchValue(value=entity_name)
- ),
- models.FieldCondition(
- key="tgt_id", match=models.MatchValue(value=entity_name)
- ),
- ],
- )
- total_deleted = 0
- offset = None
- batch_size = 1000
- while True:
- results = self._client.scroll(
- collection_name=self.final_namespace,
- scroll_filter=relation_filter,
- with_payload=False,
- with_vectors=False,
- limit=batch_size,
- offset=offset,
- )
- points, next_offset = results
- if not points:
- break
- ids_to_delete = [point.id for point in points]
- self._client.delete(
- collection_name=self.final_namespace,
- points_selector=models.PointIdsList(points=ids_to_delete),
- wait=True,
- )
- total_deleted += len(ids_to_delete)
- if next_offset is None:
- break
- offset = next_offset
- # Server-side scroll+delete fully succeeded — safe to prune
- # matching pending relation upserts so the next flush won't
- # re-upsert the just-deleted relations. If the loop above
- # raised, this prune is skipped and the buffer state stays
- # available for the caller's retry path.
- for doc_id in [
- k
- for k, v in self._pending_vector_docs.items()
- if v.source.get("src_id") == entity_name
- or v.source.get("tgt_id") == entity_name
- ]:
- self._pending_vector_docs.pop(doc_id, None)
- if total_deleted > 0:
- logger.debug(
- f"[{self.workspace}] Deleted {total_deleted} relations for {entity_name}"
- )
- else:
- logger.debug(
- f"[{self.workspace}] No relations found for entity {entity_name}"
- )
- async def get_by_id(self, id: str) -> dict[str, Any] | None:
- """Get vector data by its ID, with read-your-writes against the buffer."""
- async with self._flush_lock:
- if id in self._pending_vector_deletes:
- return None
- pending = self._pending_vector_docs.get(id)
- if pending is not None:
- # Buffer hits return the source payload (no vector); the
- # Qdrant fallback path also returns just the payload.
- payload = dict(pending.source)
- payload.setdefault(CREATED_AT_FIELD, None)
- return payload
- try:
- qdrant_id = compute_mdhash_id_for_qdrant(
- id, prefix=self.effective_workspace
- )
- result = self._client.retrieve(
- collection_name=self.final_namespace,
- ids=[qdrant_id],
- with_payload=True,
- )
- if not result:
- return None
- payload = result[0].payload
- if CREATED_AT_FIELD not in payload:
- payload[CREATED_AT_FIELD] = None
- return payload
- except Exception as e:
- logger.error(
- f"[{self.workspace}] Error retrieving vector data for ID {id}: {e}"
- )
- return None
- async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
- """Get multiple vector data by their IDs (read-your-writes), preserving order."""
- if not ids:
- return []
- buffered: dict[str, dict[str, Any] | None] = {}
- remaining: list[str] = []
- async with self._flush_lock:
- for doc_id in ids:
- if doc_id in self._pending_vector_deletes:
- buffered[doc_id] = None
- continue
- pending = self._pending_vector_docs.get(doc_id)
- if pending is not None:
- payload = dict(pending.source)
- payload.setdefault(CREATED_AT_FIELD, None)
- buffered[doc_id] = payload
- continue
- remaining.append(doc_id)
- payload_by_original_id: dict[str, dict[str, Any]] = {}
- payload_by_qdrant_id: dict[str, dict[str, Any]] = {}
- if remaining:
- try:
- qdrant_ids = [
- compute_mdhash_id_for_qdrant(id, prefix=self.effective_workspace)
- for id in remaining
- ]
- results = self._client.retrieve(
- collection_name=self.final_namespace,
- ids=qdrant_ids,
- with_payload=True,
- )
- for point in results:
- payload = dict(point.payload or {})
- if CREATED_AT_FIELD not in payload:
- payload[CREATED_AT_FIELD] = None
- qdrant_point_id = str(point.id) if point.id is not None else ""
- if qdrant_point_id:
- payload_by_qdrant_id[qdrant_point_id] = payload
- original_id = payload.get(ID_FIELD)
- if original_id is not None:
- payload_by_original_id[str(original_id)] = payload
- except Exception as e:
- logger.error(
- f"[{self.workspace}] Error retrieving vector data for IDs {remaining}: {e}"
- )
- return []
- ordered_payloads: list[dict[str, Any] | None] = []
- for doc_id in ids:
- if doc_id in buffered:
- ordered_payloads.append(buffered[doc_id])
- continue
- payload = payload_by_original_id.get(str(doc_id))
- if payload is None:
- payload = payload_by_qdrant_id.get(
- compute_mdhash_id_for_qdrant(
- doc_id, prefix=self.effective_workspace
- )
- )
- ordered_payloads.append(payload)
- return ordered_payloads
- async def get_vectors_by_ids(self, ids: list[str]) -> dict[str, list[float]]:
- """Get vector embeddings for given IDs, with read-your-writes.
- Pending docs whose vector hasn't been embedded yet are embedded
- lazily inside the lock; the resulting vector is cached on the
- buffered ``_PendingVectorDoc`` so the next flush won't re-embed.
- Visibility caveat for ids not in the buffer: the server-side
- ``retrieve`` fallback runs *outside* ``_flush_lock``. A concurrent
- ``delete()`` that lands between lock release and the server read
- only buffers the delete -- the old vector is still on disk
- until the next flush, so this method may return a stale vector
- for an id that has been buffered for deletion. This is
- best-effort read-after-uncommitted-delete and matches the
- ``query()`` contract: callers needing strict consistency must
- ``index_done_callback()`` first.
- """
- if not ids:
- return {}
- result: dict[str, list[float]] = {}
- remaining: list[str] = []
- async with self._flush_lock:
- docs_to_embed: list[tuple[str, _PendingVectorDoc]] = []
- for doc_id in ids:
- if doc_id in self._pending_vector_deletes:
- continue
- pending = self._pending_vector_docs.get(doc_id)
- if pending is not None:
- if pending.vector is None:
- docs_to_embed.append((doc_id, pending))
- else:
- result[doc_id] = pending.vector
- continue
- remaining.append(doc_id)
- if docs_to_embed:
- contents = [pdoc.content for _, pdoc in docs_to_embed]
- batches = [
- contents[i : i + self._max_batch_size]
- for i in range(0, len(contents), self._max_batch_size)
- ]
- try:
- embeddings_list = await asyncio.gather(
- *[
- self.embedding_func(batch, context="document")
- for batch in batches
- ]
- )
- except Exception as e:
- logger.error(
- f"[{self.workspace}] Error lazily embedding pending vectors "
- f"(upserts={len(docs_to_embed)}): {e}"
- )
- raise
- embeddings = np.concatenate(embeddings_list)
- if len(embeddings) != len(docs_to_embed):
- raise RuntimeError(
- f"[{self.workspace}] Embedding count mismatch: expected "
- f"{len(docs_to_embed)}, got {len(embeddings)}"
- )
- for i, ((doc_id, pdoc), embedding) in enumerate(
- zip(docs_to_embed, embeddings), start=1
- ):
- pdoc.vector = np.array(embedding, dtype=np.float32).tolist()
- result[doc_id] = pdoc.vector
- await _cooperative_yield(i)
- if not remaining:
- return result
- try:
- qdrant_ids = [
- compute_mdhash_id_for_qdrant(id, prefix=self.effective_workspace)
- for id in remaining
- ]
- results = self._client.retrieve(
- collection_name=self.final_namespace,
- ids=qdrant_ids,
- with_vectors=True,
- with_payload=True,
- )
- for point in results:
- if point and point.vector is not None and point.payload:
- original_id = point.payload.get(ID_FIELD)
- if original_id:
- vector_data = point.vector
- if isinstance(vector_data, np.ndarray):
- vector_data = vector_data.tolist()
- result[original_id] = vector_data
- return result
- except Exception as e:
- logger.error(f"[{self.workspace}] Error getting vectors: {e}")
- return result
- async def finalize(self):
- """Flush pending vector ops; surface unflushed data as RuntimeError.
- Qdrant has no client connection that needs explicit release here
- (the QdrantClient is held by the storage instance and torn down
- on GC), but we still need to fail loudly when a transient bulk
- error left writes buffered. ``_flush_pending_vector_ops`` is
- all-or-nothing: it either clears both buffers or raises with
- them intact, but we still defensively check both buffers after a
- successful flush in case a future refactor breaks that invariant.
- """
- flush_error: Exception | None = None
- try:
- await self._flush_pending_vector_ops()
- except Exception as e:
- flush_error = e
- async with self._flush_lock:
- pending_docs = len(self._pending_vector_docs)
- pending_deletes = len(self._pending_vector_deletes)
- if flush_error is not None:
- raise RuntimeError(
- f"[{self.workspace}] QdrantVectorDBStorage.finalize() flush raised; "
- f"{pending_docs} pending upserts and {pending_deletes} pending "
- f"deletes were left buffered (data lost)"
- ) from flush_error
- if pending_docs or pending_deletes:
- raise RuntimeError(
- f"[{self.workspace}] QdrantVectorDBStorage.finalize() left "
- f"{pending_docs} pending upserts and {pending_deletes} pending "
- f"deletes buffered after final flush attempt (these writes have been lost)"
- )
- async def drop(self) -> dict[str, str]:
- """Drop all vector data for the current workspace. Destructive.
- Deletes every point matching ``effective_workspace`` from the
- shared Qdrant collection ``final_namespace`` (Qdrant partitions a
- single physical collection across workspaces via the
- ``workspace_id`` payload field, so sibling workspaces on the same
- collection are untouched). The collection itself and its vector
- index are NOT recreated — they were provisioned at
- ``initialize()`` and remain in place.
- MUST only be called when ``pipeline_status`` is idle (see the
- Pipeline concurrency contract in ``AGENTS.md``); the only
- in-tree caller ``clear_documents`` enforces this.
- Pending-write buffers are cleared *before* the server-side delete
- is issued so a concurrent flush on this instance cannot resurrect
- the dropped data. As a consequence, if the server-side delete
- fails, the buffered writes are also lost — the caller cannot
- recover them by retrying ``drop()``. This matches ``drop()``'s
- contract ("discard everything for this workspace") and the other
- lazy-embedding backends.
- Caveat — only this instance's buffers are cleared. Other
- ``QdrantVectorDBStorage`` instances aliased onto the same
- ``(final_namespace, effective_workspace)`` (multi-worker
- processes, or distinct workspaces collapsed by
- ``QDRANT_WORKSPACE``) keep their own buffers; a sibling whose
- prior flush failed and left buffers intact will, on its next
- flush, upsert those stale points back into the freshly emptied
- workspace. Direct callers bypassing the idle precondition MUST
- flush every aliased instance first.
- Returns:
- dict[str, str]: ``{"status": "success"|"error", "message": str}``
- """
- try:
- async with self._flush_lock:
- # Discard buffered writes before the workspace is wiped;
- # a concurrent flush would otherwise resurrect them.
- self._pending_vector_docs.clear()
- self._pending_vector_deletes.clear()
- # Delete all points for the current workspace
- self._client.delete(
- collection_name=self.final_namespace,
- points_selector=models.FilterSelector(
- filter=models.Filter(
- must=[workspace_filter_condition(self.effective_workspace)]
- )
- ),
- wait=True,
- )
- logger.info(
- f"[{self.workspace}] Process {os.getpid()} dropped workspace data from Qdrant collection {self.namespace}"
- )
- return {"status": "success", "message": "data dropped"}
- except Exception as e:
- logger.error(
- f"[{self.workspace}] Error dropping workspace data from Qdrant collection {self.namespace}: {e}"
- )
- return {"status": "error", "message": str(e)}
|