faiss_impl.py 53 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163
  1. import glob
  2. import os
  3. import time
  4. import asyncio
  5. from typing import Any, final
  6. import json
  7. import numpy as np
  8. from dataclasses import dataclass
  9. from lightrag.file_atomic import atomic_write, reap_orphan_tmp_files
  10. from lightrag.utils import logger, compute_mdhash_id
  11. from lightrag.base import BaseVectorStorage
  12. from .shared_storage import (
  13. get_namespace_lock,
  14. get_update_flag,
  15. set_all_update_flags,
  16. )
  17. # You must manually install faiss-cpu or faiss-gpu before using FAISS vector db
  18. import faiss # type: ignore
  19. @dataclass
  20. class _PendingFaissDoc:
  21. """A buffered upsert waiting for deferred embedding and materialization.
  22. ``record`` holds ``__id__`` / ``__created_at__`` plus the ``meta_fields``
  23. (which always include ``content`` for the entity/relation/chunk vdbs), so
  24. the content needed for deferred embedding lives in the record itself — no
  25. separate copy is kept. ``vector`` starts as ``None`` and is filled either
  26. during the lock-held flush or by a lazy ``get_vectors_by_ids`` embedding;
  27. once set it is always an **already-L2-normalized float32 1D ndarray**, so
  28. the next flush can ``vstack`` and ``index.add`` without re-normalizing.
  29. ``__vector__`` is materialized into the metadata dict only at flush time,
  30. right before ``self._index.add``.
  31. """
  32. record: dict[str, Any]
  33. vector: np.ndarray | None = None
  34. @final
  35. @dataclass
  36. class FaissVectorDBStorage(BaseVectorStorage):
  37. """Faiss-backed vector storage for LightRAG.
  38. Uses cosine similarity by storing L2-normalized vectors in an
  39. ``IndexFlatIP`` (inner-product search on normalized vectors == cosine).
  40. Storage model:
  41. Two on-disk files per ``(workspace, namespace)``:
  42. * ``working_dir/[workspace/]faiss_index_<namespace>.index`` —
  43. the Faiss index (binary, written by ``faiss.write_index``).
  44. * ``…<namespace>.index.meta.json`` — the ``_id_to_meta`` dict
  45. serialized as JSON, **without** the ``__vector__`` field
  46. (vectors are reconstructed from the Faiss index on load).
  47. In memory the storage is split across two fields:
  48. * ``self._index`` — the Faiss index.
  49. * ``self._id_to_meta`` — ``dict[int_faiss_id, metadata]``.
  50. Both files are the **only** cross-process synchronization surface
  51. — there is no shared memory between processes. Cross-process
  52. visibility is mediated by (a) per-file atomic writes and (b) a
  53. per-namespace ``storage_updated`` flag distributed through
  54. ``lightrag.kg.shared_storage``.
  55. **Cross-file atomicity is not guaranteed**: the two ``atomic_write``
  56. renames in ``_save_faiss_index`` are independent, so a crash
  57. between them can leave ``.index`` and ``.meta.json`` referring to
  58. different snapshots. ``_load_faiss_index`` tolerates both
  59. directions on load: ``meta > index`` rows are dropped silently;
  60. ``index > meta`` (the more dangerous case) is logged as a warning
  61. but **not** auto-repaired — orphan vectors remain in the loaded
  62. index but are unreachable via custom-id lookups. Repair semantics
  63. (truncate index vs rebuild meta) are deliberately left to a
  64. follow-up PR.
  65. Concurrency invariants (the code here is correct *only* while all
  66. three hold):
  67. 1. **Single writer per workspace.** The document pipeline's
  68. ``busy`` / ``destructive_busy`` flags (see ``AGENTS.md``
  69. *Pipeline concurrency contract*) guarantee at most one process
  70. performs ``upsert`` / ``delete`` / ``index_done_callback`` at
  71. any time. Every other process is read-only.
  72. 2. **Eventual consistency is sufficient.** Read-only processes
  73. only need to observe the writer's data *after* the writer's
  74. ``index_done_callback`` completes. Reads in the gap between a
  75. writer's in-memory mutation and its commit may legitimately
  76. return the pre-update snapshot.
  77. 3. **Faiss + dict mutations are synchronous.** Under a
  78. single-threaded asyncio event loop, ``index.add`` /
  79. ``index.search`` / ``self._id_to_meta`` mutations cannot be
  80. preempted by another coroutine, which gives them implicit
  81. mutual exclusion. This is why most methods don't hold
  82. ``_storage_lock`` while touching ``self._index`` /
  83. ``self._id_to_meta``.
  84. Cross-process sync protocol:
  85. Writer side (``index_done_callback``):
  86. 1. ``_save_faiss_index`` writes both files atomically (per
  87. file; cross-file atomicity is best-effort, see above).
  88. 2. ``set_all_update_flags`` flips every process's
  89. ``storage_updated`` flag (including the writer's own).
  90. 3. Reset the writer's own flag to ``False`` so the next
  91. ``_get_index`` does not trigger a self-reload of what we
  92. just wrote.
  93. Reader side (any method that goes through ``_get_index``):
  94. 1. Inside ``_storage_lock``, observe
  95. ``storage_updated.value is True``.
  96. 2. **Fully reload**: re-init ``self._index`` from
  97. ``IndexFlatIP``, clear ``self._id_to_meta``, then call
  98. ``_load_faiss_index`` to re-parse both files. Faiss has no
  99. incremental sync API.
  100. 3. Reset the reader's own flag.
  101. Lock scope:
  102. ``_storage_lock`` is a per-``(namespace, workspace)`` keyed lock
  103. spanning both intra-process coroutines and inter-process workers.
  104. It wraps:
  105. * ``_get_index`` reload checks.
  106. * Pending-buffer mutations in ``upsert`` and pending-buffer
  107. reads in ``get_by_id`` / ``get_by_ids`` /
  108. ``get_vectors_by_ids`` (read-your-writes).
  109. * The single critical section in ``index_done_callback`` and
  110. ``finalize`` (reload → flush → save → notify).
  111. * The pending-cancel + rebuild critical sections in
  112. ``delete`` / ``delete_entity_relation``.
  113. * The entire ``drop`` body.
  114. The lock is **non-reentrant**, so ``_flush_pending_locked`` /
  115. ``_remove_faiss_ids_locked`` / ``_save_faiss_index`` /
  116. ``_reload_index_from_disk_locked`` all require the caller to
  117. already hold it and never re-enter via ``_get_index``. Routine
  118. ``index.search`` outside ``_get_index`` and the synchronous
  119. ``client_storage`` read rely on invariant (3) above — if either
  120. premise is broken (e.g. Faiss calls moved to a thread pool),
  121. the lock scope must be widened.
  122. Caveat — synchronous ``client_storage`` reads:
  123. ``client_storage`` is a synchronous property and does **not** go
  124. through ``_get_index``, so in a reader process it can return data
  125. older than the latest committed snapshot until some other method
  126. triggers a reload. The async read methods (``get_by_id`` /
  127. ``get_by_ids`` / ``get_vectors_by_ids``) now funnel through
  128. ``_get_index`` after checking the pending buffer, so they observe
  129. the latest on-disk snapshot.
  130. Deferred-embedding protocol:
  131. ``upsert`` does **not** call the embedding model. It only buffers
  132. a ``_PendingFaissDoc`` (content-bearing record + ``vector=None``)
  133. in the minimal ``self._pending_upserts`` area, overwriting any
  134. prior pending doc for the same id (which also clears a temp
  135. vector a previous ``get_vectors_by_ids`` may have cached). The
  136. model is called once per id at flush time
  137. (``_flush_pending_locked``), so repeated upserts of the same id —
  138. and many small upsert calls — embed only once. See issue #2785
  139. and the ``NanoVectorDBStorage`` / ``OpenSearchVectorDBStorage``
  140. equivalents.
  141. Embedding runs **inside ``_storage_lock``** during the flush (not
  142. in ``upsert``): under the single-writer invariant this keeps the
  143. content used for embedding consistent with the rows written to
  144. disk and prevents a destructive op from interleaving between
  145. embed and write. The lock is non-reentrant, so
  146. ``_flush_pending_locked`` requires the caller to already hold it
  147. and operates on ``self._index`` / ``self._id_to_meta`` directly
  148. (never through ``_get_index``).
  149. Vector storage invariant: once a ``_PendingFaissDoc.vector`` is
  150. set it is an **already-L2-normalized float32 1D ndarray** — both
  151. flush and lazy ``get_vectors_by_ids`` normalize the entire batch
  152. with ``faiss.normalize_L2`` before caching back, so a later flush
  153. can ``vstack`` and ``index.add`` without re-normalizing.
  154. Reads are read-your-writes: ``get_by_id`` / ``get_by_ids`` /
  155. ``get_vectors_by_ids`` consult ``_pending_upserts`` first, then
  156. funnel through ``_get_index`` for the materialized fallback.
  157. ``get_vectors_by_ids`` lazily embeds a pending doc on demand and
  158. caches the (normalized) vector back for the next flush.
  159. ``query`` and ``client_storage`` see only data already
  160. materialized into ``self._index`` / ``self._id_to_meta`` —
  161. unflushed pending data is intentionally not queryable.
  162. A flush failure (embedding error, count mismatch, or save IO
  163. error) raises through ``index_done_callback``; the pending buffer
  164. is preserved on flush failure, and if only the save failed
  165. ``_index_dirty`` stays ``True`` so a subsequent ``finalize``
  166. retries the save without re-embedding.
  167. Non-pipeline write paths:
  168. The pipeline ``busy`` gate serializes ``upsert`` / ``delete`` /
  169. ``index_done_callback`` called from document ingestion and purge.
  170. The following entry points are **not** serialized by the pipeline
  171. and must be guarded externally:
  172. * ``drop`` — gated by the API layer (``/documents/clear``
  173. takes the pipeline busy reservation before invoking it).
  174. * ``delete_entity`` / ``delete_entity_relation`` — currently
  175. not exposed in the WebUI. Any future caller must arrange
  176. single-writer serialization the same way the pipeline does.
  177. """
  178. def __post_init__(self):
  179. self._validate_embedding_func()
  180. # Grab config values if available
  181. kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {})
  182. cosine_threshold = kwargs.get("cosine_better_than_threshold")
  183. if cosine_threshold is None:
  184. raise ValueError(
  185. "cosine_better_than_threshold must be specified in vector_db_storage_cls_kwargs"
  186. )
  187. self.cosine_better_than_threshold = cosine_threshold
  188. # Where to save index file if you want persistent storage
  189. working_dir = self.global_config["working_dir"]
  190. if self.workspace:
  191. # Include workspace in the file path for data isolation
  192. workspace_dir = os.path.join(working_dir, self.workspace)
  193. else:
  194. # Default behavior when workspace is empty
  195. workspace_dir = working_dir
  196. self.workspace = ""
  197. os.makedirs(workspace_dir, exist_ok=True)
  198. self._faiss_index_file = os.path.join(
  199. workspace_dir, f"faiss_index_{self.namespace}.index"
  200. )
  201. self._meta_file = self._faiss_index_file + ".meta.json"
  202. self._max_batch_size = self.global_config["embedding_batch_num"]
  203. # Embedding dimension (e.g. 768) must match your embedding function
  204. self._dim = self.embedding_func.embedding_dim
  205. # Create an empty Faiss index for inner product (useful for normalized vectors = cosine similarity).
  206. # If you have a large number of vectors, you might want IVF or other indexes.
  207. # For demonstration, we use a simple IndexFlatIP.
  208. self._index = faiss.IndexFlatIP(self._dim)
  209. # Keep a local store for metadata, IDs, etc.
  210. # Maps <int faiss_id> → metadata (including your original ID).
  211. self._id_to_meta = {}
  212. # Minimal pending area for deferred embedding: custom-id -> _PendingFaissDoc.
  213. # Holds only records not yet embedded+materialized into self._index;
  214. # it never duplicates rows already added to the Faiss index. Flushed
  215. # under _storage_lock by _flush_pending_locked().
  216. self._pending_upserts: dict[str, _PendingFaissDoc] = {}
  217. # True when self._index / self._id_to_meta have materialized changes
  218. # that have not been successfully saved to disk yet. This lets
  219. # finalize retry a save even after a previous flush cleared the
  220. # pending buffer (see _flush_pending_locked / index_done_callback).
  221. self._index_dirty = False
  222. # Sweep orphan tmp siblings left behind by hard kills mid-save.
  223. # The meta file also needs an extra pattern: legacy versions of this
  224. # storage wrote a fixed "<meta>.tmp" suffix without further dot-segments,
  225. # which the default ".tmp.*" pattern does not match.
  226. reap_orphan_tmp_files(self._faiss_index_file, self.workspace or "_")
  227. reap_orphan_tmp_files(
  228. self._meta_file,
  229. self.workspace or "_",
  230. extra_patterns=(glob.escape(self._meta_file) + ".tmp",),
  231. )
  232. self._load_faiss_index()
  233. async def initialize(self):
  234. """Initialize storage data"""
  235. # Get the update flag for cross-process update notification
  236. self.storage_updated = await get_update_flag(
  237. self.namespace, workspace=self.workspace
  238. )
  239. # Get the storage lock for use in other methods
  240. self._storage_lock = get_namespace_lock(
  241. self.namespace, workspace=self.workspace
  242. )
  243. def _reload_index_from_disk_locked(self, *, for_write: bool = False) -> bool:
  244. """Reload ``self._index`` + ``self._id_to_meta`` if another process committed newer data.
  245. Precondition: the caller must already hold ``_storage_lock``. This is
  246. used by write paths as well as reads because deferred upserts mean a
  247. stale writer must merge its pending buffer into the latest on-disk
  248. snapshot, not save over it or return without flushing.
  249. Returns True if a reload happened, False if the local snapshot was
  250. already current.
  251. """
  252. if not self.storage_updated.value:
  253. return False
  254. log_message = (
  255. f"[{self.workspace}] Process {os.getpid()} FAISS reloading {self.namespace} "
  256. "due to update by another process"
  257. )
  258. if for_write:
  259. logger.warning(log_message)
  260. else:
  261. logger.info(log_message)
  262. self._index = faiss.IndexFlatIP(self._dim)
  263. self._id_to_meta = {}
  264. self._load_faiss_index()
  265. self.storage_updated.value = False
  266. return True
  267. async def _get_index(self):
  268. """Return the live Faiss index, reloading from disk if needed.
  269. Read paths (``query`` / ``get_by_id`` / ``get_by_ids`` /
  270. ``get_vectors_by_ids``) funnel through this method so that a stale
  271. reader picks up any commit made by another process before reading
  272. ``self._index`` / ``self._id_to_meta``. Faiss has no incremental
  273. sync API — the reload is unconditionally a full reload of both
  274. files via ``_reload_index_from_disk_locked``.
  275. Under the *Single writer* invariant (see class docstring), the
  276. reload branch never fires in the writer process: the writer
  277. resets its own flag at the end of every ``index_done_callback``.
  278. The branch exists for readers.
  279. ``_storage_lock`` is held during the check-and-reload to (a)
  280. serialize concurrent reload attempts by sibling coroutines and
  281. (b) interlock with ``index_done_callback`` so a reader cannot
  282. observe a partially-saved file pair.
  283. """
  284. async with self._storage_lock:
  285. self._reload_index_from_disk_locked()
  286. return self._index
  287. async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
  288. """Buffer vectors for deferred embedding; persistence is deferred too.
  289. ``data`` shape::
  290. {
  291. "custom_id_1": {"content": <text>, ...metadata...},
  292. "custom_id_2": {"content": <text>, ...metadata...},
  293. ...
  294. }
  295. Embedding is **not** performed here. Each record is buffered in
  296. ``self._pending_upserts`` with ``vector=None`` and the embedding
  297. model is called once per id at flush time (``_flush_pending_locked``
  298. during ``index_done_callback`` / ``finalize``). This coalesces
  299. repeated upserts of the same id and many small upsert calls into a
  300. single embedding pass (see class docstring,
  301. *Deferred-embedding protocol*, and issue #2785).
  302. Persistence:
  303. Changes live only in this process's memory until the next
  304. ``index_done_callback``. Cross-process readers will not see
  305. them until that commit fires (see class docstring,
  306. *Cross-process sync protocol*). Until the flush, an upserted
  307. id is observable only through the read-your-writes read paths,
  308. not through ``query``.
  309. """
  310. if not data:
  311. return
  312. current_time = int(time.time())
  313. pending = [
  314. (
  315. k,
  316. {
  317. "__id__": k,
  318. "__created_at__": current_time,
  319. **{mf: v[mf] for mf in self.meta_fields if mf in v},
  320. },
  321. )
  322. for k, v in data.items()
  323. ]
  324. # Buffer under the lock to interlock with the lock-held flush. A new
  325. # _PendingFaissDoc(vector=None) overwrites any prior pending doc for
  326. # the same id, discarding a temp vector a previous get_vectors_by_ids
  327. # may have cached (content-version change -> must re-embed new content).
  328. async with self._storage_lock:
  329. for doc_id, record in pending:
  330. self._pending_upserts[doc_id] = _PendingFaissDoc(record=record)
  331. async def query(
  332. self, query: str, top_k: int, query_embedding: list[float] = None
  333. ) -> list[dict[str, Any]]:
  334. """Similarity search over data already materialized into ``self._index``.
  335. Buffered (unflushed) upserts are intentionally **not** searchable —
  336. only rows that a prior ``index_done_callback`` / ``finalize``
  337. flushed are considered. Use the read-your-writes paths
  338. (``get_by_id`` / ``get_by_ids`` / ``get_vectors_by_ids``) to observe
  339. pending data before a flush.
  340. Returns top_k results with their metadata + similarity distance.
  341. """
  342. if query_embedding is not None:
  343. embedding = np.array([query_embedding], dtype=np.float32)
  344. else:
  345. embedding = await self.embedding_func(
  346. [query], context="query", _priority=5
  347. ) # higher priority for query
  348. # embedding is shape (1, dim)
  349. embedding = np.array(embedding, dtype=np.float32)
  350. faiss.normalize_L2(embedding) # we do in-place normalization
  351. # Perform the similarity search
  352. index = await self._get_index()
  353. distances, indices = index.search(embedding, top_k)
  354. distances = distances[0]
  355. indices = indices[0]
  356. results = []
  357. for dist, idx in zip(distances, indices):
  358. if idx == -1:
  359. # Faiss returns -1 if no neighbor
  360. continue
  361. # Cosine similarity threshold
  362. if dist < self.cosine_better_than_threshold:
  363. continue
  364. meta = self._id_to_meta.get(idx)
  365. if not meta:
  366. # Orphan vector: a row lives at this fid in self._index but
  367. # has no metadata in self._id_to_meta. This happens after an
  368. # index > meta skew on reload (see _load_faiss_index). The
  369. # vector is reachable via faiss search but not via custom id;
  370. # surfacing it as {"id": None, ...} would leak a ghost row to
  371. # callers, so we silently skip — the skew was already warned
  372. # about at load time.
  373. continue
  374. # Filter out __vector__ from query results to avoid returning large vector data
  375. filtered_meta = {k: v for k, v in meta.items() if k != "__vector__"}
  376. results.append(
  377. {
  378. **filtered_meta,
  379. "id": meta.get("__id__"),
  380. "distance": float(dist),
  381. "created_at": meta.get("__created_at__"),
  382. }
  383. )
  384. return results
  385. @property
  386. def client_storage(self):
  387. """Return a snapshot view of the materialized metadata dict for debugging.
  388. **Buffered (unflushed) upserts are intentionally not visible here**
  389. — only rows that a prior ``index_done_callback`` / ``finalize``
  390. flushed into ``self._id_to_meta`` are returned. Use the
  391. read-your-writes paths (``get_by_id`` / ``get_by_ids`` /
  392. ``get_vectors_by_ids``) to observe pending data before a flush.
  393. The outer list is a fresh shallow copy taken at access time, but
  394. each element is still a **live reference** into
  395. ``self._id_to_meta``; callers must not mutate them and must not
  396. retain them across operations that may rebuild the index
  397. (``upsert`` flush, ``delete``, ``_remove_faiss_ids_locked``,
  398. ``_get_index`` reload), since a rebuild swaps ``self._index`` and
  399. replaces ``self._id_to_meta`` with a new dict.
  400. This property is **synchronous and does not call** ``_get_index``,
  401. so in a reader process it can return data older than the latest
  402. committed snapshot until some other method triggers a reload.
  403. """
  404. return {"data": list(self._id_to_meta.values())}
  405. async def delete(self, ids: list[str]):
  406. """Delete vectors for the provided custom IDs.
  407. Persistence:
  408. Changes are in-memory only; cross-process visibility requires
  409. a subsequent ``index_done_callback``. In ``lightrag.py`` this
  410. is handled by ``_insert_done()`` at the end of the document
  411. batch. Callers outside the pipeline must persist explicitly.
  412. Errors propagate to the caller — Faiss delete is destructive enough
  413. that document deletion / status updates must not proceed if the
  414. vectors were not actually removed. (This intentionally diverges
  415. from Nano, whose delete swallows + logs.)
  416. Args:
  417. ids: List of custom IDs to be deleted.
  418. """
  419. # Hold the lock so the pending-cancel and the rebuild are a single
  420. # critical section against a concurrent flush. Operate on
  421. # self._index / self._id_to_meta directly (the lock is
  422. # non-reentrant; no _get_index).
  423. async with self._storage_lock:
  424. self._reload_index_from_disk_locked(for_write=True)
  425. for doc_id in ids:
  426. self._pending_upserts.pop(doc_id, None)
  427. # Use the find-all variant so legacy/corrupt stores with
  428. # duplicate __id__ rows still get fully cleaned.
  429. to_remove: list[int] = []
  430. for cid in ids:
  431. to_remove.extend(self._find_faiss_ids_by_custom_id(cid))
  432. if to_remove:
  433. self._remove_faiss_ids_locked(to_remove)
  434. self._index_dirty = True
  435. logger.debug(
  436. f"[{self.workspace}] Successfully deleted {len(to_remove)} vectors from {self.namespace}"
  437. )
  438. async def delete_entity(self, entity_name: str) -> None:
  439. """Delete the vector associated with a single entity name.
  440. Thin wrapper over ``delete([entity_id])`` where ``entity_id`` is
  441. ``compute_mdhash_id(entity_name, prefix="ent-")``.
  442. Persistence:
  443. Changes are in-memory only; cross-process visibility requires
  444. a subsequent ``index_done_callback``. Callers outside the
  445. pipeline must persist explicitly.
  446. **Not pipeline-gated** — see class docstring
  447. *Non-pipeline write paths*. The caller is responsible for
  448. ensuring single-writer serialization.
  449. """
  450. entity_id = compute_mdhash_id(entity_name, prefix="ent-")
  451. logger.debug(
  452. f"[{self.workspace}] Attempting to delete entity {entity_name} with ID {entity_id}"
  453. )
  454. await self.delete([entity_id])
  455. async def delete_entity_relation(self, entity_name: str) -> None:
  456. """Delete every relation vector incident to ``entity_name``.
  457. Scans both ``self._pending_upserts`` (so buffered relation upserts
  458. get cancelled) and ``self._id_to_meta`` (the materialized rows) for
  459. entries whose ``src_id`` or ``tgt_id`` matches, then rebuilds the
  460. index without them.
  461. Persistence:
  462. Changes are in-memory only; cross-process visibility requires
  463. a subsequent ``index_done_callback``. Callers outside the
  464. pipeline must persist explicitly.
  465. Errors propagate (same rationale as ``delete``).
  466. Buffer semantics — post-prune with caller short-circuit contract:
  467. The materialized index rebuild runs first; matching pending
  468. upserts are pruned **only after** it succeeds. If the
  469. rebuild raises, the pending buffer stays intact so the
  470. caller (``adelete_by_entity`` in ``utils_graph.py``) can
  471. short-circuit before ``_persist_graph_updates`` flushes a
  472. half-cleaned buffer.
  473. **Not pipeline-gated** — see class docstring
  474. *Non-pipeline write paths*. The caller is responsible for
  475. ensuring single-writer serialization.
  476. """
  477. async with self._storage_lock:
  478. self._reload_index_from_disk_locked(for_write=True)
  479. # Materialized side first so a failure leaves the pending
  480. # buffer intact for the caller's retry path. .get() so rows
  481. # from foreign namespaces (no src_id / tgt_id) silently
  482. # don't match.
  483. relations = [
  484. fid
  485. for fid, meta in self._id_to_meta.items()
  486. if meta.get("src_id") == entity_name
  487. or meta.get("tgt_id") == entity_name
  488. ]
  489. if relations:
  490. self._remove_faiss_ids_locked(relations)
  491. self._index_dirty = True
  492. # Materialized rebuild succeeded — safe to prune matching
  493. # buffered upserts (their records carry src_id / tgt_id from
  494. # the relationships vdb meta_fields).
  495. pending_ids = [
  496. doc_id
  497. for doc_id, pdoc in self._pending_upserts.items()
  498. if pdoc.record.get("src_id") == entity_name
  499. or pdoc.record.get("tgt_id") == entity_name
  500. ]
  501. for doc_id in pending_ids:
  502. del self._pending_upserts[doc_id]
  503. total = len(pending_ids) + len(relations)
  504. if total:
  505. logger.debug(
  506. f"[{self.workspace}] Deleted {total} relations for {entity_name}"
  507. )
  508. else:
  509. logger.debug(
  510. f"[{self.workspace}] No relations found for entity {entity_name}"
  511. )
  512. # --------------------------------------------------------------------------------
  513. # Internal helper methods
  514. # --------------------------------------------------------------------------------
  515. def _find_faiss_id_by_custom_id(self, custom_id: str):
  516. """Return the first Faiss internal ID matching ``custom_id``, or ``None``.
  517. Adequate for read paths (any of N duplicate rows would carry the same
  518. ``__id__`` so returning one is fine semantically). Write paths that
  519. need to remove **all** duplicates — flush overwrite, ``delete`` —
  520. must use :py:meth:`_find_faiss_ids_by_custom_id` (plural) instead.
  521. """
  522. for fid, meta in self._id_to_meta.items():
  523. if meta.get("__id__") == custom_id:
  524. return fid
  525. return None
  526. def _find_faiss_ids_by_custom_id(self, custom_id: str) -> list[int]:
  527. """Return **every** Faiss internal ID whose metadata's ``__id__`` matches.
  528. In a healthy store every custom id maps to at most one fid (each flush
  529. rebuilds the index without the prior fid before adding the new one).
  530. This plural variant exists to defend against legacy / externally
  531. corrupted stores where multiple fids share a ``__id__`` — a re-upsert
  532. or ``delete`` using only the first match would leave stale duplicates
  533. behind. Used by ``_flush_pending_locked`` and ``delete``.
  534. """
  535. return [
  536. fid
  537. for fid, meta in self._id_to_meta.items()
  538. if meta.get("__id__") == custom_id
  539. ]
  540. def _remove_faiss_ids_locked(self, fid_list) -> None:
  541. """Remove a list of internal Faiss IDs by rebuilding the index.
  542. Precondition: the caller must already hold ``_storage_lock``. This
  543. is synchronous (no ``await``) because every step — dict scan,
  544. ``IndexFlatIP`` re-init, ``index.add`` — is synchronous, and the
  545. single critical section guarantees ``self._index`` and
  546. ``self._id_to_meta`` flip together. Because ``IndexFlatIP`` has no
  547. in-place removal API, we collect the kept vectors and rebuild.
  548. Callers that mutate via this helper are responsible for setting
  549. ``self._index_dirty = True`` themselves (skipped here so a no-op
  550. call — empty intersection between ``fid_list`` and current ids —
  551. does not falsely mark the storage dirty).
  552. """
  553. if not fid_list:
  554. return
  555. fid_set = set(fid_list)
  556. keep_fids = [fid for fid in self._id_to_meta if fid not in fid_set]
  557. vectors_to_keep = []
  558. new_id_to_meta = {}
  559. for old_fid in keep_fids:
  560. vec_meta = self._id_to_meta[old_fid]
  561. if "__vector__" in vec_meta:
  562. vec = vec_meta["__vector__"]
  563. elif old_fid < self._index.ntotal:
  564. vec = self._index.reconstruct(old_fid).tolist()
  565. vec_meta["__vector__"] = vec
  566. else:
  567. logger.warning(
  568. f"[{self.workspace}] Skipping fid={old_fid} during rebuild: "
  569. f"no vector and fid exceeds index size ({self._index.ntotal})"
  570. )
  571. continue
  572. new_fid = len(vectors_to_keep)
  573. vectors_to_keep.append(vec)
  574. new_id_to_meta[new_fid] = vec_meta
  575. self._index = faiss.IndexFlatIP(self._dim)
  576. if vectors_to_keep:
  577. arr = np.array(vectors_to_keep, dtype=np.float32)
  578. self._index.add(arr)
  579. self._id_to_meta = new_id_to_meta
  580. async def _flush_pending_locked(self) -> None:
  581. """Embed pending docs and materialize them into ``self._index`` + ``self._id_to_meta``.
  582. Precondition: the caller **must already hold** ``_storage_lock``. The
  583. lock is non-reentrant, so this helper never calls ``_get_index`` and
  584. operates on ``self._index`` / ``self._id_to_meta`` directly. Embedding
  585. runs inside the lock on purpose (see class docstring,
  586. *Deferred-embedding protocol*).
  587. Invariant: once ``_PendingFaissDoc.vector`` is set it is an **already
  588. L2-normalized float32 1D ndarray**. The flush honours this — vectors
  589. cached by a prior ``get_vectors_by_ids`` are not re-normalized; only
  590. newly embedded vectors go through ``faiss.normalize_L2``.
  591. Failure handling:
  592. * Embedding error / count mismatch → raises before any mutation
  593. to ``self._index`` / ``self._id_to_meta``; ``_pending_upserts``
  594. is left intact and ``self._index_dirty`` is not touched.
  595. * Rebuild / ``index.add`` failure → raises mid-write. The
  596. materialized state may already be partially mutated (e.g.
  597. ``_remove_faiss_ids_locked`` ran and dropped the prior fids
  598. for re-upserted ids), but ``_index_dirty`` is **not** set
  599. because we deliberately treat ``_pending_upserts`` as the
  600. source of truth on this path: pending stays intact, and the
  601. next ``finalize`` call re-enters ``_flush_pending_locked``,
  602. which will rebuild the affected rows from the cached vectors
  603. and re-add them — self-healing without re-embedding. The
  604. dirty flag is reserved for "materialized but unsaved",
  605. which is only true after ``index.add`` completes.
  606. """
  607. if not self._pending_upserts:
  608. return
  609. # Snapshot for stable ordering between the embed list and the write.
  610. pending_items = list(self._pending_upserts.items())
  611. to_embed = [
  612. (doc_id, pdoc) for doc_id, pdoc in pending_items if pdoc.vector is None
  613. ]
  614. if to_embed:
  615. contents = [pdoc.record["content"] for _, pdoc in to_embed]
  616. batches = [
  617. contents[i : i + self._max_batch_size]
  618. for i in range(0, len(contents), self._max_batch_size)
  619. ]
  620. logger.info(
  621. f"[{self.workspace}] {self.namespace} flush: embedding "
  622. f"{len(to_embed)} vectors in {len(batches)} batch(es) "
  623. f"(batch_num={self._max_batch_size})"
  624. )
  625. try:
  626. embeddings_list = await asyncio.gather(
  627. *[
  628. self.embedding_func(batch, context="document")
  629. for batch in batches
  630. ]
  631. )
  632. except Exception as e:
  633. logger.error(
  634. f"[{self.workspace}] Error embedding pending vector ops "
  635. f"(upserts={len(to_embed)}): {e}"
  636. )
  637. raise
  638. arr = np.concatenate(embeddings_list, axis=0).astype(np.float32)
  639. if len(arr) != len(to_embed):
  640. # Explicit raise (not a log): a mismatch would mis-pair vectors
  641. # with records. Keep pending intact so the next flush retries.
  642. raise RuntimeError(
  643. f"[{self.workspace}] embedding is not 1-1 with pending data, "
  644. f"{len(arr)} != {len(to_embed)}"
  645. )
  646. # Batch in-place normalize once (faiss.normalize_L2 requires 2D).
  647. faiss.normalize_L2(arr)
  648. for i, (_, pdoc) in enumerate(to_embed):
  649. pdoc.vector = arr[i].copy()
  650. # All pending vectors are now non-None and already-normalized float32.
  651. # Remove every existing fid in self._id_to_meta whose custom id is
  652. # being re-upserted (find-all so duplicate __id__ rows from a legacy /
  653. # corrupt store still get fully cleaned), then add the new vectors in
  654. # a single batch.
  655. existing_fids: list[int] = []
  656. for doc_id, _ in pending_items:
  657. existing_fids.extend(self._find_faiss_ids_by_custom_id(doc_id))
  658. self._remove_faiss_ids_locked(existing_fids)
  659. matrix = np.vstack([pdoc.vector for _, pdoc in pending_items]).astype(
  660. np.float32
  661. )
  662. start_idx = self._index.ntotal
  663. self._index.add(matrix)
  664. for i, (_, pdoc) in enumerate(pending_items):
  665. fid = start_idx + i
  666. record = pdoc.record
  667. record["__vector__"] = matrix[i].tolist()
  668. self._id_to_meta[fid] = record
  669. self._index_dirty = True
  670. # Clear only the entries we just flushed. Today the non-reentrant
  671. # _storage_lock locks out concurrent upserts for the entire flush
  672. # (including the asyncio.gather await), so the `is pdoc` identity
  673. # check is always True — it's kept as defensive scaffolding so that
  674. # if the lock scope is ever relaxed (e.g. embedding moved outside the
  675. # lock), a concurrent upsert that re-set vector=None would not be
  676. # silently dropped here.
  677. for doc_id, pdoc in pending_items:
  678. if self._pending_upserts.get(doc_id) is pdoc:
  679. del self._pending_upserts[doc_id]
  680. def _save_faiss_index(self):
  681. """Atomically persist ``self._index`` + ``self._id_to_meta`` to disk.
  682. Precondition: the caller must already hold ``_storage_lock`` (this is
  683. the symmetric counterpart of ``_flush_pending_locked`` — see Nano's
  684. ``_save_to_disk_locked``).
  685. Each file lands via a per-writer tmp + os.replace so a crash mid-write
  686. leaves the prior snapshot intact. **Cross-file consistency between
  687. the .index and .meta.json is not guaranteed**: the two renames are
  688. independent, so a crash between them can produce
  689. ``ntotal(.index) > rows(.meta)`` skew. ``_load_faiss_index`` tolerates
  690. skew on load by skipping unbacked rows and logs a warning if the
  691. index has more vectors than the meta describes. The
  692. ``index < meta`` direction is covered by
  693. ``test_faiss_meta_inconsistency``; the ``index > meta`` direction is
  694. a known gap (logged on reload, not auto-repaired) — see class
  695. docstring *Storage model*.
  696. """
  697. atomic_write(
  698. self._faiss_index_file,
  699. lambda tmp: faiss.write_index(self._index, tmp),
  700. self.workspace or "_",
  701. )
  702. # Save metadata dict to JSON, excluding __vector__ since vectors are
  703. # already stored in the Faiss index file and can be reconstructed on load.
  704. serializable_dict = {}
  705. for fid, meta in self._id_to_meta.items():
  706. filtered_meta = {k: v for k, v in meta.items() if k != "__vector__"}
  707. serializable_dict[str(fid)] = filtered_meta
  708. def _write_meta(tmp: str) -> None:
  709. with open(tmp, "w", encoding="utf-8") as f:
  710. json.dump(serializable_dict, f)
  711. atomic_write(self._meta_file, _write_meta, self.workspace or "_")
  712. def _load_faiss_index(self):
  713. """
  714. Load the Faiss index + metadata from disk if it exists,
  715. and rebuild in-memory structures so we can query.
  716. """
  717. if not os.path.exists(self._faiss_index_file):
  718. logger.warning(
  719. f"[{self.workspace}] No existing Faiss index file found for {self.namespace}"
  720. )
  721. return
  722. dim_mismatch = False
  723. try:
  724. # Load the Faiss index
  725. self._index = faiss.read_index(self._faiss_index_file)
  726. # Verify dimension consistency between loaded index and embedding function
  727. if self._index.d != self._dim:
  728. error_msg = (
  729. f"Dimension mismatch: loaded Faiss index has dimension {self._index.d}, "
  730. f"but embedding function expects dimension {self._dim}. "
  731. f"Please ensure the embedding model matches the stored index or rebuild the index."
  732. )
  733. logger.error(error_msg)
  734. dim_mismatch = True
  735. raise ValueError(error_msg)
  736. # Load metadata
  737. with open(self._meta_file, "r", encoding="utf-8") as f:
  738. stored_dict = json.load(f)
  739. # Convert string keys back to int and reconstruct vectors from index
  740. self._id_to_meta = {}
  741. for fid_str, meta in stored_dict.items():
  742. fid = int(fid_str)
  743. if fid >= self._index.ntotal:
  744. logger.warning(
  745. f"[{self.workspace}] Skipping metadata row fid={fid}: "
  746. f"exceeds index size ({self._index.ntotal})"
  747. )
  748. continue
  749. if "__vector__" not in meta:
  750. meta["__vector__"] = self._index.reconstruct(fid).tolist()
  751. self._id_to_meta[fid] = meta
  752. # Cross-file skew detection (index > meta direction): a crash
  753. # between the two atomic_writes in _save_faiss_index can leave
  754. # the index with more vectors than the meta describes. We log
  755. # but do not auto-repair — repair semantics (truncate index vs
  756. # rebuild meta) are out of scope here. See class docstring.
  757. if self._index.ntotal > len(self._id_to_meta):
  758. logger.warning(
  759. f"[{self.workspace}] FAISS index has {self._index.ntotal} vectors "
  760. f"but only {len(self._id_to_meta)} metadata rows — index > meta "
  761. f"skew from a prior crash between the .index and .meta.json "
  762. f"writes. Not auto-repairing; orphan vectors remain in the index "
  763. f"but unreachable via custom-id lookups."
  764. )
  765. logger.info(
  766. f"[{self.workspace}] Faiss index loaded with {self._index.ntotal} vectors from {self._faiss_index_file}"
  767. )
  768. except Exception as e:
  769. if dim_mismatch:
  770. raise
  771. logger.error(
  772. f"[{self.workspace}] Failed to load Faiss index or metadata: {e}"
  773. )
  774. logger.warning(f"[{self.workspace}] Starting with an empty Faiss index.")
  775. self._index = faiss.IndexFlatIP(self._dim)
  776. self._id_to_meta = {}
  777. async def index_done_callback(self) -> bool:
  778. """Flush deferred embeddings, commit to disk, and notify other processes.
  779. This is the writer's **commit point** in the cross-process sync
  780. protocol (see class docstring). Effects, in order:
  781. 1. If another process committed first, reload the latest on-disk
  782. snapshot while preserving this process's pending buffer.
  783. 2. ``_flush_pending_locked`` embeds every buffered upsert (once
  784. per id) and materializes it into ``self._index`` +
  785. ``self._id_to_meta``. A failure here **raises** — pending is
  786. kept, ``_index_dirty`` is not touched, nothing is written to
  787. the index.
  788. 3. ``_save_faiss_index`` atomically writes ``.index`` and
  789. ``.meta.json``. A failure here **also raises**;
  790. ``_pending_upserts`` is already empty (flush succeeded) and
  791. ``_index_dirty`` stays ``True`` so a later ``finalize``
  792. retries the save without re-embedding.
  793. 4. ``set_all_update_flags`` flips every registered process's
  794. ``storage_updated`` flag, then we immediately reset our own
  795. flag to ``False`` so the writer does not self-reload on the
  796. next call to ``_get_index``.
  797. Either failure surfaces loudly through ``_insert_done`` so the
  798. caller can abort the document batch instead of silently losing
  799. vectors. The bool return is kept for legacy callers but is
  800. effectively always ``True`` on the success path.
  801. """
  802. async with self._storage_lock:
  803. self._reload_index_from_disk_locked(for_write=True)
  804. # Flush + save both raise on failure (embedding mismatch / save IO
  805. # error). The exception propagates out of the lock so _insert_done
  806. # aborts the batch; pending stays intact and _index_dirty stays
  807. # True (if only the save failed) for a later retry.
  808. await self._flush_pending_locked()
  809. self._save_faiss_index()
  810. await set_all_update_flags(self.namespace, workspace=self.workspace)
  811. self.storage_updated.value = False
  812. self._index_dirty = False
  813. return True
  814. @staticmethod
  815. def _format_record(dp: dict[str, Any]) -> dict[str, Any]:
  816. """Shape a stored/pending record into the public read result."""
  817. return {
  818. **{k: v for k, v in dp.items() if k != "__vector__"},
  819. "id": dp.get("__id__"),
  820. "created_at": dp.get("__created_at__"),
  821. }
  822. async def get_by_id(self, id: str) -> dict[str, Any] | None:
  823. """Get vector data by its ID (read-your-writes against the pending buffer).
  824. Args:
  825. id: The unique identifier of the vector
  826. Returns:
  827. The vector data if found, or None if not found
  828. """
  829. # Read-your-writes: a buffered upsert is visible before its flush.
  830. async with self._storage_lock:
  831. pending = self._pending_upserts.get(id)
  832. if pending is not None:
  833. return self._format_record(pending.record)
  834. await self._get_index() # reload-if-stale
  835. fid = self._find_faiss_id_by_custom_id(id)
  836. if fid is None:
  837. return None
  838. metadata = self._id_to_meta.get(fid)
  839. return self._format_record(metadata) if metadata else None
  840. async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
  841. """Get multiple vector data by their IDs (read-your-writes), preserving order.
  842. Args:
  843. ids: List of unique identifiers
  844. Returns:
  845. List of vector data objects (or ``None`` placeholders) in the
  846. same order as ``ids``.
  847. """
  848. if not ids:
  849. return []
  850. # Read-your-writes: serve buffered upserts from the pending area and
  851. # only query the materialized index for the remaining ids.
  852. result_map: dict[str, dict[str, Any]] = {}
  853. remaining: list[str] = []
  854. async with self._storage_lock:
  855. for requested_id in ids:
  856. pending = self._pending_upserts.get(requested_id)
  857. if pending is not None:
  858. result_map[str(requested_id)] = self._format_record(pending.record)
  859. else:
  860. remaining.append(requested_id)
  861. if remaining:
  862. await self._get_index() # reload-if-stale
  863. for cid in remaining:
  864. fid = self._find_faiss_id_by_custom_id(cid)
  865. if fid is None:
  866. continue
  867. metadata = self._id_to_meta.get(fid)
  868. if metadata:
  869. result_map[str(cid)] = self._format_record(metadata)
  870. return [result_map.get(str(requested_id)) for requested_id in ids]
  871. async def get_vectors_by_ids(self, ids: list[str]) -> dict[str, list[float]]:
  872. """Get vectors by their IDs (read-your-writes), returning only ID and vector.
  873. For buffered upserts the vector is computed lazily (and cached back
  874. onto the pending doc so the next flush reuses it instead of
  875. re-embedding); for materialized rows the stored normalized vector is
  876. returned directly.
  877. Args:
  878. ids: List of unique identifiers
  879. Returns:
  880. Dictionary mapping IDs to their vector embeddings
  881. Format: {id: [vector_values], ...}
  882. """
  883. if not ids:
  884. return {}
  885. vectors_dict: dict[str, list[float]] = {}
  886. remaining: list[str] = []
  887. async with self._storage_lock:
  888. to_embed: list[tuple[str, _PendingFaissDoc]] = []
  889. for requested_id in ids:
  890. pending = self._pending_upserts.get(requested_id)
  891. if pending is None:
  892. remaining.append(requested_id)
  893. elif pending.vector is not None:
  894. vectors_dict[requested_id] = pending.vector.astype(
  895. np.float32
  896. ).tolist()
  897. else:
  898. to_embed.append((requested_id, pending))
  899. if to_embed:
  900. contents = [pdoc.record["content"] for _, pdoc in to_embed]
  901. batches = [
  902. contents[i : i + self._max_batch_size]
  903. for i in range(0, len(contents), self._max_batch_size)
  904. ]
  905. embeddings_list = await asyncio.gather(
  906. *[
  907. self.embedding_func(batch, context="document")
  908. for batch in batches
  909. ]
  910. )
  911. arr = np.concatenate(embeddings_list, axis=0).astype(np.float32)
  912. if len(arr) != len(to_embed):
  913. raise RuntimeError(
  914. f"[{self.workspace}] embedding is not 1-1 with pending data, "
  915. f"{len(arr)} != {len(to_embed)}"
  916. )
  917. # Batch normalize once; shared invariant with _flush_pending_locked.
  918. faiss.normalize_L2(arr)
  919. for i, (requested_id, pdoc) in enumerate(to_embed):
  920. # Cache the normalized vector back so the next flush reuses it.
  921. pdoc.vector = arr[i].copy()
  922. vectors_dict[requested_id] = arr[i].tolist()
  923. if remaining:
  924. await self._get_index() # reload-if-stale
  925. for cid in remaining:
  926. fid = self._find_faiss_id_by_custom_id(cid)
  927. if fid is None or fid not in self._id_to_meta:
  928. continue
  929. metadata = self._id_to_meta[fid]
  930. if "__vector__" in metadata:
  931. vectors_dict[cid] = metadata["__vector__"]
  932. return vectors_dict
  933. async def drop(self) -> dict[str, str]:
  934. """Drop all vector data from storage and reinitialize the index.
  935. This method will:
  936. 1. Reset ``self._index`` to a fresh ``IndexFlatIP`` and clear
  937. ``self._id_to_meta``.
  938. 2. Remove both on-disk files (``.index`` and ``.meta.json``)
  939. if they exist.
  940. 3. Notify other processes via ``set_all_update_flags`` and
  941. reset the writer's own flag.
  942. Caller contract:
  943. ``drop`` is destructive and **not** serialized by this storage
  944. class. The caller must hold the pipeline ``busy`` reservation
  945. (the ``/documents/clear`` endpoint does this) before invoking
  946. it — running ``drop`` concurrently with an active document
  947. pipeline will tear down storage out from under the writer and
  948. silently lose data. See class docstring,
  949. *Non-pipeline write paths*.
  950. Returns:
  951. dict[str, str]: Operation status and message
  952. - On success: {"status": "success", "message": "data dropped"}
  953. - On failure: {"status": "error", "message": "<error details>"}
  954. """
  955. try:
  956. async with self._storage_lock:
  957. # Discard buffered (unflushed) upserts along with the data.
  958. self._pending_upserts.clear()
  959. # Reset the index
  960. self._index = faiss.IndexFlatIP(self._dim)
  961. self._id_to_meta = {}
  962. # Remove storage files if they exist
  963. if os.path.exists(self._faiss_index_file):
  964. os.remove(self._faiss_index_file)
  965. if os.path.exists(self._meta_file):
  966. os.remove(self._meta_file)
  967. self._id_to_meta = {}
  968. self._load_faiss_index()
  969. self._index_dirty = False
  970. # Notify other processes
  971. await set_all_update_flags(self.namespace, workspace=self.workspace)
  972. self.storage_updated.value = False
  973. logger.info(
  974. f"[{self.workspace}] Process {os.getpid()} drop FAISS index {self.namespace}"
  975. )
  976. return {"status": "success", "message": "data dropped"}
  977. except Exception as e:
  978. logger.error(
  979. f"[{self.workspace}] Error dropping FAISS index {self.namespace}: {e}"
  980. )
  981. return {"status": "error", "message": str(e)}
  982. async def finalize(self):
  983. """Flush any buffered upserts and persist before shutdown (safety net).
  984. Normally ``index_done_callback`` has already drained the pending
  985. buffer and synced to disk, but two paths land here with work to do:
  986. - **Pending upserts only** (no prior ``index_done_callback``): flush
  987. and save. We reload first so a stale process picks up other
  988. writers' commits before merging its pending buffer in.
  989. - **Unsaved materialized changes** (``_index_dirty=True``): an
  990. earlier ``index_done_callback`` flushed pending into ``self._index``
  991. but its save raised. Skip the reload — reloading would drop those
  992. materialized-but-unsaved rows — and just retry the save.
  993. Flush / save failures propagate (same contract as
  994. ``index_done_callback``); a partially flushed buffer is preserved
  995. for a future retry.
  996. """
  997. async with self._storage_lock:
  998. if not self._pending_upserts and not self._index_dirty:
  999. return
  1000. if self._pending_upserts:
  1001. # Only reload when we have nothing un-persisted in self._index.
  1002. # A dirty index carries successfully-flushed-but-unsaved rows
  1003. # from a prior index_done_callback; reloading would silently
  1004. # drop them.
  1005. if not self._index_dirty:
  1006. self._reload_index_from_disk_locked(for_write=True)
  1007. await self._flush_pending_locked()
  1008. self._save_faiss_index()
  1009. await set_all_update_flags(self.namespace, workspace=self.workspace)
  1010. self.storage_updated.value = False
  1011. self._index_dirty = False