| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317 |
- import pytest
- from lightrag.constants import SOURCE_IDS_LIMIT_METHOD_KEEP
- from lightrag.constants import GRAPH_FIELD_SEP
- from lightrag.operate import (
- _handle_single_entity_extraction,
- _merge_nodes_then_upsert,
- _normalize_text_extraction_record_attributes,
- _handle_single_relationship_extraction,
- )
- from lightrag import utils_graph
- class DummyGraphStorage:
- def __init__(self, node=None):
- self.node = node
- self.upserted_nodes = []
- async def get_node(self, node_id):
- return self.node
- async def upsert_node(self, node_id, node_data):
- self.upserted_nodes.append((node_id, node_data))
- self.node = dict(node_data)
- class DummyVectorStorage:
- def __init__(self):
- self.global_config = {"workspace": "test"}
- self.upserts = []
- self.deletes = []
- async def upsert(self, data):
- self.upserts.append(data)
- return None
- async def delete(self, ids):
- self.deletes.append(ids)
- return None
- async def get_by_id(self, id_):
- return None
- async def index_done_callback(self):
- return True
- class DummyAsyncContext:
- async def __aenter__(self):
- return None
- async def __aexit__(self, exc_type, exc, tb):
- return False
- class DummyMergeGraphStorage:
- def __init__(self):
- self.nodes = {
- "Canonical": {
- "entity_id": "Canonical",
- "description": "canonical desc",
- "entity_type": "ORG",
- "source_id": "chunk-1",
- "file_path": "canonical.md",
- },
- "Alias": {
- "entity_id": "Alias",
- "description": "alias desc",
- "entity_type": "ORG",
- "source_id": "chunk-2",
- "file_path": "alias.md",
- },
- "Neighbor": {
- "entity_id": "Neighbor",
- "description": "neighbor desc",
- "entity_type": "ORG",
- "source_id": "chunk-3",
- "file_path": "neighbor.md",
- },
- }
- self.edges = {
- ("Alias", "Neighbor"): {
- "description": "rel desc",
- "keywords": "alias",
- "source_id": "chunk-rel",
- "weight": 1.0,
- "file_path": "rel.md",
- }
- }
- async def has_node(self, node_id):
- return node_id in self.nodes
- async def get_node(self, node_id):
- return self.nodes[node_id]
- async def upsert_node(self, node_id, node_data):
- self.nodes[node_id] = dict(node_data)
- async def get_node_edges(self, node_id):
- results = []
- for src, tgt in self.edges:
- if src == node_id or tgt == node_id:
- results.append((src, tgt))
- return results
- async def get_edge(self, src, tgt):
- return self.edges.get((src, tgt)) or self.edges.get((tgt, src))
- async def upsert_edge(self, src, tgt, edge_data):
- self.edges[(src, tgt)] = dict(edge_data)
- async def delete_node(self, node_id):
- self.nodes.pop(node_id, None)
- self.edges = {
- (src, tgt): data
- for (src, tgt), data in self.edges.items()
- if src != node_id and tgt != node_id
- }
- async def index_done_callback(self):
- return True
- @pytest.mark.asyncio
- async def test_merge_nodes_then_upsert_handles_missing_legacy_description():
- graph = DummyGraphStorage(node={"source_id": "chunk-1"})
- global_config = {
- "source_ids_limit_method": SOURCE_IDS_LIMIT_METHOD_KEEP,
- "max_source_ids_per_entity": 20,
- }
- result = await _merge_nodes_then_upsert(
- entity_name="LegacyEntity",
- nodes_data=[],
- knowledge_graph_inst=graph,
- entity_vdb=None,
- global_config=global_config,
- )
- assert result["description"] == "Entity LegacyEntity"
- assert graph.upserted_nodes[-1][1]["description"] == "Entity LegacyEntity"
- @pytest.mark.asyncio
- async def test_acreate_entity_rejects_empty_description():
- with pytest.raises(ValueError, match="description cannot be empty"):
- await utils_graph.acreate_entity(
- chunk_entity_relation_graph=None,
- entities_vdb=None,
- relationships_vdb=None,
- entity_name="EntityA",
- entity_data={"description": " "},
- )
- @pytest.mark.asyncio
- async def test_acreate_relation_rejects_empty_description():
- with pytest.raises(ValueError, match="description cannot be empty"):
- await utils_graph.acreate_relation(
- chunk_entity_relation_graph=None,
- entities_vdb=None,
- relationships_vdb=None,
- source_entity="A",
- target_entity="B",
- relation_data={"description": ""},
- )
- @pytest.mark.asyncio
- async def test_aedit_entity_rejects_empty_description():
- with pytest.raises(ValueError, match="description cannot be empty"):
- await utils_graph.aedit_entity(
- chunk_entity_relation_graph=None,
- entities_vdb=None,
- relationships_vdb=None,
- entity_name="EntityA",
- updated_data={"description": None},
- )
- @pytest.mark.asyncio
- async def test_aedit_relation_rejects_empty_description():
- with pytest.raises(ValueError, match="description cannot be empty"):
- await utils_graph.aedit_relation(
- chunk_entity_relation_graph=None,
- entities_vdb=None,
- relationships_vdb=None,
- source_entity="A",
- target_entity="B",
- updated_data={"description": " "},
- )
- @pytest.mark.asyncio
- async def test_aedit_entity_allows_updates_without_description(monkeypatch):
- async def fake_edit_impl(*args, **kwargs):
- return {"entity_name": "EntityA", "description": "kept", "source_id": "chunk-1"}
- monkeypatch.setattr(
- utils_graph, "get_storage_keyed_lock", lambda *a, **k: DummyAsyncContext()
- )
- monkeypatch.setattr(utils_graph, "_edit_entity_impl", fake_edit_impl)
- result = await utils_graph.aedit_entity(
- chunk_entity_relation_graph=None,
- entities_vdb=DummyVectorStorage(),
- relationships_vdb=DummyVectorStorage(),
- entity_name="EntityA",
- updated_data={"entity_type": "ORG"},
- )
- assert result["operation_summary"]["operation_status"] == "success"
- def test_handle_single_relationship_extraction_ignores_empty_description():
- relation = _handle_single_relationship_extraction(
- ["relation", "Alice", "Bob", "works_with", " "],
- chunk_key="chunk-1",
- timestamp=1,
- )
- assert relation is None
- def test_mis_prefixed_relation_row_is_recovered():
- record = _normalize_text_extraction_record_attributes(
- ["entity", "Alice", "Acme Corp", "founded", "Alice founded Acme Corp."],
- chunk_key="chunk-1",
- )
- relation = _handle_single_relationship_extraction(
- record,
- chunk_key="chunk-1",
- timestamp=1,
- )
- assert relation is not None
- assert relation["src_id"] == "Alice"
- assert relation["tgt_id"] == "Acme Corp"
- def test_four_part_entity_row_remains_entity():
- record = _normalize_text_extraction_record_attributes(
- ["entity", "Alice", "Person", "Alice is the founder of Acme Corp."],
- chunk_key="chunk-1",
- )
- entity = _handle_single_entity_extraction(
- record,
- chunk_key="chunk-1",
- timestamp=1,
- )
- assert entity is not None
- assert entity["entity_name"] == "Alice"
- def test_malformed_recovered_relation_still_fails():
- record = _normalize_text_extraction_record_attributes(
- ["entity", "Alice", "Acme Corp", "founded", " "],
- chunk_key="chunk-1",
- )
- relation = _handle_single_relationship_extraction(
- record,
- chunk_key="chunk-1",
- timestamp=1,
- )
- assert relation is None
- def test_unrelated_five_part_prefix_remains_invalid():
- record = _normalize_text_extraction_record_attributes(
- ["edge", "Alice", "Acme Corp", "founded", "Alice founded Acme Corp."],
- chunk_key="chunk-1",
- )
- relation = _handle_single_relationship_extraction(
- record,
- chunk_key="chunk-1",
- timestamp=1,
- )
- assert relation is None
- @pytest.mark.asyncio
- async def test_merge_entities_preserves_file_path_in_vector_updates(monkeypatch):
- graph = DummyMergeGraphStorage()
- entities_vdb = DummyVectorStorage()
- relationships_vdb = DummyVectorStorage()
- async def fake_get_entity_info(*args, **kwargs):
- return {"entity_name": "Canonical"}
- monkeypatch.setattr(utils_graph, "get_entity_info", fake_get_entity_info)
- await utils_graph._merge_entities_impl(
- chunk_entity_relation_graph=graph,
- entities_vdb=entities_vdb,
- relationships_vdb=relationships_vdb,
- source_entities=["Alias", "Canonical"],
- target_entity="Canonical",
- )
- relationship_payload = relationships_vdb.upserts[-1]
- entity_payload = entities_vdb.upserts[-1]
- assert next(iter(relationship_payload.values()))["file_path"] == "rel.md"
- assert set(
- next(iter(entity_payload.values()))["file_path"].split(GRAPH_FIELD_SEP)
- ) == {
- "alias.md",
- "canonical.md",
- }
|