milvus_impl.py 82 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030
  1. import asyncio
  2. import os
  3. from typing import Any, final, Optional, Dict
  4. from dataclasses import dataclass, fields
  5. import numpy as np
  6. from lightrag.utils import logger, compute_mdhash_id, _cooperative_yield
  7. from ..base import BaseVectorStorage
  8. from ..constants import DEFAULT_MAX_FILE_PATH_LENGTH
  9. from ..kg.shared_storage import get_data_init_lock, get_namespace_lock
  10. import pipmaster as pm
  11. if not pm.is_installed("pymilvus"):
  12. pm.install("pymilvus>=2.6.2")
  13. import configparser
  14. from pymilvus import MilvusClient, DataType, CollectionSchema, FieldSchema # type: ignore
  15. from packaging import version
  16. config = configparser.ConfigParser()
  17. config.read("config.ini", "utf-8")
  18. @dataclass
  19. class _PendingVectorDoc:
  20. """Buffered vector upsert waiting for embedding and/or bulk flush."""
  21. source: dict[str, Any]
  22. content: str
  23. vector: list[float] | None = None
  24. # Supported index types
  25. SUPPORTED_INDEX_TYPES = {
  26. "AUTOINDEX",
  27. "HNSW",
  28. "HNSW_SQ",
  29. "HNSW_PQ",
  30. "HNSW_PRQ",
  31. "IVF_FLAT",
  32. "IVF_SQ8",
  33. "IVF_PQ",
  34. "DISKANN",
  35. "SCANN",
  36. }
  37. # Supported metric types
  38. SUPPORTED_METRIC_TYPES = {"COSINE", "L2", "IP"}
  39. # HNSW_SQ quantization types
  40. SUPPORTED_SQ_TYPES = {"SQ4U", "SQ6", "SQ8", "BF16", "FP16"}
  41. SUPPORTED_REFINE_TYPES = {"SQ6", "SQ8", "BF16", "FP16", "FP32"}
  42. # Index type version requirements
  43. # Important: HNSW_SQ was first introduced in Milvus 2.6.8 (not 2.5)
  44. INDEX_VERSION_REQUIREMENTS = {
  45. "HNSW_SQ": "2.6.8", # HNSW_SQ requires Milvus 2.6.8+ (supports sq_types such as SQ4U, SQ6, SQ8, BF16, FP16)
  46. }
  47. def _get_env_bool(key: str, default: bool = False) -> bool:
  48. """Parse environment variable as boolean"""
  49. val = os.environ.get(key, "").lower()
  50. if val in ("true", "1", "yes", "on"):
  51. return True
  52. elif val in ("false", "0", "no", "off"):
  53. return False
  54. return default
  55. def _get_env_int(key: str, default: int) -> int:
  56. """Parse environment variable as integer"""
  57. val = os.environ.get(key, "")
  58. if val:
  59. try:
  60. return int(val)
  61. except ValueError:
  62. logger.warning(
  63. f"Invalid integer value for {key}: {val}, using default {default}"
  64. )
  65. return default
  66. @dataclass
  67. class MilvusIndexConfig:
  68. """
  69. Milvus vector index configuration class
  70. Supports configuration via environment variables or initialization parameters.
  71. Initialization parameters take precedence over environment variables.
  72. """
  73. # Base configuration
  74. index_type: Optional[str] = None
  75. metric_type: Optional[str] = None
  76. # HNSW series parameters
  77. hnsw_m: Optional[int] = None
  78. hnsw_ef_construction: Optional[int] = None
  79. hnsw_ef: Optional[int] = None
  80. # HNSW_SQ specific parameters
  81. sq_type: Optional[str] = None
  82. sq_refine: Optional[bool] = None
  83. sq_refine_type: Optional[str] = None
  84. sq_refine_k: Optional[int] = None
  85. # IVF series parameters
  86. ivf_nlist: Optional[int] = None
  87. ivf_nprobe: Optional[int] = None
  88. def __post_init__(self):
  89. """Load configuration from environment variables (init parameters take precedence)"""
  90. # Index type
  91. self.index_type = (
  92. self.index_type or os.environ.get("MILVUS_INDEX_TYPE", "AUTOINDEX")
  93. ).upper()
  94. # Metric type
  95. self.metric_type = (
  96. self.metric_type or os.environ.get("MILVUS_METRIC_TYPE", "COSINE")
  97. ).upper()
  98. # HNSW parameters
  99. # Defaults aligned with Milvus 2.4+ official documentation
  100. if self.hnsw_m is None:
  101. self.hnsw_m = _get_env_int("MILVUS_HNSW_M", 16)
  102. if self.hnsw_ef_construction is None:
  103. self.hnsw_ef_construction = _get_env_int("MILVUS_HNSW_EF_CONSTRUCTION", 360)
  104. if self.hnsw_ef is None:
  105. self.hnsw_ef = _get_env_int("MILVUS_HNSW_EF", 200)
  106. # HNSW_SQ parameters
  107. if self.sq_type is None:
  108. self.sq_type = os.environ.get("MILVUS_HNSW_SQ_TYPE", "SQ8").upper()
  109. if self.sq_refine is None:
  110. self.sq_refine = _get_env_bool("MILVUS_HNSW_SQ_REFINE", False)
  111. if self.sq_refine_type is None:
  112. self.sq_refine_type = os.environ.get(
  113. "MILVUS_HNSW_SQ_REFINE_TYPE", "FP32"
  114. ).upper()
  115. if self.sq_refine_k is None:
  116. self.sq_refine_k = _get_env_int("MILVUS_HNSW_SQ_REFINE_K", 10)
  117. # IVF parameters
  118. if self.ivf_nlist is None:
  119. self.ivf_nlist = _get_env_int("MILVUS_IVF_NLIST", 1024)
  120. if self.ivf_nprobe is None:
  121. self.ivf_nprobe = _get_env_int("MILVUS_IVF_NPROBE", 16)
  122. # Validate configuration
  123. self._validate()
  124. def _validate(self):
  125. """Validate configuration validity"""
  126. if self.index_type not in SUPPORTED_INDEX_TYPES:
  127. raise ValueError(
  128. f"Unsupported index type: {self.index_type}. "
  129. f"Supported: {SUPPORTED_INDEX_TYPES}"
  130. )
  131. if self.metric_type not in SUPPORTED_METRIC_TYPES:
  132. raise ValueError(
  133. f"Unsupported metric type: {self.metric_type}. "
  134. f"Supported: {SUPPORTED_METRIC_TYPES}"
  135. )
  136. if self.index_type == "HNSW_SQ":
  137. if self.sq_type not in SUPPORTED_SQ_TYPES:
  138. raise ValueError(
  139. f"Unsupported sq_type: {self.sq_type}. "
  140. f"Supported: {SUPPORTED_SQ_TYPES}"
  141. )
  142. if self.sq_refine and self.sq_refine_type not in SUPPORTED_REFINE_TYPES:
  143. raise ValueError(
  144. f"Unsupported refine_type: {self.sq_refine_type}. "
  145. f"Supported: {SUPPORTED_REFINE_TYPES}"
  146. )
  147. # Parameter range validation
  148. if not (2 <= self.hnsw_m <= 2048):
  149. raise ValueError(f"hnsw_m must be in [2, 2048], got {self.hnsw_m}")
  150. if self.hnsw_ef_construction < 1:
  151. raise ValueError(
  152. f"hnsw_ef_construction must be >= 1, got {self.hnsw_ef_construction}"
  153. )
  154. if self.ivf_nlist < 1 or self.ivf_nlist > 65536:
  155. raise ValueError(f"ivf_nlist must be in [1, 65536], got {self.ivf_nlist}")
  156. def validate_milvus_version(self, server_version: str) -> None:
  157. """
  158. Validate Milvus server version supports the configured index type
  159. Args:
  160. server_version: Milvus server version string (e.g., "2.6.9")
  161. Raises:
  162. ValueError: Version does not meet index type requirements
  163. """
  164. current_ver = version.parse(
  165. server_version.split("-")[0]
  166. ) # Handle "2.6.9-dev" format
  167. # Check HNSW_SQ index type version requirements (requires 2.6.8+)
  168. if self.index_type == "HNSW_SQ":
  169. required = INDEX_VERSION_REQUIREMENTS["HNSW_SQ"]
  170. if current_ver < version.parse(required):
  171. raise ValueError(
  172. f"HNSW_SQ requires Milvus {required}+, "
  173. f"current version: {server_version}"
  174. )
  175. logger.info(
  176. f"Milvus version {server_version} validated for index type "
  177. f"{self.index_type}"
  178. + (f" with sq_type {self.sq_type}" if self.index_type == "HNSW_SQ" else "")
  179. )
  180. def build_index_params(self, index_params, field_name: str = "vector"):
  181. """
  182. Build pymilvus index parameters
  183. Args:
  184. index_params: IndexParams instance (from compatibility helper or client.prepare_index_params())
  185. field_name: Vector field name
  186. Returns:
  187. IndexParams object, or a dict fallback when direct API creation is needed.
  188. """
  189. if index_params is None:
  190. if self.index_type == "AUTOINDEX":
  191. logger.info(
  192. "Using AUTOINDEX with direct API fallback because IndexParams is unavailable"
  193. )
  194. return {
  195. "field_name": field_name,
  196. "index_type": self.index_type,
  197. "metric_type": self.metric_type,
  198. "params": {},
  199. }
  200. raise RuntimeError(
  201. f"IndexParams not available but required for index type "
  202. f"'{self.index_type}'. Ensure pymilvus is installed correctly."
  203. )
  204. params: Dict[str, Any] = {}
  205. # HNSW series indexes
  206. if self.index_type in ("HNSW", "HNSW_SQ", "HNSW_PQ", "HNSW_PRQ"):
  207. params["M"] = self.hnsw_m
  208. params["efConstruction"] = self.hnsw_ef_construction
  209. # HNSW_SQ specific parameters
  210. if self.index_type == "HNSW_SQ":
  211. params["sq_type"] = self.sq_type
  212. if self.sq_refine:
  213. params["refine"] = True
  214. params["refine_type"] = self.sq_refine_type
  215. # IVF series indexes
  216. elif self.index_type in ("IVF_FLAT", "IVF_SQ8", "IVF_PQ"):
  217. params["nlist"] = self.ivf_nlist
  218. # DISKANN / SCANN have no additional params
  219. index_params.add_index(
  220. field_name=field_name,
  221. index_type=self.index_type,
  222. metric_type=self.metric_type,
  223. params=params,
  224. )
  225. logger.info(
  226. f"Milvus index configured: type={self.index_type}, "
  227. f"metric={self.metric_type}, params={params}"
  228. )
  229. return index_params
  230. def build_search_params(self) -> Dict[str, Any]:
  231. """
  232. Build search parameters
  233. Returns:
  234. Search parameters dictionary
  235. """
  236. search_params: Dict[str, Any] = {}
  237. if self.index_type in ("HNSW", "HNSW_SQ", "HNSW_PQ", "HNSW_PRQ"):
  238. search_params["ef"] = self.hnsw_ef
  239. if self.index_type == "HNSW_SQ" and self.sq_refine:
  240. search_params["refine_k"] = self.sq_refine_k
  241. elif self.index_type in ("IVF_FLAT", "IVF_SQ8", "IVF_PQ"):
  242. search_params["nprobe"] = self.ivf_nprobe
  243. return {"params": search_params} if search_params else {}
  244. @classmethod
  245. def get_config_field_names(cls) -> set:
  246. """Get all configuration field names from the dataclass.
  247. This method provides a single source of truth for configuration parameter names,
  248. eliminating the need to maintain duplicate hardcoded lists elsewhere.
  249. Returns:
  250. Set of field names that can be used to extract configuration from kwargs
  251. """
  252. return {f.name for f in fields(cls)}
  253. def to_dict(self) -> Dict[str, Any]:
  254. """Export configuration as dictionary (for logging/debugging)"""
  255. return {
  256. "index_type": self.index_type,
  257. "metric_type": self.metric_type,
  258. "hnsw_m": self.hnsw_m,
  259. "hnsw_ef_construction": self.hnsw_ef_construction,
  260. "hnsw_ef": self.hnsw_ef,
  261. "sq_type": self.sq_type if self.index_type == "HNSW_SQ" else None,
  262. "sq_refine": self.sq_refine if self.index_type == "HNSW_SQ" else None,
  263. "sq_refine_type": (
  264. self.sq_refine_type
  265. if self.index_type == "HNSW_SQ" and self.sq_refine
  266. else None
  267. ),
  268. "sq_refine_k": (
  269. self.sq_refine_k
  270. if self.index_type == "HNSW_SQ" and self.sq_refine
  271. else None
  272. ),
  273. "ivf_nlist": (
  274. self.ivf_nlist if self.index_type.startswith("IVF") else None
  275. ),
  276. "ivf_nprobe": (
  277. self.ivf_nprobe if self.index_type.startswith("IVF") else None
  278. ),
  279. }
  280. @final
  281. @dataclass
  282. class MilvusVectorDBStorage(BaseVectorStorage):
  283. def _get_milvus_connection_kwargs(self, include_db_name: bool = True) -> dict:
  284. """Build Milvus connection kwargs from env/config."""
  285. connection_kwargs = {
  286. "uri": os.environ.get(
  287. "MILVUS_URI",
  288. config.get(
  289. "milvus",
  290. "uri",
  291. fallback=os.path.join(
  292. self.global_config["working_dir"], "milvus_lite.db"
  293. ),
  294. ),
  295. ),
  296. "user": os.environ.get(
  297. "MILVUS_USER", config.get("milvus", "user", fallback=None)
  298. ),
  299. "password": os.environ.get(
  300. "MILVUS_PASSWORD",
  301. config.get("milvus", "password", fallback=None),
  302. ),
  303. "token": os.environ.get(
  304. "MILVUS_TOKEN", config.get("milvus", "token", fallback=None)
  305. ),
  306. }
  307. db_name = os.environ.get(
  308. "MILVUS_DB_NAME",
  309. config.get("milvus", "db_name", fallback=None),
  310. )
  311. if include_db_name and db_name:
  312. connection_kwargs["db_name"] = db_name
  313. return connection_kwargs
  314. def _get_milvus_db_name(self) -> Optional[str]:
  315. """Return the configured Milvus database name, if any."""
  316. db_name = self._get_milvus_connection_kwargs(include_db_name=True).get(
  317. "db_name"
  318. )
  319. if db_name is None:
  320. return None
  321. normalized_name = str(db_name).strip()
  322. return normalized_name or None
  323. def _create_milvus_client(self) -> MilvusClient:
  324. """Create a Milvus client and ensure the configured database exists."""
  325. client = MilvusClient(
  326. **self._get_milvus_connection_kwargs(include_db_name=False)
  327. )
  328. db_name = self._get_milvus_db_name()
  329. if not db_name:
  330. return client
  331. existing_databases = set(client.list_databases())
  332. if db_name not in existing_databases:
  333. logger.warning(
  334. f"[{self.workspace}] Milvus database '{db_name}' not found, creating it"
  335. )
  336. client.create_database(db_name)
  337. use_database = getattr(client, "use_database", None) or getattr(
  338. client, "using_database", None
  339. )
  340. if callable(use_database):
  341. use_database(db_name)
  342. logger.debug(
  343. f"[{self.workspace}] Using Milvus database '{db_name}' for namespace '{self.namespace}'"
  344. )
  345. return client
  346. return MilvusClient(**self._get_milvus_connection_kwargs(include_db_name=True))
  347. def _create_schema_for_namespace(self) -> CollectionSchema:
  348. """Create schema based on the current instance's namespace"""
  349. # Get vector dimension from embedding_func
  350. dimension = self.embedding_func.embedding_dim
  351. # Base fields (common to all collections)
  352. base_fields = [
  353. FieldSchema(
  354. name="id", dtype=DataType.VARCHAR, max_length=64, is_primary=True
  355. ),
  356. FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=dimension),
  357. FieldSchema(name="created_at", dtype=DataType.INT64),
  358. ]
  359. # Determine specific fields based on namespace
  360. if self.namespace.endswith("entities"):
  361. specific_fields = [
  362. FieldSchema(
  363. name="entity_name",
  364. dtype=DataType.VARCHAR,
  365. max_length=512,
  366. nullable=True,
  367. ),
  368. FieldSchema(
  369. name="file_path",
  370. dtype=DataType.VARCHAR,
  371. max_length=DEFAULT_MAX_FILE_PATH_LENGTH,
  372. nullable=True,
  373. ),
  374. ]
  375. description = "LightRAG entities vector storage"
  376. elif self.namespace.endswith("relationships"):
  377. specific_fields = [
  378. FieldSchema(
  379. name="src_id", dtype=DataType.VARCHAR, max_length=512, nullable=True
  380. ),
  381. FieldSchema(
  382. name="tgt_id", dtype=DataType.VARCHAR, max_length=512, nullable=True
  383. ),
  384. FieldSchema(
  385. name="file_path",
  386. dtype=DataType.VARCHAR,
  387. max_length=DEFAULT_MAX_FILE_PATH_LENGTH,
  388. nullable=True,
  389. ),
  390. ]
  391. description = "LightRAG relationships vector storage"
  392. elif self.namespace.endswith("chunks"):
  393. specific_fields = [
  394. FieldSchema(
  395. name="full_doc_id",
  396. dtype=DataType.VARCHAR,
  397. max_length=64,
  398. nullable=True,
  399. ),
  400. FieldSchema(
  401. name="file_path",
  402. dtype=DataType.VARCHAR,
  403. max_length=DEFAULT_MAX_FILE_PATH_LENGTH,
  404. nullable=True,
  405. ),
  406. ]
  407. description = "LightRAG chunks vector storage"
  408. else:
  409. # Default generic schema (backward compatibility)
  410. specific_fields = [
  411. FieldSchema(
  412. name="file_path",
  413. dtype=DataType.VARCHAR,
  414. max_length=DEFAULT_MAX_FILE_PATH_LENGTH,
  415. nullable=True,
  416. ),
  417. ]
  418. description = "LightRAG generic vector storage"
  419. # Merge all fields
  420. all_fields = base_fields + specific_fields
  421. return CollectionSchema(
  422. fields=all_fields,
  423. description=description,
  424. enable_dynamic_field=True, # Support dynamic fields
  425. )
  426. def _get_index_params(self):
  427. """Get IndexParams in a version-compatible way"""
  428. try:
  429. # Try to use client's prepare_index_params method (most common)
  430. if hasattr(self._client, "prepare_index_params"):
  431. return self._client.prepare_index_params()
  432. except Exception:
  433. pass
  434. try:
  435. # Try to import IndexParams from different possible locations
  436. from pymilvus.client.prepare import IndexParams # type: ignore
  437. return IndexParams()
  438. except ImportError:
  439. pass
  440. try:
  441. from pymilvus.client.types import IndexParams # type: ignore
  442. return IndexParams()
  443. except ImportError:
  444. pass
  445. try:
  446. from pymilvus import IndexParams # type: ignore
  447. return IndexParams()
  448. except ImportError:
  449. pass
  450. # If all else fails, return None to use fallback method
  451. return None
  452. def _create_scalar_index_fallback(self, field_name: str, index_type: str):
  453. """Fallback method to create scalar index using direct API"""
  454. # Skip unsupported index types
  455. if index_type == "SORTED":
  456. logger.info(
  457. f"[{self.workspace}] Skipping SORTED index for {field_name} (not supported in this Milvus version)"
  458. )
  459. return
  460. try:
  461. self._client.create_index(
  462. collection_name=self.final_namespace,
  463. field_name=field_name,
  464. index_params={"index_type": index_type},
  465. )
  466. logger.debug(
  467. f"[{self.workspace}] Created {field_name} index using fallback method"
  468. )
  469. except Exception as e:
  470. logger.info(
  471. f"[{self.workspace}] Could not create {field_name} index using fallback method: {e}"
  472. )
  473. def _create_indexes_after_collection(self):
  474. """Create indexes after collection is created"""
  475. # Build vector index using index configuration
  476. # Use compatibility helper to get IndexParams
  477. index_params_for_vector = self._get_index_params()
  478. vector_index_params = self.index_config.build_index_params(
  479. index_params_for_vector, field_name="vector"
  480. )
  481. # Re-raise exceptions to surface vector index creation failures
  482. if isinstance(vector_index_params, dict):
  483. self._client.create_index(
  484. collection_name=self.final_namespace,
  485. field_name=vector_index_params["field_name"],
  486. index_params={
  487. "index_type": vector_index_params["index_type"],
  488. "metric_type": vector_index_params["metric_type"],
  489. "params": vector_index_params["params"],
  490. },
  491. )
  492. else:
  493. self._client.create_index(
  494. collection_name=self.final_namespace,
  495. index_params=vector_index_params,
  496. )
  497. logger.debug(
  498. f"[{self.workspace}] Created vector index with config: {self.index_config.to_dict()}"
  499. )
  500. # Create scalar indexes based on namespace
  501. # Wrap scalar index creation in try-except to allow graceful degradation
  502. try:
  503. # Try to get IndexParams in a version-compatible way
  504. scalar_index_params = self._get_index_params()
  505. if scalar_index_params is not None:
  506. # Create scalar indexes based on namespace
  507. if self.namespace.endswith("entities"):
  508. # Create indexes for entity fields
  509. try:
  510. entity_name_index = self._get_index_params()
  511. entity_name_index.add_index(
  512. field_name="entity_name", index_type="INVERTED"
  513. )
  514. self._client.create_index(
  515. collection_name=self.final_namespace,
  516. index_params=entity_name_index,
  517. )
  518. except Exception as e:
  519. logger.debug(
  520. f"[{self.workspace}] IndexParams method failed for entity_name: {e}"
  521. )
  522. self._create_scalar_index_fallback("entity_name", "INVERTED")
  523. elif self.namespace.endswith("relationships"):
  524. # Create indexes for relationship fields
  525. try:
  526. src_id_index = self._get_index_params()
  527. src_id_index.add_index(
  528. field_name="src_id", index_type="INVERTED"
  529. )
  530. self._client.create_index(
  531. collection_name=self.final_namespace,
  532. index_params=src_id_index,
  533. )
  534. except Exception as e:
  535. logger.debug(
  536. f"[{self.workspace}] IndexParams method failed for src_id: {e}"
  537. )
  538. self._create_scalar_index_fallback("src_id", "INVERTED")
  539. try:
  540. tgt_id_index = self._get_index_params()
  541. tgt_id_index.add_index(
  542. field_name="tgt_id", index_type="INVERTED"
  543. )
  544. self._client.create_index(
  545. collection_name=self.final_namespace,
  546. index_params=tgt_id_index,
  547. )
  548. except Exception as e:
  549. logger.debug(
  550. f"[{self.workspace}] IndexParams method failed for tgt_id: {e}"
  551. )
  552. self._create_scalar_index_fallback("tgt_id", "INVERTED")
  553. elif self.namespace.endswith("chunks"):
  554. # Create indexes for chunk fields
  555. try:
  556. doc_id_index = self._get_index_params()
  557. doc_id_index.add_index(
  558. field_name="full_doc_id", index_type="INVERTED"
  559. )
  560. self._client.create_index(
  561. collection_name=self.final_namespace,
  562. index_params=doc_id_index,
  563. )
  564. except Exception as e:
  565. logger.debug(
  566. f"[{self.workspace}] IndexParams method failed for full_doc_id: {e}"
  567. )
  568. self._create_scalar_index_fallback("full_doc_id", "INVERTED")
  569. else:
  570. # Fallback to direct API calls if IndexParams is not available
  571. logger.info(
  572. f"[{self.workspace}] IndexParams not available, using fallback methods for {self.namespace}"
  573. )
  574. # Create scalar indexes using fallback
  575. if self.namespace.endswith("entities"):
  576. self._create_scalar_index_fallback("entity_name", "INVERTED")
  577. elif self.namespace.endswith("relationships"):
  578. self._create_scalar_index_fallback("src_id", "INVERTED")
  579. self._create_scalar_index_fallback("tgt_id", "INVERTED")
  580. elif self.namespace.endswith("chunks"):
  581. self._create_scalar_index_fallback("full_doc_id", "INVERTED")
  582. logger.info(
  583. f"[{self.workspace}] Created indexes for collection: {self.namespace}"
  584. )
  585. except Exception as e:
  586. # Scalar index failures are logged as warnings (not critical)
  587. logger.warning(
  588. f"[{self.workspace}] Failed to create some scalar indexes for {self.namespace}: {e}"
  589. )
  590. def _get_required_fields_for_namespace(self) -> dict:
  591. """Get required core field definitions for current namespace"""
  592. # Base fields (common to all types)
  593. base_fields = {
  594. "id": {"type": "VarChar", "is_primary": True},
  595. "vector": {"type": "FloatVector"},
  596. "created_at": {"type": "Int64"},
  597. }
  598. # Add specific fields based on namespace
  599. if self.namespace.endswith("entities"):
  600. specific_fields = {
  601. "entity_name": {"type": "VarChar"},
  602. "file_path": {"type": "VarChar"},
  603. }
  604. elif self.namespace.endswith("relationships"):
  605. specific_fields = {
  606. "src_id": {"type": "VarChar"},
  607. "tgt_id": {"type": "VarChar"},
  608. "file_path": {"type": "VarChar"},
  609. }
  610. elif self.namespace.endswith("chunks"):
  611. specific_fields = {
  612. "full_doc_id": {"type": "VarChar"},
  613. "file_path": {"type": "VarChar"},
  614. }
  615. else:
  616. specific_fields = {
  617. "file_path": {"type": "VarChar"},
  618. }
  619. return {**base_fields, **specific_fields}
  620. def _is_field_compatible(self, existing_field: dict, expected_config: dict) -> bool:
  621. """Check compatibility of a single field"""
  622. field_name = existing_field.get("name", "unknown")
  623. existing_type = existing_field.get("type")
  624. expected_type = expected_config.get("type")
  625. logger.debug(
  626. f"[{self.workspace}] Checking field '{field_name}': existing_type={existing_type} (type={type(existing_type)}), expected_type={expected_type}"
  627. )
  628. # Convert DataType enum values to string names if needed
  629. original_existing_type = existing_type
  630. if hasattr(existing_type, "name"):
  631. existing_type = existing_type.name
  632. logger.debug(
  633. f"[{self.workspace}] Converted enum to name: {original_existing_type} -> {existing_type}"
  634. )
  635. elif isinstance(existing_type, int):
  636. # Map common Milvus internal type codes to type names for backward compatibility
  637. type_mapping = {
  638. 21: "VarChar",
  639. 101: "FloatVector",
  640. 5: "Int64",
  641. 9: "Double",
  642. }
  643. mapped_type = type_mapping.get(existing_type, str(existing_type))
  644. logger.debug(
  645. f"[{self.workspace}] Mapped numeric type: {existing_type} -> {mapped_type}"
  646. )
  647. existing_type = mapped_type
  648. # Normalize type names for comparison
  649. type_aliases = {
  650. "VARCHAR": "VarChar",
  651. "String": "VarChar",
  652. "FLOAT_VECTOR": "FloatVector",
  653. "INT64": "Int64",
  654. "BigInt": "Int64",
  655. "DOUBLE": "Double",
  656. "Float": "Double",
  657. }
  658. original_existing = existing_type
  659. original_expected = expected_type
  660. existing_type = type_aliases.get(existing_type, existing_type)
  661. expected_type = type_aliases.get(expected_type, expected_type)
  662. if original_existing != existing_type or original_expected != expected_type:
  663. logger.debug(
  664. f"[{self.workspace}] Applied aliases: {original_existing} -> {existing_type}, {original_expected} -> {expected_type}"
  665. )
  666. # Basic type compatibility check
  667. type_compatible = existing_type == expected_type
  668. logger.debug(
  669. f"[{self.workspace}] Type compatibility for '{field_name}': {existing_type} == {expected_type} -> {type_compatible}"
  670. )
  671. if not type_compatible:
  672. logger.warning(
  673. f"[{self.workspace}] Type mismatch for field '{field_name}': expected {expected_type}, got {existing_type}"
  674. )
  675. return False
  676. # Primary key check - be more flexible about primary key detection
  677. if expected_config.get("is_primary"):
  678. # Check multiple possible field names for primary key status
  679. is_primary = (
  680. existing_field.get("is_primary_key", False)
  681. or existing_field.get("is_primary", False)
  682. or existing_field.get("primary_key", False)
  683. )
  684. logger.debug(
  685. f"[{self.workspace}] Primary key check for '{field_name}': expected=True, actual={is_primary}"
  686. )
  687. logger.debug(
  688. f"[{self.workspace}] Raw field data for '{field_name}': {existing_field}"
  689. )
  690. # For ID field, be more lenient - if it's the ID field, assume it should be primary
  691. if field_name == "id" and not is_primary:
  692. logger.info(
  693. f"[{self.workspace}] ID field '{field_name}' not marked as primary in existing collection, but treating as compatible"
  694. )
  695. # Don't fail for ID field primary key mismatch
  696. elif not is_primary:
  697. logger.warning(
  698. f"[{self.workspace}] Primary key mismatch for field '{field_name}': expected primary key, but field is not primary"
  699. )
  700. return False
  701. logger.debug(f"[{self.workspace}] Field '{field_name}' is compatible")
  702. return True
  703. def _check_vector_dimension(self, collection_info: dict):
  704. """Check vector dimension compatibility"""
  705. current_dimension = self.embedding_func.embedding_dim
  706. # Find vector field dimension
  707. for field in collection_info.get("fields", []):
  708. if field.get("name") == "vector":
  709. field_type = field.get("type")
  710. # Extract type name from DataType enum or string
  711. type_name = None
  712. if hasattr(field_type, "name"):
  713. type_name = field_type.name
  714. elif isinstance(field_type, str):
  715. type_name = field_type
  716. else:
  717. type_name = str(field_type)
  718. # Check if it's a vector type (supports multiple formats)
  719. if type_name in ["FloatVector", "FLOAT_VECTOR"]:
  720. existing_dimension = field.get("params", {}).get("dim")
  721. # Convert both to int for comparison to handle type mismatches
  722. # (Milvus API may return string "1024" vs int 1024)
  723. try:
  724. existing_dim_int = (
  725. int(existing_dimension)
  726. if existing_dimension is not None
  727. else None
  728. )
  729. current_dim_int = (
  730. int(current_dimension)
  731. if current_dimension is not None
  732. else None
  733. )
  734. except (TypeError, ValueError) as e:
  735. logger.error(
  736. f"[{self.workspace}] Failed to parse dimensions: existing={existing_dimension} (type={type(existing_dimension)}), "
  737. f"current={current_dimension} (type={type(current_dimension)}), error={e}"
  738. )
  739. raise ValueError(
  740. f"Invalid dimension values for collection '{self.final_namespace}': "
  741. f"existing={existing_dimension}, current={current_dimension}"
  742. ) from e
  743. if existing_dim_int != current_dim_int:
  744. raise ValueError(
  745. f"Vector dimension mismatch for collection '{self.final_namespace}': "
  746. f"existing={existing_dim_int}, current={current_dim_int}"
  747. )
  748. logger.debug(
  749. f"[{self.workspace}] Vector dimension check passed: {current_dim_int}"
  750. )
  751. return
  752. # If no vector field found, this might be an old collection created with simple schema
  753. logger.warning(
  754. f"[{self.workspace}] Vector field not found in collection '{self.namespace}'. This might be an old collection created with simple schema."
  755. )
  756. logger.warning(
  757. f"[{self.workspace}] Consider recreating the collection for optimal performance."
  758. )
  759. return
  760. def _check_file_path_length_restriction(self, collection_info: dict) -> bool:
  761. """Check if collection has file_path length restrictions that need migration
  762. Returns:
  763. bool: True if migration is needed, False otherwise
  764. """
  765. existing_fields = {
  766. field["name"]: field for field in collection_info.get("fields", [])
  767. }
  768. # Check if file_path field exists and has length restrictions
  769. if "file_path" in existing_fields:
  770. file_path_field = existing_fields["file_path"]
  771. # Get max_length from field params
  772. max_length = file_path_field.get("params", {}).get("max_length")
  773. if max_length and max_length < DEFAULT_MAX_FILE_PATH_LENGTH:
  774. logger.info(
  775. f"[{self.workspace}] Collection {self.namespace} has file_path max_length={max_length}, "
  776. f"needs migration to {DEFAULT_MAX_FILE_PATH_LENGTH}"
  777. )
  778. return True
  779. return False
  780. def _check_schema_compatibility(self, collection_info: dict):
  781. """Check schema field compatibility and detect migration needs"""
  782. existing_fields = {
  783. field["name"]: field for field in collection_info.get("fields", [])
  784. }
  785. # Check if this is an old collection created with simple schema
  786. has_vector_field = any(
  787. field.get("name") == "vector" for field in collection_info.get("fields", [])
  788. )
  789. if not has_vector_field:
  790. logger.warning(
  791. f"[{self.workspace}] Collection {self.namespace} appears to be created with old simple schema (no vector field)"
  792. )
  793. logger.warning(
  794. f"[{self.workspace}] This collection will work but may have suboptimal performance"
  795. )
  796. logger.warning(
  797. f"[{self.workspace}] Consider recreating the collection for optimal performance"
  798. )
  799. return
  800. # Check if migration is needed for file_path length restrictions
  801. if self._check_file_path_length_restriction(collection_info):
  802. logger.info(
  803. f"[{self.workspace}] Starting automatic migration for collection {self.namespace}"
  804. )
  805. self._migrate_collection_schema()
  806. return
  807. # For collections with vector field, check basic compatibility
  808. # Only check for critical incompatibilities, not missing optional fields
  809. critical_fields = {"id": {"type": "VarChar", "is_primary": True}}
  810. incompatible_fields = []
  811. for field_name, expected_config in critical_fields.items():
  812. if field_name in existing_fields:
  813. existing_field = existing_fields[field_name]
  814. if not self._is_field_compatible(existing_field, expected_config):
  815. incompatible_fields.append(
  816. f"{field_name}: expected {expected_config['type']}, "
  817. f"got {existing_field.get('type')}"
  818. )
  819. if incompatible_fields:
  820. raise ValueError(
  821. f"Critical schema incompatibility in collection '{self.final_namespace}': {incompatible_fields}"
  822. )
  823. # Get all expected fields for informational purposes
  824. expected_fields = self._get_required_fields_for_namespace()
  825. missing_fields = [
  826. field for field in expected_fields if field not in existing_fields
  827. ]
  828. if missing_fields:
  829. logger.info(
  830. f"[{self.workspace}] Collection {self.namespace} missing optional fields: {missing_fields}"
  831. )
  832. logger.info(
  833. "These fields would be available in a newly created collection for better performance"
  834. )
  835. logger.debug(
  836. f"[{self.workspace}] Schema compatibility check passed for {self.namespace}"
  837. )
  838. def _migrate_collection_schema(self):
  839. """Migrate collection schema using query_iterator - completely solves query window limitations"""
  840. original_collection_name = self.final_namespace
  841. temp_collection_name = f"{self.final_namespace}_temp"
  842. iterator = None
  843. try:
  844. logger.info(
  845. f"[{self.workspace}] Starting iterator-based schema migration for {self.namespace}"
  846. )
  847. # Step 1: Create temporary collection with new schema
  848. logger.info(
  849. f"[{self.workspace}] Step 1: Creating temporary collection: {temp_collection_name}"
  850. )
  851. # Temporarily update final_namespace for index creation
  852. self.final_namespace = temp_collection_name
  853. new_schema = self._create_schema_for_namespace()
  854. self._client.create_collection(
  855. collection_name=temp_collection_name, schema=new_schema
  856. )
  857. try:
  858. self._create_indexes_after_collection()
  859. except Exception as index_error:
  860. logger.warning(
  861. f"[{self.workspace}] Failed to create indexes for new collection: {index_error}"
  862. )
  863. # Continue with migration even if index creation fails
  864. # Load the new collection
  865. self._client.load_collection(temp_collection_name)
  866. # Step 2: Copy data using query_iterator (solves query window limitation)
  867. logger.info(
  868. f"[{self.workspace}] Step 2: Copying data using query_iterator from: {original_collection_name}"
  869. )
  870. # Create query iterator
  871. try:
  872. iterator = self._client.query_iterator(
  873. collection_name=original_collection_name,
  874. batch_size=2000, # Adjustable batch size for optimal performance
  875. output_fields=["*"], # Get all fields
  876. )
  877. logger.debug(f"[{self.workspace}] Query iterator created successfully")
  878. except Exception as iterator_error:
  879. logger.error(
  880. f"[{self.workspace}] Failed to create query iterator: {iterator_error}"
  881. )
  882. raise
  883. # Iterate through all data
  884. total_migrated = 0
  885. batch_number = 1
  886. while True:
  887. try:
  888. batch_data = iterator.next()
  889. if not batch_data:
  890. # No more data available
  891. break
  892. # Insert batch data to new collection
  893. try:
  894. self._client.insert(
  895. collection_name=temp_collection_name, data=batch_data
  896. )
  897. total_migrated += len(batch_data)
  898. logger.info(
  899. f"[{self.workspace}] Iterator batch {batch_number}: "
  900. f"processed {len(batch_data)} records, total migrated: {total_migrated}"
  901. )
  902. batch_number += 1
  903. except Exception as batch_error:
  904. logger.error(
  905. f"[{self.workspace}] Failed to insert iterator batch {batch_number}: {batch_error}"
  906. )
  907. raise
  908. except Exception as next_error:
  909. logger.error(
  910. f"[{self.workspace}] Iterator next() failed at batch {batch_number}: {next_error}"
  911. )
  912. raise
  913. if total_migrated > 0:
  914. logger.info(
  915. f"[{self.workspace}] Successfully migrated {total_migrated} records using iterator"
  916. )
  917. else:
  918. logger.info(
  919. f"[{self.workspace}] No data found in original collection, migration completed"
  920. )
  921. # Step 3: Rename origin collection (keep for safety)
  922. logger.info(
  923. f"[{self.workspace}] Step 3: Rename origin collection to {original_collection_name}_old"
  924. )
  925. try:
  926. self._client.rename_collection(
  927. original_collection_name, f"{original_collection_name}_old"
  928. )
  929. except Exception as rename_error:
  930. try:
  931. logger.warning(
  932. f"[{self.workspace}] Try to drop origin collection instead"
  933. )
  934. self._client.drop_collection(original_collection_name)
  935. except Exception as e:
  936. logger.error(
  937. f"[{self.workspace}] Rename operation failed: {rename_error}"
  938. )
  939. raise e
  940. # Step 4: Rename temporary collection to original name
  941. logger.info(
  942. f"[{self.workspace}] Step 4: Renaming collection {temp_collection_name} -> {original_collection_name}"
  943. )
  944. try:
  945. self._client.rename_collection(
  946. temp_collection_name, original_collection_name
  947. )
  948. logger.info(f"[{self.workspace}] Rename operation completed")
  949. except Exception as rename_error:
  950. logger.error(
  951. f"[{self.workspace}] Rename operation failed: {rename_error}"
  952. )
  953. raise RuntimeError(
  954. f"Failed to rename collection: {rename_error}"
  955. ) from rename_error
  956. # Restore final_namespace
  957. self.final_namespace = original_collection_name
  958. except Exception as e:
  959. logger.error(
  960. f"[{self.workspace}] Iterator-based migration failed for {self.namespace}: {e}"
  961. )
  962. # Attempt cleanup of temporary collection if it exists
  963. try:
  964. if self._client and self._client.has_collection(temp_collection_name):
  965. logger.info(
  966. f"[{self.workspace}] Cleaning up failed migration temporary collection"
  967. )
  968. self._client.drop_collection(temp_collection_name)
  969. except Exception as cleanup_error:
  970. logger.warning(
  971. f"[{self.workspace}] Failed to cleanup temporary collection: {cleanup_error}"
  972. )
  973. # Re-raise the original error
  974. raise RuntimeError(
  975. f"Iterator-based migration failed for collection {self.namespace}: {e}"
  976. ) from e
  977. finally:
  978. # Ensure iterator is properly closed
  979. if iterator:
  980. try:
  981. iterator.close()
  982. logger.debug(
  983. f"[{self.workspace}] Query iterator closed successfully"
  984. )
  985. except Exception as close_error:
  986. logger.warning(
  987. f"[{self.workspace}] Failed to close query iterator: {close_error}"
  988. )
  989. def _validate_collection_compatibility(self):
  990. """Validate existing collection's dimension and schema compatibility"""
  991. try:
  992. collection_info = self._client.describe_collection(self.final_namespace)
  993. # 1. Check vector dimension
  994. self._check_vector_dimension(collection_info)
  995. # 2. Check schema compatibility
  996. self._check_schema_compatibility(collection_info)
  997. logger.info(
  998. f"[{self.workspace}] VectorDB Collection '{self.namespace}' compatibility validation passed"
  999. )
  1000. except Exception as e:
  1001. logger.error(
  1002. f"[{self.workspace}] Collection compatibility validation failed for {self.namespace}: {e}"
  1003. )
  1004. raise
  1005. @staticmethod
  1006. def _is_missing_vector_index_error(error: Exception) -> bool:
  1007. """Return True when the error indicates the collection lacks a vector index."""
  1008. error_message = str(error).lower()
  1009. return (
  1010. "no vector index" in error_message
  1011. or "please create index firstly" in error_message
  1012. )
  1013. def _repair_missing_vector_index(self):
  1014. """Create indexes for an existing collection that is missing its vector index."""
  1015. logger.warning(
  1016. f"[{self.workspace}] Collection '{self.namespace}' is missing a vector index, attempting repair"
  1017. )
  1018. self._create_indexes_after_collection()
  1019. def _ensure_collection_loaded(self):
  1020. """Ensure the collection is loaded into memory for search operations"""
  1021. try:
  1022. # Check if collection exists first
  1023. if not self._client.has_collection(self.final_namespace):
  1024. logger.error(
  1025. f"[{self.workspace}] Collection {self.namespace} does not exist"
  1026. )
  1027. raise ValueError(f"Collection {self.final_namespace} does not exist")
  1028. # Load the collection if it's not already loaded
  1029. # In Milvus, collections need to be loaded before they can be searched
  1030. self._client.load_collection(self.final_namespace)
  1031. # logger.debug(f"[{self.workspace}] Collection {self.namespace} loaded successfully")
  1032. except Exception as e:
  1033. logger.error(
  1034. f"[{self.workspace}] Failed to load collection {self.namespace}: {e}"
  1035. )
  1036. raise
  1037. def _create_collection_if_not_exist(self):
  1038. """Create collection if not exists and check existing collection compatibility"""
  1039. try:
  1040. # Check if our specific collection exists
  1041. collection_exists = self._client.has_collection(self.final_namespace)
  1042. logger.info(
  1043. f"[{self.workspace}] VectorDB collection '{self.namespace}' exists check: {collection_exists}"
  1044. )
  1045. if collection_exists:
  1046. # Double-check by trying to describe the collection
  1047. try:
  1048. self._client.describe_collection(self.final_namespace)
  1049. self._validate_collection_compatibility()
  1050. try:
  1051. # Ensure the collection is loaded after validation
  1052. self._ensure_collection_loaded()
  1053. return
  1054. except Exception as load_error:
  1055. if not self._is_missing_vector_index_error(load_error):
  1056. raise
  1057. try:
  1058. self._repair_missing_vector_index()
  1059. self._ensure_collection_loaded()
  1060. logger.info(
  1061. f"[{self.workspace}] Repaired missing vector index for existing collection '{self.namespace}'"
  1062. )
  1063. return
  1064. except Exception as repair_error:
  1065. raise RuntimeError(
  1066. f"Index repair failed for collection '{self.final_namespace}'. "
  1067. f"Original error: {repair_error}"
  1068. ) from repair_error
  1069. except Exception as validation_error:
  1070. # CRITICAL: Collection exists but validation failed
  1071. # This indicates potential data migration failure or incompatible schema
  1072. # Stop execution to prevent data loss and require manual intervention
  1073. logger.error(
  1074. f"[{self.workspace}] CRITICAL ERROR: Collection '{self.namespace}' exists but validation failed!"
  1075. )
  1076. logger.error(
  1077. f"[{self.workspace}] This indicates potential data migration failure or schema incompatibility."
  1078. )
  1079. logger.error(
  1080. f"[{self.workspace}] Validation error: {validation_error}"
  1081. )
  1082. logger.error(f"[{self.workspace}] MANUAL INTERVENTION REQUIRED:")
  1083. logger.error(
  1084. f"[{self.workspace}] 1. Check the existing collection schema and data integrity"
  1085. )
  1086. logger.error(
  1087. f"[{self.workspace}] 2. Backup existing data if needed"
  1088. )
  1089. logger.error(
  1090. f"[{self.workspace}] 3. Manually resolve schema compatibility issues"
  1091. )
  1092. logger.error(
  1093. f"[{self.workspace}] 4. Consider dropping and recreating the collection if data is not critical"
  1094. )
  1095. logger.error(
  1096. f"[{self.workspace}] Program execution stopped to prevent potential data loss."
  1097. )
  1098. # Raise a specific exception to stop execution
  1099. raise RuntimeError(
  1100. f"Collection validation failed for '{self.final_namespace}'. "
  1101. f"Data migration failure detected. Manual intervention required to prevent data loss. "
  1102. f"Original error: {validation_error}"
  1103. )
  1104. # Collection doesn't exist, create new collection
  1105. logger.info(f"[{self.workspace}] Creating new collection: {self.namespace}")
  1106. schema = self._create_schema_for_namespace()
  1107. # Create collection with schema only first
  1108. self._client.create_collection(
  1109. collection_name=self.final_namespace, schema=schema
  1110. )
  1111. # Then create indexes
  1112. self._create_indexes_after_collection()
  1113. # Load the newly created collection
  1114. self._ensure_collection_loaded()
  1115. logger.info(
  1116. f"[{self.workspace}] Successfully created Milvus collection: {self.namespace}"
  1117. )
  1118. except RuntimeError:
  1119. # Re-raise RuntimeError (validation failures) without modification
  1120. # These are critical errors that should stop execution
  1121. raise
  1122. except Exception as e:
  1123. logger.error(
  1124. f"[{self.workspace}] Error in _create_collection_if_not_exist for {self.namespace}: {e}"
  1125. )
  1126. # If there's any error (other than validation failure), try to force create the collection
  1127. logger.info(
  1128. f"[{self.workspace}] Attempting to force create collection {self.namespace}..."
  1129. )
  1130. try:
  1131. # Try to drop the collection first if it exists in a bad state
  1132. try:
  1133. if self._client.has_collection(self.final_namespace):
  1134. logger.info(
  1135. f"[{self.workspace}] Dropping potentially corrupted collection {self.namespace}"
  1136. )
  1137. self._client.drop_collection(self.final_namespace)
  1138. except Exception as drop_error:
  1139. logger.warning(
  1140. f"[{self.workspace}] Could not drop collection {self.namespace}: {drop_error}"
  1141. )
  1142. # Create fresh collection
  1143. schema = self._create_schema_for_namespace()
  1144. self._client.create_collection(
  1145. collection_name=self.final_namespace, schema=schema
  1146. )
  1147. self._create_indexes_after_collection()
  1148. # Load the newly created collection
  1149. self._ensure_collection_loaded()
  1150. logger.info(
  1151. f"[{self.workspace}] Successfully force-created collection {self.namespace}"
  1152. )
  1153. except Exception as create_error:
  1154. logger.error(
  1155. f"[{self.workspace}] Failed to force-create collection {self.namespace}: {create_error}"
  1156. )
  1157. raise
  1158. def __post_init__(self):
  1159. self._validate_embedding_func()
  1160. # Extract MilvusIndexConfig parameters from vector_db_storage_cls_kwargs
  1161. #
  1162. # IMPORTANT: This approach allows Milvus index configuration via vector_db_storage_cls_kwargs,
  1163. # which is the RECOMMENDED method for framework integration (e.g., RAGAnything).
  1164. #
  1165. # All 11 index configuration parameters can be passed through vector_db_storage_cls_kwargs:
  1166. # - index_type, metric_type
  1167. # - hnsw_m, hnsw_ef_construction, hnsw_ef
  1168. # - sq_type, sq_refine, sq_refine_type, sq_refine_k
  1169. # - ivf_nlist, ivf_nprobe
  1170. #
  1171. # Example:
  1172. # LightRAG(
  1173. # vector_storage="MilvusVectorDBStorage",
  1174. # vector_db_storage_cls_kwargs={
  1175. # "cosine_better_than_threshold": 0.2,
  1176. # "index_type": "HNSW",
  1177. # "metric_type": "COSINE",
  1178. # "hnsw_m": 32,
  1179. # "hnsw_ef_construction": 256,
  1180. # }
  1181. # )
  1182. #
  1183. # Use MilvusIndexConfig.get_config_field_names() to dynamically extract valid parameters.
  1184. # This ensures we always stay in sync with the MilvusIndexConfig dataclass definition.
  1185. kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {})
  1186. index_config_keys = MilvusIndexConfig.get_config_field_names()
  1187. index_config_params = {
  1188. k: v for k, v in kwargs.items() if k in index_config_keys
  1189. }
  1190. # Initialize index configuration (if not already set)
  1191. # Configuration priority: init params from kwargs > environment variables > defaults
  1192. if not hasattr(self, "index_config") or self.index_config is None:
  1193. self.index_config = MilvusIndexConfig(**index_config_params)
  1194. # Check for MILVUS_WORKSPACE environment variable first (higher priority)
  1195. # This allows administrators to force a specific workspace for all Milvus storage instances
  1196. milvus_workspace = os.environ.get("MILVUS_WORKSPACE")
  1197. if milvus_workspace and milvus_workspace.strip():
  1198. # Use environment variable value, overriding the passed workspace parameter
  1199. effective_workspace = milvus_workspace.strip()
  1200. logger.info(
  1201. f"Using MILVUS_WORKSPACE environment variable: '{effective_workspace}' (overriding '{self.workspace}/{self.namespace}')"
  1202. )
  1203. else:
  1204. # Use the workspace parameter passed during initialization
  1205. effective_workspace = self.workspace
  1206. if effective_workspace:
  1207. logger.debug(
  1208. f"Using passed workspace parameter: '{effective_workspace}'"
  1209. )
  1210. # Build final_namespace with workspace prefix for data isolation
  1211. # Keep original namespace unchanged for type detection logic
  1212. if effective_workspace:
  1213. self.final_namespace = f"{effective_workspace}_{self.namespace}"
  1214. logger.debug(
  1215. f"Final namespace with workspace prefix: '{self.final_namespace}'"
  1216. )
  1217. else:
  1218. # When workspace is empty, final_namespace equals original namespace
  1219. self.final_namespace = self.namespace
  1220. self.workspace = ""
  1221. logger.debug(f"Final namespace (no workspace): '{self.final_namespace}'")
  1222. cosine_threshold = kwargs.get("cosine_better_than_threshold")
  1223. if cosine_threshold is None:
  1224. raise ValueError(
  1225. "cosine_better_than_threshold must be specified in vector_db_storage_cls_kwargs"
  1226. )
  1227. self.cosine_better_than_threshold = cosine_threshold
  1228. # Ensure created_at is in meta_fields
  1229. if "created_at" not in self.meta_fields:
  1230. self.meta_fields.add("created_at")
  1231. # Initialize client as None - will be created in initialize() method
  1232. self._client = None
  1233. self._max_batch_size = self.global_config["embedding_batch_num"]
  1234. self._initialized = False
  1235. # Deferred-embedding buffers and the per-namespace flush lock.
  1236. # The lock keys on final_namespace so two instances pointing at the
  1237. # same Milvus collection (e.g. when MILVUS_WORKSPACE env override is
  1238. # used) share a single writer lock. We construct it here in
  1239. # __post_init__ — not in initialize() — so any code path that
  1240. # touches the buffer before initialize() still has a valid lock.
  1241. self._pending_vector_docs: dict[str, _PendingVectorDoc] = {}
  1242. self._pending_vector_deletes: set[str] = set()
  1243. self._flush_lock = get_namespace_lock(
  1244. namespace=self.final_namespace, workspace=""
  1245. )
  1246. async def initialize(self):
  1247. """Initialize Milvus collection"""
  1248. async with get_data_init_lock():
  1249. if self._initialized:
  1250. return
  1251. try:
  1252. # Create MilvusClient if not already created
  1253. if self._client is None:
  1254. self._client = self._create_milvus_client()
  1255. logger.debug(
  1256. f"[{self.workspace}] MilvusClient created successfully"
  1257. )
  1258. # Validate Milvus version compatibility with configured index
  1259. if self.index_config.index_type in INDEX_VERSION_REQUIREMENTS:
  1260. try:
  1261. server_version = self._client.get_server_version()
  1262. self.index_config.validate_milvus_version(server_version)
  1263. except Exception as version_error:
  1264. logger.error(
  1265. f"[{self.workspace}] Milvus version validation failed: {version_error}"
  1266. )
  1267. raise
  1268. # Create collection and check compatibility
  1269. self._create_collection_if_not_exist()
  1270. self._initialized = True
  1271. logger.info(
  1272. f"[{self.workspace}] Milvus collection '{self.namespace}' initialized successfully"
  1273. )
  1274. except Exception as e:
  1275. logger.error(
  1276. f"[{self.workspace}] Failed to initialize Milvus collection '{self.namespace}': {e}"
  1277. )
  1278. raise
  1279. async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
  1280. """Buffer vector docs for embedding and batched flush.
  1281. Embedding deliberately does NOT happen here: repeated upserts of the
  1282. same id, or many small batches, collapse into a single flush-time
  1283. embedding pass. Reads (`get_by_id`/`get_by_ids`/`get_vectors_by_ids`)
  1284. observe pending docs via the same lock for read-your-writes.
  1285. """
  1286. if not data:
  1287. return
  1288. import time
  1289. current_time = int(time.time())
  1290. pending_docs: list[tuple[str, _PendingVectorDoc]] = []
  1291. for i, (k, v) in enumerate(data.items(), start=1):
  1292. source = {
  1293. "id": k,
  1294. "created_at": current_time,
  1295. **{k1: v1 for k1, v1 in v.items() if k1 in self.meta_fields},
  1296. }
  1297. pending_docs.append(
  1298. (
  1299. k,
  1300. _PendingVectorDoc(source=source, content=v["content"]),
  1301. )
  1302. )
  1303. await _cooperative_yield(i)
  1304. # An upsert overrides any pending delete on the same id; installing
  1305. # a fresh _PendingVectorDoc instance invalidates any vector cached
  1306. # by a prior get_vectors_by_ids() call on a stale revision.
  1307. async with self._flush_lock:
  1308. for doc_id, pdoc in pending_docs:
  1309. self._pending_vector_deletes.discard(doc_id)
  1310. self._pending_vector_docs[doc_id] = pdoc
  1311. async def query(
  1312. self, query: str, top_k: int, query_embedding: list[float] = None
  1313. ) -> list[dict[str, Any]]:
  1314. """Similarity search against the persisted Milvus collection.
  1315. Note: buffered-but-unflushed upserts are NOT visible to this method —
  1316. they exist only in `_pending_vector_docs` until `index_done_callback()`
  1317. embeds and writes them. Callers that need read-after-write visibility
  1318. for similarity search must run an explicit flush first.
  1319. """
  1320. # Ensure collection is loaded before querying
  1321. self._ensure_collection_loaded()
  1322. # Use provided embedding or compute it
  1323. if query_embedding is not None:
  1324. embedding = [query_embedding] # Milvus expects a list of embeddings
  1325. else:
  1326. embedding = await self.embedding_func(
  1327. [query], context="query", _priority=5
  1328. ) # higher priority for query
  1329. # Include all meta_fields (created_at is now always included)
  1330. output_fields = list(self.meta_fields)
  1331. # Build search params from index config
  1332. search_params_base = self.index_config.build_search_params()
  1333. # Merge with metric type and radius threshold
  1334. search_params = {
  1335. "metric_type": self.index_config.metric_type,
  1336. "params": {
  1337. **search_params_base.get("params", {}),
  1338. "radius": self.cosine_better_than_threshold,
  1339. },
  1340. }
  1341. results = self._client.search(
  1342. collection_name=self.final_namespace,
  1343. data=embedding,
  1344. limit=top_k,
  1345. output_fields=output_fields,
  1346. search_params=search_params,
  1347. )
  1348. return [
  1349. {
  1350. **dp["entity"],
  1351. "id": dp["id"],
  1352. "distance": dp["distance"],
  1353. "created_at": dp.get("created_at"),
  1354. }
  1355. for dp in results[0]
  1356. ]
  1357. async def index_done_callback(self) -> None:
  1358. """Flush all buffered vector ops to Milvus before returning.
  1359. Contract: on a successful return, every previously buffered upsert
  1360. has been embedded and committed to the collection, and every buffered
  1361. delete has been issued — i.e. all pending vectors are durable in
  1362. Milvus (which persists automatically once written). On any embed-
  1363. or server-side failure this method raises and leaves both buffers
  1364. intact for the next callback to retry; the caller MUST NOT assume
  1365. clean persistence in that case.
  1366. """
  1367. await self._flush_pending_vector_ops()
  1368. async def _flush_pending_vector_ops(self) -> None:
  1369. """Flush buffered vector upserts and deletes to Milvus.
  1370. Embedding runs *inside* this lock (not in `upsert` or lock-free):
  1371. it makes deferred embedding and bulk indexing atomic against
  1372. concurrent upserts and destructive mutations. Any failure (embed
  1373. or server write) raises and leaves both buffers intact; the next
  1374. `index_done_callback` retries automatically.
  1375. """
  1376. async with self._flush_lock:
  1377. if not self._pending_vector_docs and not self._pending_vector_deletes:
  1378. return
  1379. if self._client is None:
  1380. return
  1381. # Milvus requires the collection to be loaded before upsert/delete.
  1382. self._ensure_collection_loaded()
  1383. pending_docs = self._pending_vector_docs
  1384. pending_deletes = self._pending_vector_deletes
  1385. docs_to_embed: list[tuple[str, _PendingVectorDoc]] = [
  1386. (doc_id, pdoc)
  1387. for doc_id, pdoc in pending_docs.items()
  1388. if pdoc.vector is None
  1389. ]
  1390. if docs_to_embed:
  1391. contents = [pdoc.content for _, pdoc in docs_to_embed]
  1392. batches = [
  1393. contents[i : i + self._max_batch_size]
  1394. for i in range(0, len(contents), self._max_batch_size)
  1395. ]
  1396. logger.info(
  1397. f"[{self.workspace}] {self.namespace} flush: embedding "
  1398. f"{len(docs_to_embed)} vectors in {len(batches)} batch(es) "
  1399. f"(batch_num={self._max_batch_size})"
  1400. )
  1401. try:
  1402. embeddings_list = await asyncio.gather(
  1403. *[
  1404. self.embedding_func(batch, context="document")
  1405. for batch in batches
  1406. ]
  1407. )
  1408. except Exception as e:
  1409. logger.error(
  1410. f"[{self.workspace}] Error embedding pending vector ops "
  1411. f"(upserts={len(docs_to_embed)}): {e}"
  1412. )
  1413. raise
  1414. embeddings = np.concatenate(embeddings_list)
  1415. if len(embeddings) != len(docs_to_embed):
  1416. raise RuntimeError(
  1417. f"[{self.workspace}] Embedding count mismatch: expected "
  1418. f"{len(docs_to_embed)}, got {len(embeddings)}"
  1419. )
  1420. for i, ((_, pdoc), embedding) in enumerate(
  1421. zip(docs_to_embed, embeddings), start=1
  1422. ):
  1423. pdoc.vector = embedding.tolist()
  1424. await _cooperative_yield(i)
  1425. # Assemble final upsert payload. After the embed loop above every
  1426. # pending doc has a non-None vector (count-mismatch was checked),
  1427. # so we can iterate without re-guarding.
  1428. committed_ids: list[str] = list(pending_docs.keys())
  1429. list_data: list[dict[str, Any]] = [
  1430. {**pending_docs[doc_id].source, "vector": pending_docs[doc_id].vector}
  1431. for doc_id in committed_ids
  1432. ]
  1433. try:
  1434. if list_data:
  1435. self._client.upsert(
  1436. collection_name=self.final_namespace, data=list_data
  1437. )
  1438. if pending_deletes:
  1439. self._client.delete(
  1440. collection_name=self.final_namespace,
  1441. pks=list(pending_deletes),
  1442. )
  1443. except Exception as e:
  1444. logger.error(
  1445. f"[{self.workspace}] Error flushing vector ops "
  1446. f"(upserts={len(pending_docs)}, "
  1447. f"deletes={len(pending_deletes)}): {e}"
  1448. )
  1449. raise
  1450. # On success, clear the buffers in-place so external references
  1451. # (e.g. drop()) see the cleared state.
  1452. for doc_id in committed_ids:
  1453. pending_docs.pop(doc_id, None)
  1454. pending_deletes.clear()
  1455. async def delete_entity(self, entity_name: str) -> None:
  1456. """Buffer an entity vector delete by computing its hash ID."""
  1457. entity_id = compute_mdhash_id(entity_name, prefix="ent-")
  1458. async with self._flush_lock:
  1459. self._pending_vector_docs.pop(entity_id, None)
  1460. self._pending_vector_deletes.add(entity_id)
  1461. logger.debug(
  1462. f"[{self.workspace}] Buffered delete for entity {entity_name} (id={entity_id})"
  1463. )
  1464. async def delete_entity_relation(self, entity_name: str) -> None:
  1465. """Delete all relation vectors where entity appears as src or tgt.
  1466. The whole method runs under ``_flush_lock`` so the server-side query
  1467. + delete cannot interleave with an in-flight bulk upsert.
  1468. Server-side failures are re-raised (no log-and-swallow): the caller
  1469. decides whether to retry.
  1470. Buffer semantics — post-prune with caller short-circuit contract:
  1471. Matching pending upserts in ``_pending_vector_docs`` are
  1472. pruned **only after** the server-side query + delete
  1473. succeeds. On failure the pending buffer stays intact and
  1474. the exception propagates so the caller (``adelete_by_entity``
  1475. in ``utils_graph.py``) can short-circuit before
  1476. ``_persist_graph_updates`` flushes a half-cleaned buffer.
  1477. Semantic note (deferred-buffer ↔ persisted divergence): pruning only
  1478. consults the *current* buffered ``src_id`` / ``tgt_id`` view; we do
  1479. not re-read the persisted row a buffered upsert is about to
  1480. overwrite. So if a pending upsert is rewriting an already-persisted
  1481. ``rel-X-Y`` so that its new ``src_id`` / ``tgt_id`` matches
  1482. ``entity_name`` while the persisted row's do not (or vice versa),
  1483. the persisted row will not be deleted by the server-side filter and
  1484. the pending overwrite is dropped — i.e. the final state can diverge
  1485. from the eager-flush ordering (upsert → flush → delete). Callers
  1486. that require eager-equivalent semantics should call
  1487. ``index_done_callback()`` before ``delete_entity_relation``.
  1488. """
  1489. def _prune_pending() -> None:
  1490. for doc_id in [
  1491. k
  1492. for k, v in self._pending_vector_docs.items()
  1493. if v.source.get("src_id") == entity_name
  1494. or v.source.get("tgt_id") == entity_name
  1495. ]:
  1496. self._pending_vector_docs.pop(doc_id, None)
  1497. async with self._flush_lock:
  1498. if self._client is None:
  1499. # No server state to mutate; buffer prune is the only
  1500. # delete intent we can record.
  1501. _prune_pending()
  1502. return
  1503. self._ensure_collection_loaded()
  1504. expr = f'src_id == "{entity_name}" or tgt_id == "{entity_name}"'
  1505. results = self._client.query(
  1506. collection_name=self.final_namespace,
  1507. filter=expr,
  1508. output_fields=["id"],
  1509. )
  1510. if not results:
  1511. # No server rows to delete — still safe to prune any
  1512. # pending upserts so they can't re-create the relation.
  1513. _prune_pending()
  1514. logger.debug(
  1515. f"[{self.workspace}] No relations found for entity {entity_name}"
  1516. )
  1517. return
  1518. relation_ids = [item["id"] for item in results]
  1519. self._client.delete(collection_name=self.final_namespace, pks=relation_ids)
  1520. # Server-side delete succeeded — safe to prune the pending
  1521. # buffer so subsequent flushes don't re-upsert the deleted
  1522. # relations.
  1523. _prune_pending()
  1524. logger.debug(
  1525. f"[{self.workspace}] Deleted {len(relation_ids)} relations for {entity_name}"
  1526. )
  1527. async def delete(self, ids: list[str]) -> None:
  1528. """Buffer vector deletes for batched flush."""
  1529. if not ids:
  1530. return
  1531. if isinstance(ids, set):
  1532. ids = list(ids)
  1533. async with self._flush_lock:
  1534. for doc_id in ids:
  1535. self._pending_vector_docs.pop(doc_id, None)
  1536. self._pending_vector_deletes.add(doc_id)
  1537. logger.debug(
  1538. f"[{self.workspace}] Buffered delete for {len(ids)} vectors in {self.namespace}"
  1539. )
  1540. async def get_by_id(self, id: str) -> dict[str, Any] | None:
  1541. """Get vector data by its ID, with read-your-writes against the buffer."""
  1542. async with self._flush_lock:
  1543. if id in self._pending_vector_deletes:
  1544. return None
  1545. pending = self._pending_vector_docs.get(id)
  1546. if pending is not None:
  1547. doc = dict(pending.source)
  1548. doc["id"] = id
  1549. return doc
  1550. try:
  1551. # Ensure collection is loaded before querying
  1552. self._ensure_collection_loaded()
  1553. # Include all meta_fields (created_at is now always included) plus id
  1554. output_fields = list(self.meta_fields) + ["id"]
  1555. result = self._client.query(
  1556. collection_name=self.final_namespace,
  1557. filter=f'id == "{id}"',
  1558. output_fields=output_fields,
  1559. )
  1560. if not result or len(result) == 0:
  1561. return None
  1562. return result[0]
  1563. except Exception as e:
  1564. logger.error(
  1565. f"[{self.workspace}] Error retrieving vector data for ID {id}: {e}"
  1566. )
  1567. return None
  1568. async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
  1569. """Get multiple vector data by their IDs (read-your-writes), preserving order."""
  1570. if not ids:
  1571. return []
  1572. buffered: dict[str, dict[str, Any] | None] = {}
  1573. remaining: list[str] = []
  1574. async with self._flush_lock:
  1575. for doc_id in ids:
  1576. if doc_id in self._pending_vector_deletes:
  1577. buffered[doc_id] = None
  1578. continue
  1579. pending = self._pending_vector_docs.get(doc_id)
  1580. if pending is not None:
  1581. doc = dict(pending.source)
  1582. doc["id"] = doc_id
  1583. buffered[doc_id] = doc
  1584. continue
  1585. remaining.append(doc_id)
  1586. result_map: dict[str, dict[str, Any]] = {}
  1587. if remaining:
  1588. try:
  1589. # Ensure collection is loaded before querying
  1590. self._ensure_collection_loaded()
  1591. # Include all meta_fields (created_at is now always included) plus id
  1592. output_fields = list(self.meta_fields) + ["id"]
  1593. id_list = '", "'.join(remaining)
  1594. filter_expr = f'id in ["{id_list}"]'
  1595. result = self._client.query(
  1596. collection_name=self.final_namespace,
  1597. filter=filter_expr,
  1598. output_fields=output_fields,
  1599. )
  1600. if result:
  1601. for row in result:
  1602. if not row:
  1603. continue
  1604. row_id = row.get("id")
  1605. if row_id is not None:
  1606. result_map[str(row_id)] = row
  1607. except Exception as e:
  1608. logger.error(
  1609. f"[{self.workspace}] Error retrieving vector data for IDs {remaining}: {e}"
  1610. )
  1611. return []
  1612. return [
  1613. buffered[doc_id] if doc_id in buffered else result_map.get(str(doc_id))
  1614. for doc_id in ids
  1615. ]
  1616. async def get_vectors_by_ids(self, ids: list[str]) -> dict[str, list[float]]:
  1617. """Get vector embeddings for given IDs, with read-your-writes.
  1618. Pending docs with `vector is None` trigger a lazy embed inside the
  1619. lock; the resulting vector is cached on the buffered `_PendingVectorDoc`
  1620. so the next flush won't re-embed the same content.
  1621. """
  1622. if not ids:
  1623. return {}
  1624. result: dict[str, list[float]] = {}
  1625. remaining: list[str] = []
  1626. async with self._flush_lock:
  1627. docs_to_embed: list[tuple[str, _PendingVectorDoc]] = []
  1628. for doc_id in ids:
  1629. if doc_id in self._pending_vector_deletes:
  1630. continue
  1631. pending = self._pending_vector_docs.get(doc_id)
  1632. if pending is not None:
  1633. if pending.vector is None:
  1634. docs_to_embed.append((doc_id, pending))
  1635. else:
  1636. result[doc_id] = pending.vector
  1637. continue
  1638. remaining.append(doc_id)
  1639. if docs_to_embed:
  1640. contents = [pdoc.content for _, pdoc in docs_to_embed]
  1641. batches = [
  1642. contents[i : i + self._max_batch_size]
  1643. for i in range(0, len(contents), self._max_batch_size)
  1644. ]
  1645. try:
  1646. embeddings_list = await asyncio.gather(
  1647. *[
  1648. self.embedding_func(batch, context="document")
  1649. for batch in batches
  1650. ]
  1651. )
  1652. except Exception as e:
  1653. logger.error(
  1654. f"[{self.workspace}] Error lazily embedding pending vectors "
  1655. f"(upserts={len(docs_to_embed)}): {e}"
  1656. )
  1657. raise
  1658. embeddings = np.concatenate(embeddings_list)
  1659. if len(embeddings) != len(docs_to_embed):
  1660. raise RuntimeError(
  1661. f"[{self.workspace}] Embedding count mismatch: expected "
  1662. f"{len(docs_to_embed)}, got {len(embeddings)}"
  1663. )
  1664. for i, ((doc_id, pdoc), embedding) in enumerate(
  1665. zip(docs_to_embed, embeddings), start=1
  1666. ):
  1667. pdoc.vector = embedding.tolist()
  1668. result[doc_id] = pdoc.vector
  1669. await _cooperative_yield(i)
  1670. if not remaining:
  1671. return result
  1672. try:
  1673. self._ensure_collection_loaded()
  1674. id_list = '", "'.join(remaining)
  1675. filter_expr = f'id in ["{id_list}"]'
  1676. rows = self._client.query(
  1677. collection_name=self.final_namespace,
  1678. filter=filter_expr,
  1679. output_fields=["id", "vector"],
  1680. )
  1681. for item in rows or []:
  1682. if item and "vector" in item and "id" in item:
  1683. vector_data = item["vector"]
  1684. if isinstance(vector_data, np.ndarray):
  1685. vector_data = vector_data.tolist()
  1686. # Match get_by_ids: stringify the server-returned id so
  1687. # callers can index the dict by the original requested id.
  1688. result[str(item["id"])] = vector_data
  1689. return result
  1690. except Exception as e:
  1691. logger.error(
  1692. f"[{self.workspace}] Error retrieving vectors by IDs from {self.namespace}: {e}"
  1693. )
  1694. return result
  1695. async def finalize(self):
  1696. """Flush pending vector ops; surface unflushed data as RuntimeError.
  1697. Milvus has no client connection to release (the MilvusClient is
  1698. stateless from the storage layer's perspective), but we still need
  1699. to fail loudly when a transient bulk error left writes buffered —
  1700. the caller must not believe storage finalized cleanly.
  1701. """
  1702. flush_error: Exception | None = None
  1703. try:
  1704. await self._flush_pending_vector_ops()
  1705. except Exception as e:
  1706. flush_error = e
  1707. # Read the residual buffer sizes under the flush lock so the
  1708. # snapshot is consistent with any racing late-arriving mutator
  1709. # (cancellation paths can land an upsert/delete between the flush
  1710. # above and the post-mortem check below).
  1711. async with self._flush_lock:
  1712. pending_docs = len(self._pending_vector_docs)
  1713. pending_deletes = len(self._pending_vector_deletes)
  1714. if flush_error is not None:
  1715. raise RuntimeError(
  1716. f"[{self.workspace}] MilvusVectorDBStorage.finalize() flush raised; "
  1717. f"{pending_docs} pending upserts and {pending_deletes} pending "
  1718. f"deletes were left buffered (data lost)"
  1719. ) from flush_error
  1720. if pending_docs or pending_deletes:
  1721. raise RuntimeError(
  1722. f"[{self.workspace}] MilvusVectorDBStorage.finalize() left "
  1723. f"{pending_docs} pending upserts and {pending_deletes} pending "
  1724. f"deletes buffered after final flush attempt (these writes have been lost)"
  1725. )
  1726. async def drop(self) -> dict[str, str]:
  1727. """Drop all data from the Milvus collection. Destructive.
  1728. MUST only be called when ``pipeline_status`` is idle (see the
  1729. Pipeline concurrency contract in ``AGENTS.md``); the only
  1730. in-tree caller ``clear_documents`` enforces this.
  1731. Caveat — only this instance's buffers are cleared. Other
  1732. ``MilvusVectorDBStorage`` instances aliased onto the same
  1733. ``final_namespace`` (multi-worker processes, or distinct
  1734. workspaces collapsed by ``MILVUS_WORKSPACE``) keep their own
  1735. buffers; a sibling whose prior flush failed and left buffers
  1736. intact will, on its next flush, upsert those stale rows into
  1737. the freshly recreated collection. Direct callers bypassing the
  1738. idle precondition MUST flush every aliased instance first.
  1739. Returns:
  1740. dict[str, str]: ``{"status": "success"|"error", "message": str}``
  1741. """
  1742. try:
  1743. async with self._flush_lock:
  1744. # Discard any buffered writes before the collection is gone;
  1745. # a concurrent flush would otherwise resurrect them.
  1746. self._pending_vector_docs.clear()
  1747. self._pending_vector_deletes.clear()
  1748. # Drop the collection and recreate it
  1749. if self._client.has_collection(self.final_namespace):
  1750. self._client.drop_collection(self.final_namespace)
  1751. # Recreate the collection
  1752. self._create_collection_if_not_exist()
  1753. logger.info(
  1754. f"[{self.workspace}] Process {os.getpid()} drop Milvus collection {self.namespace}"
  1755. )
  1756. return {"status": "success", "message": "data dropped"}
  1757. except Exception as e:
  1758. logger.error(
  1759. f"[{self.workspace}] Error dropping Milvus collection {self.namespace}: {e}"
  1760. )
  1761. return {"status": "error", "message": str(e)}