utils_graph.py 72 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774
  1. from __future__ import annotations
  2. import time
  3. import asyncio
  4. from typing import Any, cast
  5. from .base import DeletionResult
  6. from .kg.shared_storage import get_storage_keyed_lock
  7. from .constants import GRAPH_FIELD_SEP
  8. from .utils import compute_mdhash_id, logger, make_relation_vdb_ids
  9. from .base import StorageNameSpace
  10. def _require_non_empty_description(
  11. description: Any, *, operation: str, object_type: str
  12. ) -> None:
  13. if description is None or not str(description).strip():
  14. raise ValueError(
  15. f"{object_type.capitalize()} description cannot be empty for {operation} operation"
  16. )
  17. async def _persist_graph_updates(
  18. entities_vdb=None,
  19. relationships_vdb=None,
  20. chunk_entity_relation_graph=None,
  21. entity_chunks_storage=None,
  22. relation_chunks_storage=None,
  23. ) -> None:
  24. """Unified callback to persist updates after graph operations.
  25. Ensures all relevant storage instances are properly persisted after
  26. operations like delete, edit, create, or merge.
  27. Args:
  28. entities_vdb: Entity vector database storage (optional)
  29. relationships_vdb: Relationship vector database storage (optional)
  30. chunk_entity_relation_graph: Graph storage instance (optional)
  31. entity_chunks_storage: Entity-chunk tracking storage (optional)
  32. relation_chunks_storage: Relation-chunk tracking storage (optional)
  33. """
  34. storages = []
  35. # Collect all non-None storage instances
  36. if entities_vdb is not None:
  37. storages.append(entities_vdb)
  38. if relationships_vdb is not None:
  39. storages.append(relationships_vdb)
  40. if chunk_entity_relation_graph is not None:
  41. storages.append(chunk_entity_relation_graph)
  42. if entity_chunks_storage is not None:
  43. storages.append(entity_chunks_storage)
  44. if relation_chunks_storage is not None:
  45. storages.append(relation_chunks_storage)
  46. # Persist all storage instances in parallel
  47. if storages:
  48. await asyncio.gather(
  49. *[
  50. cast(StorageNameSpace, storage_inst).index_done_callback()
  51. for storage_inst in storages # type: ignore
  52. ]
  53. )
  54. async def adelete_by_entity(
  55. chunk_entity_relation_graph,
  56. entities_vdb,
  57. relationships_vdb,
  58. entity_name: str,
  59. entity_chunks_storage=None,
  60. relation_chunks_storage=None,
  61. ) -> DeletionResult:
  62. """Asynchronously delete an entity and all its relationships.
  63. Also cleans up entity_chunks_storage and relation_chunks_storage to remove chunk tracking.
  64. Args:
  65. chunk_entity_relation_graph: Graph storage instance
  66. entities_vdb: Vector database storage for entities
  67. relationships_vdb: Vector database storage for relationships
  68. entity_name: Name of the entity to delete
  69. entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity
  70. relation_chunks_storage: Optional KV storage for tracking chunks that reference relations
  71. """
  72. # Use keyed lock for entity to ensure atomic graph and vector db operations.
  73. # The doc-ingest pipeline locks edges under sorted([src, tgt]) in this same
  74. # namespace, so [entity_name] already mutually excludes any concurrent edge
  75. # write that touches this entity — get_storage_keyed_lock acquires one
  76. # mutex per key, and identical key strings share the same mutex.
  77. workspace = entities_vdb.global_config.get("workspace", "")
  78. namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
  79. async with get_storage_keyed_lock(
  80. [entity_name], namespace=namespace, enable_logging=False
  81. ):
  82. try:
  83. # Check if the entity exists
  84. if not await chunk_entity_relation_graph.has_node(entity_name):
  85. logger.warning(f"Entity '{entity_name}' not found.")
  86. return DeletionResult(
  87. status="not_found",
  88. doc_id=entity_name,
  89. message=f"Entity '{entity_name}' not found.",
  90. status_code=404,
  91. )
  92. # Retrieve related relationships before deleting the node
  93. edges = await chunk_entity_relation_graph.get_node_edges(entity_name)
  94. related_relations_count = len(edges) if edges else 0
  95. # Clean up chunk tracking storages before deletion
  96. if entity_chunks_storage is not None:
  97. # Delete entity's entry from entity_chunks_storage
  98. await entity_chunks_storage.delete([entity_name])
  99. logger.info(
  100. f"Entity Delete: removed chunk tracking for `{entity_name}`"
  101. )
  102. if relation_chunks_storage is not None and edges:
  103. # Delete all related relationships from relation_chunks_storage
  104. from .utils import make_relation_chunk_key
  105. relation_keys_to_delete = []
  106. for src, tgt in edges:
  107. # Normalize entity order for consistent key generation
  108. normalized_src, normalized_tgt = sorted([src, tgt])
  109. storage_key = make_relation_chunk_key(
  110. normalized_src, normalized_tgt
  111. )
  112. relation_keys_to_delete.append(storage_key)
  113. if relation_keys_to_delete:
  114. await relation_chunks_storage.delete(relation_keys_to_delete)
  115. logger.info(
  116. f"Entity Delete: removed chunk tracking for {len(relation_keys_to_delete)} relations"
  117. )
  118. await entities_vdb.delete_entity(entity_name)
  119. await relationships_vdb.delete_entity_relation(entity_name)
  120. await chunk_entity_relation_graph.delete_node(entity_name)
  121. message = f"Entity Delete: remove '{entity_name}' and its {related_relations_count} relations"
  122. logger.info(message)
  123. await _persist_graph_updates(
  124. entities_vdb=entities_vdb,
  125. relationships_vdb=relationships_vdb,
  126. chunk_entity_relation_graph=chunk_entity_relation_graph,
  127. entity_chunks_storage=entity_chunks_storage,
  128. relation_chunks_storage=relation_chunks_storage,
  129. )
  130. return DeletionResult(
  131. status="success",
  132. doc_id=entity_name,
  133. message=message,
  134. status_code=200,
  135. )
  136. except Exception as e:
  137. error_message = f"Error while deleting entity '{entity_name}': {e}"
  138. logger.error(error_message)
  139. return DeletionResult(
  140. status="fail",
  141. doc_id=entity_name,
  142. message=error_message,
  143. status_code=500,
  144. )
  145. async def adelete_by_relation(
  146. chunk_entity_relation_graph,
  147. relationships_vdb,
  148. source_entity: str,
  149. target_entity: str,
  150. relation_chunks_storage=None,
  151. ) -> DeletionResult:
  152. """Asynchronously delete a relation between two entities.
  153. Also cleans up relation_chunks_storage to remove chunk tracking.
  154. Args:
  155. chunk_entity_relation_graph: Graph storage instance
  156. relationships_vdb: Vector database storage for relationships
  157. source_entity: Name of the source entity
  158. target_entity: Name of the target entity
  159. relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation
  160. """
  161. relation_str = f"{source_entity} -> {target_entity}"
  162. # Normalize entity order for undirected graph (ensures consistent key generation)
  163. if source_entity > target_entity:
  164. source_entity, target_entity = target_entity, source_entity
  165. # Use keyed lock for relation to ensure atomic graph and vector db operations
  166. workspace = relationships_vdb.global_config.get("workspace", "")
  167. namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
  168. sorted_edge_key = sorted([source_entity, target_entity])
  169. async with get_storage_keyed_lock(
  170. sorted_edge_key, namespace=namespace, enable_logging=False
  171. ):
  172. try:
  173. # Check if the relation exists
  174. edge_exists = await chunk_entity_relation_graph.has_edge(
  175. source_entity, target_entity
  176. )
  177. if not edge_exists:
  178. message = f"Relation from '{source_entity}' to '{target_entity}' does not exist"
  179. logger.warning(message)
  180. return DeletionResult(
  181. status="not_found",
  182. doc_id=relation_str,
  183. message=message,
  184. status_code=404,
  185. )
  186. # Clean up chunk tracking storage before deletion
  187. if relation_chunks_storage is not None:
  188. from .utils import make_relation_chunk_key
  189. # Normalize entity order for consistent key generation
  190. normalized_src, normalized_tgt = sorted([source_entity, target_entity])
  191. storage_key = make_relation_chunk_key(normalized_src, normalized_tgt)
  192. await relation_chunks_storage.delete([storage_key])
  193. logger.info(
  194. f"Relation Delete: removed chunk tracking for `{source_entity}`~`{target_entity}`"
  195. )
  196. # Delete relation from vector database
  197. rel_ids_to_delete = [
  198. compute_mdhash_id(source_entity + target_entity, prefix="rel-"),
  199. compute_mdhash_id(target_entity + source_entity, prefix="rel-"),
  200. ]
  201. await relationships_vdb.delete(rel_ids_to_delete)
  202. # Delete relation from knowledge graph
  203. await chunk_entity_relation_graph.remove_edges(
  204. [(source_entity, target_entity)]
  205. )
  206. message = f"Relation Delete: `{source_entity}`~`{target_entity}` deleted successfully"
  207. logger.info(message)
  208. await _persist_graph_updates(
  209. relationships_vdb=relationships_vdb,
  210. chunk_entity_relation_graph=chunk_entity_relation_graph,
  211. relation_chunks_storage=relation_chunks_storage,
  212. )
  213. return DeletionResult(
  214. status="success",
  215. doc_id=relation_str,
  216. message=message,
  217. status_code=200,
  218. )
  219. except Exception as e:
  220. error_message = f"Error while deleting relation from '{source_entity}' to '{target_entity}': {e}"
  221. logger.error(error_message)
  222. return DeletionResult(
  223. status="fail",
  224. doc_id=relation_str,
  225. message=error_message,
  226. status_code=500,
  227. )
  228. async def _edit_entity_impl(
  229. chunk_entity_relation_graph,
  230. entities_vdb,
  231. relationships_vdb,
  232. entity_name: str,
  233. updated_data: dict[str, str],
  234. *,
  235. entity_chunks_storage=None,
  236. relation_chunks_storage=None,
  237. ) -> dict[str, Any]:
  238. """Internal helper that edits an entity without acquiring storage locks.
  239. This function performs the actual entity edit operations without lock management.
  240. It should only be called by public APIs that have already acquired necessary locks.
  241. Args:
  242. chunk_entity_relation_graph: Graph storage instance
  243. entities_vdb: Vector database storage for entities
  244. relationships_vdb: Vector database storage for relationships
  245. entity_name: Name of the entity to edit
  246. updated_data: Dictionary containing updated attributes (including optional entity_name for renaming)
  247. entity_chunks_storage: Optional KV storage for tracking chunks
  248. relation_chunks_storage: Optional KV storage for tracking relation chunks
  249. Returns:
  250. Dictionary containing updated entity information
  251. Note:
  252. Caller must acquire appropriate locks before calling this function.
  253. If renaming (entity_name in updated_data), this function will check if the new name exists.
  254. """
  255. new_entity_name = updated_data.get("entity_name", entity_name)
  256. is_renaming = new_entity_name != entity_name
  257. original_entity_name = entity_name
  258. node_exists = await chunk_entity_relation_graph.has_node(entity_name)
  259. if not node_exists:
  260. raise ValueError(f"Entity '{entity_name}' does not exist")
  261. node_data = await chunk_entity_relation_graph.get_node(entity_name)
  262. if is_renaming:
  263. existing_node = await chunk_entity_relation_graph.has_node(new_entity_name)
  264. if existing_node:
  265. raise ValueError(
  266. f"Entity name '{new_entity_name}' already exists, cannot rename"
  267. )
  268. new_node_data = {**node_data, **updated_data}
  269. new_node_data["entity_id"] = new_entity_name
  270. if "entity_name" in new_node_data:
  271. del new_node_data[
  272. "entity_name"
  273. ] # Node data should not contain entity_name field
  274. if is_renaming:
  275. logger.info(f"Entity Edit: renaming `{entity_name}` to `{new_entity_name}`")
  276. await chunk_entity_relation_graph.upsert_node(new_entity_name, new_node_data)
  277. relations_to_update = []
  278. relations_to_delete = []
  279. edges = await chunk_entity_relation_graph.get_node_edges(entity_name)
  280. if edges:
  281. for source, target in edges:
  282. edge_data = await chunk_entity_relation_graph.get_edge(source, target)
  283. if edge_data:
  284. relations_to_delete.append(
  285. compute_mdhash_id(source + target, prefix="rel-")
  286. )
  287. relations_to_delete.append(
  288. compute_mdhash_id(target + source, prefix="rel-")
  289. )
  290. if source == entity_name:
  291. await chunk_entity_relation_graph.upsert_edge(
  292. new_entity_name, target, edge_data
  293. )
  294. relations_to_update.append((new_entity_name, target, edge_data))
  295. else: # target == entity_name
  296. await chunk_entity_relation_graph.upsert_edge(
  297. source, new_entity_name, edge_data
  298. )
  299. relations_to_update.append((source, new_entity_name, edge_data))
  300. await chunk_entity_relation_graph.delete_node(entity_name)
  301. old_entity_id = compute_mdhash_id(entity_name, prefix="ent-")
  302. await entities_vdb.delete([old_entity_id])
  303. await relationships_vdb.delete(relations_to_delete)
  304. for src, tgt, edge_data in relations_to_update:
  305. normalized_src, normalized_tgt = sorted([src, tgt])
  306. description = edge_data.get("description", "")
  307. keywords = edge_data.get("keywords", "")
  308. source_id = edge_data.get("source_id", "")
  309. weight = float(edge_data.get("weight", 1.0))
  310. content = f"{normalized_src}\t{normalized_tgt}\n{keywords}\n{description}"
  311. relation_id = compute_mdhash_id(
  312. normalized_src + normalized_tgt, prefix="rel-"
  313. )
  314. relation_data = {
  315. relation_id: {
  316. "content": content,
  317. "src_id": normalized_src,
  318. "tgt_id": normalized_tgt,
  319. "source_id": source_id,
  320. "description": description,
  321. "keywords": keywords,
  322. "weight": weight,
  323. }
  324. }
  325. await relationships_vdb.upsert(relation_data)
  326. entity_name = new_entity_name
  327. else:
  328. await chunk_entity_relation_graph.upsert_node(entity_name, new_node_data)
  329. description = new_node_data.get("description", "")
  330. source_id = new_node_data.get("source_id", "")
  331. entity_type = new_node_data.get("entity_type", "")
  332. content = entity_name + "\n" + description
  333. entity_id = compute_mdhash_id(entity_name, prefix="ent-")
  334. entity_data = {
  335. entity_id: {
  336. "content": content,
  337. "entity_name": entity_name,
  338. "source_id": source_id,
  339. "description": description,
  340. "entity_type": entity_type,
  341. }
  342. }
  343. await entities_vdb.upsert(entity_data)
  344. if entity_chunks_storage is not None or relation_chunks_storage is not None:
  345. from .utils import make_relation_chunk_key, compute_incremental_chunk_ids
  346. if entity_chunks_storage is not None:
  347. storage_key = original_entity_name if is_renaming else entity_name
  348. stored_data = await entity_chunks_storage.get_by_id(storage_key)
  349. has_stored_data = (
  350. stored_data
  351. and isinstance(stored_data, dict)
  352. and stored_data.get("chunk_ids")
  353. )
  354. old_source_id = node_data.get("source_id", "")
  355. old_chunk_ids = [cid for cid in old_source_id.split(GRAPH_FIELD_SEP) if cid]
  356. new_source_id = new_node_data.get("source_id", "")
  357. new_chunk_ids = [cid for cid in new_source_id.split(GRAPH_FIELD_SEP) if cid]
  358. source_id_changed = set(new_chunk_ids) != set(old_chunk_ids)
  359. if source_id_changed or not has_stored_data or is_renaming:
  360. existing_full_chunk_ids = []
  361. if has_stored_data:
  362. existing_full_chunk_ids = [
  363. cid for cid in stored_data.get("chunk_ids", []) if cid
  364. ]
  365. if not existing_full_chunk_ids:
  366. existing_full_chunk_ids = old_chunk_ids.copy()
  367. updated_chunk_ids = compute_incremental_chunk_ids(
  368. existing_full_chunk_ids, old_chunk_ids, new_chunk_ids
  369. )
  370. if is_renaming:
  371. await entity_chunks_storage.delete([original_entity_name])
  372. await entity_chunks_storage.upsert(
  373. {
  374. entity_name: {
  375. "chunk_ids": updated_chunk_ids,
  376. "count": len(updated_chunk_ids),
  377. }
  378. }
  379. )
  380. else:
  381. await entity_chunks_storage.upsert(
  382. {
  383. entity_name: {
  384. "chunk_ids": updated_chunk_ids,
  385. "count": len(updated_chunk_ids),
  386. }
  387. }
  388. )
  389. logger.info(
  390. f"Entity Edit: find {len(updated_chunk_ids)} chunks related to `{entity_name}`"
  391. )
  392. if is_renaming and relation_chunks_storage is not None and relations_to_update:
  393. for src, tgt, edge_data in relations_to_update:
  394. old_src = original_entity_name if src == entity_name else src
  395. old_tgt = original_entity_name if tgt == entity_name else tgt
  396. old_normalized_src, old_normalized_tgt = sorted([old_src, old_tgt])
  397. new_normalized_src, new_normalized_tgt = sorted([src, tgt])
  398. old_storage_key = make_relation_chunk_key(
  399. old_normalized_src, old_normalized_tgt
  400. )
  401. new_storage_key = make_relation_chunk_key(
  402. new_normalized_src, new_normalized_tgt
  403. )
  404. if old_storage_key != new_storage_key:
  405. old_stored_data = await relation_chunks_storage.get_by_id(
  406. old_storage_key
  407. )
  408. relation_chunk_ids = []
  409. if old_stored_data and isinstance(old_stored_data, dict):
  410. relation_chunk_ids = [
  411. cid for cid in old_stored_data.get("chunk_ids", []) if cid
  412. ]
  413. else:
  414. relation_source_id = edge_data.get("source_id", "")
  415. relation_chunk_ids = [
  416. cid
  417. for cid in relation_source_id.split(GRAPH_FIELD_SEP)
  418. if cid
  419. ]
  420. await relation_chunks_storage.delete([old_storage_key])
  421. if relation_chunk_ids:
  422. await relation_chunks_storage.upsert(
  423. {
  424. new_storage_key: {
  425. "chunk_ids": relation_chunk_ids,
  426. "count": len(relation_chunk_ids),
  427. }
  428. }
  429. )
  430. logger.info(
  431. f"Entity Edit: migrate {len(relations_to_update)} relations after rename"
  432. )
  433. await _persist_graph_updates(
  434. entities_vdb=entities_vdb,
  435. relationships_vdb=relationships_vdb,
  436. chunk_entity_relation_graph=chunk_entity_relation_graph,
  437. entity_chunks_storage=entity_chunks_storage,
  438. relation_chunks_storage=relation_chunks_storage,
  439. )
  440. logger.info(f"Entity Edit: `{entity_name}` successfully updated")
  441. return await get_entity_info(
  442. chunk_entity_relation_graph,
  443. entities_vdb,
  444. entity_name,
  445. include_vector_data=True,
  446. )
  447. async def aedit_entity(
  448. chunk_entity_relation_graph,
  449. entities_vdb,
  450. relationships_vdb,
  451. entity_name: str,
  452. updated_data: dict[str, str],
  453. allow_rename: bool = True,
  454. allow_merge: bool = False,
  455. entity_chunks_storage=None,
  456. relation_chunks_storage=None,
  457. ) -> dict[str, Any]:
  458. """Asynchronously edit entity information.
  459. Updates entity information in the knowledge graph and re-embeds the entity in the vector database.
  460. Also synchronizes entity_chunks_storage and relation_chunks_storage to track chunk references.
  461. Args:
  462. chunk_entity_relation_graph: Graph storage instance
  463. entities_vdb: Vector database storage for entities
  464. relationships_vdb: Vector database storage for relationships
  465. entity_name: Name of the entity to edit
  466. updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"}
  467. allow_rename: Whether to allow entity renaming, defaults to True
  468. allow_merge: Whether to merge into an existing entity when renaming to an existing name, defaults to False
  469. entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity
  470. relation_chunks_storage: Optional KV storage for tracking chunks that reference relations
  471. Returns:
  472. Dictionary containing updated entity information and operation summary with the following structure:
  473. {
  474. "entity_name": str, # Name of the entity
  475. "description": str, # Entity description
  476. "entity_type": str, # Entity type
  477. "source_id": str, # Source chunk IDs
  478. ... # Other entity properties
  479. "operation_summary": {
  480. "merged": bool, # Whether entity was merged
  481. "merge_status": str, # "success" | "failed" | "not_attempted"
  482. "merge_error": str | None, # Error message if merge failed
  483. "operation_status": str, # "success" | "partial_success" | "failure"
  484. "target_entity": str | None, # Target entity name if renaming/merging
  485. "final_entity": str, # Final entity name after operation
  486. "renamed": bool # Whether entity was renamed
  487. }
  488. }
  489. operation_status values:
  490. - "success": Operation completed successfully (update/rename/merge all succeeded)
  491. - "partial_success": Non-name updates succeeded but merge failed
  492. - "failure": Operation failed completely
  493. merge_status values:
  494. - "success": Entity successfully merged into target
  495. - "failed": Merge operation failed
  496. - "not_attempted": No merge was attempted (normal update/rename)
  497. """
  498. if "description" in updated_data:
  499. _require_non_empty_description(
  500. updated_data.get("description"), operation="edit", object_type="entity"
  501. )
  502. new_entity_name = updated_data.get("entity_name", entity_name)
  503. is_renaming = new_entity_name != entity_name
  504. # Lock the (old, new) entity names. The doc-ingest pipeline acquires
  505. # edge locks as sorted([src, tgt]) in the same namespace, and
  506. # get_storage_keyed_lock takes one mutex per key — so locking the
  507. # entity name already mutually excludes any concurrent edge write that
  508. # touches it, no need to enumerate incident edges here.
  509. lock_keys = sorted({entity_name, new_entity_name}) if is_renaming else [entity_name]
  510. workspace = entities_vdb.global_config.get("workspace", "")
  511. namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
  512. operation_summary: dict[str, Any] = {
  513. "merged": False,
  514. "merge_status": "not_attempted",
  515. "merge_error": None,
  516. "operation_status": "success",
  517. "target_entity": None,
  518. "final_entity": new_entity_name if is_renaming else entity_name,
  519. "renamed": is_renaming,
  520. }
  521. async with get_storage_keyed_lock(
  522. lock_keys, namespace=namespace, enable_logging=False
  523. ):
  524. try:
  525. if is_renaming and not allow_rename:
  526. raise ValueError(
  527. "Entity renaming is not allowed. Set allow_rename=True to enable this feature"
  528. )
  529. if is_renaming:
  530. target_exists = await chunk_entity_relation_graph.has_node(
  531. new_entity_name
  532. )
  533. if target_exists:
  534. if not allow_merge:
  535. raise ValueError(
  536. f"Entity name '{new_entity_name}' already exists, cannot rename"
  537. )
  538. logger.info(
  539. f"Entity Edit: `{entity_name}` will be merged into `{new_entity_name}`"
  540. )
  541. # Track whether non-name updates were applied
  542. non_name_updates_applied = False
  543. non_name_updates = {
  544. key: value
  545. for key, value in updated_data.items()
  546. if key != "entity_name"
  547. }
  548. # Apply non-name updates first
  549. if non_name_updates:
  550. try:
  551. logger.info(
  552. "Entity Edit: applying non-name updates before merge"
  553. )
  554. await _edit_entity_impl(
  555. chunk_entity_relation_graph,
  556. entities_vdb,
  557. relationships_vdb,
  558. entity_name,
  559. non_name_updates,
  560. entity_chunks_storage=entity_chunks_storage,
  561. relation_chunks_storage=relation_chunks_storage,
  562. )
  563. non_name_updates_applied = True
  564. except Exception as update_error:
  565. # If update fails, re-raise immediately
  566. logger.error(
  567. f"Entity Edit: non-name updates failed: {update_error}"
  568. )
  569. raise
  570. # Attempt to merge entities
  571. try:
  572. merge_result = await _merge_entities_impl(
  573. chunk_entity_relation_graph,
  574. entities_vdb,
  575. relationships_vdb,
  576. [entity_name],
  577. new_entity_name,
  578. merge_strategy=None,
  579. target_entity_data=None,
  580. entity_chunks_storage=entity_chunks_storage,
  581. relation_chunks_storage=relation_chunks_storage,
  582. )
  583. # Merge succeeded
  584. operation_summary.update(
  585. {
  586. "merged": True,
  587. "merge_status": "success",
  588. "merge_error": None,
  589. "operation_status": "success",
  590. "target_entity": new_entity_name,
  591. "final_entity": new_entity_name,
  592. }
  593. )
  594. return {**merge_result, "operation_summary": operation_summary}
  595. except Exception as merge_error:
  596. # Merge failed, but update may have succeeded
  597. logger.error(f"Entity Edit: merge failed: {merge_error}")
  598. # Return partial success status (update succeeded but merge failed)
  599. operation_summary.update(
  600. {
  601. "merged": False,
  602. "merge_status": "failed",
  603. "merge_error": str(merge_error),
  604. "operation_status": "partial_success"
  605. if non_name_updates_applied
  606. else "failure",
  607. "target_entity": new_entity_name,
  608. "final_entity": entity_name, # Keep source entity name
  609. }
  610. )
  611. # Get current entity info (with applied updates if any)
  612. entity_info = await get_entity_info(
  613. chunk_entity_relation_graph,
  614. entities_vdb,
  615. entity_name,
  616. include_vector_data=True,
  617. )
  618. return {**entity_info, "operation_summary": operation_summary}
  619. # Normal edit flow (no merge involved)
  620. edit_result = await _edit_entity_impl(
  621. chunk_entity_relation_graph,
  622. entities_vdb,
  623. relationships_vdb,
  624. entity_name,
  625. updated_data,
  626. entity_chunks_storage=entity_chunks_storage,
  627. relation_chunks_storage=relation_chunks_storage,
  628. )
  629. operation_summary["operation_status"] = "success"
  630. return {**edit_result, "operation_summary": operation_summary}
  631. except Exception as e:
  632. logger.error(f"Error while editing entity '{entity_name}': {e}")
  633. raise
  634. async def aedit_relation(
  635. chunk_entity_relation_graph,
  636. entities_vdb,
  637. relationships_vdb,
  638. source_entity: str,
  639. target_entity: str,
  640. updated_data: dict[str, Any],
  641. relation_chunks_storage=None,
  642. ) -> dict[str, Any]:
  643. """Asynchronously edit relation information.
  644. Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database.
  645. Also synchronizes the relation_chunks_storage to track which chunks reference this relation.
  646. Args:
  647. chunk_entity_relation_graph: Graph storage instance
  648. entities_vdb: Vector database storage for entities
  649. relationships_vdb: Vector database storage for relationships
  650. source_entity: Name of the source entity
  651. target_entity: Name of the target entity
  652. updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "keywords": "new keywords"}
  653. relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation
  654. Returns:
  655. Dictionary containing updated relation information
  656. """
  657. if "description" in updated_data:
  658. _require_non_empty_description(
  659. updated_data.get("description"), operation="edit", object_type="relation"
  660. )
  661. # Normalize entity order for undirected graph (ensures consistent key generation)
  662. if source_entity > target_entity:
  663. source_entity, target_entity = target_entity, source_entity
  664. # Use keyed lock for relation to ensure atomic graph and vector db operations
  665. workspace = relationships_vdb.global_config.get("workspace", "")
  666. namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
  667. sorted_edge_key = sorted([source_entity, target_entity])
  668. async with get_storage_keyed_lock(
  669. sorted_edge_key, namespace=namespace, enable_logging=False
  670. ):
  671. try:
  672. # 1. Get current relation information
  673. edge_exists = await chunk_entity_relation_graph.has_edge(
  674. source_entity, target_entity
  675. )
  676. if not edge_exists:
  677. raise ValueError(
  678. f"Relation from '{source_entity}' to '{target_entity}' does not exist"
  679. )
  680. edge_data = await chunk_entity_relation_graph.get_edge(
  681. source_entity, target_entity
  682. )
  683. # Important: First delete the old relation record from the vector database
  684. # Delete both permutations to handle relationships created before normalization
  685. rel_ids_to_delete = [
  686. compute_mdhash_id(source_entity + target_entity, prefix="rel-"),
  687. compute_mdhash_id(target_entity + source_entity, prefix="rel-"),
  688. ]
  689. await relationships_vdb.delete(rel_ids_to_delete)
  690. logger.debug(
  691. f"Relation Delete: delete vdb for `{source_entity}`~`{target_entity}`"
  692. )
  693. # 2. Update relation information in the graph
  694. new_edge_data = {**edge_data, **updated_data}
  695. await chunk_entity_relation_graph.upsert_edge(
  696. source_entity, target_entity, new_edge_data
  697. )
  698. # 3. Recalculate relation's vector representation and update vector database
  699. description = new_edge_data.get("description", "")
  700. keywords = new_edge_data.get("keywords", "")
  701. source_id = new_edge_data.get("source_id", "")
  702. weight = float(new_edge_data.get("weight", 1.0))
  703. # Create content for embedding
  704. content = f"{source_entity}\t{target_entity}\n{keywords}\n{description}"
  705. # Calculate relation ID
  706. relation_id = compute_mdhash_id(
  707. source_entity + target_entity, prefix="rel-"
  708. )
  709. # Prepare data for vector database update
  710. relation_data = {
  711. relation_id: {
  712. "content": content,
  713. "src_id": source_entity,
  714. "tgt_id": target_entity,
  715. "source_id": source_id,
  716. "description": description,
  717. "keywords": keywords,
  718. "weight": weight,
  719. }
  720. }
  721. # Update vector database
  722. await relationships_vdb.upsert(relation_data)
  723. # 4. Update relation_chunks_storage in two scenarios:
  724. # - source_id has changed (edit scenario)
  725. # - relation_chunks_storage has no existing data (migration/initialization scenario)
  726. if relation_chunks_storage is not None:
  727. from .utils import (
  728. make_relation_chunk_key,
  729. compute_incremental_chunk_ids,
  730. )
  731. storage_key = make_relation_chunk_key(source_entity, target_entity)
  732. # Check if storage has existing data
  733. stored_data = await relation_chunks_storage.get_by_id(storage_key)
  734. has_stored_data = (
  735. stored_data
  736. and isinstance(stored_data, dict)
  737. and stored_data.get("chunk_ids")
  738. )
  739. # Get old and new source_id
  740. old_source_id = edge_data.get("source_id", "")
  741. old_chunk_ids = [
  742. cid for cid in old_source_id.split(GRAPH_FIELD_SEP) if cid
  743. ]
  744. new_source_id = new_edge_data.get("source_id", "")
  745. new_chunk_ids = [
  746. cid for cid in new_source_id.split(GRAPH_FIELD_SEP) if cid
  747. ]
  748. source_id_changed = set(new_chunk_ids) != set(old_chunk_ids)
  749. # Update if: source_id changed OR storage has no data
  750. if source_id_changed or not has_stored_data:
  751. # Get existing full chunk_ids from storage
  752. existing_full_chunk_ids = []
  753. if has_stored_data:
  754. existing_full_chunk_ids = [
  755. cid for cid in stored_data.get("chunk_ids", []) if cid
  756. ]
  757. # If no stored data exists, use old source_id as baseline
  758. if not existing_full_chunk_ids:
  759. existing_full_chunk_ids = old_chunk_ids.copy()
  760. # Use utility function to compute incremental updates
  761. updated_chunk_ids = compute_incremental_chunk_ids(
  762. existing_full_chunk_ids, old_chunk_ids, new_chunk_ids
  763. )
  764. # Update storage (Update even if updated_chunk_ids is empty)
  765. await relation_chunks_storage.upsert(
  766. {
  767. storage_key: {
  768. "chunk_ids": updated_chunk_ids,
  769. "count": len(updated_chunk_ids),
  770. }
  771. }
  772. )
  773. logger.info(
  774. f"Relation Delete: update chunk tracking for `{source_entity}`~`{target_entity}`"
  775. )
  776. # 5. Save changes
  777. await _persist_graph_updates(
  778. relationships_vdb=relationships_vdb,
  779. chunk_entity_relation_graph=chunk_entity_relation_graph,
  780. relation_chunks_storage=relation_chunks_storage,
  781. )
  782. logger.info(
  783. f"Relation Delete: `{source_entity}`~`{target_entity}`' successfully updated"
  784. )
  785. return await get_relation_info(
  786. chunk_entity_relation_graph,
  787. relationships_vdb,
  788. source_entity,
  789. target_entity,
  790. include_vector_data=True,
  791. )
  792. except Exception as e:
  793. logger.error(
  794. f"Error while editing relation from '{source_entity}' to '{target_entity}': {e}"
  795. )
  796. raise
  797. async def acreate_entity(
  798. chunk_entity_relation_graph,
  799. entities_vdb,
  800. relationships_vdb,
  801. entity_name: str,
  802. entity_data: dict[str, Any],
  803. entity_chunks_storage=None,
  804. relation_chunks_storage=None,
  805. ) -> dict[str, Any]:
  806. """Asynchronously create a new entity.
  807. Creates a new entity in the knowledge graph and adds it to the vector database.
  808. Also synchronizes entity_chunks_storage to track chunk references.
  809. Args:
  810. chunk_entity_relation_graph: Graph storage instance
  811. entities_vdb: Vector database storage for entities
  812. relationships_vdb: Vector database storage for relationships
  813. entity_name: Name of the new entity
  814. entity_data: Dictionary containing entity attributes, e.g. {"description": "description", "entity_type": "type"}
  815. entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity
  816. relation_chunks_storage: Optional KV storage for tracking chunks that reference relations
  817. Returns:
  818. Dictionary containing created entity information
  819. """
  820. _require_non_empty_description(
  821. entity_data.get("description"), operation="create", object_type="entity"
  822. )
  823. # Use keyed lock for entity to ensure atomic graph and vector db operations
  824. workspace = entities_vdb.global_config.get("workspace", "")
  825. namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
  826. async with get_storage_keyed_lock(
  827. [entity_name], namespace=namespace, enable_logging=False
  828. ):
  829. try:
  830. # Check if entity already exists
  831. existing_node = await chunk_entity_relation_graph.has_node(entity_name)
  832. if existing_node:
  833. raise ValueError(f"Entity '{entity_name}' already exists")
  834. # Prepare node data with defaults if missing
  835. node_data = {
  836. "entity_id": entity_name,
  837. "entity_type": entity_data.get("entity_type", "UNKNOWN"),
  838. "description": entity_data.get("description", ""),
  839. "source_id": entity_data.get("source_id", "manual_creation"),
  840. "file_path": entity_data.get("file_path", "manual_creation"),
  841. "created_at": int(time.time()),
  842. }
  843. # Add entity to knowledge graph
  844. await chunk_entity_relation_graph.upsert_node(entity_name, node_data)
  845. # Prepare content for entity
  846. description = node_data.get("description", "")
  847. source_id = node_data.get("source_id", "")
  848. entity_type = node_data.get("entity_type", "")
  849. content = entity_name + "\n" + description
  850. # Calculate entity ID
  851. entity_id = compute_mdhash_id(entity_name, prefix="ent-")
  852. # Prepare data for vector database update
  853. entity_data_for_vdb = {
  854. entity_id: {
  855. "content": content,
  856. "entity_name": entity_name,
  857. "source_id": source_id,
  858. "description": description,
  859. "entity_type": entity_type,
  860. "file_path": entity_data.get("file_path", "manual_creation"),
  861. }
  862. }
  863. # Update vector database
  864. await entities_vdb.upsert(entity_data_for_vdb)
  865. # Update entity_chunks_storage to track chunk references
  866. if entity_chunks_storage is not None:
  867. source_id = node_data.get("source_id", "")
  868. chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid]
  869. if chunk_ids:
  870. await entity_chunks_storage.upsert(
  871. {
  872. entity_name: {
  873. "chunk_ids": chunk_ids,
  874. "count": len(chunk_ids),
  875. }
  876. }
  877. )
  878. logger.info(
  879. f"Entity Create: tracked {len(chunk_ids)} chunks for `{entity_name}`"
  880. )
  881. # Save changes
  882. await _persist_graph_updates(
  883. entities_vdb=entities_vdb,
  884. relationships_vdb=relationships_vdb,
  885. chunk_entity_relation_graph=chunk_entity_relation_graph,
  886. entity_chunks_storage=entity_chunks_storage,
  887. relation_chunks_storage=relation_chunks_storage,
  888. )
  889. logger.info(f"Entity Create: '{entity_name}' successfully created")
  890. return await get_entity_info(
  891. chunk_entity_relation_graph,
  892. entities_vdb,
  893. entity_name,
  894. include_vector_data=True,
  895. )
  896. except Exception as e:
  897. logger.error(f"Error while creating entity '{entity_name}': {e}")
  898. raise
  899. async def acreate_relation(
  900. chunk_entity_relation_graph,
  901. entities_vdb,
  902. relationships_vdb,
  903. source_entity: str,
  904. target_entity: str,
  905. relation_data: dict[str, Any],
  906. relation_chunks_storage=None,
  907. ) -> dict[str, Any]:
  908. """Asynchronously create a new relation between entities.
  909. Creates a new relation (edge) in the knowledge graph and adds it to the vector database.
  910. Also synchronizes relation_chunks_storage to track chunk references.
  911. Args:
  912. chunk_entity_relation_graph: Graph storage instance
  913. entities_vdb: Vector database storage for entities
  914. relationships_vdb: Vector database storage for relationships
  915. source_entity: Name of the source entity
  916. target_entity: Name of the target entity
  917. relation_data: Dictionary containing relation attributes, e.g. {"description": "description", "keywords": "keywords"}
  918. relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation
  919. Returns:
  920. Dictionary containing created relation information
  921. """
  922. _require_non_empty_description(
  923. relation_data.get("description"), operation="create", object_type="relation"
  924. )
  925. # Use keyed lock for relation to ensure atomic graph and vector db operations
  926. workspace = relationships_vdb.global_config.get("workspace", "")
  927. namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
  928. sorted_edge_key = sorted([source_entity, target_entity])
  929. async with get_storage_keyed_lock(
  930. sorted_edge_key, namespace=namespace, enable_logging=False
  931. ):
  932. try:
  933. # Check if both entities exist
  934. source_exists = await chunk_entity_relation_graph.has_node(source_entity)
  935. target_exists = await chunk_entity_relation_graph.has_node(target_entity)
  936. if not source_exists:
  937. raise ValueError(f"Source entity '{source_entity}' does not exist")
  938. if not target_exists:
  939. raise ValueError(f"Target entity '{target_entity}' does not exist")
  940. # Check if relation already exists
  941. existing_edge = await chunk_entity_relation_graph.has_edge(
  942. source_entity, target_entity
  943. )
  944. if existing_edge:
  945. raise ValueError(
  946. f"Relation from '{source_entity}' to '{target_entity}' already exists"
  947. )
  948. # Prepare edge data with defaults if missing
  949. edge_data = {
  950. "description": relation_data.get("description", ""),
  951. "keywords": relation_data.get("keywords", ""),
  952. "source_id": relation_data.get("source_id", "manual_creation"),
  953. "weight": float(relation_data.get("weight", 1.0)),
  954. "file_path": relation_data.get("file_path", "manual_creation"),
  955. "created_at": int(time.time()),
  956. }
  957. # Add relation to knowledge graph
  958. await chunk_entity_relation_graph.upsert_edge(
  959. source_entity, target_entity, edge_data
  960. )
  961. # Normalize entity order for undirected relation vector (ensures consistent key generation)
  962. if source_entity > target_entity:
  963. source_entity, target_entity = target_entity, source_entity
  964. # Prepare content for embedding
  965. description = edge_data.get("description", "")
  966. keywords = edge_data.get("keywords", "")
  967. source_id = edge_data.get("source_id", "")
  968. weight = edge_data.get("weight", 1.0)
  969. # Create content for embedding
  970. content = f"{keywords}\t{source_entity}\n{target_entity}\n{description}"
  971. # Calculate relation ID
  972. relation_id = compute_mdhash_id(
  973. source_entity + target_entity, prefix="rel-"
  974. )
  975. # Prepare data for vector database update
  976. relation_data_for_vdb = {
  977. relation_id: {
  978. "content": content,
  979. "src_id": source_entity,
  980. "tgt_id": target_entity,
  981. "source_id": source_id,
  982. "description": description,
  983. "keywords": keywords,
  984. "weight": weight,
  985. "file_path": relation_data.get("file_path", "manual_creation"),
  986. }
  987. }
  988. # Update vector database
  989. await relationships_vdb.upsert(relation_data_for_vdb)
  990. # Update relation_chunks_storage to track chunk references
  991. if relation_chunks_storage is not None:
  992. from .utils import make_relation_chunk_key
  993. # Normalize entity order for consistent key generation
  994. normalized_src, normalized_tgt = sorted([source_entity, target_entity])
  995. storage_key = make_relation_chunk_key(normalized_src, normalized_tgt)
  996. source_id = edge_data.get("source_id", "")
  997. chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid]
  998. if chunk_ids:
  999. await relation_chunks_storage.upsert(
  1000. {
  1001. storage_key: {
  1002. "chunk_ids": chunk_ids,
  1003. "count": len(chunk_ids),
  1004. }
  1005. }
  1006. )
  1007. logger.info(
  1008. f"Relation Create: tracked {len(chunk_ids)} chunks for `{source_entity}`~`{target_entity}`"
  1009. )
  1010. # Save changes
  1011. await _persist_graph_updates(
  1012. relationships_vdb=relationships_vdb,
  1013. chunk_entity_relation_graph=chunk_entity_relation_graph,
  1014. relation_chunks_storage=relation_chunks_storage,
  1015. )
  1016. logger.info(
  1017. f"Relation Create: `{source_entity}`~`{target_entity}` successfully created"
  1018. )
  1019. return await get_relation_info(
  1020. chunk_entity_relation_graph,
  1021. relationships_vdb,
  1022. source_entity,
  1023. target_entity,
  1024. include_vector_data=True,
  1025. )
  1026. except Exception as e:
  1027. logger.error(
  1028. f"Error while creating relation from '{source_entity}' to '{target_entity}': {e}"
  1029. )
  1030. raise
  1031. async def _merge_entities_impl(
  1032. chunk_entity_relation_graph,
  1033. entities_vdb,
  1034. relationships_vdb,
  1035. source_entities: list[str],
  1036. target_entity: str,
  1037. *,
  1038. merge_strategy: dict[str, str] = None,
  1039. target_entity_data: dict[str, Any] = None,
  1040. entity_chunks_storage=None,
  1041. relation_chunks_storage=None,
  1042. ) -> dict[str, Any]:
  1043. """Internal helper that merges entities without acquiring storage locks.
  1044. This function performs the actual entity merge operations without lock management.
  1045. It should only be called by public APIs that have already acquired necessary locks.
  1046. Args:
  1047. chunk_entity_relation_graph: Graph storage instance
  1048. entities_vdb: Vector database storage for entities
  1049. relationships_vdb: Vector database storage for relationships
  1050. source_entities: List of source entity names to merge
  1051. target_entity: Name of the target entity after merging
  1052. merge_strategy: Deprecated. Merge strategy for each field (optional)
  1053. target_entity_data: Dictionary of specific values to set for target entity (optional)
  1054. entity_chunks_storage: Optional KV storage for tracking chunks
  1055. relation_chunks_storage: Optional KV storage for tracking relation chunks
  1056. Returns:
  1057. Dictionary containing the merged entity information
  1058. Note:
  1059. Caller must acquire appropriate locks before calling this function.
  1060. All source entities and the target entity should be locked together.
  1061. """
  1062. # Default merge strategy for entities
  1063. default_entity_merge_strategy = {
  1064. "description": "concatenate",
  1065. "entity_type": "keep_first",
  1066. "source_id": "join_unique",
  1067. "file_path": "join_unique",
  1068. }
  1069. effective_entity_merge_strategy = default_entity_merge_strategy
  1070. if merge_strategy:
  1071. logger.warning(
  1072. "Entity Merge: merge_strategy parameter is deprecated and will be ignored in a future release."
  1073. )
  1074. effective_entity_merge_strategy = {
  1075. **default_entity_merge_strategy,
  1076. **merge_strategy,
  1077. }
  1078. target_entity_data = {} if target_entity_data is None else target_entity_data
  1079. # 1. Check if all source entities exist
  1080. source_entities_data = {}
  1081. for entity_name in source_entities:
  1082. node_exists = await chunk_entity_relation_graph.has_node(entity_name)
  1083. if not node_exists:
  1084. raise ValueError(f"Source entity '{entity_name}' does not exist")
  1085. node_data = await chunk_entity_relation_graph.get_node(entity_name)
  1086. source_entities_data[entity_name] = node_data
  1087. # 2. Check if target entity exists and get its data if it does
  1088. target_exists = await chunk_entity_relation_graph.has_node(target_entity)
  1089. existing_target_entity_data = {}
  1090. if target_exists:
  1091. existing_target_entity_data = await chunk_entity_relation_graph.get_node(
  1092. target_entity
  1093. )
  1094. # 3. Merge entity data
  1095. merged_entity_data = _merge_attributes(
  1096. list(source_entities_data.values())
  1097. + ([existing_target_entity_data] if target_exists else []),
  1098. effective_entity_merge_strategy,
  1099. filter_none_only=False, # Use entity behavior: filter falsy values
  1100. )
  1101. # Apply any explicitly provided target entity data (overrides merged data)
  1102. for key, value in target_entity_data.items():
  1103. merged_entity_data[key] = value
  1104. # 4. Get all relationships of the source entities and target entity (if exists)
  1105. all_relations = []
  1106. entities_to_collect = source_entities.copy()
  1107. # If target entity exists and not already in source_entities, add it
  1108. if target_exists and target_entity not in source_entities:
  1109. entities_to_collect.append(target_entity)
  1110. for entity_name in entities_to_collect:
  1111. # Get all relationships of the entities
  1112. edges = await chunk_entity_relation_graph.get_node_edges(entity_name)
  1113. if edges:
  1114. for src, tgt in edges:
  1115. # Ensure src is the current entity
  1116. if src == entity_name:
  1117. edge_data = await chunk_entity_relation_graph.get_edge(src, tgt)
  1118. all_relations.append((src, tgt, edge_data))
  1119. # 5. Create or update the target entity
  1120. merged_entity_data["entity_id"] = target_entity
  1121. if not target_exists:
  1122. await chunk_entity_relation_graph.upsert_node(target_entity, merged_entity_data)
  1123. logger.info(f"Entity Merge: created target '{target_entity}'")
  1124. else:
  1125. await chunk_entity_relation_graph.upsert_node(target_entity, merged_entity_data)
  1126. logger.info(f"Entity Merge: Updated target '{target_entity}'")
  1127. # 6. Recreate all relations pointing to the target entity in KG
  1128. # Also collect chunk tracking information in the same loop
  1129. relation_updates = {} # Track relationships that need to be merged
  1130. relations_to_delete = []
  1131. # Initialize chunk tracking variables
  1132. relation_chunk_tracking = {} # key: storage_key, value: list of chunk_ids
  1133. old_relation_keys_to_delete = []
  1134. for src, tgt, edge_data in all_relations:
  1135. relations_to_delete.append(compute_mdhash_id(src + tgt, prefix="rel-"))
  1136. relations_to_delete.append(compute_mdhash_id(tgt + src, prefix="rel-"))
  1137. # Collect old chunk tracking key for deletion
  1138. if relation_chunks_storage is not None:
  1139. from .utils import make_relation_chunk_key
  1140. old_storage_key = make_relation_chunk_key(src, tgt)
  1141. old_relation_keys_to_delete.append(old_storage_key)
  1142. new_src = target_entity if src in source_entities else src
  1143. new_tgt = target_entity if tgt in source_entities else tgt
  1144. # Skip relationships between source entities to avoid self-loops
  1145. if new_src == new_tgt:
  1146. logger.info(f"Entity Merge: skipping `{src}`~`{tgt}` to avoid self-loop")
  1147. continue
  1148. # Normalize entity order for consistent duplicate detection (undirected relationships)
  1149. normalized_src, normalized_tgt = sorted([new_src, new_tgt])
  1150. relation_key = f"{normalized_src}|{normalized_tgt}"
  1151. # Process chunk tracking for this relation
  1152. if relation_chunks_storage is not None:
  1153. storage_key = make_relation_chunk_key(normalized_src, normalized_tgt)
  1154. # Get chunk_ids from storage for this original relation
  1155. stored = await relation_chunks_storage.get_by_id(old_storage_key)
  1156. if stored is not None and isinstance(stored, dict):
  1157. chunk_ids = [cid for cid in stored.get("chunk_ids", []) if cid]
  1158. else:
  1159. # Fallback to source_id from graph
  1160. source_id = edge_data.get("source_id", "")
  1161. chunk_ids = [cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid]
  1162. # Accumulate chunk_ids with ordered deduplication
  1163. if storage_key not in relation_chunk_tracking:
  1164. relation_chunk_tracking[storage_key] = []
  1165. existing_chunks = set(relation_chunk_tracking[storage_key])
  1166. for chunk_id in chunk_ids:
  1167. if chunk_id not in existing_chunks:
  1168. existing_chunks.add(chunk_id)
  1169. relation_chunk_tracking[storage_key].append(chunk_id)
  1170. if relation_key in relation_updates:
  1171. # Merge relationship data
  1172. existing_data = relation_updates[relation_key]["data"]
  1173. merged_relation = _merge_attributes(
  1174. [existing_data, edge_data],
  1175. {
  1176. "description": "concatenate",
  1177. "keywords": "join_unique_comma",
  1178. "source_id": "join_unique",
  1179. "file_path": "join_unique",
  1180. "weight": "max",
  1181. },
  1182. filter_none_only=True, # Use relation behavior: only filter None
  1183. )
  1184. relation_updates[relation_key]["data"] = merged_relation
  1185. logger.debug(
  1186. f"Entity Merge: deduplicating relation `{normalized_src}`~`{normalized_tgt}`"
  1187. )
  1188. else:
  1189. relation_updates[relation_key] = {
  1190. "graph_src": new_src,
  1191. "graph_tgt": new_tgt,
  1192. "norm_src": normalized_src,
  1193. "norm_tgt": normalized_tgt,
  1194. "data": edge_data.copy(),
  1195. }
  1196. # Apply relationship updates
  1197. logger.info(f"Entity Merge: updatign {len(relation_updates)} relations")
  1198. for rel_data in relation_updates.values():
  1199. await chunk_entity_relation_graph.upsert_edge(
  1200. rel_data["graph_src"], rel_data["graph_tgt"], rel_data["data"]
  1201. )
  1202. logger.info(
  1203. f"Entity Merge: updating relation `{rel_data['graph_src']}`~`{rel_data['graph_tgt']}`"
  1204. )
  1205. # Update relation chunk tracking storage
  1206. if relation_chunks_storage is not None and all_relations:
  1207. if old_relation_keys_to_delete:
  1208. await relation_chunks_storage.delete(old_relation_keys_to_delete)
  1209. if relation_chunk_tracking:
  1210. updates = {}
  1211. for storage_key, chunk_ids in relation_chunk_tracking.items():
  1212. updates[storage_key] = {
  1213. "chunk_ids": chunk_ids,
  1214. "count": len(chunk_ids),
  1215. }
  1216. await relation_chunks_storage.upsert(updates)
  1217. logger.info(
  1218. f"Entity Merge: {len(updates)} relation chunk tracking records updated"
  1219. )
  1220. # 7. Update relationship vector representations
  1221. logger.debug(
  1222. f"Entity Merge: deleting {len(relations_to_delete)} relations from vdb"
  1223. )
  1224. await relationships_vdb.delete(relations_to_delete)
  1225. for rel_data in relation_updates.values():
  1226. edge_data = rel_data["data"]
  1227. normalized_src = rel_data["norm_src"]
  1228. normalized_tgt = rel_data["norm_tgt"]
  1229. description = edge_data.get("description", "")
  1230. keywords = edge_data.get("keywords", "")
  1231. source_id = edge_data.get("source_id", "")
  1232. weight = float(edge_data.get("weight", 1.0))
  1233. # Use normalized order for content and relation ID
  1234. content = f"{keywords}\t{normalized_src}\n{normalized_tgt}\n{description}"
  1235. relation_id = compute_mdhash_id(normalized_src + normalized_tgt, prefix="rel-")
  1236. relation_data_for_vdb = {
  1237. relation_id: {
  1238. "content": content,
  1239. "src_id": normalized_src,
  1240. "tgt_id": normalized_tgt,
  1241. "source_id": source_id,
  1242. "description": description,
  1243. "keywords": keywords,
  1244. "weight": weight,
  1245. "file_path": edge_data.get("file_path", ""),
  1246. }
  1247. }
  1248. await relationships_vdb.upsert(relation_data_for_vdb)
  1249. logger.debug(
  1250. f"Entity Merge: updating vdb `{normalized_src}`~`{normalized_tgt}`"
  1251. )
  1252. logger.info(f"Entity Merge: {len(relation_updates)} relations in vdb updated")
  1253. # 8. Update entity vector representation
  1254. description = merged_entity_data.get("description", "")
  1255. source_id = merged_entity_data.get("source_id", "")
  1256. entity_type = merged_entity_data.get("entity_type", "")
  1257. content = target_entity + "\n" + description
  1258. entity_id = compute_mdhash_id(target_entity, prefix="ent-")
  1259. entity_data_for_vdb = {
  1260. entity_id: {
  1261. "content": content,
  1262. "entity_name": target_entity,
  1263. "source_id": source_id,
  1264. "description": description,
  1265. "entity_type": entity_type,
  1266. "file_path": merged_entity_data.get("file_path", ""),
  1267. }
  1268. }
  1269. await entities_vdb.upsert(entity_data_for_vdb)
  1270. logger.info(f"Entity Merge: updating vdb `{target_entity}`")
  1271. # 9. Merge entity chunk tracking (source entities first, then target entity)
  1272. if entity_chunks_storage is not None:
  1273. all_chunk_id_lists = []
  1274. # Build list of entities to process (source entities first, then target entity)
  1275. entities_to_process = []
  1276. # Add source entities first (excluding target if it's already in source list)
  1277. for entity_name in source_entities:
  1278. if entity_name != target_entity:
  1279. entities_to_process.append(entity_name)
  1280. # Add target entity last (if it exists)
  1281. if target_exists:
  1282. entities_to_process.append(target_entity)
  1283. # Process all entities in order with unified logic
  1284. for entity_name in entities_to_process:
  1285. stored = await entity_chunks_storage.get_by_id(entity_name)
  1286. if stored and isinstance(stored, dict):
  1287. chunk_ids = [cid for cid in stored.get("chunk_ids", []) if cid]
  1288. if chunk_ids:
  1289. all_chunk_id_lists.append(chunk_ids)
  1290. # Merge chunk_ids with ordered deduplication (preserves order, source entities first)
  1291. merged_chunk_ids = []
  1292. seen = set()
  1293. for chunk_id_list in all_chunk_id_lists:
  1294. for chunk_id in chunk_id_list:
  1295. if chunk_id not in seen:
  1296. seen.add(chunk_id)
  1297. merged_chunk_ids.append(chunk_id)
  1298. # Delete source entities' chunk tracking records
  1299. entity_keys_to_delete = [e for e in source_entities if e != target_entity]
  1300. if entity_keys_to_delete:
  1301. await entity_chunks_storage.delete(entity_keys_to_delete)
  1302. # Update target entity's chunk tracking
  1303. if merged_chunk_ids:
  1304. await entity_chunks_storage.upsert(
  1305. {
  1306. target_entity: {
  1307. "chunk_ids": merged_chunk_ids,
  1308. "count": len(merged_chunk_ids),
  1309. }
  1310. }
  1311. )
  1312. logger.info(
  1313. f"Entity Merge: find {len(merged_chunk_ids)} chunks related to '{target_entity}'"
  1314. )
  1315. # 10. Delete source entities
  1316. for entity_name in source_entities:
  1317. if entity_name == target_entity:
  1318. logger.warning(
  1319. f"Entity Merge: source entity'{entity_name}' is same as target entity"
  1320. )
  1321. continue
  1322. logger.info(f"Entity Merge: deleting '{entity_name}' from KG and vdb")
  1323. # Delete entity node and related edges from knowledge graph
  1324. await chunk_entity_relation_graph.delete_node(entity_name)
  1325. # Delete entity record from vector database
  1326. entity_id = compute_mdhash_id(entity_name, prefix="ent-")
  1327. await entities_vdb.delete([entity_id])
  1328. # 11. Save changes
  1329. await _persist_graph_updates(
  1330. entities_vdb=entities_vdb,
  1331. relationships_vdb=relationships_vdb,
  1332. chunk_entity_relation_graph=chunk_entity_relation_graph,
  1333. entity_chunks_storage=entity_chunks_storage,
  1334. relation_chunks_storage=relation_chunks_storage,
  1335. )
  1336. logger.info(
  1337. f"Entity Merge: successfully merged {len(source_entities)} entities into '{target_entity}'"
  1338. )
  1339. return await get_entity_info(
  1340. chunk_entity_relation_graph,
  1341. entities_vdb,
  1342. target_entity,
  1343. include_vector_data=True,
  1344. )
  1345. async def amerge_entities(
  1346. chunk_entity_relation_graph,
  1347. entities_vdb,
  1348. relationships_vdb,
  1349. source_entities: list[str],
  1350. target_entity: str,
  1351. merge_strategy: dict[str, str] = None,
  1352. target_entity_data: dict[str, Any] = None,
  1353. entity_chunks_storage=None,
  1354. relation_chunks_storage=None,
  1355. ) -> dict[str, Any]:
  1356. """Asynchronously merge multiple entities into one entity.
  1357. Merges multiple source entities into a target entity, handling all relationships,
  1358. and updating both the knowledge graph and vector database.
  1359. Also merges chunk tracking information from entity_chunks_storage and relation_chunks_storage.
  1360. Args:
  1361. chunk_entity_relation_graph: Graph storage instance
  1362. entities_vdb: Vector database storage for entities
  1363. relationships_vdb: Vector database storage for relationships
  1364. source_entities: List of source entity names to merge
  1365. target_entity: Name of the target entity after merging
  1366. merge_strategy: Deprecated (Each field uses its own default strategy). If provided,
  1367. customizations are applied but a warning is logged.
  1368. target_entity_data: Dictionary of specific values to set for the target entity,
  1369. overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"}
  1370. entity_chunks_storage: Optional KV storage for tracking chunks that reference entities
  1371. relation_chunks_storage: Optional KV storage for tracking chunks that reference relations
  1372. Returns:
  1373. Dictionary containing the merged entity information
  1374. """
  1375. # Collect all entities involved (source + target) and lock them all in sorted order
  1376. all_entities = set(source_entities)
  1377. all_entities.add(target_entity)
  1378. lock_keys = sorted(all_entities)
  1379. workspace = entities_vdb.global_config.get("workspace", "")
  1380. namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
  1381. async with get_storage_keyed_lock(
  1382. lock_keys, namespace=namespace, enable_logging=False
  1383. ):
  1384. try:
  1385. return await _merge_entities_impl(
  1386. chunk_entity_relation_graph,
  1387. entities_vdb,
  1388. relationships_vdb,
  1389. source_entities,
  1390. target_entity,
  1391. merge_strategy=merge_strategy,
  1392. target_entity_data=target_entity_data,
  1393. entity_chunks_storage=entity_chunks_storage,
  1394. relation_chunks_storage=relation_chunks_storage,
  1395. )
  1396. except Exception as e:
  1397. logger.error(f"Error merging entities: {e}")
  1398. raise
  1399. def _merge_attributes(
  1400. data_list: list[dict[str, Any]],
  1401. merge_strategy: dict[str, str],
  1402. filter_none_only: bool = False,
  1403. ) -> dict[str, Any]:
  1404. """Merge attributes from multiple entities or relationships.
  1405. This unified function handles merging of both entity and relationship attributes,
  1406. applying different merge strategies per field.
  1407. Args:
  1408. data_list: List of dictionaries containing entity or relationship data
  1409. merge_strategy: Merge strategy for each field. Supported strategies:
  1410. - "concatenate": Join all values with GRAPH_FIELD_SEP
  1411. - "keep_first": Keep the first non-empty value
  1412. - "keep_last": Keep the last non-empty value
  1413. - "join_unique": Join unique items separated by GRAPH_FIELD_SEP
  1414. - "join_unique_comma": Join unique items separated by comma and space
  1415. - "max": Keep the maximum numeric value (for numeric fields)
  1416. filter_none_only: If True, only filter None values (keep empty strings, 0, etc.).
  1417. If False, filter all falsy values. Default is False for backward compatibility.
  1418. Returns:
  1419. Dictionary containing merged data
  1420. """
  1421. merged_data = {}
  1422. # Collect all possible keys
  1423. all_keys = set()
  1424. for data in data_list:
  1425. all_keys.update(data.keys())
  1426. # Merge values for each key
  1427. for key in all_keys:
  1428. # Get all values for this key based on filtering mode
  1429. if filter_none_only:
  1430. values = [data.get(key) for data in data_list if data.get(key) is not None]
  1431. else:
  1432. values = [data.get(key) for data in data_list if data.get(key)]
  1433. if not values:
  1434. continue
  1435. # Merge values according to strategy
  1436. strategy = merge_strategy.get(key, "keep_first")
  1437. if strategy == "concatenate":
  1438. # Convert all values to strings and join with GRAPH_FIELD_SEP
  1439. merged_data[key] = GRAPH_FIELD_SEP.join(str(v) for v in values)
  1440. elif strategy == "keep_first":
  1441. merged_data[key] = values[0]
  1442. elif strategy == "keep_last":
  1443. merged_data[key] = values[-1]
  1444. elif strategy == "join_unique":
  1445. # Handle fields separated by GRAPH_FIELD_SEP
  1446. unique_items = set()
  1447. for value in values:
  1448. items = str(value).split(GRAPH_FIELD_SEP)
  1449. unique_items.update(items)
  1450. merged_data[key] = GRAPH_FIELD_SEP.join(unique_items)
  1451. elif strategy == "join_unique_comma":
  1452. # Handle fields separated by comma, join unique items with comma
  1453. unique_items = set()
  1454. for value in values:
  1455. items = str(value).split(",")
  1456. unique_items.update(item.strip() for item in items if item.strip())
  1457. merged_data[key] = ",".join(sorted(unique_items))
  1458. elif strategy == "max":
  1459. # For numeric fields like weight
  1460. try:
  1461. merged_data[key] = max(float(v) for v in values)
  1462. except (ValueError, TypeError):
  1463. # Fallback to first value if conversion fails
  1464. merged_data[key] = values[0]
  1465. else:
  1466. # Default strategy: keep first value
  1467. merged_data[key] = values[0]
  1468. return merged_data
  1469. async def get_entity_info(
  1470. chunk_entity_relation_graph,
  1471. entities_vdb,
  1472. entity_name: str,
  1473. include_vector_data: bool = False,
  1474. ) -> dict[str, str | None | dict[str, str]]:
  1475. """Get detailed information of an entity"""
  1476. # Get information from the graph
  1477. node_data = await chunk_entity_relation_graph.get_node(entity_name)
  1478. source_id = node_data.get("source_id") if node_data else None
  1479. result: dict[str, str | None | dict[str, str]] = {
  1480. "entity_name": entity_name,
  1481. "source_id": source_id,
  1482. "graph_data": node_data,
  1483. }
  1484. # Optional: Get vector database information
  1485. if include_vector_data:
  1486. entity_id = compute_mdhash_id(entity_name, prefix="ent-")
  1487. vector_data = await entities_vdb.get_by_id(entity_id)
  1488. result["vector_data"] = vector_data
  1489. return result
  1490. async def get_relation_info(
  1491. chunk_entity_relation_graph,
  1492. relationships_vdb,
  1493. src_entity: str,
  1494. tgt_entity: str,
  1495. include_vector_data: bool = False,
  1496. ) -> dict[str, str | None | dict[str, str]]:
  1497. """
  1498. Get detailed information of a relationship between two entities.
  1499. Relationship is unidirectional, swap src_entity and tgt_entity does not change the relationship.
  1500. Args:
  1501. src_entity: Source entity name
  1502. tgt_entity: Target entity name
  1503. include_vector_data: Whether to include vector database information
  1504. Returns:
  1505. Dictionary containing relationship information
  1506. """
  1507. # Get information from the graph
  1508. edge_data = await chunk_entity_relation_graph.get_edge(src_entity, tgt_entity)
  1509. source_id = edge_data.get("source_id") if edge_data else None
  1510. result: dict[str, str | None | dict[str, str]] = {
  1511. "src_entity": src_entity,
  1512. "tgt_entity": tgt_entity,
  1513. "source_id": source_id,
  1514. "graph_data": edge_data,
  1515. }
  1516. # Optional: Get vector database information
  1517. if include_vector_data:
  1518. vector_data = None
  1519. for rel_id in make_relation_vdb_ids(src_entity, tgt_entity):
  1520. vector_data = await relationships_vdb.get_by_id(rel_id)
  1521. if vector_data is not None:
  1522. break
  1523. result["vector_data"] = vector_data
  1524. return result