qdrant_impl.py 54 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302
  1. import asyncio
  2. import configparser
  3. import hashlib
  4. import json
  5. import os
  6. import uuid
  7. from dataclasses import dataclass
  8. from typing import Any, List, final
  9. import numpy as np
  10. import pipmaster as pm
  11. from ..base import BaseVectorStorage
  12. from ..exceptions import DataMigrationError
  13. from ..kg.shared_storage import get_data_init_lock, get_namespace_lock
  14. from ..utils import _cooperative_yield, compute_mdhash_id, logger
  15. if not pm.is_installed("qdrant-client"):
  16. pm.install("qdrant-client")
  17. from qdrant_client import QdrantClient, models # type: ignore
  18. @dataclass
  19. class _PendingVectorDoc:
  20. """Buffered vector upsert waiting for embedding and/or bulk flush."""
  21. source: dict[str, Any]
  22. content: str
  23. vector: list[float] | None = None
  24. DEFAULT_WORKSPACE = "_"
  25. WORKSPACE_ID_FIELD = "workspace_id"
  26. ENTITY_PREFIX = "ent-"
  27. CREATED_AT_FIELD = "created_at"
  28. ID_FIELD = "id"
  29. DEFAULT_QDRANT_UPSERT_MAX_PAYLOAD_BYTES = 16 * 1024 * 1024 # 16MB
  30. DEFAULT_QDRANT_UPSERT_MAX_POINTS_PER_BATCH = 128
  31. config = configparser.ConfigParser()
  32. config.read("config.ini", "utf-8")
  33. def compute_mdhash_id_for_qdrant(
  34. content: str, prefix: str = "", style: str = "simple"
  35. ) -> str:
  36. """
  37. Generate a UUID based on the content and support multiple formats.
  38. :param content: The content used to generate the UUID.
  39. :param style: The format of the UUID, optional values are "simple", "hyphenated", "urn".
  40. :return: A UUID that meets the requirements of Qdrant.
  41. """
  42. if not content:
  43. raise ValueError("Content must not be empty.")
  44. # Use the hash value of the content to create a UUID.
  45. hashed_content = hashlib.sha256((prefix + content).encode("utf-8")).digest()
  46. generated_uuid = uuid.UUID(bytes=hashed_content[:16], version=4)
  47. # Return the UUID according to the specified format.
  48. if style == "simple":
  49. return generated_uuid.hex
  50. elif style == "hyphenated":
  51. return str(generated_uuid)
  52. elif style == "urn":
  53. return f"urn:uuid:{generated_uuid}"
  54. else:
  55. raise ValueError("Invalid style. Choose from 'simple', 'hyphenated', or 'urn'.")
  56. def workspace_filter_condition(workspace: str) -> models.FieldCondition:
  57. """
  58. Create a workspace filter condition for Qdrant queries.
  59. """
  60. return models.FieldCondition(
  61. key=WORKSPACE_ID_FIELD, match=models.MatchValue(value=workspace)
  62. )
  63. def _find_legacy_collection(
  64. client: QdrantClient,
  65. namespace: str,
  66. workspace: str = None,
  67. model_suffix: str = None,
  68. ) -> str | None:
  69. """
  70. Find legacy collection with backward compatibility support.
  71. This function tries multiple naming patterns to locate legacy collections
  72. created by older versions of LightRAG:
  73. 1. lightrag_vdb_{namespace} - if model_suffix is provided (HIGHEST PRIORITY)
  74. 2. {workspace}_{namespace} or {namespace} - no matter if model_suffix is provided or not
  75. 3. lightrag_vdb_{namespace} - fall back value no matter if model_suffix is provided or not (LOWEST PRIORITY)
  76. Args:
  77. client: QdrantClient instance
  78. namespace: Base namespace (e.g., "chunks", "entities")
  79. workspace: Optional workspace identifier
  80. model_suffix: Optional model suffix for new collection
  81. Returns:
  82. Collection name if found, None otherwise
  83. """
  84. # Try multiple naming patterns for backward compatibility
  85. # More specific names (with workspace) have higher priority
  86. candidates = [
  87. f"lightrag_vdb_{namespace}" if model_suffix else None,
  88. f"{workspace}_{namespace}" if workspace else None,
  89. f"lightrag_vdb_{namespace}",
  90. namespace,
  91. ]
  92. for candidate in candidates:
  93. if candidate and client.collection_exists(candidate):
  94. logger.info(
  95. f"Qdrant: Found legacy collection '{candidate}' "
  96. f"(namespace={namespace}, workspace={workspace or 'none'})"
  97. )
  98. return candidate
  99. return None
  100. @final
  101. @dataclass
  102. class QdrantVectorDBStorage(BaseVectorStorage):
  103. def __init__(
  104. self, namespace, global_config, embedding_func, workspace=None, meta_fields=None
  105. ):
  106. super().__init__(
  107. namespace=namespace,
  108. workspace=workspace or "",
  109. global_config=global_config,
  110. embedding_func=embedding_func,
  111. meta_fields=meta_fields or set(),
  112. )
  113. self.__post_init__()
  114. @staticmethod
  115. def setup_collection(
  116. client: QdrantClient,
  117. collection_name: str,
  118. namespace: str,
  119. workspace: str,
  120. vectors_config: models.VectorParams,
  121. hnsw_config: models.HnswConfigDiff,
  122. model_suffix: str,
  123. ):
  124. """
  125. Setup Qdrant collection with migration support from legacy collections.
  126. Ensure final collection has workspace isolation index.
  127. Check vector dimension compatibility before new collection creation.
  128. Drop legacy collection if it exists and is empty.
  129. Only migrate data from legacy collection to new collection when new collection first created and legacy collection is not empty.
  130. Args:
  131. client: QdrantClient instance
  132. collection_name: Name of the final collection
  133. namespace: Base namespace (e.g., "chunks", "entities")
  134. workspace: Workspace identifier for data isolation
  135. vectors_config: Vector configuration parameters for the collection
  136. hnsw_config: HNSW index configuration diff for the collection
  137. """
  138. if not namespace or not workspace:
  139. raise ValueError("namespace and workspace must be provided")
  140. workspace_count_filter = models.Filter(
  141. must=[workspace_filter_condition(workspace)]
  142. )
  143. new_collection_exists = client.collection_exists(collection_name)
  144. legacy_collection = _find_legacy_collection(
  145. client, namespace, workspace, model_suffix
  146. )
  147. # Case 1: Only new collection exists or new collection is the same as legacy collection
  148. # No data migration needed, and ensuring index is created then return
  149. if (new_collection_exists and not legacy_collection) or (
  150. collection_name == legacy_collection
  151. ):
  152. # create_payload_index return without error if index already exists
  153. client.create_payload_index(
  154. collection_name=collection_name,
  155. field_name=WORKSPACE_ID_FIELD,
  156. field_schema=models.KeywordIndexParams(
  157. type=models.KeywordIndexType.KEYWORD,
  158. is_tenant=True,
  159. ),
  160. )
  161. new_workspace_count = client.count(
  162. collection_name=collection_name,
  163. count_filter=workspace_count_filter,
  164. exact=True,
  165. ).count
  166. # Skip data migration if new collection already has workspace data
  167. if new_workspace_count == 0 and not (collection_name == legacy_collection):
  168. logger.warning(
  169. f"Qdrant: workspace data in collection '{collection_name}' is empty. "
  170. f"Ensure it is caused by new workspace setup and not an unexpected embedding model change."
  171. )
  172. return
  173. legacy_count = None
  174. if not new_collection_exists:
  175. # Check vector dimension compatibility before creating new collection
  176. if legacy_collection:
  177. legacy_count = client.count(
  178. collection_name=legacy_collection, exact=True
  179. ).count
  180. if legacy_count > 0:
  181. legacy_info = client.get_collection(legacy_collection)
  182. legacy_dim = legacy_info.config.params.vectors.size
  183. if vectors_config.size and legacy_dim != vectors_config.size:
  184. logger.error(
  185. f"Qdrant: Dimension mismatch detected! "
  186. f"Legacy collection '{legacy_collection}' has {legacy_dim}d vectors, "
  187. f"but new embedding model expects {vectors_config.size}d."
  188. )
  189. raise DataMigrationError(
  190. f"Dimension mismatch between legacy collection '{legacy_collection}' "
  191. f"and new collection. Expected {vectors_config.size}d but got {legacy_dim}d."
  192. )
  193. client.create_collection(
  194. collection_name, vectors_config=vectors_config, hnsw_config=hnsw_config
  195. )
  196. logger.info(f"Qdrant: Collection '{collection_name}' created successfully")
  197. if not legacy_collection:
  198. logger.warning(
  199. "Qdrant: Ensure this new collection creation is caused by new workspace setup and not an unexpected embedding model change."
  200. )
  201. # create_payload_index return without error if index already exists
  202. client.create_payload_index(
  203. collection_name=collection_name,
  204. field_name=WORKSPACE_ID_FIELD,
  205. field_schema=models.KeywordIndexParams(
  206. type=models.KeywordIndexType.KEYWORD,
  207. is_tenant=True,
  208. ),
  209. )
  210. # Case 2: Legacy collection exist
  211. if legacy_collection:
  212. # Only drop legacy collection if it's empty
  213. if legacy_count is None:
  214. legacy_count = client.count(
  215. collection_name=legacy_collection, exact=True
  216. ).count
  217. if legacy_count == 0:
  218. client.delete_collection(collection_name=legacy_collection)
  219. logger.info(
  220. f"Qdrant: Empty legacy collection '{legacy_collection}' deleted successfully"
  221. )
  222. return
  223. new_workspace_count = client.count(
  224. collection_name=collection_name,
  225. count_filter=workspace_count_filter,
  226. exact=True,
  227. ).count
  228. # Skip data migration if new collection already has workspace data
  229. if new_workspace_count > 0:
  230. logger.warning(
  231. f"Qdrant: Both new and legacy collection have data. "
  232. f"{legacy_count} records in {legacy_collection} require manual deletion after migration verification."
  233. )
  234. return
  235. # Case 3: Only legacy exists - migrate data from legacy collection to new collection
  236. # Check if legacy collection has workspace_id to determine migration strategy
  237. # Note: payload_schema only reflects INDEXED fields, so we also sample
  238. # actual payloads to detect unindexed workspace_id fields
  239. legacy_info = client.get_collection(legacy_collection)
  240. has_workspace_index = WORKSPACE_ID_FIELD in (
  241. legacy_info.payload_schema or {}
  242. )
  243. # Detect workspace_id field presence by sampling payloads if not indexed
  244. # This prevents cross-workspace data leakage when workspace_id exists but isn't indexed
  245. has_workspace_field = has_workspace_index
  246. if not has_workspace_index:
  247. # Sample a small batch of points to check for workspace_id in payloads
  248. # All points must have workspace_id if any point has it
  249. sample_result = client.scroll(
  250. collection_name=legacy_collection,
  251. limit=10, # Small sample is sufficient for detection
  252. with_payload=True,
  253. with_vectors=False,
  254. )
  255. sample_points, _ = sample_result
  256. for point in sample_points:
  257. if point.payload and WORKSPACE_ID_FIELD in point.payload:
  258. has_workspace_field = True
  259. logger.info(
  260. f"Qdrant: Detected unindexed {WORKSPACE_ID_FIELD} field "
  261. f"in legacy collection '{legacy_collection}' via payload sampling"
  262. )
  263. break
  264. # Build workspace filter if legacy collection has workspace support
  265. # This prevents cross-workspace data leakage during migration
  266. legacy_scroll_filter = None
  267. if has_workspace_field:
  268. legacy_scroll_filter = models.Filter(
  269. must=[workspace_filter_condition(workspace)]
  270. )
  271. # Recount with workspace filter for accurate migration tracking
  272. legacy_count = client.count(
  273. collection_name=legacy_collection,
  274. count_filter=legacy_scroll_filter,
  275. exact=True,
  276. ).count
  277. logger.info(
  278. f"Qdrant: Legacy collection has workspace support, "
  279. f"filtering to {legacy_count} records for workspace '{workspace}'"
  280. )
  281. logger.info(
  282. f"Qdrant: Found legacy collection '{legacy_collection}' with {legacy_count} records to migrate."
  283. )
  284. logger.info(
  285. f"Qdrant: Migrating data from legacy collection '{legacy_collection}' to new collection '{collection_name}'"
  286. )
  287. try:
  288. # Batch migration (500 records per batch)
  289. migrated_count = 0
  290. offset = None
  291. batch_size = 500
  292. while True:
  293. # Scroll through legacy data with optional workspace filter
  294. result = client.scroll(
  295. collection_name=legacy_collection,
  296. scroll_filter=legacy_scroll_filter,
  297. limit=batch_size,
  298. offset=offset,
  299. with_vectors=True,
  300. with_payload=True,
  301. )
  302. points, next_offset = result
  303. if not points:
  304. break
  305. # Transform points for new collection
  306. new_points = []
  307. for point in points:
  308. # Set workspace_id in payload
  309. new_payload = dict(point.payload or {})
  310. new_payload[WORKSPACE_ID_FIELD] = workspace
  311. # Create new point with workspace-prefixed ID
  312. original_id = new_payload.get(ID_FIELD)
  313. if original_id:
  314. new_point_id = compute_mdhash_id_for_qdrant(
  315. original_id, prefix=workspace
  316. )
  317. else:
  318. # Fallback: use original point ID
  319. new_point_id = str(point.id)
  320. new_points.append(
  321. models.PointStruct(
  322. id=new_point_id,
  323. vector=point.vector,
  324. payload=new_payload,
  325. )
  326. )
  327. # Upsert to new collection
  328. client.upsert(
  329. collection_name=collection_name, points=new_points, wait=True
  330. )
  331. migrated_count += len(points)
  332. logger.info(
  333. f"Qdrant: {migrated_count}/{legacy_count} records migrated"
  334. )
  335. # Check if we've reached the end
  336. if next_offset is None:
  337. break
  338. offset = next_offset
  339. new_count_after = client.count(
  340. collection_name=collection_name,
  341. count_filter=workspace_count_filter,
  342. exact=True,
  343. ).count
  344. inserted_count = new_count_after - new_workspace_count
  345. if inserted_count != legacy_count:
  346. error_msg = (
  347. "Qdrant: Migration verification failed, expected "
  348. f"{legacy_count} inserted records, got {inserted_count}."
  349. )
  350. logger.error(error_msg)
  351. raise DataMigrationError(error_msg)
  352. except DataMigrationError:
  353. # Re-raise DataMigrationError as-is to preserve specific error messages
  354. raise
  355. except Exception as e:
  356. logger.error(
  357. f"Qdrant: Failed to migrate data from legacy collection '{legacy_collection}' to new collection '{collection_name}': {e}"
  358. )
  359. raise DataMigrationError(
  360. f"Failed to migrate data from legacy collection '{legacy_collection}' to new collection '{collection_name}'"
  361. ) from e
  362. logger.info(
  363. f"Qdrant: Migration from '{legacy_collection}' to '{collection_name}' completed successfully"
  364. )
  365. logger.warning(
  366. "Qdrant: Manual deletion is required after data migration verification."
  367. )
  368. def __post_init__(self):
  369. self._validate_embedding_func()
  370. # Check for QDRANT_WORKSPACE environment variable first (higher priority)
  371. # This allows administrators to force a specific workspace for all Qdrant storage instances
  372. qdrant_workspace = os.environ.get("QDRANT_WORKSPACE")
  373. if qdrant_workspace and qdrant_workspace.strip():
  374. # Use environment variable value, overriding the passed workspace parameter
  375. effective_workspace = qdrant_workspace.strip()
  376. logger.info(
  377. f"Using QDRANT_WORKSPACE environment variable: '{effective_workspace}' (overriding '{self.workspace}/{self.namespace}')"
  378. )
  379. else:
  380. # Use the workspace parameter passed during initialization
  381. effective_workspace = self.workspace
  382. if effective_workspace:
  383. logger.debug(
  384. f"Using passed workspace parameter: '{effective_workspace}'"
  385. )
  386. self.effective_workspace = effective_workspace or DEFAULT_WORKSPACE
  387. # Generate model suffix
  388. self.model_suffix = self._generate_collection_suffix()
  389. # New naming scheme with model isolation
  390. # Example: "lightrag_vdb_chunks_text_embedding_ada_002_1536d"
  391. # Ensure model_suffix is not empty before appending
  392. if self.model_suffix:
  393. self.final_namespace = f"lightrag_vdb_{self.namespace}_{self.model_suffix}"
  394. logger.info(f"Qdrant collection: {self.final_namespace}")
  395. else:
  396. # Fallback: use legacy namespace if model_suffix is unavailable
  397. self.final_namespace = f"lightrag_vdb_{self.namespace}"
  398. logger.warning(
  399. f"Qdrant collection: {self.final_namespace} missing suffix. Pls add model_name to embedding_func for proper workspace data isolation."
  400. )
  401. kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {})
  402. cosine_threshold = kwargs.get("cosine_better_than_threshold")
  403. if cosine_threshold is None:
  404. raise ValueError(
  405. "cosine_better_than_threshold must be specified in vector_db_storage_cls_kwargs"
  406. )
  407. self.cosine_better_than_threshold = cosine_threshold
  408. # Initialize client as None - will be created in initialize() method
  409. self._client = None
  410. self._max_batch_size = self.global_config["embedding_batch_num"]
  411. self._max_upsert_payload_bytes = int(
  412. os.getenv(
  413. "QDRANT_UPSERT_MAX_PAYLOAD_BYTES",
  414. str(DEFAULT_QDRANT_UPSERT_MAX_PAYLOAD_BYTES),
  415. )
  416. )
  417. self._max_upsert_points_per_batch = int(
  418. os.getenv(
  419. "QDRANT_UPSERT_MAX_POINTS_PER_BATCH",
  420. str(DEFAULT_QDRANT_UPSERT_MAX_POINTS_PER_BATCH),
  421. )
  422. )
  423. if self._max_upsert_payload_bytes <= 0:
  424. logger.warning(
  425. f"QDRANT_UPSERT_MAX_PAYLOAD_BYTES={self._max_upsert_payload_bytes} is non-positive, disable payload-size splitting"
  426. )
  427. if self._max_upsert_points_per_batch <= 0:
  428. logger.warning(
  429. f"QDRANT_UPSERT_MAX_POINTS_PER_BATCH={self._max_upsert_points_per_batch} is non-positive, disable point-count splitting"
  430. )
  431. self._initialized = False
  432. # Deferred-embedding buffers and the per-namespace flush lock.
  433. # Qdrant partitions a single physical collection across workspaces
  434. # via the workspace_id payload field, so the lock must include the
  435. # effective workspace (not just final_namespace) to avoid letting
  436. # two effectively-different writers race on the same collection.
  437. self._pending_vector_docs: dict[str, _PendingVectorDoc] = {}
  438. self._pending_vector_deletes: set[str] = set()
  439. self._flush_lock = None
  440. @staticmethod
  441. def _to_json_serializable(value: Any) -> Any:
  442. """Convert nested values to JSON-serializable types for payload size estimation."""
  443. if isinstance(value, np.ndarray):
  444. return value.tolist()
  445. if isinstance(value, np.integer):
  446. return int(value)
  447. if isinstance(value, np.floating):
  448. return float(value)
  449. if isinstance(value, dict):
  450. return {
  451. str(k): QdrantVectorDBStorage._to_json_serializable(v)
  452. for k, v in value.items()
  453. }
  454. if isinstance(value, (list, tuple)):
  455. return [QdrantVectorDBStorage._to_json_serializable(v) for v in value]
  456. return value
  457. @staticmethod
  458. def _estimate_point_payload_bytes(point: models.PointStruct) -> int:
  459. """Estimate serialized JSON byte size of a single Qdrant point."""
  460. point_obj = {
  461. "id": point.id,
  462. "vector": QdrantVectorDBStorage._to_json_serializable(point.vector),
  463. "payload": QdrantVectorDBStorage._to_json_serializable(point.payload or {}),
  464. }
  465. return len(
  466. json.dumps(
  467. point_obj,
  468. ensure_ascii=False,
  469. separators=(",", ":"),
  470. ).encode("utf-8")
  471. )
  472. @staticmethod
  473. def _build_upsert_batches(
  474. points: list[models.PointStruct],
  475. max_payload_bytes: int,
  476. max_points_per_batch: int,
  477. ) -> list[tuple[list[models.PointStruct], int]]:
  478. """Split points into batches using payload size and point count limits."""
  479. if not points:
  480. return []
  481. payload_limit = max_payload_bytes if max_payload_bytes > 0 else float("inf")
  482. points_limit = (
  483. max_points_per_batch if max_points_per_batch > 0 else float("inf")
  484. )
  485. batches: list[tuple[list[models.PointStruct], int]] = []
  486. current_batch: list[models.PointStruct] = []
  487. # JSON array overhead ("[]")
  488. current_estimated_bytes = 2
  489. for point in points:
  490. point_size = QdrantVectorDBStorage._estimate_point_payload_bytes(point)
  491. point_with_array_overhead = point_size + 2
  492. point_id = str(point.id)
  493. if point_with_array_overhead > payload_limit:
  494. raise ValueError(
  495. f"Single Qdrant point exceeds payload limit: id={point_id}, "
  496. f"estimated_bytes={point_with_array_overhead}, "
  497. f"limit={int(payload_limit)}"
  498. )
  499. # If current batch not empty, a comma is needed before next element.
  500. separator_overhead = 1 if current_batch else 0
  501. next_batch_size = current_estimated_bytes + separator_overhead + point_size
  502. if current_batch and (
  503. len(current_batch) >= points_limit or next_batch_size > payload_limit
  504. ):
  505. batches.append((current_batch, current_estimated_bytes))
  506. current_batch = []
  507. current_estimated_bytes = 2
  508. next_batch_size = current_estimated_bytes + point_size
  509. current_batch.append(point)
  510. current_estimated_bytes = next_batch_size
  511. if current_batch:
  512. batches.append((current_batch, current_estimated_bytes))
  513. return batches
  514. async def initialize(self):
  515. """Initialize Qdrant collection"""
  516. async with get_data_init_lock():
  517. if self._initialized:
  518. return
  519. try:
  520. # Create QdrantClient if not already created
  521. if self._client is None:
  522. self._client = QdrantClient(
  523. url=os.environ.get(
  524. "QDRANT_URL", config.get("qdrant", "uri", fallback=None)
  525. ),
  526. api_key=os.environ.get(
  527. "QDRANT_API_KEY",
  528. config.get("qdrant", "apikey", fallback=None),
  529. ),
  530. )
  531. logger.debug(
  532. f"[{self.workspace}] QdrantClient created successfully"
  533. )
  534. # Setup collection (create if not exists and configure indexes)
  535. # Pass namespace and workspace for backward-compatible migration support
  536. QdrantVectorDBStorage.setup_collection(
  537. self._client,
  538. self.final_namespace,
  539. namespace=self.namespace,
  540. workspace=self.effective_workspace,
  541. vectors_config=models.VectorParams(
  542. size=self.embedding_func.embedding_dim,
  543. distance=models.Distance.COSINE,
  544. ),
  545. hnsw_config=models.HnswConfigDiff(
  546. payload_m=16,
  547. m=0,
  548. ),
  549. model_suffix=self.model_suffix,
  550. )
  551. # Removed duplicate max batch size initialization
  552. self._initialized = True
  553. logger.info(
  554. f"[{self.workspace}] Qdrant collection '{self.namespace}' initialized successfully"
  555. )
  556. except Exception as e:
  557. logger.error(
  558. f"[{self.workspace}] Failed to initialize Qdrant collection '{self.namespace}': {e}"
  559. )
  560. raise
  561. if self._flush_lock is None:
  562. self._flush_lock = get_namespace_lock(
  563. namespace=self.final_namespace,
  564. workspace=self.effective_workspace,
  565. )
  566. async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
  567. """Buffer vector docs for embedding and batched flush.
  568. Embedding deliberately does NOT happen here: repeated upserts of
  569. the same id, or many small batches, collapse into a single
  570. flush-time embedding pass. The buffer is keyed by the caller's
  571. original doc id; the Qdrant UUID conversion runs at flush time.
  572. """
  573. if not data:
  574. return
  575. import time
  576. current_time = int(time.time())
  577. pending_docs: list[tuple[str, _PendingVectorDoc]] = []
  578. for i, (k, v) in enumerate(data.items(), start=1):
  579. source = {
  580. ID_FIELD: k,
  581. WORKSPACE_ID_FIELD: self.effective_workspace,
  582. CREATED_AT_FIELD: current_time,
  583. **{k1: v1 for k1, v1 in v.items() if k1 in self.meta_fields},
  584. }
  585. pending_docs.append(
  586. (
  587. k,
  588. _PendingVectorDoc(source=source, content=v["content"]),
  589. )
  590. )
  591. await _cooperative_yield(i)
  592. # An upsert overrides any pending delete on the same id; installing
  593. # a fresh _PendingVectorDoc invalidates any vector cached by a
  594. # prior get_vectors_by_ids() call on a stale revision.
  595. async with self._flush_lock:
  596. for doc_id, pdoc in pending_docs:
  597. self._pending_vector_deletes.discard(doc_id)
  598. self._pending_vector_docs[doc_id] = pdoc
  599. async def query(
  600. self, query: str, top_k: int, query_embedding: list[float] = None
  601. ) -> list[dict[str, Any]]:
  602. """Query the vector database via Qdrant ``query_points``.
  603. Reads from the server-side index only; buffered upserts and deletes
  604. are NOT visible until ``index_done_callback`` / ``finalize`` flushes
  605. them. Callers that need read-your-writes for a freshly upserted id
  606. should use ``get_by_id`` / ``get_by_ids`` (which consult the buffer)
  607. or flush first. Matches the deferred-embedding contract used by the
  608. other lazy-embedding backends (Mongo / OpenSearch / FAISS / Nano).
  609. """
  610. if query_embedding is not None:
  611. embedding = query_embedding
  612. else:
  613. embedding_result = await self.embedding_func(
  614. [query], context="query", _priority=5
  615. ) # higher priority for query
  616. embedding = embedding_result[0]
  617. results = self._client.query_points(
  618. collection_name=self.final_namespace,
  619. query=embedding,
  620. limit=top_k,
  621. with_payload=True,
  622. score_threshold=self.cosine_better_than_threshold,
  623. query_filter=models.Filter(
  624. must=[workspace_filter_condition(self.effective_workspace)]
  625. ),
  626. ).points
  627. return [
  628. {
  629. **dp.payload,
  630. "distance": dp.score,
  631. CREATED_AT_FIELD: dp.payload.get(CREATED_AT_FIELD),
  632. }
  633. for dp in results
  634. ]
  635. async def index_done_callback(self) -> None:
  636. """Flush buffered vector ops; Qdrant persists automatically once written."""
  637. await self._flush_pending_vector_ops()
  638. async def _flush_pending_vector_ops(self) -> None:
  639. """Flush buffered vector upserts and deletes via batched client calls.
  640. Embedding runs *inside* this lock (not in `upsert` or lock-free):
  641. it makes deferred embedding and the upsert atomic against
  642. concurrent upserts and destructive mutations. Reuses
  643. ``_build_upsert_batches`` to respect Qdrant's payload size limit.
  644. Any failure (embed or server write) raises and leaves both
  645. buffers intact; the next ``index_done_callback`` retries.
  646. Concurrency invariant: ``_flush_lock`` is a non-reentrant asyncio
  647. lock. Callers MUST NOT hold it when invoking this method --
  648. re-entry would deadlock. The only in-tree callers are
  649. ``index_done_callback`` and ``finalize``, both lock-free.
  650. """
  651. async with self._flush_lock:
  652. if not self._pending_vector_docs and not self._pending_vector_deletes:
  653. return
  654. if self._client is None:
  655. return
  656. pending_docs = self._pending_vector_docs
  657. pending_deletes = self._pending_vector_deletes
  658. docs_to_embed: list[tuple[str, _PendingVectorDoc]] = [
  659. (doc_id, pdoc)
  660. for doc_id, pdoc in pending_docs.items()
  661. if pdoc.vector is None
  662. ]
  663. if docs_to_embed:
  664. contents = [pdoc.content for _, pdoc in docs_to_embed]
  665. batches = [
  666. contents[i : i + self._max_batch_size]
  667. for i in range(0, len(contents), self._max_batch_size)
  668. ]
  669. logger.info(
  670. f"[{self.workspace}] {self.namespace} flush: embedding "
  671. f"{len(docs_to_embed)} vectors in {len(batches)} batch(es) "
  672. f"(batch_num={self._max_batch_size})"
  673. )
  674. try:
  675. embeddings_list = await asyncio.gather(
  676. *[
  677. self.embedding_func(batch, context="document")
  678. for batch in batches
  679. ]
  680. )
  681. except Exception as e:
  682. logger.error(
  683. f"[{self.workspace}] Error embedding pending vector ops "
  684. f"(upserts={len(docs_to_embed)}): {e}"
  685. )
  686. raise
  687. embeddings = np.concatenate(embeddings_list)
  688. if len(embeddings) != len(docs_to_embed):
  689. raise RuntimeError(
  690. f"[{self.workspace}] Embedding count mismatch: expected "
  691. f"{len(docs_to_embed)}, got {len(embeddings)}"
  692. )
  693. for i, ((_, pdoc), embedding) in enumerate(
  694. zip(docs_to_embed, embeddings), start=1
  695. ):
  696. # Cache the raw numpy row so a second flush after a
  697. # server-side error doesn't re-embed.
  698. pdoc.vector = np.array(embedding, dtype=np.float32).tolist()
  699. await _cooperative_yield(i)
  700. # Build PointStruct list, converting caller-supplied ids to
  701. # Qdrant UUIDs only now (the buffer keeps caller ids so
  702. # read-your-writes works against the same key).
  703. list_points: list[models.PointStruct] = []
  704. committed_ids: list[str] = []
  705. for doc_id, pdoc in pending_docs.items():
  706. if pdoc.vector is None:
  707. continue
  708. committed_ids.append(doc_id)
  709. list_points.append(
  710. models.PointStruct(
  711. id=compute_mdhash_id_for_qdrant(
  712. doc_id, prefix=self.effective_workspace
  713. ),
  714. vector=pdoc.vector,
  715. payload=dict(pdoc.source),
  716. )
  717. )
  718. try:
  719. if list_points:
  720. point_batches = self._build_upsert_batches(
  721. list_points,
  722. max_payload_bytes=self._max_upsert_payload_bytes,
  723. max_points_per_batch=self._max_upsert_points_per_batch,
  724. )
  725. if len(point_batches) > 1:
  726. logger.info(
  727. f"[{self.workspace}] Qdrant upsert split into {len(point_batches)} batches "
  728. f"for {len(list_points)} points (max_payload_bytes={self._max_upsert_payload_bytes}, "
  729. f"max_points_per_batch={self._max_upsert_points_per_batch})"
  730. )
  731. for batch_index, (points_batch, estimated_bytes) in enumerate(
  732. point_batches, 1
  733. ):
  734. logger.debug(
  735. f"[{self.workspace}] Qdrant upsert batch {batch_index}/{len(point_batches)}: "
  736. f"points={len(points_batch)}, estimated_payload_bytes={estimated_bytes}"
  737. )
  738. # Fail-fast: any batch failure raises immediately
  739. # and stops subsequent batches; the full buffer is
  740. # retained so the next flush retries.
  741. self._client.upsert(
  742. collection_name=self.final_namespace,
  743. points=points_batch,
  744. wait=True,
  745. )
  746. if pending_deletes:
  747. qdrant_delete_ids = [
  748. compute_mdhash_id_for_qdrant(
  749. doc_id, prefix=self.effective_workspace
  750. )
  751. for doc_id in pending_deletes
  752. ]
  753. self._client.delete(
  754. collection_name=self.final_namespace,
  755. points_selector=models.PointIdsList(points=qdrant_delete_ids),
  756. wait=True,
  757. )
  758. except Exception as e:
  759. logger.error(
  760. f"[{self.workspace}] Error flushing vector ops "
  761. f"(upserts={len(pending_docs)}, "
  762. f"deletes={len(pending_deletes)}): {e}"
  763. )
  764. raise
  765. for doc_id in committed_ids:
  766. pending_docs.pop(doc_id, None)
  767. pending_deletes.clear()
  768. async def delete(self, ids: List[str]) -> None:
  769. """Buffer vector deletes for batched flush."""
  770. if not ids:
  771. return
  772. if isinstance(ids, set):
  773. ids = list(ids)
  774. async with self._flush_lock:
  775. for doc_id in ids:
  776. self._pending_vector_docs.pop(doc_id, None)
  777. self._pending_vector_deletes.add(doc_id)
  778. logger.debug(
  779. f"[{self.workspace}] Buffered delete for {len(ids)} vectors in {self.namespace}"
  780. )
  781. async def delete_entity(self, entity_name: str) -> None:
  782. """Buffer an entity vector delete by computing its hash ID."""
  783. entity_id = compute_mdhash_id(entity_name, prefix=ENTITY_PREFIX)
  784. async with self._flush_lock:
  785. self._pending_vector_docs.pop(entity_id, None)
  786. self._pending_vector_deletes.add(entity_id)
  787. logger.debug(
  788. f"[{self.workspace}] Buffered delete for entity {entity_name} (id={entity_id})"
  789. )
  790. async def delete_entity_relation(self, entity_name: str) -> None:
  791. """Delete all relation vectors where entity appears as src or tgt.
  792. The whole method runs under ``_flush_lock`` so the server-side
  793. scroll + delete cannot interleave with an in-flight bulk upsert.
  794. Server-side failures are re-raised (no log-and-swallow): the
  795. caller decides whether to retry.
  796. Buffer semantics — post-prune with caller short-circuit contract:
  797. Matching pending upserts in ``_pending_vector_docs`` are
  798. pruned **only after** the server-side scroll+delete loop
  799. completes fully. If any iteration raises, the pending buffer
  800. is left intact so a higher-level failure does not silently
  801. drop buffered relation vectors that the user never told us
  802. to discard. The trade-off is that partial server-side
  803. deletes plus preserved pending upserts can re-insert deleted
  804. relations on the next flush — correctness therefore relies
  805. on the caller short-circuiting before ``index_done_callback``
  806. can run. The single in-tree caller ``adelete_by_entity``
  807. in ``utils_graph.py`` honors this: its ``except`` clause
  808. skips both ``delete_node`` and ``_persist_graph_updates``,
  809. so on failure the graph and the pending buffer stay
  810. consistent with the "delete never happened" state and the
  811. operation converges on the next retry.
  812. """
  813. async with self._flush_lock:
  814. if self._client is None:
  815. # pre-init / post-finalize: only buffer state remains, so
  816. # apply the delete intent there.
  817. for doc_id in [
  818. k
  819. for k, v in self._pending_vector_docs.items()
  820. if v.source.get("src_id") == entity_name
  821. or v.source.get("tgt_id") == entity_name
  822. ]:
  823. self._pending_vector_docs.pop(doc_id, None)
  824. return
  825. relation_filter = models.Filter(
  826. must=[workspace_filter_condition(self.effective_workspace)],
  827. should=[
  828. models.FieldCondition(
  829. key="src_id", match=models.MatchValue(value=entity_name)
  830. ),
  831. models.FieldCondition(
  832. key="tgt_id", match=models.MatchValue(value=entity_name)
  833. ),
  834. ],
  835. )
  836. total_deleted = 0
  837. offset = None
  838. batch_size = 1000
  839. while True:
  840. results = self._client.scroll(
  841. collection_name=self.final_namespace,
  842. scroll_filter=relation_filter,
  843. with_payload=False,
  844. with_vectors=False,
  845. limit=batch_size,
  846. offset=offset,
  847. )
  848. points, next_offset = results
  849. if not points:
  850. break
  851. ids_to_delete = [point.id for point in points]
  852. self._client.delete(
  853. collection_name=self.final_namespace,
  854. points_selector=models.PointIdsList(points=ids_to_delete),
  855. wait=True,
  856. )
  857. total_deleted += len(ids_to_delete)
  858. if next_offset is None:
  859. break
  860. offset = next_offset
  861. # Server-side scroll+delete fully succeeded — safe to prune
  862. # matching pending relation upserts so the next flush won't
  863. # re-upsert the just-deleted relations. If the loop above
  864. # raised, this prune is skipped and the buffer state stays
  865. # available for the caller's retry path.
  866. for doc_id in [
  867. k
  868. for k, v in self._pending_vector_docs.items()
  869. if v.source.get("src_id") == entity_name
  870. or v.source.get("tgt_id") == entity_name
  871. ]:
  872. self._pending_vector_docs.pop(doc_id, None)
  873. if total_deleted > 0:
  874. logger.debug(
  875. f"[{self.workspace}] Deleted {total_deleted} relations for {entity_name}"
  876. )
  877. else:
  878. logger.debug(
  879. f"[{self.workspace}] No relations found for entity {entity_name}"
  880. )
  881. async def get_by_id(self, id: str) -> dict[str, Any] | None:
  882. """Get vector data by its ID, with read-your-writes against the buffer."""
  883. async with self._flush_lock:
  884. if id in self._pending_vector_deletes:
  885. return None
  886. pending = self._pending_vector_docs.get(id)
  887. if pending is not None:
  888. # Buffer hits return the source payload (no vector); the
  889. # Qdrant fallback path also returns just the payload.
  890. payload = dict(pending.source)
  891. payload.setdefault(CREATED_AT_FIELD, None)
  892. return payload
  893. try:
  894. qdrant_id = compute_mdhash_id_for_qdrant(
  895. id, prefix=self.effective_workspace
  896. )
  897. result = self._client.retrieve(
  898. collection_name=self.final_namespace,
  899. ids=[qdrant_id],
  900. with_payload=True,
  901. )
  902. if not result:
  903. return None
  904. payload = result[0].payload
  905. if CREATED_AT_FIELD not in payload:
  906. payload[CREATED_AT_FIELD] = None
  907. return payload
  908. except Exception as e:
  909. logger.error(
  910. f"[{self.workspace}] Error retrieving vector data for ID {id}: {e}"
  911. )
  912. return None
  913. async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
  914. """Get multiple vector data by their IDs (read-your-writes), preserving order."""
  915. if not ids:
  916. return []
  917. buffered: dict[str, dict[str, Any] | None] = {}
  918. remaining: list[str] = []
  919. async with self._flush_lock:
  920. for doc_id in ids:
  921. if doc_id in self._pending_vector_deletes:
  922. buffered[doc_id] = None
  923. continue
  924. pending = self._pending_vector_docs.get(doc_id)
  925. if pending is not None:
  926. payload = dict(pending.source)
  927. payload.setdefault(CREATED_AT_FIELD, None)
  928. buffered[doc_id] = payload
  929. continue
  930. remaining.append(doc_id)
  931. payload_by_original_id: dict[str, dict[str, Any]] = {}
  932. payload_by_qdrant_id: dict[str, dict[str, Any]] = {}
  933. if remaining:
  934. try:
  935. qdrant_ids = [
  936. compute_mdhash_id_for_qdrant(id, prefix=self.effective_workspace)
  937. for id in remaining
  938. ]
  939. results = self._client.retrieve(
  940. collection_name=self.final_namespace,
  941. ids=qdrant_ids,
  942. with_payload=True,
  943. )
  944. for point in results:
  945. payload = dict(point.payload or {})
  946. if CREATED_AT_FIELD not in payload:
  947. payload[CREATED_AT_FIELD] = None
  948. qdrant_point_id = str(point.id) if point.id is not None else ""
  949. if qdrant_point_id:
  950. payload_by_qdrant_id[qdrant_point_id] = payload
  951. original_id = payload.get(ID_FIELD)
  952. if original_id is not None:
  953. payload_by_original_id[str(original_id)] = payload
  954. except Exception as e:
  955. logger.error(
  956. f"[{self.workspace}] Error retrieving vector data for IDs {remaining}: {e}"
  957. )
  958. return []
  959. ordered_payloads: list[dict[str, Any] | None] = []
  960. for doc_id in ids:
  961. if doc_id in buffered:
  962. ordered_payloads.append(buffered[doc_id])
  963. continue
  964. payload = payload_by_original_id.get(str(doc_id))
  965. if payload is None:
  966. payload = payload_by_qdrant_id.get(
  967. compute_mdhash_id_for_qdrant(
  968. doc_id, prefix=self.effective_workspace
  969. )
  970. )
  971. ordered_payloads.append(payload)
  972. return ordered_payloads
  973. async def get_vectors_by_ids(self, ids: list[str]) -> dict[str, list[float]]:
  974. """Get vector embeddings for given IDs, with read-your-writes.
  975. Pending docs whose vector hasn't been embedded yet are embedded
  976. lazily inside the lock; the resulting vector is cached on the
  977. buffered ``_PendingVectorDoc`` so the next flush won't re-embed.
  978. Visibility caveat for ids not in the buffer: the server-side
  979. ``retrieve`` fallback runs *outside* ``_flush_lock``. A concurrent
  980. ``delete()`` that lands between lock release and the server read
  981. only buffers the delete -- the old vector is still on disk
  982. until the next flush, so this method may return a stale vector
  983. for an id that has been buffered for deletion. This is
  984. best-effort read-after-uncommitted-delete and matches the
  985. ``query()`` contract: callers needing strict consistency must
  986. ``index_done_callback()`` first.
  987. """
  988. if not ids:
  989. return {}
  990. result: dict[str, list[float]] = {}
  991. remaining: list[str] = []
  992. async with self._flush_lock:
  993. docs_to_embed: list[tuple[str, _PendingVectorDoc]] = []
  994. for doc_id in ids:
  995. if doc_id in self._pending_vector_deletes:
  996. continue
  997. pending = self._pending_vector_docs.get(doc_id)
  998. if pending is not None:
  999. if pending.vector is None:
  1000. docs_to_embed.append((doc_id, pending))
  1001. else:
  1002. result[doc_id] = pending.vector
  1003. continue
  1004. remaining.append(doc_id)
  1005. if docs_to_embed:
  1006. contents = [pdoc.content for _, pdoc in docs_to_embed]
  1007. batches = [
  1008. contents[i : i + self._max_batch_size]
  1009. for i in range(0, len(contents), self._max_batch_size)
  1010. ]
  1011. try:
  1012. embeddings_list = await asyncio.gather(
  1013. *[
  1014. self.embedding_func(batch, context="document")
  1015. for batch in batches
  1016. ]
  1017. )
  1018. except Exception as e:
  1019. logger.error(
  1020. f"[{self.workspace}] Error lazily embedding pending vectors "
  1021. f"(upserts={len(docs_to_embed)}): {e}"
  1022. )
  1023. raise
  1024. embeddings = np.concatenate(embeddings_list)
  1025. if len(embeddings) != len(docs_to_embed):
  1026. raise RuntimeError(
  1027. f"[{self.workspace}] Embedding count mismatch: expected "
  1028. f"{len(docs_to_embed)}, got {len(embeddings)}"
  1029. )
  1030. for i, ((doc_id, pdoc), embedding) in enumerate(
  1031. zip(docs_to_embed, embeddings), start=1
  1032. ):
  1033. pdoc.vector = np.array(embedding, dtype=np.float32).tolist()
  1034. result[doc_id] = pdoc.vector
  1035. await _cooperative_yield(i)
  1036. if not remaining:
  1037. return result
  1038. try:
  1039. qdrant_ids = [
  1040. compute_mdhash_id_for_qdrant(id, prefix=self.effective_workspace)
  1041. for id in remaining
  1042. ]
  1043. results = self._client.retrieve(
  1044. collection_name=self.final_namespace,
  1045. ids=qdrant_ids,
  1046. with_vectors=True,
  1047. with_payload=True,
  1048. )
  1049. for point in results:
  1050. if point and point.vector is not None and point.payload:
  1051. original_id = point.payload.get(ID_FIELD)
  1052. if original_id:
  1053. vector_data = point.vector
  1054. if isinstance(vector_data, np.ndarray):
  1055. vector_data = vector_data.tolist()
  1056. result[original_id] = vector_data
  1057. return result
  1058. except Exception as e:
  1059. logger.error(f"[{self.workspace}] Error getting vectors: {e}")
  1060. return result
  1061. async def finalize(self):
  1062. """Flush pending vector ops; surface unflushed data as RuntimeError.
  1063. Qdrant has no client connection that needs explicit release here
  1064. (the QdrantClient is held by the storage instance and torn down
  1065. on GC), but we still need to fail loudly when a transient bulk
  1066. error left writes buffered. ``_flush_pending_vector_ops`` is
  1067. all-or-nothing: it either clears both buffers or raises with
  1068. them intact, but we still defensively check both buffers after a
  1069. successful flush in case a future refactor breaks that invariant.
  1070. """
  1071. flush_error: Exception | None = None
  1072. try:
  1073. await self._flush_pending_vector_ops()
  1074. except Exception as e:
  1075. flush_error = e
  1076. async with self._flush_lock:
  1077. pending_docs = len(self._pending_vector_docs)
  1078. pending_deletes = len(self._pending_vector_deletes)
  1079. if flush_error is not None:
  1080. raise RuntimeError(
  1081. f"[{self.workspace}] QdrantVectorDBStorage.finalize() flush raised; "
  1082. f"{pending_docs} pending upserts and {pending_deletes} pending "
  1083. f"deletes were left buffered (data lost)"
  1084. ) from flush_error
  1085. if pending_docs or pending_deletes:
  1086. raise RuntimeError(
  1087. f"[{self.workspace}] QdrantVectorDBStorage.finalize() left "
  1088. f"{pending_docs} pending upserts and {pending_deletes} pending "
  1089. f"deletes buffered after final flush attempt (these writes have been lost)"
  1090. )
  1091. async def drop(self) -> dict[str, str]:
  1092. """Drop all vector data for the current workspace. Destructive.
  1093. Deletes every point matching ``effective_workspace`` from the
  1094. shared Qdrant collection ``final_namespace`` (Qdrant partitions a
  1095. single physical collection across workspaces via the
  1096. ``workspace_id`` payload field, so sibling workspaces on the same
  1097. collection are untouched). The collection itself and its vector
  1098. index are NOT recreated — they were provisioned at
  1099. ``initialize()`` and remain in place.
  1100. MUST only be called when ``pipeline_status`` is idle (see the
  1101. Pipeline concurrency contract in ``AGENTS.md``); the only
  1102. in-tree caller ``clear_documents`` enforces this.
  1103. Pending-write buffers are cleared *before* the server-side delete
  1104. is issued so a concurrent flush on this instance cannot resurrect
  1105. the dropped data. As a consequence, if the server-side delete
  1106. fails, the buffered writes are also lost — the caller cannot
  1107. recover them by retrying ``drop()``. This matches ``drop()``'s
  1108. contract ("discard everything for this workspace") and the other
  1109. lazy-embedding backends.
  1110. Caveat — only this instance's buffers are cleared. Other
  1111. ``QdrantVectorDBStorage`` instances aliased onto the same
  1112. ``(final_namespace, effective_workspace)`` (multi-worker
  1113. processes, or distinct workspaces collapsed by
  1114. ``QDRANT_WORKSPACE``) keep their own buffers; a sibling whose
  1115. prior flush failed and left buffers intact will, on its next
  1116. flush, upsert those stale points back into the freshly emptied
  1117. workspace. Direct callers bypassing the idle precondition MUST
  1118. flush every aliased instance first.
  1119. Returns:
  1120. dict[str, str]: ``{"status": "success"|"error", "message": str}``
  1121. """
  1122. try:
  1123. async with self._flush_lock:
  1124. # Discard buffered writes before the workspace is wiped;
  1125. # a concurrent flush would otherwise resurrect them.
  1126. self._pending_vector_docs.clear()
  1127. self._pending_vector_deletes.clear()
  1128. # Delete all points for the current workspace
  1129. self._client.delete(
  1130. collection_name=self.final_namespace,
  1131. points_selector=models.FilterSelector(
  1132. filter=models.Filter(
  1133. must=[workspace_filter_condition(self.effective_workspace)]
  1134. )
  1135. ),
  1136. wait=True,
  1137. )
  1138. logger.info(
  1139. f"[{self.workspace}] Process {os.getpid()} dropped workspace data from Qdrant collection {self.namespace}"
  1140. )
  1141. return {"status": "success", "message": "data dropped"}
  1142. except Exception as e:
  1143. logger.error(
  1144. f"[{self.workspace}] Error dropping workspace data from Qdrant collection {self.namespace}: {e}"
  1145. )
  1146. return {"status": "error", "message": str(e)}