nano_vector_db_impl.py 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888
  1. import asyncio
  2. import base64
  3. import os
  4. import zlib
  5. from typing import Any, final
  6. from dataclasses import dataclass
  7. import numpy as np
  8. import time
  9. from lightrag.file_atomic import atomic_write, reap_orphan_tmp_files
  10. from lightrag.utils import (
  11. logger,
  12. compute_mdhash_id,
  13. )
  14. from lightrag.base import BaseVectorStorage
  15. from nano_vectordb import NanoVectorDB
  16. from .shared_storage import (
  17. get_namespace_lock,
  18. get_update_flag,
  19. set_all_update_flags,
  20. )
  21. @dataclass
  22. class _PendingNanoDoc:
  23. """A buffered upsert waiting for deferred embedding and materialization.
  24. ``record`` holds ``__id__`` / ``__created_at__`` plus the ``meta_fields``
  25. (which always include ``content`` for the entity/relation/chunk vdbs), so
  26. the content needed for deferred embedding lives in the record itself — no
  27. separate copy is kept. ``vector`` starts as ``None`` and is filled either
  28. during the lock-held flush or by a lazy ``get_vectors_by_ids`` embedding;
  29. once set it is reused by the next flush instead of re-calling the model.
  30. The compressed ``vector`` / raw ``__vector__`` keys are added to ``record``
  31. only at flush time, right before ``client.upsert``.
  32. """
  33. record: dict[str, Any]
  34. vector: np.ndarray | None = None
  35. @final
  36. @dataclass
  37. class NanoVectorDBStorage(BaseVectorStorage):
  38. """File-backed vector storage built on the in-memory ``NanoVectorDB``.
  39. Storage model:
  40. A single ``NanoVectorDB`` instance lives in process memory; its full
  41. state is serialized to one JSON file at
  42. ``working_dir/[workspace/]vdb_<namespace>.json``. That JSON file is
  43. the **only** cross-process synchronization surface — there is no
  44. shared memory, no message bus, and no network channel between
  45. processes. All cross-process visibility is therefore mediated by
  46. (a) an atomic file write at commit time and (b) a per-namespace
  47. ``storage_updated`` flag distributed through
  48. ``lightrag.kg.shared_storage``.
  49. Concurrency invariants (the code in this file is correct *only* while
  50. all three hold):
  51. 1. **Single writer per workspace.** The document pipeline's
  52. ``busy`` / ``destructive_busy`` flags (see ``AGENTS.md``
  53. *Pipeline concurrency contract*) guarantee that at most one
  54. process performs ``upsert`` / ``delete`` /
  55. ``index_done_callback`` at any time. Every other process is
  56. read-only with respect to this storage.
  57. 2. **Eventual consistency is sufficient.** Read-only processes
  58. only need to observe the writer's data *after* the writer's
  59. ``index_done_callback`` completes. Reads that land in the gap
  60. between a writer's in-memory mutation and its commit may
  61. legitimately return the pre-update snapshot.
  62. 3. **NanoVectorDB operations are fully synchronous.** Under a
  63. single-threaded asyncio event loop, ``client.upsert`` /
  64. ``client.query`` / ``client.delete`` cannot be preempted by
  65. another coroutine, which gives them implicit mutual exclusion
  66. over ``self._client.__storage``. This is why the methods below
  67. don't have to hold ``_storage_lock`` while calling into
  68. ``client``.
  69. Cross-process sync protocol:
  70. Writer side (``index_done_callback``):
  71. 1. Atomically write the in-memory state to disk
  72. (``atomic_write`` swaps a tmp file into place).
  73. 2. Call ``set_all_update_flags`` to flip every process's
  74. ``storage_updated`` flag (including the writer's own).
  75. 3. Immediately reset the writer's own flag to ``False`` so
  76. the next call to ``_get_client`` does not trigger a
  77. self-reload of the data this process just wrote.
  78. Reader side (any method that goes through ``_get_client``):
  79. 1. Inside ``_storage_lock``, observe
  80. ``storage_updated.value is True``.
  81. 2. **Fully reload** ``self._client`` from disk — NanoVectorDB
  82. has no incremental sync API, so the entire JSON file is
  83. re-parsed and a fresh in-memory matrix is rebuilt.
  84. 3. Reset the reader's own flag to ``False`` so concurrent
  85. coroutines in the same process don't double-reload.
  86. Lock scope:
  87. ``_storage_lock`` is a per-``(namespace, workspace)`` keyed lock
  88. spanning both intra-process coroutines and inter-process workers.
  89. It only wraps the *reload* and *commit* critical sections, not
  90. every ``client.xxx`` call. Operating on ``client`` outside the
  91. lock is safe today *because of invariant (3)* — if either premise
  92. is ever broken (e.g. ``client.xxx`` is moved to a thread pool, or
  93. NanoVectorDB is swapped for an async vector library), the lock
  94. scope must be widened to cover the mutation/read itself.
  95. Non-pipeline write paths:
  96. The pipeline's ``busy`` gate serializes ``upsert`` / ``delete`` /
  97. ``index_done_callback`` called from the document ingestion and
  98. purge flows. The following entry points are **not** serialized by
  99. the pipeline gate and must be guarded externally:
  100. * ``drop`` — currently gated by the API layer (the
  101. ``/documents/clear`` endpoint takes the pipeline busy
  102. reservation before invoking it).
  103. * ``delete_entity`` / ``delete_entity_relation`` — currently
  104. not exposed in the WebUI. If you wire them up to a new
  105. caller, that caller must arrange single-writer
  106. serialization the same way the pipeline does.
  107. Deferred-embedding protocol:
  108. ``upsert`` does **not** call the embedding model. It only buffers a
  109. ``_PendingNanoDoc`` (content-bearing record + ``vector=None``) in the
  110. minimal ``self._pending_upserts`` area, overwriting any prior pending
  111. doc for the same id (which also clears a temp vector a previous
  112. ``get_vectors_by_ids`` may have cached). The model is called once per
  113. id at flush time (``_flush_pending_locked``), so repeated upserts of
  114. the same id — and many small upsert calls — embed only once. See
  115. issue #2785 and the ``OpenSearchVectorDBStorage`` equivalent.
  116. Embedding runs **inside ``_storage_lock``** during the flush (not in
  117. ``upsert``): under the single-writer invariant this keeps the content
  118. used for embedding consistent with the record written to disk and
  119. prevents a destructive op from interleaving between embed and write.
  120. The lock is non-reentrant, so ``_flush_pending_locked`` requires the
  121. caller to already hold it and operates on ``self._client`` directly
  122. (never through ``_get_client``).
  123. Reads are read-your-writes: ``get_by_id`` / ``get_by_ids`` /
  124. ``get_vectors_by_ids`` consult ``_pending_upserts`` first.
  125. ``get_vectors_by_ids`` lazily embeds a pending doc on demand and
  126. caches the vector back for the next flush. ``query`` and
  127. ``client_storage`` see only data already materialized into
  128. ``self._client`` — unflushed pending data is intentionally not
  129. queryable. A flush failure (embedding error, count mismatch, or save
  130. IO error) raises through ``index_done_callback``; the pending buffer
  131. is preserved, and if only the save failed ``_client_dirty`` stays
  132. ``True`` so a subsequent ``finalize`` retries the save.
  133. """
  134. def __post_init__(self):
  135. self._validate_embedding_func()
  136. # Initialize basic attributes
  137. self._client = None
  138. self._storage_lock = None
  139. self.storage_updated = None
  140. # Use global config value if specified, otherwise use default
  141. kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {})
  142. cosine_threshold = kwargs.get("cosine_better_than_threshold")
  143. if cosine_threshold is None:
  144. raise ValueError(
  145. "cosine_better_than_threshold must be specified in vector_db_storage_cls_kwargs"
  146. )
  147. self.cosine_better_than_threshold = cosine_threshold
  148. working_dir = self.global_config["working_dir"]
  149. if self.workspace:
  150. # Include workspace in the file path for data isolation
  151. workspace_dir = os.path.join(working_dir, self.workspace)
  152. self.final_namespace = f"{self.workspace}_{self.namespace}"
  153. else:
  154. # Default behavior when workspace is empty
  155. self.final_namespace = self.namespace
  156. self.workspace = ""
  157. workspace_dir = working_dir
  158. os.makedirs(workspace_dir, exist_ok=True)
  159. self._client_file_name = os.path.join(
  160. workspace_dir, f"vdb_{self.namespace}.json"
  161. )
  162. self._max_batch_size = self.global_config["embedding_batch_num"]
  163. # Sweep orphan tmp siblings left behind by hard kills mid-save before
  164. # NanoVectorDB opens the target file.
  165. reap_orphan_tmp_files(self._client_file_name, self.workspace or "_")
  166. self._client = NanoVectorDB(
  167. self.embedding_func.embedding_dim,
  168. storage_file=self._client_file_name,
  169. )
  170. # Minimal pending area for deferred embedding: id -> _PendingNanoDoc.
  171. # Holds only records not yet embedded+materialized into self._client;
  172. # it never duplicates rows already written to the client. Flushed
  173. # under _storage_lock by _flush_pending_locked().
  174. self._pending_upserts: dict[str, _PendingNanoDoc] = {}
  175. # True when self._client has materialized changes that have not been
  176. # successfully saved to disk yet. This lets finalize retry a save even
  177. # after a previous flush cleared the pending buffer.
  178. self._client_dirty = False
  179. async def initialize(self):
  180. """Initialize storage data"""
  181. # Get the update flag for cross-process update notification
  182. self.storage_updated = await get_update_flag(
  183. self.namespace, workspace=self.workspace
  184. )
  185. # Get the storage lock for use in other methods
  186. self._storage_lock = get_namespace_lock(
  187. self.namespace, workspace=self.workspace
  188. )
  189. def _reload_client_from_disk_locked(self, *, for_write: bool = False) -> bool:
  190. """Reload ``self._client`` if another process committed newer data.
  191. Precondition: the caller must already hold ``_storage_lock``. This is
  192. used by write paths as well as reads because deferred upserts mean a
  193. stale writer must merge its pending buffer into the latest on-disk
  194. snapshot, not save over it or return without flushing.
  195. """
  196. if not self.storage_updated.value:
  197. return False
  198. log_message = (
  199. f"[{self.workspace}] Process {os.getpid()} reloading {self.namespace} "
  200. "due to update by another process"
  201. )
  202. if for_write:
  203. logger.warning(log_message)
  204. else:
  205. logger.info(log_message)
  206. self._client = NanoVectorDB(
  207. self.embedding_func.embedding_dim,
  208. storage_file=self._client_file_name,
  209. )
  210. self.storage_updated.value = False
  211. return True
  212. async def _get_client(self):
  213. """Return the live ``NanoVectorDB`` instance, reloading from disk if needed.
  214. This is the **single entry point** every public method funnels
  215. through to obtain ``self._client``. It is also the **only place
  216. readers transition to a fresher on-disk snapshot**: when another
  217. process has committed (via ``index_done_callback``) and flipped
  218. this process's ``storage_updated`` flag, the next call here
  219. rebuilds ``self._client`` by re-parsing the entire JSON file.
  220. NanoVectorDB has no incremental sync API — the reload is
  221. unconditionally a full file reload.
  222. Under the *Single writer* invariant (see class docstring), the
  223. reload branch never fires in the writer process: the writer
  224. resets its own flag at the end of every ``index_done_callback``.
  225. The branch exists for readers.
  226. ``_storage_lock`` is held during the check-and-reload to (a)
  227. serialize concurrent reload attempts by sibling coroutines in
  228. the same process and (b) interlock with ``index_done_callback``
  229. so a reader cannot observe a partially-saved file.
  230. """
  231. async with self._storage_lock:
  232. self._reload_client_from_disk_locked()
  233. return self._client
  234. async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
  235. """Buffer vectors for deferred embedding; persistence is deferred too.
  236. Embedding is **not** performed here. Each record is buffered in
  237. ``self._pending_upserts`` with ``vector=None`` and the embedding model
  238. is called once per id at flush time (``_flush_pending_locked`` during
  239. ``index_done_callback`` / ``finalize``). This coalesces repeated
  240. upserts of the same id and many small upsert calls into a single
  241. embedding pass (see class docstring, *Deferred-embedding protocol*,
  242. and issue #2785).
  243. Persistence:
  244. Changes live only in this process's memory until the next
  245. ``index_done_callback``. Cross-process readers will not see
  246. them until that commit fires (see class docstring,
  247. *Cross-process sync protocol*). Until the flush, an upserted id
  248. is observable only through the read-your-writes read paths, not
  249. through ``query``.
  250. """
  251. # logger.debug(f"[{self.workspace}] Buffering {len(data)} to {self.namespace}")
  252. if not data:
  253. return
  254. current_time = int(time.time())
  255. pending = [
  256. (
  257. k,
  258. {
  259. "__id__": k,
  260. "__created_at__": current_time,
  261. **{k1: v1 for k1, v1 in v.items() if k1 in self.meta_fields},
  262. },
  263. )
  264. for k, v in data.items()
  265. ]
  266. # Buffer under the lock to interlock with the lock-held flush. A new
  267. # _PendingNanoDoc(vector=None) overwrites any prior pending doc for the
  268. # same id, discarding a temp vector a previous get_vectors_by_ids may
  269. # have cached (content-version change -> must re-embed new content).
  270. async with self._storage_lock:
  271. for doc_id, record in pending:
  272. self._pending_upserts[doc_id] = _PendingNanoDoc(record=record)
  273. async def _flush_pending_locked(self) -> None:
  274. """Embed pending docs and materialize them into ``self._client``.
  275. Precondition: the caller **must already hold** ``_storage_lock``. The
  276. lock is non-reentrant, so this helper never calls ``_get_client`` and
  277. operates on ``self._client`` directly. Embedding runs inside the lock
  278. on purpose (see class docstring, *Deferred-embedding protocol*).
  279. Failure handling: if embedding raises or the returned count does not
  280. match, the exception propagates and ``_pending_upserts`` is left intact
  281. so the next flush retries; nothing is written to ``self._client``.
  282. """
  283. if not self._pending_upserts:
  284. return
  285. # Snapshot for stable ordering between the embed list and the write.
  286. pending_items = list(self._pending_upserts.items())
  287. to_embed = [
  288. (doc_id, pdoc) for doc_id, pdoc in pending_items if pdoc.vector is None
  289. ]
  290. if to_embed:
  291. contents = [pdoc.record["content"] for _, pdoc in to_embed]
  292. batches = [
  293. contents[i : i + self._max_batch_size]
  294. for i in range(0, len(contents), self._max_batch_size)
  295. ]
  296. logger.info(
  297. f"[{self.workspace}] {self.namespace} flush: embedding "
  298. f"{len(to_embed)} vectors in {len(batches)} batch(es) "
  299. f"(batch_num={self._max_batch_size})"
  300. )
  301. try:
  302. embeddings_list = await asyncio.gather(
  303. *[
  304. self.embedding_func(batch, context="document")
  305. for batch in batches
  306. ]
  307. )
  308. except Exception as e:
  309. logger.error(
  310. f"[{self.workspace}] Error embedding pending vector ops "
  311. f"(upserts={len(to_embed)}): {e}"
  312. )
  313. raise
  314. embeddings = np.concatenate(embeddings_list)
  315. if len(embeddings) != len(to_embed):
  316. # Explicit raise (not a log): a mismatch would mis-pair vectors
  317. # with records. Keep pending intact so the next flush retries.
  318. raise RuntimeError(
  319. f"[{self.workspace}] embedding is not 1-1 with pending data, "
  320. f"{len(embeddings)} != {len(to_embed)}"
  321. )
  322. for (_, pdoc), embedding in zip(to_embed, embeddings):
  323. pdoc.vector = embedding
  324. list_data = []
  325. for _, pdoc in pending_items:
  326. vector = pdoc.vector
  327. # Compress vector using Float16 + zlib + Base64 for storage optimization
  328. vector_f16 = vector.astype(np.float16)
  329. compressed_vector = zlib.compress(vector_f16.tobytes())
  330. encoded_vector = base64.b64encode(compressed_vector).decode("utf-8")
  331. record = pdoc.record
  332. record["vector"] = encoded_vector
  333. record["__vector__"] = vector
  334. list_data.append(record)
  335. self._client.upsert(datas=list_data)
  336. self._client_dirty = True
  337. # Clear only the entries we just flushed (an upsert that arrived after
  338. # the snapshot would have re-set vector=None and must not be dropped).
  339. for doc_id, pdoc in pending_items:
  340. if self._pending_upserts.get(doc_id) is pdoc:
  341. del self._pending_upserts[doc_id]
  342. def _save_to_disk_locked(self) -> None:
  343. """Atomically persist ``self._client`` and notify other processes.
  344. Precondition: the caller must already hold ``_storage_lock``. Factored
  345. out of ``index_done_callback`` so ``finalize`` reuses the exact same
  346. save+notify sequence. ``NanoVectorDB.save()`` always writes to whatever
  347. path is on the instance, so we temporarily redirect ``storage_file`` to
  348. the per-writer tmp and let ``atomic_write`` own the rename; the original
  349. path is restored on every path (success and exception).
  350. """
  351. def _save_atomic(tmp: str) -> None:
  352. original = self._client.storage_file
  353. self._client.storage_file = tmp
  354. try:
  355. self._client.save()
  356. finally:
  357. self._client.storage_file = original
  358. atomic_write(self._client_file_name, _save_atomic, self.workspace or "_")
  359. async def query(
  360. self, query: str, top_k: int, query_embedding: list[float] = None
  361. ) -> list[dict[str, Any]]:
  362. """Similarity search over data already materialized into ``self._client``.
  363. Buffered (unflushed) upserts are **not** searchable — only rows that a
  364. prior ``index_done_callback`` / ``finalize`` flushed are considered.
  365. Use the read-your-writes paths (``get_by_id`` / ``get_by_ids`` /
  366. ``get_vectors_by_ids``) to observe pending data before a flush.
  367. """
  368. # Use provided embedding or compute it
  369. if query_embedding is not None:
  370. embedding = query_embedding
  371. else:
  372. # Execute embedding outside of lock to avoid improve cocurrent
  373. embedding = await self.embedding_func(
  374. [query], context="query", _priority=5
  375. ) # higher priority for query
  376. embedding = embedding[0]
  377. client = await self._get_client()
  378. results = client.query(
  379. query=embedding,
  380. top_k=top_k,
  381. better_than_threshold=self.cosine_better_than_threshold,
  382. )
  383. results = [
  384. {
  385. **{k: v for k, v in dp.items() if k != "vector"},
  386. "id": dp["__id__"],
  387. "distance": dp["__metrics__"],
  388. "created_at": dp.get("__created_at__"),
  389. }
  390. for dp in results
  391. ]
  392. return results
  393. @property
  394. async def client_storage(self):
  395. """Return a **live reference** to ``NanoVectorDB.__storage``.
  396. The returned dict is the same object NanoVectorDB mutates in
  397. place during ``upsert`` / ``delete``. Reading it outside
  398. ``_storage_lock`` is safe today only because NanoVectorDB
  399. mutations are fully synchronous (see class docstring,
  400. *Lock scope*). Callers must not retain this reference across an
  401. ``await`` that might cross into ``_get_client`` again: a reload
  402. will swap ``self._client`` for a fresh instance and leave the
  403. held reference pointing at the old (now-stale) storage.
  404. """
  405. client = await self._get_client()
  406. return getattr(client, "_NanoVectorDB__storage")
  407. async def delete(self, ids: list[str]):
  408. """Delete vectors with specified IDs.
  409. Persistence:
  410. Changes are in-memory only; cross-process visibility requires a
  411. subsequent ``index_done_callback``. In ``lightrag.py`` this is
  412. handled by ``_insert_done()`` at the end of the document batch.
  413. Callers outside the pipeline must persist explicitly.
  414. Args:
  415. ids: List of vector IDs to be deleted
  416. """
  417. try:
  418. # Hold the lock so the pending-cancel and the client delete are a
  419. # single critical section against a concurrent flush. Operate on
  420. # self._client directly (the lock is non-reentrant; no _get_client).
  421. async with self._storage_lock:
  422. self._reload_client_from_disk_locked(for_write=True)
  423. for doc_id in ids:
  424. self._pending_upserts.pop(doc_id, None)
  425. # Record count before deletion
  426. before_count = len(self._client)
  427. self._client.delete(ids)
  428. # Calculate actual deleted count
  429. after_count = len(self._client)
  430. deleted_count = before_count - after_count
  431. if deleted_count:
  432. self._client_dirty = True
  433. logger.debug(
  434. f"[{self.workspace}] Successfully deleted {deleted_count} vectors from {self.namespace}"
  435. )
  436. except Exception as e:
  437. logger.error(
  438. f"[{self.workspace}] Error while deleting vectors from {self.namespace}: {e}"
  439. )
  440. async def delete_entity(self, entity_name: str) -> None:
  441. """Delete the vector associated with a single entity name.
  442. Persistence:
  443. Changes are in-memory only; cross-process visibility requires
  444. a subsequent ``index_done_callback``. Callers outside the
  445. pipeline must persist explicitly.
  446. Buffer semantics — post-prune with caller short-circuit contract:
  447. The materialized client delete runs first; the matching
  448. pending upsert (if any) is popped **only after** it
  449. succeeds. If the materialized delete raises, the pending
  450. buffer stays intact and the exception is re-raised so the
  451. caller can short-circuit before ``index_done_callback``
  452. flushes a half-cleaned buffer.
  453. **Not pipeline-gated** — see class docstring
  454. *Non-pipeline write paths*. The caller is responsible for
  455. ensuring single-writer serialization.
  456. """
  457. try:
  458. entity_id = compute_mdhash_id(entity_name, prefix="ent-")
  459. logger.debug(
  460. f"[{self.workspace}] Attempting to delete entity {entity_name} with ID {entity_id}"
  461. )
  462. async with self._storage_lock:
  463. self._reload_client_from_disk_locked(for_write=True)
  464. # Materialized side first so a failure leaves the
  465. # pending buffer intact for the caller's retry path.
  466. if self._client.get([entity_id]):
  467. self._client.delete([entity_id])
  468. self._client_dirty = True
  469. deleted = True
  470. else:
  471. deleted = False
  472. # Materialized delete succeeded — safe to cancel any
  473. # buffered upsert for this entity.
  474. pending_cancelled = (
  475. self._pending_upserts.pop(entity_id, None) is not None
  476. )
  477. if deleted or pending_cancelled:
  478. logger.debug(
  479. f"[{self.workspace}] Successfully deleted entity {entity_name}"
  480. )
  481. else:
  482. logger.debug(
  483. f"[{self.workspace}] Entity {entity_name} not found in storage"
  484. )
  485. except Exception as e:
  486. logger.error(f"[{self.workspace}] Error deleting entity {entity_name}: {e}")
  487. raise
  488. async def delete_entity_relation(self, entity_name: str) -> None:
  489. """Delete every relation vector incident to ``entity_name``.
  490. Persistence:
  491. Changes are in-memory only; cross-process visibility requires
  492. a subsequent ``index_done_callback``. Callers outside the
  493. pipeline must persist explicitly.
  494. Buffer semantics — post-prune with caller short-circuit contract:
  495. The materialized client delete runs first; matching pending
  496. upserts are pruned **only after** it succeeds. If the
  497. materialized delete raises, the pending buffer stays intact
  498. and the exception is re-raised so the caller (e.g.
  499. ``adelete_by_entity``) can short-circuit before
  500. ``_persist_graph_updates`` triggers ``index_done_callback``
  501. on a half-cleaned buffer.
  502. Previously the buffer was pre-pruned and the outer
  503. ``except`` swallowed exceptions into ``logger.error`` — that
  504. combination silently dropped both buffered relation vectors
  505. and the failure signal.
  506. **Not pipeline-gated** — see class docstring
  507. *Non-pipeline write paths*. The caller is responsible for
  508. ensuring single-writer serialization.
  509. """
  510. try:
  511. async with self._storage_lock:
  512. self._reload_client_from_disk_locked(for_write=True)
  513. # Materialized side first so a failure leaves the
  514. # pending buffer intact for the caller's retry path.
  515. # Use .get() for src_id / tgt_id so rows from foreign
  516. # namespaces without those keys silently don't match.
  517. storage = getattr(self._client, "_NanoVectorDB__storage")
  518. ids_to_delete = [
  519. dp["__id__"]
  520. for dp in storage["data"]
  521. if dp.get("src_id") == entity_name
  522. or dp.get("tgt_id") == entity_name
  523. ]
  524. if ids_to_delete:
  525. self._client.delete(ids_to_delete)
  526. self._client_dirty = True
  527. # Materialized delete succeeded — safe to prune matching
  528. # buffered upserts so a subsequent flush won't re-upsert
  529. # the just-deleted relations.
  530. pending_ids = [
  531. doc_id
  532. for doc_id, pdoc in self._pending_upserts.items()
  533. if pdoc.record.get("src_id") == entity_name
  534. or pdoc.record.get("tgt_id") == entity_name
  535. ]
  536. for doc_id in pending_ids:
  537. del self._pending_upserts[doc_id]
  538. total = len(pending_ids) + len(ids_to_delete)
  539. if total:
  540. logger.debug(
  541. f"[{self.workspace}] Deleted {total} relations for {entity_name}"
  542. )
  543. else:
  544. logger.debug(
  545. f"[{self.workspace}] No relations found for entity {entity_name}"
  546. )
  547. except Exception as e:
  548. logger.error(
  549. f"[{self.workspace}] Error deleting relations for {entity_name}: {e}"
  550. )
  551. raise
  552. async def index_done_callback(self) -> bool:
  553. """Flush deferred embeddings, commit to disk, and notify other processes.
  554. This is the writer's **commit point** in the cross-process sync
  555. protocol (see class docstring). Effects, in order:
  556. 1. If another process committed first, reload the latest on-disk
  557. snapshot while preserving this process's pending buffer.
  558. 2. ``_flush_pending_locked`` embeds every buffered upsert (once
  559. per id) and materializes it into ``self._client``. A failure
  560. here **raises** — pending is kept, nothing is written.
  561. 3. ``_save_to_disk_locked`` (``atomic_write``) lays a tmp file
  562. beside the target and renames it into place — readers either
  563. see the previous file in full or the new file in full, never a
  564. torn write. A failure here **also raises**; ``_client_dirty``
  565. stays ``True`` so a later ``finalize`` retries the save.
  566. 4. ``set_all_update_flags`` flips every registered process's
  567. ``storage_updated`` flag, then we immediately reset our own
  568. flag to ``False`` so the writer does not self-reload on the
  569. next call to ``_get_client``.
  570. Either failure surfaces loudly through ``_insert_done`` so the caller
  571. can abort the document batch instead of silently losing vectors. The
  572. bool return is kept for legacy callers but is effectively always
  573. ``True`` on the success path.
  574. """
  575. async with self._storage_lock:
  576. self._reload_client_from_disk_locked(for_write=True)
  577. # Flush + save both raise on failure (embedding mismatch / save IO
  578. # error). The exception propagates out of the lock so _insert_done
  579. # aborts the batch; pending stays intact and _client_dirty stays
  580. # True (if only the save failed) for a later retry.
  581. await self._flush_pending_locked()
  582. self._save_to_disk_locked()
  583. await set_all_update_flags(self.namespace, workspace=self.workspace)
  584. self.storage_updated.value = False
  585. self._client_dirty = False
  586. return True
  587. @staticmethod
  588. def _format_record(dp: dict[str, Any]) -> dict[str, Any]:
  589. """Shape a stored/pending record into the public read result."""
  590. return {
  591. **{k: v for k, v in dp.items() if k not in ("vector", "__vector__")},
  592. "id": dp.get("__id__"),
  593. "created_at": dp.get("__created_at__"),
  594. }
  595. async def get_by_id(self, id: str) -> dict[str, Any] | None:
  596. """Get vector data by its ID (read-your-writes against the pending buffer).
  597. Args:
  598. id: The unique identifier of the vector
  599. Returns:
  600. The vector data if found, or None if not found
  601. """
  602. # Read-your-writes: a buffered upsert is visible before its flush.
  603. async with self._storage_lock:
  604. pending = self._pending_upserts.get(id)
  605. if pending is not None:
  606. return self._format_record(pending.record)
  607. client = await self._get_client()
  608. result = client.get([id])
  609. if result:
  610. return self._format_record(result[0])
  611. return None
  612. async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
  613. """Get multiple vector data by their IDs (read-your-writes), preserving order.
  614. Args:
  615. ids: List of unique identifiers
  616. Returns:
  617. List of vector data objects that were found
  618. """
  619. if not ids:
  620. return []
  621. # Read-your-writes: serve buffered upserts from the pending area and
  622. # only query the materialized client for the remaining ids.
  623. result_map: dict[str, dict[str, Any]] = {}
  624. remaining: list[str] = []
  625. async with self._storage_lock:
  626. for requested_id in ids:
  627. pending = self._pending_upserts.get(requested_id)
  628. if pending is not None:
  629. result_map[str(requested_id)] = self._format_record(pending.record)
  630. else:
  631. remaining.append(requested_id)
  632. if remaining:
  633. client = await self._get_client()
  634. for dp in client.get(remaining):
  635. if not dp:
  636. continue
  637. record = self._format_record(dp)
  638. key = record.get("id")
  639. if key is not None:
  640. result_map[str(key)] = record
  641. return [result_map.get(str(requested_id)) for requested_id in ids]
  642. async def get_vectors_by_ids(self, ids: list[str]) -> dict[str, list[float]]:
  643. """Get vectors by their IDs (read-your-writes), returning only ID and vector.
  644. For buffered upserts the vector is computed lazily (and cached back onto
  645. the pending doc so the next flush reuses it instead of re-embedding);
  646. for materialized rows the stored compressed vector is decoded.
  647. Args:
  648. ids: List of unique identifiers
  649. Returns:
  650. Dictionary mapping IDs to their vector embeddings
  651. Format: {id: [vector_values], ...}
  652. """
  653. if not ids:
  654. return {}
  655. vectors_dict: dict[str, list[float]] = {}
  656. remaining: list[str] = []
  657. async with self._storage_lock:
  658. to_embed: list[tuple[str, _PendingNanoDoc]] = []
  659. for requested_id in ids:
  660. pending = self._pending_upserts.get(requested_id)
  661. if pending is None:
  662. remaining.append(requested_id)
  663. elif pending.vector is not None:
  664. vectors_dict[requested_id] = pending.vector.astype(
  665. np.float32
  666. ).tolist()
  667. else:
  668. to_embed.append((requested_id, pending))
  669. if to_embed:
  670. contents = [pdoc.record["content"] for _, pdoc in to_embed]
  671. batches = [
  672. contents[i : i + self._max_batch_size]
  673. for i in range(0, len(contents), self._max_batch_size)
  674. ]
  675. embeddings_list = await asyncio.gather(
  676. *[
  677. self.embedding_func(batch, context="document")
  678. for batch in batches
  679. ]
  680. )
  681. embeddings = np.concatenate(embeddings_list)
  682. if len(embeddings) != len(to_embed):
  683. raise RuntimeError(
  684. f"[{self.workspace}] embedding is not 1-1 with pending data, "
  685. f"{len(embeddings)} != {len(to_embed)}"
  686. )
  687. for (requested_id, pdoc), embedding in zip(to_embed, embeddings):
  688. # Cache the vector back so the next flush reuses it.
  689. pdoc.vector = embedding
  690. vectors_dict[requested_id] = embedding.astype(np.float32).tolist()
  691. if remaining:
  692. client = await self._get_client()
  693. for result in client.get(remaining):
  694. if result and "vector" in result and "__id__" in result:
  695. # Decompress vector data (Base64 + zlib + Float16 compressed)
  696. decoded = base64.b64decode(result["vector"])
  697. decompressed = zlib.decompress(decoded)
  698. vector_f16 = np.frombuffer(decompressed, dtype=np.float16)
  699. vector_f32 = vector_f16.astype(np.float32).tolist()
  700. vectors_dict[result["__id__"]] = vector_f32
  701. return vectors_dict
  702. async def drop(self) -> dict[str, str]:
  703. """Drop all vector data from storage and reinitialize the client.
  704. This method will:
  705. 1. Remove the vector database storage file if it exists
  706. 2. Reinitialize the vector database client
  707. 3. Update flags to notify other processes
  708. 4. Changes are persisted to disk immediately
  709. Caller contract:
  710. ``drop`` is destructive and **not** serialized by this storage
  711. class. The caller must hold the pipeline ``busy`` reservation
  712. (the ``/documents/clear`` endpoint does this) before invoking
  713. it — running ``drop`` concurrently with an active document
  714. pipeline will tear down storage out from under the writer and
  715. silently lose data. See class docstring,
  716. *Non-pipeline write paths*.
  717. Returns:
  718. dict[str, str]: Operation status and message
  719. - On success: {"status": "success", "message": "data dropped"}
  720. - On failure: {"status": "error", "message": "<error details>"}
  721. """
  722. try:
  723. async with self._storage_lock:
  724. # Discard buffered (unflushed) upserts along with the data.
  725. self._pending_upserts.clear()
  726. # delete _client_file_name
  727. if os.path.exists(self._client_file_name):
  728. os.remove(self._client_file_name)
  729. self._client = NanoVectorDB(
  730. self.embedding_func.embedding_dim,
  731. storage_file=self._client_file_name,
  732. )
  733. self._client_dirty = False
  734. # Notify other processes that data has been updated
  735. await set_all_update_flags(self.namespace, workspace=self.workspace)
  736. # Reset own update flag to avoid self-reloading
  737. self.storage_updated.value = False
  738. logger.info(
  739. f"[{self.workspace}] Process {os.getpid()} drop {self.namespace}(file:{self._client_file_name})"
  740. )
  741. return {"status": "success", "message": "data dropped"}
  742. except Exception as e:
  743. logger.error(f"[{self.workspace}] Error dropping {self.namespace}: {e}")
  744. return {"status": "error", "message": str(e)}
  745. async def finalize(self):
  746. """Flush any buffered upserts and persist before shutdown (safety net).
  747. Normally ``index_done_callback`` has already drained the pending buffer
  748. and synced to disk, but two paths land here with work to do:
  749. - **Pending upserts only** (no prior ``index_done_callback``): flush
  750. and save. We reload first so a stale process picks up other writers'
  751. commits before merging its pending buffer in.
  752. - **Unsaved materialized changes** (``_client_dirty=True``): an earlier
  753. ``index_done_callback`` flushed pending into ``self._client`` but
  754. its save raised. Skip the reload — reloading would drop those
  755. materialized-but-unsaved rows — and just retry the save.
  756. Flush / save failures propagate (same contract as
  757. ``index_done_callback``); a partially flushed buffer is preserved for
  758. a future retry.
  759. """
  760. async with self._storage_lock:
  761. if not self._pending_upserts and not self._client_dirty:
  762. return
  763. if self._pending_upserts:
  764. # Only reload when we have nothing un-persisted in self._client.
  765. # A dirty client carries successfully-flushed-but-unsaved rows
  766. # from a prior index_done_callback; reloading would silently
  767. # drop them.
  768. if not self._client_dirty:
  769. self._reload_client_from_disk_locked(for_write=True)
  770. await self._flush_pending_locked()
  771. self._save_to_disk_locked()
  772. await set_all_update_flags(self.namespace, workspace=self.workspace)
  773. self.storage_updated.value = False
  774. self._client_dirty = False