| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574 |
- #!/usr/bin/env python
- """
- General-purpose graph storage test program.
- This program selects the graph storage type to use based on the LIGHTRAG_GRAPH_STORAGE configuration in .env,
- and tests its basic and advanced operations.
- Supported graph storage types include:
- - NetworkXStorage
- - Neo4JStorage
- - MongoDBStorage
- - PGGraphStorage
- - MemgraphStorage
- """
- import asyncio
- import os
- import sys
- import importlib
- import numpy as np
- import pytest
- from dotenv import load_dotenv
- from ascii_colors import ASCIIColors
- # Add the project root directory to the front of the Python path so this
- # script always exercises the checked-out source tree, not a stale installed
- # lightrag package from the active virtualenv.
- sys.path.insert(
- 0,
- os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
- )
- from lightrag.types import KnowledgeGraph
- from lightrag.kg import (
- STORAGE_IMPLEMENTATIONS,
- STORAGE_ENV_REQUIREMENTS,
- STORAGES,
- verify_storage_implementation,
- )
- from lightrag.kg.shared_storage import initialize_share_data
- from lightrag.constants import GRAPH_FIELD_SEP
- # Mock embedding function that returns random vectors
- async def mock_embedding_func(texts):
- return np.random.rand(len(texts), 10) # Return 10-dimensional random vectors
- def check_env_file():
- """
- Check if the .env file exists and issue a warning if it does not.
- Returns True to continue execution, False to exit.
- """
- if not os.path.exists(".env"):
- warning_msg = "Warning: .env file not found in the current directory. This may affect storage configuration loading."
- ASCIIColors.yellow(warning_msg)
- # Check if running in an interactive terminal
- if sys.stdin.isatty():
- response = input("Do you want to continue? (yes/NO): ")
- if response.lower() != "yes":
- ASCIIColors.red("Test program cancelled.")
- return False
- return True
- async def initialize_graph_storage():
- """
- Initialize the corresponding graph storage instance based on environment variables.
- Returns the initialized storage instance.
- """
- # Get the graph storage type from environment variables
- graph_storage_type = os.getenv("LIGHTRAG_GRAPH_STORAGE", "NetworkXStorage")
- # Validate the storage type
- try:
- verify_storage_implementation("GRAPH_STORAGE", graph_storage_type)
- except ValueError as e:
- ASCIIColors.red(f"Error: {str(e)}")
- ASCIIColors.yellow(
- f"Supported graph storage types: {', '.join(STORAGE_IMPLEMENTATIONS['GRAPH_STORAGE']['implementations'])}"
- )
- return None
- # Check for required environment variables
- required_env_vars = STORAGE_ENV_REQUIREMENTS.get(graph_storage_type, [])
- missing_env_vars = [var for var in required_env_vars if not os.getenv(var)]
- if missing_env_vars:
- ASCIIColors.red(
- f"Error: {graph_storage_type} requires the following environment variables, but they are not set: {', '.join(missing_env_vars)}"
- )
- return None
- # Dynamically import the corresponding module
- module_path = STORAGES.get(graph_storage_type)
- if not module_path:
- ASCIIColors.red(f"Error: Module path for {graph_storage_type} not found.")
- return None
- try:
- module = importlib.import_module(module_path, package="lightrag")
- storage_class = getattr(module, graph_storage_type)
- except (ImportError, AttributeError) as e:
- ASCIIColors.red(f"Error: Failed to import {graph_storage_type}: {str(e)}")
- return None
- # Initialize the storage instance
- global_config = {
- "embedding_batch_num": 10, # Batch size
- "vector_db_storage_cls_kwargs": {
- "cosine_better_than_threshold": 0.5 # Cosine similarity threshold
- },
- "working_dir": os.environ.get(
- "WORKING_DIR", "./rag_storage"
- ), # Working directory
- }
- # Initialize shared_storage for all storage types (required for locks)
- initialize_share_data() # Use single-process mode (workers=1)
- try:
- storage = storage_class(
- namespace="test_graph",
- workspace="test_workspace",
- global_config=global_config,
- embedding_func=mock_embedding_func,
- )
- # Initialize the connection
- await storage.initialize()
- return storage
- except Exception as e:
- ASCIIColors.red(f"Error: Failed to initialize {graph_storage_type}: {str(e)}")
- return None
- @pytest.fixture
- async def storage():
- """
- Pytest fixture for graph storage integration tests.
- Each test gets an initialized storage instance with a clean graph state.
- """
- load_dotenv(dotenv_path=".env", override=False)
- if not check_env_file():
- pytest.skip(".env file not available for graph storage integration tests")
- storage_instance = await initialize_graph_storage()
- if storage_instance is None:
- pytest.skip("Graph storage backend is not configured for integration tests")
- try:
- await storage_instance.drop()
- yield storage_instance
- finally:
- try:
- await storage_instance.drop()
- except Exception as exc:
- ASCIIColors.yellow(f"Warning: failed to drop test graph data: {exc}")
- finally:
- await storage_instance.finalize()
- @pytest.mark.integration
- @pytest.mark.requires_db
- async def test_graph_basic(storage):
- """
- Test basic graph database operations:
- 1. Use upsert_node to insert two nodes.
- 2. Use upsert_edge to insert an edge connecting the two nodes.
- 3. Use get_node to read a node.
- 4. Use get_edge to read an edge.
- """
- try:
- # 1. Insert the first node
- node1_id = "Artificial Intelligence"
- node1_data = {
- "entity_id": node1_id,
- "description": "Artificial intelligence is a branch of computer science that aims to understand the essence of intelligence and produce a new kind of intelligent machine that can react in a manner similar to human intelligence.",
- "keywords": "AI,Machine Learning,Deep Learning",
- "entity_type": "Technology Field",
- }
- print(f"Inserting node 1: {node1_id}")
- await storage.upsert_node(node1_id, node1_data)
- # 2. Insert the second node
- node2_id = "Machine Learning"
- node2_data = {
- "entity_id": node2_id,
- "description": "Machine learning is a branch of artificial intelligence that uses statistical methods to enable computer systems to learn without being explicitly programmed.",
- "keywords": "Supervised Learning,Unsupervised Learning,Reinforcement Learning",
- "entity_type": "Technology Field",
- }
- print(f"Inserting node 2: {node2_id}")
- await storage.upsert_node(node2_id, node2_data)
- # 3. Insert the connecting edge
- edge_data = {
- "relationship": "includes",
- "weight": 1.0,
- "description": "The field of artificial intelligence includes the subfield of machine learning.",
- }
- print(f"Inserting edge: {node1_id} -> {node2_id}")
- await storage.upsert_edge(node1_id, node2_id, edge_data)
- # 4. Read node properties
- print(f"Reading node properties: {node1_id}")
- node1_props = await storage.get_node(node1_id)
- if node1_props:
- print(f"Successfully read node properties: {node1_id}")
- print(
- f"Node description: {node1_props.get('description', 'No description')}"
- )
- print(f"Node type: {node1_props.get('entity_type', 'No type')}")
- print(f"Node keywords: {node1_props.get('keywords', 'No keywords')}")
- # Verify that the returned properties are correct
- assert (
- node1_props.get("entity_id") == node1_id
- ), f"Node ID mismatch: expected {node1_id}, got {node1_props.get('entity_id')}"
- assert (
- node1_props.get("description") == node1_data["description"]
- ), "Node description mismatch"
- assert (
- node1_props.get("entity_type") == node1_data["entity_type"]
- ), "Node type mismatch"
- else:
- print(f"Failed to read node properties: {node1_id}")
- assert False, f"Failed to read node properties: {node1_id}"
- # 5. Read edge properties
- print(f"Reading edge properties: {node1_id} -> {node2_id}")
- edge_props = await storage.get_edge(node1_id, node2_id)
- if edge_props:
- print(f"Successfully read edge properties: {node1_id} -> {node2_id}")
- print(
- f"Edge relationship: {edge_props.get('relationship', 'No relationship')}"
- )
- print(
- f"Edge description: {edge_props.get('description', 'No description')}"
- )
- print(f"Edge weight: {edge_props.get('weight', 'No weight')}")
- # Verify that the returned properties are correct
- assert (
- edge_props.get("relationship") == edge_data["relationship"]
- ), "Edge relationship mismatch"
- assert (
- edge_props.get("description") == edge_data["description"]
- ), "Edge description mismatch"
- assert (
- edge_props.get("weight") == edge_data["weight"]
- ), "Edge weight mismatch"
- else:
- print(f"Failed to read edge properties: {node1_id} -> {node2_id}")
- assert False, f"Failed to read edge properties: {node1_id} -> {node2_id}"
- # 5.1 Verify undirected graph property - read reverse edge properties
- print(f"Reading reverse edge properties: {node2_id} -> {node1_id}")
- reverse_edge_props = await storage.get_edge(node2_id, node1_id)
- if reverse_edge_props:
- print(
- f"Successfully read reverse edge properties: {node2_id} -> {node1_id}"
- )
- print(
- f"Reverse edge relationship: {reverse_edge_props.get('relationship', 'No relationship')}"
- )
- print(
- f"Reverse edge description: {reverse_edge_props.get('description', 'No description')}"
- )
- print(
- f"Reverse edge weight: {reverse_edge_props.get('weight', 'No weight')}"
- )
- # Verify that forward and reverse edge properties are the same
- assert (
- edge_props == reverse_edge_props
- ), "Forward and reverse edge properties are not consistent, undirected graph property verification failed"
- print(
- "Undirected graph property verification successful: forward and reverse edge properties are consistent"
- )
- else:
- print(f"Failed to read reverse edge properties: {node2_id} -> {node1_id}")
- assert False, f"Failed to read reverse edge properties: {node2_id} -> {node1_id}, undirected graph property verification failed"
- print("Basic tests completed, data is preserved in the database.")
- return True
- except Exception as e:
- ASCIIColors.red(f"An error occurred during the test: {str(e)}")
- return False
- @pytest.mark.integration
- @pytest.mark.requires_db
- async def test_graph_advanced(storage):
- """
- Test advanced graph database operations:
- 1. Use node_degree to get the degree of a node.
- 2. Use edge_degree to get the degree of an edge.
- 3. Use get_node_edges to get all edges of a node.
- 4. Use get_all_labels to get all labels.
- 5. Use get_knowledge_graph to get a knowledge graph.
- 6. Use delete_node to delete a node.
- 7. Use remove_nodes to delete multiple nodes.
- 8. Use remove_edges to delete edges.
- 9. Use drop to clean up data.
- """
- try:
- # 1. Insert test data
- # Insert node 1: Artificial Intelligence
- node1_id = "Artificial Intelligence"
- node1_data = {
- "entity_id": node1_id,
- "description": "Artificial intelligence is a branch of computer science that aims to understand the essence of intelligence and produce a new kind of intelligent machine that can react in a manner similar to human intelligence.",
- "keywords": "AI,Machine Learning,Deep Learning",
- "entity_type": "Technology Field",
- }
- print(f"Inserting node 1: {node1_id}")
- await storage.upsert_node(node1_id, node1_data)
- # Insert node 2: Machine Learning
- node2_id = "Machine Learning"
- node2_data = {
- "entity_id": node2_id,
- "description": "Machine learning is a branch of artificial intelligence that uses statistical methods to enable computer systems to learn without being explicitly programmed.",
- "keywords": "Supervised Learning,Unsupervised Learning,Reinforcement Learning",
- "entity_type": "Technology Field",
- }
- print(f"Inserting node 2: {node2_id}")
- await storage.upsert_node(node2_id, node2_data)
- # Insert node 3: Deep Learning
- node3_id = "Deep Learning"
- node3_data = {
- "entity_id": node3_id,
- "description": "Deep learning is a branch of machine learning that uses multi-layered neural networks to simulate the learning process of the human brain.",
- "keywords": "Neural Networks,CNN,RNN",
- "entity_type": "Technology Field",
- }
- print(f"Inserting node 3: {node3_id}")
- await storage.upsert_node(node3_id, node3_data)
- # Insert edge 1: Artificial Intelligence -> Machine Learning
- edge1_data = {
- "relationship": "includes",
- "weight": 1.0,
- "description": "The field of artificial intelligence includes the subfield of machine learning.",
- }
- print(f"Inserting edge 1: {node1_id} -> {node2_id}")
- await storage.upsert_edge(node1_id, node2_id, edge1_data)
- # Insert edge 2: Machine Learning -> Deep Learning
- edge2_data = {
- "relationship": "includes",
- "weight": 1.0,
- "description": "The field of machine learning includes the subfield of deep learning.",
- }
- print(f"Inserting edge 2: {node2_id} -> {node3_id}")
- await storage.upsert_edge(node2_id, node3_id, edge2_data)
- # 2. Test node_degree - get the degree of a node
- print(f"== Testing node_degree: {node1_id}")
- node1_degree = await storage.node_degree(node1_id)
- print(f"Degree of node {node1_id}: {node1_degree}")
- assert (
- node1_degree == 1
- ), f"Degree of node {node1_id} should be 1, but got {node1_degree}"
- # 2.1 Test degrees of all nodes
- print("== Testing degrees of all nodes")
- node2_degree = await storage.node_degree(node2_id)
- node3_degree = await storage.node_degree(node3_id)
- print(f"Degree of node {node2_id}: {node2_degree}")
- print(f"Degree of node {node3_id}: {node3_degree}")
- assert (
- node2_degree == 2
- ), f"Degree of node {node2_id} should be 2, but got {node2_degree}"
- assert (
- node3_degree == 1
- ), f"Degree of node {node3_id} should be 1, but got {node3_degree}"
- # 3. Test edge_degree - get the degree of an edge
- print(f"== Testing edge_degree: {node1_id} -> {node2_id}")
- edge_degree = await storage.edge_degree(node1_id, node2_id)
- print(f"Degree of edge {node1_id} -> {node2_id}: {edge_degree}")
- assert (
- edge_degree == 3
- ), f"Degree of edge {node1_id} -> {node2_id} should be 3, but got {edge_degree}"
- # 3.1 Test reverse edge degree - verify undirected graph property
- print(f"== Testing reverse edge degree: {node2_id} -> {node1_id}")
- reverse_edge_degree = await storage.edge_degree(node2_id, node1_id)
- print(f"Degree of reverse edge {node2_id} -> {node1_id}: {reverse_edge_degree}")
- assert (
- edge_degree == reverse_edge_degree
- ), "Degrees of forward and reverse edges are not consistent, undirected graph property verification failed"
- print(
- "Undirected graph property verification successful: degrees of forward and reverse edges are consistent"
- )
- # 4. Test get_node_edges - get all edges of a node
- print(f"== Testing get_node_edges: {node2_id}")
- node2_edges = await storage.get_node_edges(node2_id)
- print(f"All edges of node {node2_id}: {node2_edges}")
- assert (
- len(node2_edges) == 2
- ), f"Node {node2_id} should have 2 edges, but got {len(node2_edges)}"
- # 4.1 Verify undirected graph property of node edges
- print("== Verifying undirected graph property of node edges")
- # Check if it includes connections with node1 and node3 (regardless of direction)
- has_connection_with_node1 = False
- has_connection_with_node3 = False
- for edge in node2_edges:
- # Check for connection with node1 (regardless of direction)
- if (edge[0] == node1_id and edge[1] == node2_id) or (
- edge[0] == node2_id and edge[1] == node1_id
- ):
- has_connection_with_node1 = True
- # Check for connection with node3 (regardless of direction)
- if (edge[0] == node2_id and edge[1] == node3_id) or (
- edge[0] == node3_id and edge[1] == node2_id
- ):
- has_connection_with_node3 = True
- assert (
- has_connection_with_node1
- ), f"Edge list of node {node2_id} should include a connection with {node1_id}"
- assert (
- has_connection_with_node3
- ), f"Edge list of node {node2_id} should include a connection with {node3_id}"
- print(
- f"Undirected graph property verification successful: edge list of node {node2_id} contains all relevant edges"
- )
- # 5. Test get_all_labels - get all labels
- print("== Testing get_all_labels")
- all_labels = await storage.get_all_labels()
- print(f"All labels: {all_labels}")
- assert len(all_labels) == 3, f"Should have 3 labels, but got {len(all_labels)}"
- assert node1_id in all_labels, f"{node1_id} should be in the label list"
- assert node2_id in all_labels, f"{node2_id} should be in the label list"
- assert node3_id in all_labels, f"{node3_id} should be in the label list"
- # 6. Test get_knowledge_graph - get a knowledge graph
- print("== Testing get_knowledge_graph")
- kg = await storage.get_knowledge_graph("*", max_depth=2, max_nodes=10)
- print(f"Number of nodes in knowledge graph: {len(kg.nodes)}")
- print(f"Number of edges in knowledge graph: {len(kg.edges)}")
- assert isinstance(
- kg, KnowledgeGraph
- ), "The returned result should be of type KnowledgeGraph"
- assert (
- len(kg.nodes) == 3
- ), f"The knowledge graph should have 3 nodes, but got {len(kg.nodes)}"
- assert (
- len(kg.edges) == 2
- ), f"The knowledge graph should have 2 edges, but got {len(kg.edges)}"
- # 7. Test delete_node - delete a node
- print(f"== Testing delete_node: {node3_id}")
- await storage.delete_node(node3_id)
- node3_props = await storage.get_node(node3_id)
- print(f"Querying node properties after deletion {node3_id}: {node3_props}")
- assert node3_props is None, f"Node {node3_id} should have been deleted"
- # Re-insert node 3 for subsequent tests
- await storage.upsert_node(node3_id, node3_data)
- await storage.upsert_edge(node2_id, node3_id, edge2_data)
- # 8. Test remove_edges - delete edges
- print(f"== Testing remove_edges: {node2_id} -> {node3_id}")
- await storage.remove_edges([(node2_id, node3_id)])
- edge_props = await storage.get_edge(node2_id, node3_id)
- print(
- f"Querying edge properties after deletion {node2_id} -> {node3_id}: {edge_props}"
- )
- assert (
- edge_props is None
- ), f"Edge {node2_id} -> {node3_id} should have been deleted"
- # 8.1 Verify undirected graph property of edge deletion
- print(
- f"== Verifying undirected graph property of edge deletion: {node3_id} -> {node2_id}"
- )
- reverse_edge_props = await storage.get_edge(node3_id, node2_id)
- print(
- f"Querying reverse edge properties after deletion {node3_id} -> {node2_id}: {reverse_edge_props}"
- )
- assert (
- reverse_edge_props is None
- ), f"Reverse edge {node3_id} -> {node2_id} should also be deleted, undirected graph property verification failed"
- print(
- "Undirected graph property verification successful: deleting an edge in one direction also deletes the reverse edge"
- )
- # 9. Test remove_nodes - delete multiple nodes
- print(f"== Testing remove_nodes: [{node2_id}, {node3_id}]")
- await storage.remove_nodes([node2_id, node3_id])
- node2_props = await storage.get_node(node2_id)
- node3_props = await storage.get_node(node3_id)
- print(f"Querying node properties after deletion {node2_id}: {node2_props}")
- print(f"Querying node properties after deletion {node3_id}: {node3_props}")
- assert node2_props is None, f"Node {node2_id} should have been deleted"
- assert node3_props is None, f"Node {node3_id} should have been deleted"
- print("\nAdvanced tests completed.")
- return True
- except Exception as e:
- ASCIIColors.red(f"An error occurred during the test: {str(e)}")
- return False
- @pytest.mark.integration
- @pytest.mark.requires_db
- async def test_graph_batch_operations(storage):
- """
- Test batch operations of the graph database:
- 1. Use get_nodes_batch to get properties of multiple nodes in batch.
- 2. Use node_degrees_batch to get degrees of multiple nodes in batch.
- 3. Use edge_degrees_batch to get degrees of multiple edges in batch.
- 4. Use get_edges_batch to get properties of multiple edges in batch.
- 5. Use get_nodes_edges_batch to get all edges of multiple nodes in batch.
- """
- try:
- chunk1_id = "1"
- chunk2_id = "2"
- chunk3_id = "3"
- # 1. Insert test data
- # Insert node 1: Artificial Intelligence
- node1_id = "Artificial Intelligence"
- node1_data = {
- "entity_id": node1_id,
- "description": "Artificial intelligence is a branch of computer science that aims to understand the essence of intelligence and produce a new kind of intelligent machine that can react in a manner similar to human intelligence.",
- "keywords": "AI,Machine Learning,Deep Learning",
- "entity_type": "Technology Field",
- "source_id": GRAPH_FIELD_SEP.join([chunk1_id, chunk2_id]),
- }
- print(f"Inserting node 1: {node1_id}")
- await storage.upsert_node(node1_id, node1_data)
- # Insert node 2: Machine Learning
- node2_id = "Machine Learning"
- node2_data = {
- "entity_id": node2_id,
- "description": "Machine learning is a branch of artificial intelligence that uses statistical methods to enable computer systems to learn without being explicitly programmed.",
- "keywords": "Supervised Learning,Unsupervised Learning,Reinforcement Learning",
- "entity_type": "Technology Field",
- "source_id": GRAPH_FIELD_SEP.join([chunk2_id, chunk3_id]),
- }
- print(f"Inserting node 2: {node2_id}")
- await storage.upsert_node(node2_id, node2_data)
- # Insert node 3: Deep Learning
- node3_id = "Deep Learning"
- node3_data = {
- "entity_id": node3_id,
- "description": "Deep learning is a branch of machine learning that uses multi-layered neural networks to simulate the learning process of the human brain.",
- "keywords": "Neural Networks,CNN,RNN",
- "entity_type": "Technology Field",
- "source_id": GRAPH_FIELD_SEP.join([chunk3_id]),
- }
- print(f"Inserting node 3: {node3_id}")
- await storage.upsert_node(node3_id, node3_data)
- # Insert node 4: Natural Language Processing
- node4_id = "Natural Language Processing"
- node4_data = {
- "entity_id": node4_id,
- "description": "Natural language processing is a branch of artificial intelligence that focuses on enabling computers to understand and process human language.",
- "keywords": "NLP,Text Analysis,Language Models",
- "entity_type": "Technology Field",
- }
- print(f"Inserting node 4: {node4_id}")
- await storage.upsert_node(node4_id, node4_data)
- # Insert node 5: Computer Vision
- node5_id = "Computer Vision"
- node5_data = {
- "entity_id": node5_id,
- "description": "Computer vision is a branch of artificial intelligence that focuses on enabling computers to gain information from images or videos.",
- "keywords": "CV,Image Recognition,Object Detection",
- "entity_type": "Technology Field",
- }
- print(f"Inserting node 5: {node5_id}")
- await storage.upsert_node(node5_id, node5_data)
- # Insert edge 1: Artificial Intelligence -> Machine Learning
- edge1_data = {
- "relationship": "includes",
- "weight": 1.0,
- "description": "The field of artificial intelligence includes the subfield of machine learning.",
- "source_id": GRAPH_FIELD_SEP.join([chunk1_id, chunk2_id]),
- }
- print(f"Inserting edge 1: {node1_id} -> {node2_id}")
- await storage.upsert_edge(node1_id, node2_id, edge1_data)
- # Insert edge 2: Machine Learning -> Deep Learning
- edge2_data = {
- "relationship": "includes",
- "weight": 1.0,
- "description": "The field of machine learning includes the subfield of deep learning.",
- "source_id": GRAPH_FIELD_SEP.join([chunk2_id, chunk3_id]),
- }
- print(f"Inserting edge 2: {node2_id} -> {node3_id}")
- await storage.upsert_edge(node2_id, node3_id, edge2_data)
- # Insert edge 3: Artificial Intelligence -> Natural Language Processing
- edge3_data = {
- "relationship": "includes",
- "weight": 1.0,
- "description": "The field of artificial intelligence includes the subfield of natural language processing.",
- "source_id": GRAPH_FIELD_SEP.join([chunk3_id]),
- }
- print(f"Inserting edge 3: {node1_id} -> {node4_id}")
- await storage.upsert_edge(node1_id, node4_id, edge3_data)
- # Insert edge 4: Artificial Intelligence -> Computer Vision
- edge4_data = {
- "relationship": "includes",
- "weight": 1.0,
- "description": "The field of artificial intelligence includes the subfield of computer vision.",
- }
- print(f"Inserting edge 4: {node1_id} -> {node5_id}")
- await storage.upsert_edge(node1_id, node5_id, edge4_data)
- # Insert edge 5: Deep Learning -> Natural Language Processing
- edge5_data = {
- "relationship": "applied to",
- "weight": 0.8,
- "description": "Deep learning techniques are applied in the field of natural language processing.",
- }
- print(f"Inserting edge 5: {node3_id} -> {node4_id}")
- await storage.upsert_edge(node3_id, node4_id, edge5_data)
- # Insert edge 6: Deep Learning -> Computer Vision
- edge6_data = {
- "relationship": "applied to",
- "weight": 0.8,
- "description": "Deep learning techniques are applied in the field of computer vision.",
- }
- print(f"Inserting edge 6: {node3_id} -> {node5_id}")
- await storage.upsert_edge(node3_id, node5_id, edge6_data)
- # 2. Test get_nodes_batch - batch get properties of multiple nodes
- print("== Testing get_nodes_batch")
- node_ids = [node1_id, node2_id, node3_id]
- nodes_dict = await storage.get_nodes_batch(node_ids)
- print(f"Batch get node properties result: {nodes_dict.keys()}")
- assert len(nodes_dict) == 3, f"Should return 3 nodes, but got {len(nodes_dict)}"
- assert node1_id in nodes_dict, f"{node1_id} should be in the result"
- assert node2_id in nodes_dict, f"{node2_id} should be in the result"
- assert node3_id in nodes_dict, f"{node3_id} should be in the result"
- assert (
- nodes_dict[node1_id]["description"] == node1_data["description"]
- ), f"{node1_id} description mismatch"
- assert (
- nodes_dict[node2_id]["description"] == node2_data["description"]
- ), f"{node2_id} description mismatch"
- assert (
- nodes_dict[node3_id]["description"] == node3_data["description"]
- ), f"{node3_id} description mismatch"
- # 3. Test node_degrees_batch - batch get degrees of multiple nodes
- print("== Testing node_degrees_batch")
- node_degrees = await storage.node_degrees_batch(node_ids)
- print(f"Batch get node degrees result: {node_degrees}")
- assert (
- len(node_degrees) == 3
- ), f"Should return degrees of 3 nodes, but got {len(node_degrees)}"
- assert node1_id in node_degrees, f"{node1_id} should be in the result"
- assert node2_id in node_degrees, f"{node2_id} should be in the result"
- assert node3_id in node_degrees, f"{node3_id} should be in the result"
- assert (
- node_degrees[node1_id] == 3
- ), f"Degree of {node1_id} should be 3, but got {node_degrees[node1_id]}"
- assert (
- node_degrees[node2_id] == 2
- ), f"Degree of {node2_id} should be 2, but got {node_degrees[node2_id]}"
- assert (
- node_degrees[node3_id] == 3
- ), f"Degree of {node3_id} should be 3, but got {node_degrees[node3_id]}"
- # 4. Test edge_degrees_batch - batch get degrees of multiple edges
- print("== Testing edge_degrees_batch")
- edges = [(node1_id, node2_id), (node2_id, node3_id), (node3_id, node4_id)]
- edge_degrees = await storage.edge_degrees_batch(edges)
- print(f"Batch get edge degrees result: {edge_degrees}")
- assert (
- len(edge_degrees) == 3
- ), f"Should return degrees of 3 edges, but got {len(edge_degrees)}"
- assert (
- node1_id,
- node2_id,
- ) in edge_degrees, f"Edge {node1_id} -> {node2_id} should be in the result"
- assert (
- node2_id,
- node3_id,
- ) in edge_degrees, f"Edge {node2_id} -> {node3_id} should be in the result"
- assert (
- node3_id,
- node4_id,
- ) in edge_degrees, f"Edge {node3_id} -> {node4_id} should be in the result"
- # Verify edge degrees (sum of source and target node degrees)
- assert (
- edge_degrees[(node1_id, node2_id)] == 5
- ), f"Degree of edge {node1_id} -> {node2_id} should be 5, but got {edge_degrees[(node1_id, node2_id)]}"
- assert (
- edge_degrees[(node2_id, node3_id)] == 5
- ), f"Degree of edge {node2_id} -> {node3_id} should be 5, but got {edge_degrees[(node2_id, node3_id)]}"
- assert (
- edge_degrees[(node3_id, node4_id)] == 5
- ), f"Degree of edge {node3_id} -> {node4_id} should be 5, but got {edge_degrees[(node3_id, node4_id)]}"
- # 5. Test get_edges_batch - batch get properties of multiple edges
- print("== Testing get_edges_batch")
- # Convert list of tuples to list of dicts for Neo4j style
- edge_dicts = [{"src": src, "tgt": tgt} for src, tgt in edges]
- edges_dict = await storage.get_edges_batch(edge_dicts)
- print(f"Batch get edge properties result: {edges_dict.keys()}")
- assert (
- len(edges_dict) == 3
- ), f"Should return properties of 3 edges, but got {len(edges_dict)}"
- assert (
- node1_id,
- node2_id,
- ) in edges_dict, f"Edge {node1_id} -> {node2_id} should be in the result"
- assert (
- node2_id,
- node3_id,
- ) in edges_dict, f"Edge {node2_id} -> {node3_id} should be in the result"
- assert (
- node3_id,
- node4_id,
- ) in edges_dict, f"Edge {node3_id} -> {node4_id} should be in the result"
- assert (
- edges_dict[(node1_id, node2_id)]["relationship"]
- == edge1_data["relationship"]
- ), f"Edge {node1_id} -> {node2_id} relationship mismatch"
- assert (
- edges_dict[(node2_id, node3_id)]["relationship"]
- == edge2_data["relationship"]
- ), f"Edge {node2_id} -> {node3_id} relationship mismatch"
- assert (
- edges_dict[(node3_id, node4_id)]["relationship"]
- == edge5_data["relationship"]
- ), f"Edge {node3_id} -> {node4_id} relationship mismatch"
- # 5.1 Test batch get of reverse edges - verify undirected property
- print("== Testing batch get of reverse edges")
- # Create list of dicts for reverse edges
- reverse_edge_dicts = [{"src": tgt, "tgt": src} for src, tgt in edges]
- reverse_edges_dict = await storage.get_edges_batch(reverse_edge_dicts)
- print(f"Batch get reverse edge properties result: {reverse_edges_dict.keys()}")
- assert (
- len(reverse_edges_dict) == 3
- ), f"Should return properties of 3 reverse edges, but got {len(reverse_edges_dict)}"
- # Verify that properties of forward and reverse edges are consistent
- for (src, tgt), props in edges_dict.items():
- assert (
- (
- tgt,
- src,
- )
- in reverse_edges_dict
- ), f"Reverse edge {tgt} -> {src} should be in the result"
- assert (
- props == reverse_edges_dict[(tgt, src)]
- ), f"Properties of edge {src} -> {tgt} and reverse edge {tgt} -> {src} are inconsistent"
- print(
- "Undirected graph property verification successful: properties of batch-retrieved forward and reverse edges are consistent"
- )
- # 6. Test get_nodes_edges_batch - batch get all edges of multiple nodes
- print("== Testing get_nodes_edges_batch")
- nodes_edges = await storage.get_nodes_edges_batch([node1_id, node3_id])
- print(f"Batch get node edges result: {nodes_edges.keys()}")
- assert (
- len(nodes_edges) == 2
- ), f"Should return edges for 2 nodes, but got {len(nodes_edges)}"
- assert node1_id in nodes_edges, f"{node1_id} should be in the result"
- assert node3_id in nodes_edges, f"{node3_id} should be in the result"
- assert (
- len(nodes_edges[node1_id]) == 3
- ), f"{node1_id} should have 3 edges, but has {len(nodes_edges[node1_id])}"
- assert (
- len(nodes_edges[node3_id]) == 3
- ), f"{node3_id} should have 3 edges, but has {len(nodes_edges[node3_id])}"
- # 6.1 Verify undirected property of batch-retrieved node edges
- print("== Verifying undirected property of batch-retrieved node edges")
- # Check if node 1's edges include all relevant edges (regardless of direction)
- node1_outgoing_edges = [
- (src, tgt) for src, tgt in nodes_edges[node1_id] if src == node1_id
- ]
- node1_incoming_edges = [
- (src, tgt) for src, tgt in nodes_edges[node1_id] if tgt == node1_id
- ]
- print(f"Outgoing edges of node {node1_id}: {node1_outgoing_edges}")
- print(f"Incoming edges of node {node1_id}: {node1_incoming_edges}")
- # Check for edges to Machine Learning, Natural Language Processing, and Computer Vision
- has_edge_to_node2 = any(tgt == node2_id for _, tgt in node1_outgoing_edges)
- has_edge_to_node4 = any(tgt == node4_id for _, tgt in node1_outgoing_edges)
- has_edge_to_node5 = any(tgt == node5_id for _, tgt in node1_outgoing_edges)
- assert (
- has_edge_to_node2
- ), f"Edge list of node {node1_id} should include an edge to {node2_id}"
- assert (
- has_edge_to_node4
- ), f"Edge list of node {node1_id} should include an edge to {node4_id}"
- assert (
- has_edge_to_node5
- ), f"Edge list of node {node1_id} should include an edge to {node5_id}"
- # Check if node 3's edges include all relevant edges (regardless of direction)
- node3_outgoing_edges = [
- (src, tgt) for src, tgt in nodes_edges[node3_id] if src == node3_id
- ]
- node3_incoming_edges = [
- (src, tgt) for src, tgt in nodes_edges[node3_id] if tgt == node3_id
- ]
- print(f"Outgoing edges of node {node3_id}: {node3_outgoing_edges}")
- print(f"Incoming edges of node {node3_id}: {node3_incoming_edges}")
- # Check for connections with Machine Learning, Natural Language Processing, and Computer Vision (ignoring direction)
- has_connection_with_node2 = any(
- (src == node2_id and tgt == node3_id)
- or (src == node3_id and tgt == node2_id)
- for src, tgt in nodes_edges[node3_id]
- )
- has_connection_with_node4 = any(
- (src == node3_id and tgt == node4_id)
- or (src == node4_id and tgt == node3_id)
- for src, tgt in nodes_edges[node3_id]
- )
- has_connection_with_node5 = any(
- (src == node3_id and tgt == node5_id)
- or (src == node5_id and tgt == node3_id)
- for src, tgt in nodes_edges[node3_id]
- )
- assert (
- has_connection_with_node2
- ), f"Edge list of node {node3_id} should include a connection with {node2_id}"
- assert (
- has_connection_with_node4
- ), f"Edge list of node {node3_id} should include a connection with {node4_id}"
- assert (
- has_connection_with_node5
- ), f"Edge list of node {node3_id} should include a connection with {node5_id}"
- print(
- "Undirected graph property verification successful: batch-retrieved node edges include all relevant edges (regardless of direction)"
- )
- print("\nBatch operations tests completed.")
- return True
- except Exception as e:
- ASCIIColors.red(f"An error occurred during the test: {str(e)}")
- return False
- @pytest.mark.integration
- @pytest.mark.requires_db
- async def test_graph_special_characters(storage):
- """
- Test the graph database's handling of special characters:
- 1. Test node names and descriptions containing single quotes, double quotes, and backslashes.
- 2. Test edge descriptions containing single quotes, double quotes, and backslashes.
- 3. Verify that special characters are saved and retrieved correctly.
- """
- try:
- # 1. Test special characters in node name
- node1_id = "Node with 'single quotes'"
- node1_data = {
- "entity_id": node1_id,
- "description": "This description contains 'single quotes', \"double quotes\", and \\backslashes",
- "keywords": "special characters,quotes,escaping",
- "entity_type": "Test Node",
- }
- print(f"Inserting node with special characters 1: {node1_id}")
- await storage.upsert_node(node1_id, node1_data)
- # 2. Test double quotes in node name
- node2_id = 'Node with "double quotes"'
- node2_data = {
- "entity_id": node2_id,
- "description": "This description contains both 'single quotes' and \"double quotes\" and \\a\\path",
- "keywords": "special characters,quotes,JSON",
- "entity_type": "Test Node",
- }
- print(f"Inserting node with special characters 2: {node2_id}")
- await storage.upsert_node(node2_id, node2_data)
- # 3. Test backslashes in node name
- node3_id = "Node with \\backslashes\\"
- node3_data = {
- "entity_id": node3_id,
- "description": "This description contains a Windows path C:\\Program Files\\ and escape characters \\n\\t",
- "keywords": "backslashes,paths,escaping",
- "entity_type": "Test Node",
- }
- print(f"Inserting node with special characters 3: {node3_id}")
- await storage.upsert_node(node3_id, node3_data)
- # 4. Test special characters in edge description
- edge1_data = {
- "relationship": "special 'relationship'",
- "weight": 1.0,
- "description": "This edge description contains 'single quotes', \"double quotes\", and \\backslashes",
- }
- print(f"Inserting edge with special characters: {node1_id} -> {node2_id}")
- await storage.upsert_edge(node1_id, node2_id, edge1_data)
- # 5. Test more complex combination of special characters in edge description
- edge2_data = {
- "relationship": 'complex "relationship"\\type',
- "weight": 0.8,
- "description": "Contains SQL injection attempt: SELECT * FROM users WHERE name='admin'--",
- }
- print(
- f"Inserting edge with complex special characters: {node2_id} -> {node3_id}"
- )
- await storage.upsert_edge(node2_id, node3_id, edge2_data)
- # 6. Verify that node special characters are saved correctly
- print("\n== Verifying node special characters")
- for node_id, original_data in [
- (node1_id, node1_data),
- (node2_id, node2_data),
- (node3_id, node3_data),
- ]:
- node_props = await storage.get_node(node_id)
- if node_props:
- print(f"Successfully read node: {node_id}")
- print(
- f"Node description: {node_props.get('description', 'No description')}"
- )
- # Verify node ID is saved correctly
- assert (
- node_props.get("entity_id") == node_id
- ), f"Node ID mismatch: expected {node_id}, got {node_props.get('entity_id')}"
- # Verify description is saved correctly
- assert (
- node_props.get("description") == original_data["description"]
- ), f"Node description mismatch: expected {original_data['description']}, got {node_props.get('description')}"
- print(f"Node {node_id} special character verification successful")
- else:
- print(f"Failed to read node properties: {node_id}")
- assert False, f"Failed to read node properties: {node_id}"
- # 7. Verify that edge special characters are saved correctly
- print("\n== Verifying edge special characters")
- edge1_props = await storage.get_edge(node1_id, node2_id)
- if edge1_props:
- print(f"Successfully read edge: {node1_id} -> {node2_id}")
- print(
- f"Edge relationship: {edge1_props.get('relationship', 'No relationship')}"
- )
- print(
- f"Edge description: {edge1_props.get('description', 'No description')}"
- )
- # Verify edge relationship is saved correctly
- assert (
- edge1_props.get("relationship") == edge1_data["relationship"]
- ), f"Edge relationship mismatch: expected {edge1_data['relationship']}, got {edge1_props.get('relationship')}"
- # Verify edge description is saved correctly
- assert (
- edge1_props.get("description") == edge1_data["description"]
- ), f"Edge description mismatch: expected {edge1_data['description']}, got {edge1_props.get('description')}"
- print(
- f"Edge {node1_id} -> {node2_id} special character verification successful"
- )
- else:
- print(f"Failed to read edge properties: {node1_id} -> {node2_id}")
- assert False, f"Failed to read edge properties: {node1_id} -> {node2_id}"
- edge2_props = await storage.get_edge(node2_id, node3_id)
- if edge2_props:
- print(f"Successfully read edge: {node2_id} -> {node3_id}")
- print(
- f"Edge relationship: {edge2_props.get('relationship', 'No relationship')}"
- )
- print(
- f"Edge description: {edge2_props.get('description', 'No description')}"
- )
- # Verify edge relationship is saved correctly
- assert (
- edge2_props.get("relationship") == edge2_data["relationship"]
- ), f"Edge relationship mismatch: expected {edge2_data['relationship']}, got {edge2_props.get('relationship')}"
- # Verify edge description is saved correctly
- assert (
- edge2_props.get("description") == edge2_data["description"]
- ), f"Edge description mismatch: expected {edge2_data['description']}, got {edge2_props.get('description')}"
- print(
- f"Edge {node2_id} -> {node3_id} special character verification successful"
- )
- else:
- print(f"Failed to read edge properties: {node2_id} -> {node3_id}")
- assert False, f"Failed to read edge properties: {node2_id} -> {node3_id}"
- print("\nSpecial character tests completed, data is preserved in the database.")
- return True
- except Exception as e:
- ASCIIColors.red(f"An error occurred during the test: {str(e)}")
- return False
- @pytest.mark.integration
- @pytest.mark.requires_db
- async def test_graph_string_escaping_regressions(storage):
- """
- Regression coverage for entity IDs and properties that require Cypher escaping.
- Covers quoted and backslash-heavy node IDs across single-node reads, batch reads,
- edge retrieval, and delete/remove write paths.
- """
- center_id = 'Danh mục "bài toán lớn"'
- backslash_id = r"C:\Program Files\LightRAG"
- mixed_id = 'Path "C:\\RAG\\docs"'
- single_quote_id = "Node with 'single quotes'"
- node_payloads = {
- center_id: {
- "entity_id": center_id,
- "description": 'Quoted entity with JSON-ish payload {"path": "C:\\\\temp"}',
- "keywords": 'quotes,"double quotes",unicode',
- "entity_type": "Regression Node",
- },
- backslash_id: {
- "entity_id": backslash_id,
- "description": r"Windows path C:\Program Files\LightRAG\bin",
- "keywords": r"paths,C:\temp,backslashes",
- "entity_type": "Regression Node",
- },
- mixed_id: {
- "entity_id": mixed_id,
- "description": 'Mixed quotes "and" slashes \\ in one entity id',
- "keywords": r'mixed,"quoted",C:\RAG\docs',
- "entity_type": "Regression Node",
- },
- single_quote_id: {
- "entity_id": single_quote_id,
- "description": "Single quotes stay literal in entity identifiers",
- "keywords": "single quotes,escaping",
- "entity_type": "Regression Node",
- },
- }
- for node_id, payload in node_payloads.items():
- await storage.upsert_node(node_id, payload)
- edge_payloads = {
- (center_id, backslash_id): {
- "relationship": r'contains "path"\edge',
- "weight": 1.0,
- "description": r'Links "quoted" title to C:\Program Files\LightRAG',
- },
- (center_id, mixed_id): {
- "relationship": 'references "docs"',
- "weight": 0.8,
- "description": r'Contains both "quotes" and \\backslashes\\',
- },
- (center_id, single_quote_id): {
- "relationship": "mentions 'alias'",
- "weight": 0.6,
- "description": 'Single quote entity linked to "quoted" center node',
- },
- }
- for (src_id, tgt_id), payload in edge_payloads.items():
- await storage.upsert_edge(src_id, tgt_id, payload)
- for node_id, payload in node_payloads.items():
- node = await storage.get_node(node_id)
- assert node is not None, f"Expected node {node_id!r} to round-trip"
- assert node["entity_id"] == node_id
- assert node["description"] == payload["description"]
- nodes_batch = await storage.get_nodes_batch(list(node_payloads))
- assert set(nodes_batch) == set(node_payloads)
- for node_id, payload in node_payloads.items():
- assert nodes_batch[node_id]["entity_id"] == node_id
- assert nodes_batch[node_id]["description"] == payload["description"]
- degrees = await storage.node_degrees_batch(list(node_payloads))
- assert degrees[center_id] == 3
- assert degrees[backslash_id] == 1
- assert degrees[mixed_id] == 1
- assert degrees[single_quote_id] == 1
- # Helper: undirected graph has no canonical direction, so accept either (a,b) or (b,a).
- def connects(edges, a, b):
- return any(
- (src == a and tgt == b) or (src == b and tgt == a) for src, tgt in edges
- )
- center_edges = await storage.get_node_edges(center_id)
- assert center_edges is not None
- assert connects(
- center_edges, center_id, backslash_id
- ), f"center_edges should contain connection to {backslash_id}"
- assert connects(
- center_edges, center_id, mixed_id
- ), f"center_edges should contain connection to {mixed_id}"
- assert connects(
- center_edges, center_id, single_quote_id
- ), f"center_edges should contain connection to {single_quote_id}"
- batch_edges = await storage.get_nodes_edges_batch(
- [center_id, mixed_id, backslash_id, single_quote_id]
- )
- assert set(batch_edges) == {center_id, mixed_id, backslash_id, single_quote_id}
- assert connects(batch_edges[center_id], center_id, backslash_id)
- assert connects(batch_edges[center_id], center_id, mixed_id)
- assert connects(batch_edges[center_id], center_id, single_quote_id)
- assert connects(batch_edges[mixed_id], center_id, mixed_id)
- assert connects(batch_edges[backslash_id], center_id, backslash_id)
- assert connects(batch_edges[single_quote_id], center_id, single_quote_id)
- # --- Undirected property: get_edge in both directions ---
- print("\n== Verifying undirected property: get_edge forward and reverse")
- for (src_id, tgt_id), payload in edge_payloads.items():
- fwd = await storage.get_edge(src_id, tgt_id)
- rev = await storage.get_edge(tgt_id, src_id)
- assert (
- fwd is not None
- ), f"get_edge({src_id!r}, {tgt_id!r}) returned None after insertion"
- assert rev is not None, (
- f"get_edge({tgt_id!r}, {src_id!r}) returned None — "
- f"storage is not treating the edge as undirected"
- )
- assert fwd["relationship"] == payload["relationship"]
- assert fwd["description"] == payload["description"]
- assert rev["relationship"] == fwd["relationship"], (
- f"Reverse get_edge returned different relationship for "
- f"({src_id!r}, {tgt_id!r})"
- )
- assert rev["description"] == fwd["description"], (
- f"Reverse get_edge returned different description for "
- f"({src_id!r}, {tgt_id!r})"
- )
- print(
- "Undirected property verification successful: "
- "get_edge returns consistent data in both directions"
- )
- # --- Undirected property: has_edge in both directions ---
- print("\n== Verifying undirected property: has_edge forward and reverse")
- for src_id, tgt_id in edge_payloads:
- assert await storage.has_edge(
- src_id, tgt_id
- ), f"has_edge({src_id!r}, {tgt_id!r}) returned False after insertion"
- assert await storage.has_edge(tgt_id, src_id), (
- f"has_edge({tgt_id!r}, {src_id!r}) returned False — "
- f"storage is not treating the edge as undirected"
- )
- print(
- "Undirected property verification successful: "
- "has_edge returns True in both directions"
- )
- # --- Undirected property: get_edges_batch forward and reverse ---
- print("\n== Verifying undirected property: get_edges_batch forward and reverse")
- forward_edges = await storage.get_edges_batch(
- [{"src": src_id, "tgt": tgt_id} for src_id, tgt_id in edge_payloads]
- )
- reverse_edges = await storage.get_edges_batch(
- [{"src": tgt_id, "tgt": src_id} for src_id, tgt_id in edge_payloads]
- )
- assert set(forward_edges) == set(edge_payloads)
- for pair, payload in edge_payloads.items():
- assert forward_edges[pair]["relationship"] == payload["relationship"]
- assert forward_edges[pair]["description"] == payload["description"]
- reverse_pair = (pair[1], pair[0])
- assert (
- reverse_pair in reverse_edges
- ), f"get_edges_batch did not return reverse pair {reverse_pair!r}"
- assert reverse_edges[reverse_pair]["relationship"] == payload["relationship"]
- assert reverse_edges[reverse_pair]["description"] == payload["description"]
- print(
- "Undirected property verification successful: "
- "get_edges_batch returns consistent data in both directions"
- )
- # --- Undirected property: edge deletion removes both directions ---
- print("\n== Verifying undirected property: edge deletion removes both directions")
- await storage.remove_edges([(center_id, mixed_id)])
- assert (
- await storage.get_edge(center_id, mixed_id) is None
- ), f"Forward edge ({center_id!r} -> {mixed_id!r}) should be deleted"
- assert await storage.get_edge(mixed_id, center_id) is None, (
- f"Reverse edge ({mixed_id!r} -> {center_id!r}) should also be deleted "
- f"— storage is not treating deletion as undirected"
- )
- remaining_center_edges = await storage.get_node_edges(center_id)
- assert remaining_center_edges is not None
- assert not connects(
- remaining_center_edges, center_id, mixed_id
- ), "Edge between center and mixed_id should have been removed"
- print(
- "Undirected property verification successful: "
- "deleting an edge removes it in both directions"
- )
- await storage.delete_node(single_quote_id)
- assert await storage.get_node(single_quote_id) is None
- await storage.remove_nodes([center_id, backslash_id])
- assert await storage.get_node(center_id) is None
- assert await storage.get_node(backslash_id) is None
- assert await storage.get_node(mixed_id) is not None
- @pytest.mark.integration
- @pytest.mark.requires_db
- async def test_graph_undirected_property(storage):
- """
- Specifically test the undirected graph property of the storage:
- 1. Verify that after inserting an edge in one direction, a reverse query can retrieve the same result.
- 2. Verify that edge properties are consistent in forward and reverse queries.
- 3. Verify that after deleting an edge in one direction, the edge in the other direction is also deleted.
- 4. Verify the undirected property in batch operations.
- """
- try:
- # 1. Insert test data
- # Insert node 1: Computer Science
- node1_id = "Computer Science"
- node1_data = {
- "entity_id": node1_id,
- "description": "Computer science is the study of computers and their applications.",
- "keywords": "computer,science,technology",
- "entity_type": "Discipline",
- }
- print(f"Inserting node 1: {node1_id}")
- await storage.upsert_node(node1_id, node1_data)
- # Insert node 2: Data Structures
- node2_id = "Data Structures"
- node2_data = {
- "entity_id": node2_id,
- "description": "A data structure is a fundamental concept in computer science used to organize and store data.",
- "keywords": "data,structure,organization",
- "entity_type": "Concept",
- }
- print(f"Inserting node 2: {node2_id}")
- await storage.upsert_node(node2_id, node2_data)
- # Insert node 3: Algorithms
- node3_id = "Algorithms"
- node3_data = {
- "entity_id": node3_id,
- "description": "An algorithm is a set of steps and methods for solving problems.",
- "keywords": "algorithm,steps,methods",
- "entity_type": "Concept",
- }
- print(f"Inserting node 3: {node3_id}")
- await storage.upsert_node(node3_id, node3_data)
- # 2. Test undirected property after edge insertion
- print("\n== Testing undirected property after edge insertion")
- # Insert edge 1: Computer Science -> Data Structures
- edge1_data = {
- "relationship": "includes",
- "weight": 1.0,
- "description": "Computer science includes the concept of data structures.",
- }
- print(f"Inserting edge 1: {node1_id} -> {node2_id}")
- await storage.upsert_edge(node1_id, node2_id, edge1_data)
- # Verify forward query
- forward_edge = await storage.get_edge(node1_id, node2_id)
- print(f"Forward edge properties: {forward_edge}")
- assert (
- forward_edge is not None
- ), f"Failed to read forward edge properties: {node1_id} -> {node2_id}"
- # Verify reverse query
- reverse_edge = await storage.get_edge(node2_id, node1_id)
- print(f"Reverse edge properties: {reverse_edge}")
- assert (
- reverse_edge is not None
- ), f"Failed to read reverse edge properties: {node2_id} -> {node1_id}"
- # Verify that forward and reverse edge properties are consistent
- assert (
- forward_edge == reverse_edge
- ), "Forward and reverse edge properties are inconsistent, undirected property verification failed"
- print(
- "Undirected property verification successful: forward and reverse edge properties are consistent"
- )
- # 3. Test undirected property of edge degree
- print("\n== Testing undirected property of edge degree")
- # Insert edge 2: Computer Science -> Algorithms
- edge2_data = {
- "relationship": "includes",
- "weight": 1.0,
- "description": "Computer science includes the concept of algorithms.",
- }
- print(f"Inserting edge 2: {node1_id} -> {node3_id}")
- await storage.upsert_edge(node1_id, node3_id, edge2_data)
- # Verify degrees of forward and reverse edges
- forward_degree = await storage.edge_degree(node1_id, node2_id)
- reverse_degree = await storage.edge_degree(node2_id, node1_id)
- print(f"Degree of forward edge {node1_id} -> {node2_id}: {forward_degree}")
- print(f"Degree of reverse edge {node2_id} -> {node1_id}: {reverse_degree}")
- assert (
- forward_degree == reverse_degree
- ), "Degrees of forward and reverse edges are inconsistent, undirected property verification failed"
- print(
- "Undirected property verification successful: degrees of forward and reverse edges are consistent"
- )
- # 4. Test undirected property of edge deletion
- print("\n== Testing undirected property of edge deletion")
- # Delete forward edge
- print(f"Deleting edge: {node1_id} -> {node2_id}")
- await storage.remove_edges([(node1_id, node2_id)])
- # Verify forward edge is deleted
- forward_edge = await storage.get_edge(node1_id, node2_id)
- print(
- f"Querying forward edge properties after deletion {node1_id} -> {node2_id}: {forward_edge}"
- )
- assert (
- forward_edge is None
- ), f"Edge {node1_id} -> {node2_id} should have been deleted"
- # Verify reverse edge is also deleted
- reverse_edge = await storage.get_edge(node2_id, node1_id)
- print(
- f"Querying reverse edge properties after deletion {node2_id} -> {node1_id}: {reverse_edge}"
- )
- assert (
- reverse_edge is None
- ), f"Reverse edge {node2_id} -> {node1_id} should also be deleted, undirected property verification failed"
- print(
- "Undirected property verification successful: deleting an edge in one direction also deletes the reverse edge"
- )
- # 5. Test undirected property in batch operations
- print("\n== Testing undirected property in batch operations")
- # Re-insert edge
- await storage.upsert_edge(node1_id, node2_id, edge1_data)
- # Batch get edge properties
- edge_dicts = [
- {"src": node1_id, "tgt": node2_id},
- {"src": node1_id, "tgt": node3_id},
- ]
- reverse_edge_dicts = [
- {"src": node2_id, "tgt": node1_id},
- {"src": node3_id, "tgt": node1_id},
- ]
- edges_dict = await storage.get_edges_batch(edge_dicts)
- reverse_edges_dict = await storage.get_edges_batch(reverse_edge_dicts)
- print(f"Batch get forward edge properties result: {edges_dict.keys()}")
- print(f"Batch get reverse edge properties result: {reverse_edges_dict.keys()}")
- # Verify that properties of forward and reverse edges are consistent
- for (src, tgt), props in edges_dict.items():
- assert (
- (
- tgt,
- src,
- )
- in reverse_edges_dict
- ), f"Reverse edge {tgt} -> {src} should be in the result"
- assert (
- props == reverse_edges_dict[(tgt, src)]
- ), f"Properties of edge {src} -> {tgt} and reverse edge {tgt} -> {src} are inconsistent"
- print(
- "Undirected property verification successful: properties of batch-retrieved forward and reverse edges are consistent"
- )
- # 6. Test undirected property of batch-retrieved node edges
- print("\n== Testing undirected property of batch-retrieved node edges")
- nodes_edges = await storage.get_nodes_edges_batch([node1_id, node2_id])
- print(f"Batch get node edges result: {nodes_edges.keys()}")
- # Check if node 1's edges include all relevant edges (regardless of direction)
- node1_edges = nodes_edges[node1_id]
- node2_edges = nodes_edges[node2_id]
- # Check if node 1 has edges to node 2 and node 3
- has_edge_to_node2 = any(
- (src == node1_id and tgt == node2_id) for src, tgt in node1_edges
- )
- has_edge_to_node3 = any(
- (src == node1_id and tgt == node3_id) for src, tgt in node1_edges
- )
- assert (
- has_edge_to_node2
- ), f"Edge list of node {node1_id} should include an edge to {node2_id}"
- assert (
- has_edge_to_node3
- ), f"Edge list of node {node1_id} should include an edge to {node3_id}"
- # Check if node 2 has a connection with node 1
- has_edge_to_node1 = any(
- (src == node2_id and tgt == node1_id)
- or (src == node1_id and tgt == node2_id)
- for src, tgt in node2_edges
- )
- assert (
- has_edge_to_node1
- ), f"Edge list of node {node2_id} should include a connection with {node1_id}"
- print(
- "Undirected property verification successful: batch-retrieved node edges include all relevant edges (regardless of direction)"
- )
- print("\nUndirected property tests completed.")
- return True
- except Exception as e:
- ASCIIColors.red(f"An error occurred during the test: {str(e)}")
- return False
- async def main():
- """Main function"""
- # Display program title
- ASCIIColors.cyan("""
- ╔══════════════════════════════════════════════════════════════╗
- ║ General Graph Storage Test Program ║
- ╚══════════════════════════════════════════════════════════════╝
- """)
- # Check for .env file
- if not check_env_file():
- return
- # Load environment variables
- load_dotenv(dotenv_path=".env", override=False)
- # Get graph storage type
- graph_storage_type = os.getenv("LIGHTRAG_GRAPH_STORAGE", "NetworkXStorage")
- ASCIIColors.magenta(
- f"\nCurrently configured graph storage type: {graph_storage_type}"
- )
- ASCIIColors.white(
- f"Supported graph storage types: {', '.join(STORAGE_IMPLEMENTATIONS['GRAPH_STORAGE']['implementations'])}"
- )
- # Initialize storage instance
- storage = await initialize_graph_storage()
- if not storage:
- ASCIIColors.red("Failed to initialize storage instance, exiting test program.")
- return
- try:
- async def reset_storage(test_name: str) -> None:
- ASCIIColors.yellow(f"\nCleaning data before {test_name}...")
- await storage.drop()
- ASCIIColors.green("Data cleanup complete\n")
- # Display test options
- ASCIIColors.yellow("\nPlease select a test type:")
- ASCIIColors.white("1. Basic Test (Node and edge insertion, reading)")
- ASCIIColors.white(
- "2. Advanced Test (Degree, labels, knowledge graph, deletion, etc.)"
- )
- ASCIIColors.white(
- "3. Batch Operations Test (Batch get node/edge properties, degrees, etc.)"
- )
- ASCIIColors.white(
- "4. Undirected Property Test (Verify undirected properties of the storage)"
- )
- ASCIIColors.white(
- "5. Special Characters Test (Verify handling of single/double quotes, backslashes, etc.)"
- )
- ASCIIColors.white(
- "6. String Escaping Regression Test (Quoted and escaped entity IDs across graph operations)"
- )
- ASCIIColors.white("7. All Tests")
- choice = input("\nEnter your choice (1/2/3/4/5/6/7): ")
- # Clean data before running tests
- if choice in ["1", "2", "3", "4", "5", "6", "7"]:
- await reset_storage("running tests")
- if choice == "1":
- await test_graph_basic(storage)
- elif choice == "2":
- await test_graph_advanced(storage)
- elif choice == "3":
- await test_graph_batch_operations(storage)
- elif choice == "4":
- await test_graph_undirected_property(storage)
- elif choice == "5":
- await test_graph_special_characters(storage)
- elif choice == "6":
- await test_graph_string_escaping_regressions(storage)
- elif choice == "7":
- ASCIIColors.cyan("\n=== Starting Basic Test ===")
- await reset_storage("Basic Test")
- basic_result = await test_graph_basic(storage)
- if basic_result:
- ASCIIColors.cyan("\n=== Starting Advanced Test ===")
- await reset_storage("Advanced Test")
- advanced_result = await test_graph_advanced(storage)
- if advanced_result:
- ASCIIColors.cyan("\n=== Starting Batch Operations Test ===")
- await reset_storage("Batch Operations Test")
- batch_result = await test_graph_batch_operations(storage)
- if batch_result:
- ASCIIColors.cyan("\n=== Starting Undirected Property Test ===")
- await reset_storage("Undirected Property Test")
- undirected_result = await test_graph_undirected_property(
- storage
- )
- if undirected_result:
- ASCIIColors.cyan(
- "\n=== Starting Special Characters Test ==="
- )
- await reset_storage("Special Characters Test")
- special_result = await test_graph_special_characters(
- storage
- )
- if special_result:
- ASCIIColors.cyan(
- "\n=== Starting String Escaping Regression Test ==="
- )
- await reset_storage("String Escaping Regression Test")
- await test_graph_string_escaping_regressions(storage)
- else:
- ASCIIColors.red("Invalid choice")
- finally:
- # Close connection
- if storage:
- await storage.finalize()
- ASCIIColors.green("\nStorage connection closed.")
- if __name__ == "__main__":
- asyncio.run(main())
|