prepare_qdrant_legacy_data.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720
  1. #!/usr/bin/env python3
  2. """
  3. Qdrant Legacy Data Preparation Tool for LightRAG
  4. This tool copies data from new collections to legacy collections for testing
  5. the data migration logic in setup_collection function.
  6. New Collections (with workspace_id):
  7. - lightrag_vdb_chunks
  8. - lightrag_vdb_entities
  9. - lightrag_vdb_relationships
  10. Legacy Collections (without workspace_id, dynamically named as {workspace}_{suffix}):
  11. - {workspace}_chunks (e.g., space1_chunks)
  12. - {workspace}_entities (e.g., space1_entities)
  13. - {workspace}_relationships (e.g., space1_relationships)
  14. The tool:
  15. 1. Filters source data by workspace_id
  16. 2. Verifies workspace data exists before creating legacy collections
  17. 3. Removes workspace_id field to simulate legacy data format
  18. 4. Copies only the specified workspace's data to legacy collections
  19. Usage:
  20. python -m lightrag.tools.prepare_qdrant_legacy_data
  21. # or
  22. python lightrag/tools/prepare_qdrant_legacy_data.py
  23. # Specify custom workspace
  24. python -m lightrag.tools.prepare_qdrant_legacy_data --workspace space1
  25. # Process specific collection types only
  26. python -m lightrag.tools.prepare_qdrant_legacy_data --types chunks,entities
  27. # Dry run (preview only, no actual changes)
  28. python -m lightrag.tools.prepare_qdrant_legacy_data --dry-run
  29. """
  30. import argparse
  31. import asyncio
  32. import configparser
  33. import os
  34. import sys
  35. import time
  36. from dataclasses import dataclass, field
  37. from typing import Any, Dict, List, Optional
  38. import pipmaster as pm
  39. from dotenv import load_dotenv
  40. from qdrant_client import QdrantClient, models # type: ignore
  41. # Add project root to path for imports
  42. sys.path.insert(
  43. 0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  44. )
  45. # Load environment variables
  46. load_dotenv(dotenv_path=".env", override=False)
  47. # Ensure qdrant-client is installed
  48. if not pm.is_installed("qdrant-client"):
  49. pm.install("qdrant-client")
  50. # Collection namespace mapping: new collection pattern -> legacy suffix
  51. # Legacy collection will be named as: {workspace}_{suffix}
  52. COLLECTION_NAMESPACES = {
  53. "chunks": {
  54. "new": "lightrag_vdb_chunks",
  55. "suffix": "chunks",
  56. },
  57. "entities": {
  58. "new": "lightrag_vdb_entities",
  59. "suffix": "entities",
  60. },
  61. "relationships": {
  62. "new": "lightrag_vdb_relationships",
  63. "suffix": "relationships",
  64. },
  65. }
  66. # Default batch size for copy operations
  67. DEFAULT_BATCH_SIZE = 500
  68. # Field to remove from legacy data
  69. WORKSPACE_ID_FIELD = "workspace_id"
  70. # ANSI color codes for terminal output
  71. BOLD_CYAN = "\033[1;36m"
  72. BOLD_GREEN = "\033[1;32m"
  73. BOLD_YELLOW = "\033[1;33m"
  74. BOLD_RED = "\033[1;31m"
  75. RESET = "\033[0m"
  76. @dataclass
  77. class CopyStats:
  78. """Copy operation statistics"""
  79. collection_type: str
  80. source_collection: str
  81. target_collection: str
  82. total_records: int = 0
  83. copied_records: int = 0
  84. failed_records: int = 0
  85. errors: List[Dict[str, Any]] = field(default_factory=list)
  86. elapsed_time: float = 0.0
  87. def add_error(self, batch_idx: int, error: Exception, batch_size: int):
  88. """Record batch error"""
  89. self.errors.append(
  90. {
  91. "batch": batch_idx,
  92. "error_type": type(error).__name__,
  93. "error_msg": str(error),
  94. "records_lost": batch_size,
  95. "timestamp": time.time(),
  96. }
  97. )
  98. self.failed_records += batch_size
  99. class QdrantLegacyDataPreparationTool:
  100. """Tool for preparing legacy data in Qdrant for migration testing"""
  101. def __init__(
  102. self,
  103. workspace: str = "space1",
  104. batch_size: int = DEFAULT_BATCH_SIZE,
  105. dry_run: bool = False,
  106. clear_target: bool = False,
  107. ):
  108. """
  109. Initialize the tool.
  110. Args:
  111. workspace: Workspace to use for filtering new collection data
  112. batch_size: Number of records to process per batch
  113. dry_run: If True, only preview operations without making changes
  114. clear_target: If True, delete target collection before copying data
  115. """
  116. self.workspace = workspace
  117. self.batch_size = batch_size
  118. self.dry_run = dry_run
  119. self.clear_target = clear_target
  120. self._client: Optional[QdrantClient] = None
  121. def _get_client(self) -> QdrantClient:
  122. """Get or create QdrantClient instance"""
  123. if self._client is None:
  124. config = configparser.ConfigParser()
  125. config.read("config.ini", "utf-8")
  126. self._client = QdrantClient(
  127. url=os.environ.get(
  128. "QDRANT_URL", config.get("qdrant", "uri", fallback=None)
  129. ),
  130. api_key=os.environ.get(
  131. "QDRANT_API_KEY",
  132. config.get("qdrant", "apikey", fallback=None),
  133. ),
  134. )
  135. return self._client
  136. def print_header(self):
  137. """Print tool header"""
  138. print("\n" + "=" * 60)
  139. print("Qdrant Legacy Data Preparation Tool - LightRAG")
  140. print("=" * 60)
  141. if self.dry_run:
  142. print(f"{BOLD_YELLOW}⚠️ DRY RUN MODE - No changes will be made{RESET}")
  143. if self.clear_target:
  144. print(
  145. f"{BOLD_RED}⚠️ CLEAR TARGET MODE - Target collections will be deleted first{RESET}"
  146. )
  147. print(f"Workspace: {BOLD_CYAN}{self.workspace}{RESET}")
  148. print(f"Batch Size: {self.batch_size}")
  149. print("=" * 60)
  150. def check_connection(self) -> bool:
  151. """Check Qdrant connection"""
  152. try:
  153. client = self._get_client()
  154. # Try to list collections to verify connection
  155. client.get_collections()
  156. print(f"{BOLD_GREEN}✓{RESET} Qdrant connection successful")
  157. return True
  158. except Exception as e:
  159. print(f"{BOLD_RED}✗{RESET} Qdrant connection failed: {e}")
  160. return False
  161. def get_collection_info(self, collection_name: str) -> Optional[Dict[str, Any]]:
  162. """
  163. Get collection information.
  164. Args:
  165. collection_name: Name of the collection
  166. Returns:
  167. Dictionary with collection info (vector_size, count) or None if not exists
  168. """
  169. client = self._get_client()
  170. if not client.collection_exists(collection_name):
  171. return None
  172. info = client.get_collection(collection_name)
  173. count = client.count(collection_name=collection_name, exact=True).count
  174. # Handle both object and dict formats for vectors config
  175. vectors_config = info.config.params.vectors
  176. if isinstance(vectors_config, dict):
  177. # Named vectors format or dict format
  178. if vectors_config:
  179. first_key = next(iter(vectors_config.keys()), None)
  180. if first_key and hasattr(vectors_config[first_key], "size"):
  181. vector_size = vectors_config[first_key].size
  182. distance = vectors_config[first_key].distance
  183. else:
  184. # Try to get from dict values
  185. first_val = next(iter(vectors_config.values()), {})
  186. vector_size = (
  187. first_val.get("size")
  188. if isinstance(first_val, dict)
  189. else getattr(first_val, "size", None)
  190. )
  191. distance = (
  192. first_val.get("distance")
  193. if isinstance(first_val, dict)
  194. else getattr(first_val, "distance", None)
  195. )
  196. else:
  197. vector_size = None
  198. distance = None
  199. else:
  200. # Standard single vector format
  201. vector_size = vectors_config.size
  202. distance = vectors_config.distance
  203. return {
  204. "name": collection_name,
  205. "vector_size": vector_size,
  206. "count": count,
  207. "distance": distance,
  208. }
  209. def delete_collection(self, collection_name: str) -> bool:
  210. """
  211. Delete a collection if it exists.
  212. Args:
  213. collection_name: Name of the collection to delete
  214. Returns:
  215. True if deleted or doesn't exist
  216. """
  217. client = self._get_client()
  218. if not client.collection_exists(collection_name):
  219. return True
  220. if self.dry_run:
  221. target_info = self.get_collection_info(collection_name)
  222. count = target_info["count"] if target_info else 0
  223. print(
  224. f" {BOLD_YELLOW}[DRY RUN]{RESET} Would delete collection '{collection_name}' ({count:,} records)"
  225. )
  226. return True
  227. try:
  228. target_info = self.get_collection_info(collection_name)
  229. count = target_info["count"] if target_info else 0
  230. client.delete_collection(collection_name=collection_name)
  231. print(
  232. f" {BOLD_RED}✗{RESET} Deleted collection '{collection_name}' ({count:,} records)"
  233. )
  234. return True
  235. except Exception as e:
  236. print(f" {BOLD_RED}✗{RESET} Failed to delete collection: {e}")
  237. return False
  238. def create_legacy_collection(
  239. self, collection_name: str, vector_size: int, distance: models.Distance
  240. ) -> bool:
  241. """
  242. Create legacy collection if it doesn't exist.
  243. Args:
  244. collection_name: Name of the collection to create
  245. vector_size: Dimension of vectors
  246. distance: Distance metric
  247. Returns:
  248. True if created or already exists
  249. """
  250. client = self._get_client()
  251. if client.collection_exists(collection_name):
  252. print(f" Collection '{collection_name}' already exists")
  253. return True
  254. if self.dry_run:
  255. print(
  256. f" {BOLD_YELLOW}[DRY RUN]{RESET} Would create collection '{collection_name}' with {vector_size}d vectors"
  257. )
  258. return True
  259. try:
  260. client.create_collection(
  261. collection_name=collection_name,
  262. vectors_config=models.VectorParams(
  263. size=vector_size,
  264. distance=distance,
  265. ),
  266. hnsw_config=models.HnswConfigDiff(
  267. payload_m=16,
  268. m=0,
  269. ),
  270. )
  271. print(
  272. f" {BOLD_GREEN}✓{RESET} Created collection '{collection_name}' with {vector_size}d vectors"
  273. )
  274. return True
  275. except Exception as e:
  276. print(f" {BOLD_RED}✗{RESET} Failed to create collection: {e}")
  277. return False
  278. def _get_workspace_filter(self) -> models.Filter:
  279. """Create workspace filter for Qdrant queries"""
  280. return models.Filter(
  281. must=[
  282. models.FieldCondition(
  283. key=WORKSPACE_ID_FIELD,
  284. match=models.MatchValue(value=self.workspace),
  285. )
  286. ]
  287. )
  288. def get_workspace_count(self, collection_name: str) -> int:
  289. """
  290. Get count of records for the current workspace in a collection.
  291. Args:
  292. collection_name: Name of the collection
  293. Returns:
  294. Count of records for the workspace
  295. """
  296. client = self._get_client()
  297. return client.count(
  298. collection_name=collection_name,
  299. count_filter=self._get_workspace_filter(),
  300. exact=True,
  301. ).count
  302. def copy_collection_data(
  303. self,
  304. source_collection: str,
  305. target_collection: str,
  306. collection_type: str,
  307. workspace_count: int,
  308. ) -> CopyStats:
  309. """
  310. Copy data from source to target collection.
  311. This filters by workspace_id and removes it from payload to simulate legacy data format.
  312. Args:
  313. source_collection: Source collection name
  314. target_collection: Target collection name
  315. collection_type: Type of collection (chunks, entities, relationships)
  316. workspace_count: Pre-computed count of workspace records
  317. Returns:
  318. CopyStats with operation results
  319. """
  320. client = self._get_client()
  321. stats = CopyStats(
  322. collection_type=collection_type,
  323. source_collection=source_collection,
  324. target_collection=target_collection,
  325. )
  326. start_time = time.time()
  327. stats.total_records = workspace_count
  328. if workspace_count == 0:
  329. print(f" No records for workspace '{self.workspace}', skipping")
  330. stats.elapsed_time = time.time() - start_time
  331. return stats
  332. print(f" Workspace records: {workspace_count:,}")
  333. if self.dry_run:
  334. print(
  335. f" {BOLD_YELLOW}[DRY RUN]{RESET} Would copy {workspace_count:,} records to '{target_collection}'"
  336. )
  337. stats.copied_records = workspace_count
  338. stats.elapsed_time = time.time() - start_time
  339. return stats
  340. # Batch copy using scroll with workspace filter
  341. workspace_filter = self._get_workspace_filter()
  342. offset = None
  343. batch_idx = 0
  344. while True:
  345. # Scroll source collection with workspace filter
  346. result = client.scroll(
  347. collection_name=source_collection,
  348. scroll_filter=workspace_filter,
  349. limit=self.batch_size,
  350. offset=offset,
  351. with_vectors=True,
  352. with_payload=True,
  353. )
  354. points, next_offset = result
  355. if not points:
  356. break
  357. batch_idx += 1
  358. # Transform points: remove workspace_id from payload
  359. new_points = []
  360. for point in points:
  361. new_payload = dict(point.payload or {})
  362. # Remove workspace_id to simulate legacy format
  363. new_payload.pop(WORKSPACE_ID_FIELD, None)
  364. # Use original id from payload if available, otherwise use point.id
  365. original_id = new_payload.get("id")
  366. if original_id:
  367. # Generate a simple deterministic id for legacy format
  368. # Use original id directly (legacy format didn't have workspace prefix)
  369. import hashlib
  370. import uuid
  371. hashed = hashlib.sha256(original_id.encode("utf-8")).digest()
  372. point_id = uuid.UUID(bytes=hashed[:16], version=4).hex
  373. else:
  374. point_id = str(point.id)
  375. new_points.append(
  376. models.PointStruct(
  377. id=point_id,
  378. vector=point.vector,
  379. payload=new_payload,
  380. )
  381. )
  382. try:
  383. # Upsert to target collection
  384. client.upsert(
  385. collection_name=target_collection, points=new_points, wait=True
  386. )
  387. stats.copied_records += len(new_points)
  388. # Progress bar
  389. progress = (stats.copied_records / workspace_count) * 100
  390. bar_length = 30
  391. filled = int(bar_length * stats.copied_records // workspace_count)
  392. bar = "█" * filled + "░" * (bar_length - filled)
  393. print(
  394. f"\r Copying: {bar} {stats.copied_records:,}/{workspace_count:,} ({progress:.1f}%) ",
  395. end="",
  396. flush=True,
  397. )
  398. except Exception as e:
  399. stats.add_error(batch_idx, e, len(new_points))
  400. print(
  401. f"\n {BOLD_RED}✗{RESET} Batch {batch_idx} failed: {type(e).__name__}: {e}"
  402. )
  403. if next_offset is None:
  404. break
  405. offset = next_offset
  406. print() # New line after progress bar
  407. stats.elapsed_time = time.time() - start_time
  408. return stats
  409. def process_collection_type(self, collection_type: str) -> Optional[CopyStats]:
  410. """
  411. Process a single collection type.
  412. Args:
  413. collection_type: Type of collection (chunks, entities, relationships)
  414. Returns:
  415. CopyStats or None if error
  416. """
  417. namespace_config = COLLECTION_NAMESPACES.get(collection_type)
  418. if not namespace_config:
  419. print(f"{BOLD_RED}✗{RESET} Unknown collection type: {collection_type}")
  420. return None
  421. source = namespace_config["new"]
  422. # Generate legacy collection name dynamically: {workspace}_{suffix}
  423. target = f"{self.workspace}_{namespace_config['suffix']}"
  424. print(f"\n{'=' * 50}")
  425. print(f"Processing: {BOLD_CYAN}{collection_type}{RESET}")
  426. print(f"{'=' * 50}")
  427. print(f" Source: {source}")
  428. print(f" Target: {target}")
  429. # Check source collection
  430. source_info = self.get_collection_info(source)
  431. if source_info is None:
  432. print(
  433. f" {BOLD_YELLOW}⚠{RESET} Source collection '{source}' does not exist, skipping"
  434. )
  435. return None
  436. print(f" Source vector dimension: {source_info['vector_size']}d")
  437. print(f" Source distance metric: {source_info['distance']}")
  438. print(f" Source total records: {source_info['count']:,}")
  439. # Check workspace data exists BEFORE creating legacy collection
  440. workspace_count = self.get_workspace_count(source)
  441. print(f" Workspace '{self.workspace}' records: {workspace_count:,}")
  442. if workspace_count == 0:
  443. print(
  444. f" {BOLD_YELLOW}⚠{RESET} No data found for workspace '{self.workspace}' in '{source}', skipping"
  445. )
  446. return None
  447. # Clear target collection if requested
  448. if self.clear_target:
  449. if not self.delete_collection(target):
  450. return None
  451. # Create target collection only after confirming workspace data exists
  452. if not self.create_legacy_collection(
  453. target, source_info["vector_size"], source_info["distance"]
  454. ):
  455. return None
  456. # Copy data with workspace filter
  457. stats = self.copy_collection_data(
  458. source, target, collection_type, workspace_count
  459. )
  460. # Print result
  461. if stats.failed_records == 0:
  462. print(
  463. f" {BOLD_GREEN}✓{RESET} Copied {stats.copied_records:,} records in {stats.elapsed_time:.2f}s"
  464. )
  465. else:
  466. print(
  467. f" {BOLD_YELLOW}⚠{RESET} Copied {stats.copied_records:,} records, "
  468. f"{BOLD_RED}{stats.failed_records:,} failed{RESET} in {stats.elapsed_time:.2f}s"
  469. )
  470. return stats
  471. def print_summary(self, all_stats: List[CopyStats]):
  472. """Print summary of all operations"""
  473. print("\n" + "=" * 60)
  474. print("Summary")
  475. print("=" * 60)
  476. total_copied = sum(s.copied_records for s in all_stats)
  477. total_failed = sum(s.failed_records for s in all_stats)
  478. total_time = sum(s.elapsed_time for s in all_stats)
  479. for stats in all_stats:
  480. status = (
  481. f"{BOLD_GREEN}✓{RESET}"
  482. if stats.failed_records == 0
  483. else f"{BOLD_YELLOW}⚠{RESET}"
  484. )
  485. print(
  486. f" {status} {stats.collection_type}: {stats.copied_records:,}/{stats.total_records:,} "
  487. f"({stats.source_collection} → {stats.target_collection})"
  488. )
  489. print("-" * 60)
  490. print(f" Total records copied: {BOLD_CYAN}{total_copied:,}{RESET}")
  491. if total_failed > 0:
  492. print(f" Total records failed: {BOLD_RED}{total_failed:,}{RESET}")
  493. print(f" Total time: {total_time:.2f}s")
  494. if self.dry_run:
  495. print(f"\n{BOLD_YELLOW}⚠️ DRY RUN - No actual changes were made{RESET}")
  496. # Print error details if any
  497. all_errors = []
  498. for stats in all_stats:
  499. all_errors.extend(stats.errors)
  500. if all_errors:
  501. print(f"\n{BOLD_RED}Errors ({len(all_errors)}){RESET}")
  502. for i, error in enumerate(all_errors[:5], 1):
  503. print(
  504. f" {i}. Batch {error['batch']}: {error['error_type']}: {error['error_msg']}"
  505. )
  506. if len(all_errors) > 5:
  507. print(f" ... and {len(all_errors) - 5} more errors")
  508. print("=" * 60)
  509. async def run(self, collection_types: Optional[List[str]] = None):
  510. """
  511. Run the data preparation tool.
  512. Args:
  513. collection_types: List of collection types to process (default: all)
  514. """
  515. self.print_header()
  516. # Check connection
  517. if not self.check_connection():
  518. return
  519. # Determine which collection types to process
  520. if collection_types:
  521. types_to_process = [t.strip() for t in collection_types]
  522. invalid_types = [
  523. t for t in types_to_process if t not in COLLECTION_NAMESPACES
  524. ]
  525. if invalid_types:
  526. print(
  527. f"{BOLD_RED}✗{RESET} Invalid collection types: {', '.join(invalid_types)}"
  528. )
  529. print(f" Valid types: {', '.join(COLLECTION_NAMESPACES.keys())}")
  530. return
  531. else:
  532. types_to_process = list(COLLECTION_NAMESPACES.keys())
  533. print(f"\nCollection types to process: {', '.join(types_to_process)}")
  534. # Process each collection type
  535. all_stats = []
  536. for ctype in types_to_process:
  537. stats = self.process_collection_type(ctype)
  538. if stats:
  539. all_stats.append(stats)
  540. # Print summary
  541. if all_stats:
  542. self.print_summary(all_stats)
  543. else:
  544. print(f"\n{BOLD_YELLOW}⚠{RESET} No collections were processed")
  545. def parse_args():
  546. """Parse command line arguments"""
  547. parser = argparse.ArgumentParser(
  548. description="Prepare legacy data in Qdrant for migration testing",
  549. formatter_class=argparse.RawDescriptionHelpFormatter,
  550. epilog="""
  551. Examples:
  552. python -m lightrag.tools.prepare_qdrant_legacy_data
  553. python -m lightrag.tools.prepare_qdrant_legacy_data --workspace space1
  554. python -m lightrag.tools.prepare_qdrant_legacy_data --types chunks,entities
  555. python -m lightrag.tools.prepare_qdrant_legacy_data --dry-run
  556. """,
  557. )
  558. parser.add_argument(
  559. "--workspace",
  560. type=str,
  561. default="space1",
  562. help="Workspace name (default: space1)",
  563. )
  564. parser.add_argument(
  565. "--types",
  566. type=str,
  567. default=None,
  568. help="Comma-separated list of collection types (chunks, entities, relationships)",
  569. )
  570. parser.add_argument(
  571. "--batch-size",
  572. type=int,
  573. default=DEFAULT_BATCH_SIZE,
  574. help=f"Batch size for copy operations (default: {DEFAULT_BATCH_SIZE})",
  575. )
  576. parser.add_argument(
  577. "--dry-run",
  578. action="store_true",
  579. help="Preview operations without making changes",
  580. )
  581. parser.add_argument(
  582. "--clear-target",
  583. action="store_true",
  584. help="Delete target collections before copying (for clean test environment)",
  585. )
  586. return parser.parse_args()
  587. async def main():
  588. """Main entry point"""
  589. args = parse_args()
  590. collection_types = None
  591. if args.types:
  592. collection_types = [t.strip() for t in args.types.split(",")]
  593. tool = QdrantLegacyDataPreparationTool(
  594. workspace=args.workspace,
  595. batch_size=args.batch_size,
  596. dry_run=args.dry_run,
  597. clear_target=args.clear_target,
  598. )
  599. await tool.run(collection_types=collection_types)
  600. if __name__ == "__main__":
  601. asyncio.run(main())