test_postgres_upsert.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719
  1. """
  2. Unit tests for PGKVStorage.upsert batch optimization (PR #2742 fixes).
  3. Verifies:
  4. 1. Each namespace builds correct tuple ordering matching SQL positional params.
  5. 2. _run_with_retry is used (not the removed PostgreSQLDB.executemany wrapper).
  6. 3. Sub-batching splits data when len(data) > _max_batch_size.
  7. 4. Unknown namespace raises ValueError.
  8. 5. Empty data returns without any DB call.
  9. """
  10. import asyncio
  11. import json
  12. import pytest
  13. import numpy as np
  14. from unittest.mock import AsyncMock, MagicMock
  15. from lightrag.kg.postgres_impl import PGDocStatusStorage, PGKVStorage, PGVectorStorage
  16. from lightrag.namespace import NameSpace
  17. from lightrag.utils import EmbeddingFunc
  18. # ---------------------------------------------------------------------------
  19. # Helpers
  20. # ---------------------------------------------------------------------------
  21. GLOBAL_CONFIG = {"embedding_batch_num": 10}
  22. def make_storage(namespace: str) -> PGKVStorage:
  23. """Construct a PGKVStorage instance with a mocked db."""
  24. db = MagicMock()
  25. captured: list[tuple] = []
  26. retry_kwargs: list[dict] = []
  27. async def fake_run_with_retry(operation, **kwargs):
  28. """Call the closure with a mock connection to capture executemany args."""
  29. retry_kwargs.append(kwargs)
  30. mock_conn = AsyncMock()
  31. await operation(mock_conn)
  32. # Store (sql, data) from each executemany call
  33. for call in mock_conn.executemany.call_args_list:
  34. captured.append((call.args[0], call.args[1]))
  35. db._run_with_retry = AsyncMock(side_effect=fake_run_with_retry)
  36. db.workspace = "test_ws"
  37. storage = PGKVStorage.__new__(PGKVStorage)
  38. storage.namespace = namespace
  39. storage.workspace = "test_ws"
  40. storage.global_config = GLOBAL_CONFIG
  41. storage.db = db
  42. storage.__post_init__()
  43. storage._captured = captured
  44. storage._retry_kwargs = retry_kwargs
  45. return storage
  46. def make_doc_status_storage() -> PGDocStatusStorage:
  47. """Construct a PGDocStatusStorage instance with a mocked db."""
  48. db = MagicMock()
  49. captured: list[tuple] = []
  50. retry_kwargs: list[dict] = []
  51. async def fake_run_with_retry(operation, **kwargs):
  52. retry_kwargs.append(kwargs)
  53. mock_conn = AsyncMock()
  54. tx = AsyncMock()
  55. tx.__aenter__.return_value = tx
  56. tx.__aexit__.return_value = False
  57. mock_conn.transaction = MagicMock(return_value=tx)
  58. await operation(mock_conn)
  59. for call in mock_conn.executemany.call_args_list:
  60. captured.append((call.args[0], call.args[1]))
  61. db._run_with_retry = AsyncMock(side_effect=fake_run_with_retry)
  62. db.workspace = "test_ws"
  63. storage = PGDocStatusStorage.__new__(PGDocStatusStorage)
  64. storage.namespace = NameSpace.DOC_STATUS
  65. storage.workspace = "test_ws"
  66. storage.global_config = GLOBAL_CONFIG
  67. storage.db = db
  68. storage._captured = captured
  69. storage._retry_kwargs = retry_kwargs
  70. return storage
  71. def make_vector_storage(namespace: str) -> PGVectorStorage:
  72. """Construct a PGVectorStorage instance with a mocked db and embedding func."""
  73. db = MagicMock()
  74. captured: list[tuple] = []
  75. retry_kwargs: list[dict] = []
  76. async def fake_run_with_retry(operation, **kwargs):
  77. retry_kwargs.append(kwargs)
  78. mock_conn = AsyncMock()
  79. # connection.transaction() is a synchronous factory in asyncpg that
  80. # returns an async context manager. AsyncMock would return a coroutine,
  81. # so swap in a MagicMock that returns an AsyncMock-backed context.
  82. tx_cm = AsyncMock()
  83. tx_cm.__aenter__.return_value = None
  84. tx_cm.__aexit__.return_value = None
  85. mock_conn.transaction = MagicMock(return_value=tx_cm)
  86. await operation(mock_conn)
  87. for call in mock_conn.executemany.call_args_list:
  88. captured.append((call.args[0], call.args[1]))
  89. # Also capture connection.execute calls (delete SQL inside flush).
  90. for call in mock_conn.execute.call_args_list:
  91. captured.append((call.args[0], call.args[1:]))
  92. db._run_with_retry = AsyncMock(side_effect=fake_run_with_retry)
  93. db.workspace = "test_ws"
  94. async def embed_func(texts, **kwargs):
  95. return np.array([[0.1, 0.2, 0.3] for _ in texts], dtype=np.float32)
  96. embedding = EmbeddingFunc(
  97. embedding_dim=3,
  98. func=embed_func,
  99. model_name="test_model",
  100. )
  101. storage = PGVectorStorage(
  102. namespace=namespace,
  103. workspace="test_ws",
  104. global_config={
  105. "embedding_batch_num": 10,
  106. "vector_db_storage_cls_kwargs": {
  107. "cosine_better_than_threshold": 0.5,
  108. },
  109. },
  110. embedding_func=embedding,
  111. )
  112. storage.db = db
  113. storage._flush_lock = asyncio.Lock()
  114. storage._captured = captured
  115. storage._retry_kwargs = retry_kwargs
  116. return storage
  117. # ---------------------------------------------------------------------------
  118. # 1. _max_batch_size is always 200 (not embedding_batch_num)
  119. # ---------------------------------------------------------------------------
  120. def test_max_batch_size_is_constant():
  121. storage = make_storage(NameSpace.KV_STORE_TEXT_CHUNKS)
  122. assert storage._max_batch_size == 200
  123. # ---------------------------------------------------------------------------
  124. # 2. Namespace: TEXT_CHUNKS
  125. # ---------------------------------------------------------------------------
  126. @pytest.mark.asyncio
  127. async def test_upsert_text_chunks_tuple_order():
  128. storage = make_storage(NameSpace.KV_STORE_TEXT_CHUNKS)
  129. data = {
  130. "chunk-1": {
  131. "tokens": 42,
  132. "chunk_order_index": 0,
  133. "full_doc_id": "doc-1",
  134. "content": "hello world",
  135. "file_path": "/a/b.[native-Fi].txt",
  136. "llm_cache_list": ["cache-key"],
  137. "heading": {"level": 2, "text": "Section A"},
  138. "sidecar": {"type": "drawing", "id": "img-1", "refs": []},
  139. }
  140. }
  141. await storage.upsert(data)
  142. assert len(storage._captured) == 1
  143. sql, rows = storage._captured[0]
  144. assert "LIGHTRAG_DOC_CHUNKS" in sql
  145. assert len(rows) == 1
  146. row = rows[0]
  147. # SQL: (workspace, id, tokens, chunk_order_index, full_doc_id,
  148. # content, file_path, llm_cache_list, heading, sidecar,
  149. # create_time, update_time)
  150. assert row[0] == "test_ws" # workspace
  151. assert row[1] == "chunk-1" # id
  152. assert row[2] == 42 # tokens
  153. assert row[3] == 0 # chunk_order_index
  154. assert row[4] == "doc-1" # full_doc_id
  155. assert row[5] == "hello world" # content
  156. assert row[6] == "/a/b.[native-Fi].txt" # file_path
  157. assert json.loads(row[7]) == ["cache-key"] # llm_cache_list
  158. assert json.loads(row[8]) == {"level": 2, "text": "Section A"} # heading
  159. assert json.loads(row[9]) == {
  160. "type": "drawing",
  161. "id": "img-1",
  162. "refs": [],
  163. } # sidecar
  164. @pytest.mark.asyncio
  165. async def test_upsert_text_chunks_missing_heading_sidecar_defaults_to_empty_dict():
  166. """Plain-text chunks without heading/sidecar should serialize to '{}'."""
  167. storage = make_storage(NameSpace.KV_STORE_TEXT_CHUNKS)
  168. data = {
  169. "chunk-1": {
  170. "tokens": 10,
  171. "chunk_order_index": 0,
  172. "full_doc_id": "doc-1",
  173. "content": "plain text",
  174. "file_path": "/a/b.txt",
  175. }
  176. }
  177. await storage.upsert(data)
  178. _, rows = storage._captured[0]
  179. row = rows[0]
  180. assert json.loads(row[8]) == {} # heading
  181. assert json.loads(row[9]) == {} # sidecar
  182. @pytest.mark.asyncio
  183. async def test_upsert_text_chunks_none_heading_sidecar_defaults_to_empty_dict():
  184. """Explicit None values should be coerced to '{}' to avoid type errors."""
  185. storage = make_storage(NameSpace.KV_STORE_TEXT_CHUNKS)
  186. data = {
  187. "chunk-1": {
  188. "tokens": 10,
  189. "chunk_order_index": 0,
  190. "full_doc_id": "doc-1",
  191. "content": "plain text",
  192. "file_path": "/a/b.txt",
  193. "heading": None,
  194. "sidecar": None,
  195. }
  196. }
  197. await storage.upsert(data)
  198. _, rows = storage._captured[0]
  199. row = rows[0]
  200. assert json.loads(row[8]) == {}
  201. assert json.loads(row[9]) == {}
  202. # ---------------------------------------------------------------------------
  203. # 3. Namespace: FULL_DOCS
  204. # ---------------------------------------------------------------------------
  205. @pytest.mark.asyncio
  206. async def test_upsert_full_docs_tuple_order():
  207. storage = make_storage(NameSpace.KV_STORE_FULL_DOCS)
  208. data = {
  209. "doc-1": {
  210. "content": "full text",
  211. "file_path": "/path/doc.[mineru-Fi].pdf",
  212. "sidecar_location": "lightrag://sidecar/doc-1",
  213. "parse_format": "lightrag",
  214. "content_hash": "deadbeef",
  215. "process_options": "Fi",
  216. "chunk_options": {"chunk_token_size": 1200, "chunk_overlap": 100},
  217. "parse_engine": "mineru",
  218. }
  219. }
  220. await storage.upsert(data)
  221. assert len(storage._captured) == 1
  222. _, rows = storage._captured[0]
  223. row = rows[0]
  224. # SQL: (id, content, doc_name, workspace, sidecar_location, parse_format,
  225. # content_hash, process_options, chunk_options, parse_engine)
  226. assert row[0] == "doc-1"
  227. assert row[1] == "full text"
  228. assert row[2] == "/path/doc.[mineru-Fi].pdf"
  229. assert row[3] == "test_ws"
  230. assert row[4] == "lightrag://sidecar/doc-1"
  231. assert row[5] == "lightrag"
  232. assert row[6] == "deadbeef"
  233. assert row[7] == "Fi"
  234. assert json.loads(row[8]) == {"chunk_token_size": 1200, "chunk_overlap": 100}
  235. assert row[9] == "mineru"
  236. @pytest.mark.asyncio
  237. async def test_upsert_full_docs_missing_pipeline_fields_pass_through_as_none():
  238. """Missing pipeline-derived fields must serialize as None at the Python
  239. layer so the SQL-level COALESCE guard can distinguish "caller did not
  240. supply" from "caller supplied a real value".
  241. The 'raw' default for parse_format is provided by the column DDL on
  242. initial insert; the Python layer must NOT inject it, otherwise the
  243. COALESCE guard never triggers on subsequent partial writes (a follow-up
  244. upsert with no parse_format would re-stamp the column with 'raw' and
  245. blow away a previously-set 'lightrag').
  246. """
  247. storage = make_storage(NameSpace.KV_STORE_FULL_DOCS)
  248. data = {"doc-1": {"content": "full text", "file_path": "/path/doc.pdf"}}
  249. await storage.upsert(data)
  250. _, rows = storage._captured[0]
  251. row = rows[0]
  252. assert row[4] is None # sidecar_location
  253. assert row[5] is None # parse_format — DDL supplies 'raw' default on insert
  254. assert row[6] is None # content_hash
  255. assert row[7] is None # process_options
  256. assert json.loads(row[8]) == {} # chunk_options default
  257. assert row[9] is None # parse_engine
  258. @pytest.mark.asyncio
  259. async def test_upsert_full_docs_none_chunk_options_defaults_to_empty_dict():
  260. storage = make_storage(NameSpace.KV_STORE_FULL_DOCS)
  261. data = {
  262. "doc-1": {
  263. "content": "full text",
  264. "file_path": "/path/doc.pdf",
  265. "chunk_options": None,
  266. }
  267. }
  268. await storage.upsert(data)
  269. _, rows = storage._captured[0]
  270. assert json.loads(rows[0][8]) == {}
  271. @pytest.mark.asyncio
  272. async def test_upsert_full_docs_sql_protects_partial_writes():
  273. """The ON CONFLICT clause must COALESCE+NULLIF every pipeline-derived
  274. column so a follow-up upsert that only carries ``content`` + ``doc_name``
  275. does not silently overwrite previously-recorded metadata back to defaults.
  276. We assert this at the SQL-template level since the actual COALESCE
  277. behavior is executed by Postgres. The presence of the protective
  278. expression in the SQL is the single source of truth for the guarantee.
  279. """
  280. storage = make_storage(NameSpace.KV_STORE_FULL_DOCS)
  281. await storage.upsert(
  282. {"doc-1": {"content": "full text", "file_path": "/path/doc.pdf"}}
  283. )
  284. sql, _ = storage._captured[0]
  285. normalized = " ".join(sql.split()).lower()
  286. # Each pipeline-derived string column must be COALESCE/NULLIF-guarded
  287. for col in (
  288. "sidecar_location",
  289. "parse_format",
  290. "content_hash",
  291. "process_options",
  292. "parse_engine",
  293. ):
  294. assert (
  295. f"coalesce( nullif(excluded.{col}, '')" in normalized
  296. ), f"upsert_doc_full must guard {col} via COALESCE+NULLIF"
  297. assert (
  298. f"lightrag_doc_full.{col}" in normalized
  299. ), f"upsert_doc_full must preserve existing {col} on partial write"
  300. # chunk_options (JSONB) is guarded via CASE on NULL/empty-object literal
  301. assert "excluded.chunk_options is null" in normalized
  302. assert "excluded.chunk_options = '{}'::jsonb" in normalized
  303. assert "lightrag_doc_full.chunk_options" in normalized
  304. # content / doc_name remain straight overwrites — they ARE the payload
  305. assert "content = excluded.content" in normalized
  306. assert "doc_name = excluded.doc_name" in normalized
  307. # ---------------------------------------------------------------------------
  308. # 4. Namespace: LLM_RESPONSE_CACHE
  309. # ---------------------------------------------------------------------------
  310. @pytest.mark.asyncio
  311. async def test_upsert_llm_cache_tuple_order():
  312. storage = make_storage(NameSpace.KV_STORE_LLM_RESPONSE_CACHE)
  313. data = {
  314. "key-1": {
  315. "original_prompt": "what is X?",
  316. "return": "X is Y",
  317. "chunk_id": "chunk-1",
  318. "cache_type": "query",
  319. "queryparam": {"mode": "hybrid"},
  320. }
  321. }
  322. await storage.upsert(data)
  323. assert len(storage._captured) == 1
  324. _, rows = storage._captured[0]
  325. row = rows[0]
  326. # SQL: (workspace, id, original_prompt, return_value, chunk_id, cache_type, queryparam)
  327. assert row[0] == "test_ws"
  328. assert row[1] == "key-1"
  329. assert row[2] == "what is X?"
  330. assert row[3] == "X is Y"
  331. assert row[4] == "chunk-1"
  332. assert row[5] == "query"
  333. assert json.loads(row[6]) == {"mode": "hybrid"}
  334. @pytest.mark.asyncio
  335. async def test_upsert_llm_cache_null_queryparam():
  336. storage = make_storage(NameSpace.KV_STORE_LLM_RESPONSE_CACHE)
  337. data = {
  338. "key-2": {
  339. "original_prompt": "prompt",
  340. "return": "answer",
  341. "cache_type": "extract",
  342. }
  343. }
  344. await storage.upsert(data)
  345. _, rows = storage._captured[0]
  346. assert rows[0][6] is None # queryparam should be None
  347. # ---------------------------------------------------------------------------
  348. # 5. Namespace: FULL_ENTITIES
  349. # ---------------------------------------------------------------------------
  350. @pytest.mark.asyncio
  351. async def test_upsert_full_entities_tuple_order():
  352. storage = make_storage(NameSpace.KV_STORE_FULL_ENTITIES)
  353. data = {"ent-1": {"entity_names": ["EntityA", "EntityB"], "count": 2}}
  354. await storage.upsert(data)
  355. _, rows = storage._captured[0]
  356. row = rows[0]
  357. # SQL: (workspace, id, entity_names, count, create_time, update_time)
  358. assert row[0] == "test_ws"
  359. assert row[1] == "ent-1"
  360. assert json.loads(row[2]) == ["EntityA", "EntityB"]
  361. assert row[3] == 2
  362. # ---------------------------------------------------------------------------
  363. # 6. Namespace: FULL_RELATIONS
  364. # ---------------------------------------------------------------------------
  365. @pytest.mark.asyncio
  366. async def test_upsert_full_relations_tuple_order():
  367. storage = make_storage(NameSpace.KV_STORE_FULL_RELATIONS)
  368. data = {"rel-1": {"relation_pairs": [["A", "B"]], "count": 1}}
  369. await storage.upsert(data)
  370. _, rows = storage._captured[0]
  371. row = rows[0]
  372. # SQL: (workspace, id, relation_pairs, count, create_time, update_time)
  373. assert row[0] == "test_ws"
  374. assert row[1] == "rel-1"
  375. assert json.loads(row[2]) == [["A", "B"]]
  376. assert row[3] == 1
  377. # ---------------------------------------------------------------------------
  378. # 7. Namespace: ENTITY_CHUNKS / RELATION_CHUNKS
  379. # ---------------------------------------------------------------------------
  380. @pytest.mark.asyncio
  381. async def test_upsert_entity_chunks_tuple_order():
  382. storage = make_storage(NameSpace.KV_STORE_ENTITY_CHUNKS)
  383. data = {"ec-1": {"chunk_ids": ["c1", "c2"], "count": 2}}
  384. await storage.upsert(data)
  385. _, rows = storage._captured[0]
  386. row = rows[0]
  387. # SQL: (workspace, id, chunk_ids, count, create_time, update_time)
  388. assert row[0] == "test_ws"
  389. assert row[1] == "ec-1"
  390. assert json.loads(row[2]) == ["c1", "c2"]
  391. assert row[3] == 2
  392. @pytest.mark.asyncio
  393. async def test_upsert_relation_chunks_tuple_order():
  394. storage = make_storage(NameSpace.KV_STORE_RELATION_CHUNKS)
  395. data = {"rc-1": {"chunk_ids": ["c3"], "count": 1}}
  396. await storage.upsert(data)
  397. _, rows = storage._captured[0]
  398. row = rows[0]
  399. assert row[0] == "test_ws"
  400. assert row[1] == "rc-1"
  401. assert json.loads(row[2]) == ["c3"]
  402. assert row[3] == 1
  403. # ---------------------------------------------------------------------------
  404. # 8. Sub-batching: data > _max_batch_size splits into multiple _run_with_retry calls
  405. # ---------------------------------------------------------------------------
  406. @pytest.mark.asyncio
  407. async def test_sub_batching_splits_correctly():
  408. storage = make_storage(NameSpace.KV_STORE_FULL_DOCS)
  409. storage._max_batch_size = 3 # Override to small value for testing
  410. data = {f"doc-{i}": {"content": f"text {i}", "file_path": ""} for i in range(7)}
  411. await storage.upsert(data)
  412. # 7 records / batch_size 3 => 3 batches (3 + 3 + 1)
  413. assert len(storage._captured) == 3
  414. assert len(storage._captured[0][1]) == 3
  415. assert len(storage._captured[1][1]) == 3
  416. assert len(storage._captured[2][1]) == 1
  417. @pytest.mark.asyncio
  418. async def test_sub_batching_exact_multiple():
  419. storage = make_storage(NameSpace.KV_STORE_FULL_DOCS)
  420. storage._max_batch_size = 3
  421. data = {f"doc-{i}": {"content": f"text {i}", "file_path": ""} for i in range(6)}
  422. await storage.upsert(data)
  423. # 6 / 3 => exactly 2 batches
  424. assert len(storage._captured) == 2
  425. assert len(storage._captured[0][1]) == 3
  426. assert len(storage._captured[1][1]) == 3
  427. # ---------------------------------------------------------------------------
  428. # 9. Empty data: no DB call
  429. # ---------------------------------------------------------------------------
  430. @pytest.mark.asyncio
  431. async def test_upsert_empty_data_no_db_call():
  432. storage = make_storage(NameSpace.KV_STORE_FULL_DOCS)
  433. await storage.upsert({})
  434. assert len(storage._captured) == 0
  435. storage.db._run_with_retry.assert_not_called()
  436. # ---------------------------------------------------------------------------
  437. # 10. Unknown namespace raises ValueError
  438. # ---------------------------------------------------------------------------
  439. @pytest.mark.asyncio
  440. async def test_upsert_unknown_namespace_raises():
  441. storage = make_storage("unknown_namespace")
  442. with pytest.raises(ValueError, match="Unknown namespace"):
  443. await storage.upsert({"k": {"v": 1}})
  444. # ---------------------------------------------------------------------------
  445. # 11. Multiple records go into one batch when within limit
  446. # ---------------------------------------------------------------------------
  447. @pytest.mark.asyncio
  448. async def test_multiple_records_single_batch():
  449. storage = make_storage(NameSpace.KV_STORE_FULL_DOCS)
  450. data = {
  451. "doc-1": {"content": "text 1", "file_path": "/a"},
  452. "doc-2": {"content": "text 2", "file_path": "/b"},
  453. "doc-3": {"content": "text 3", "file_path": "/c"},
  454. }
  455. await storage.upsert(data)
  456. # All 3 fit within default batch size of 200
  457. assert len(storage._captured) == 1
  458. _, rows = storage._captured[0]
  459. assert len(rows) == 3
  460. ids = {row[0] for row in rows} # id is $1 for FULL_DOCS
  461. assert ids == {"doc-1", "doc-2", "doc-3"}
  462. @pytest.mark.asyncio
  463. async def test_kv_upsert_passes_timing_label():
  464. storage = make_storage(NameSpace.KV_STORE_FULL_DOCS)
  465. await storage.upsert({"doc-1": {"content": "text 1", "file_path": "/a"}})
  466. assert storage._retry_kwargs[0]["timing_label"] == (
  467. f"test_ws PGKVStorage.upsert[{NameSpace.KV_STORE_FULL_DOCS}]"
  468. )
  469. @pytest.mark.asyncio
  470. async def test_doc_status_upsert_passes_timing_label():
  471. storage = make_doc_status_storage()
  472. await storage.upsert(
  473. {
  474. "doc-1": {
  475. "content_summary": "summary",
  476. "content_length": 12,
  477. "chunks_count": 1,
  478. "status": "processed",
  479. "file_path": "/a.txt",
  480. "chunks_list": ["chunk-1"],
  481. "metadata": {"source": "test"},
  482. "created_at": "2024-01-01T00:00:00+00:00",
  483. "updated_at": "2024-01-01T00:00:00+00:00",
  484. }
  485. }
  486. )
  487. assert storage._retry_kwargs[0]["timing_label"] == (
  488. "test_ws PGDocStatusStorage.upsert"
  489. )
  490. # ---------------------------------------------------------------------------
  491. # doc_status: content_hash tuple + COALESCE SQL guard
  492. # ---------------------------------------------------------------------------
  493. @pytest.mark.asyncio
  494. async def test_doc_status_upsert_includes_content_hash():
  495. storage = make_doc_status_storage()
  496. await storage.upsert(
  497. {
  498. "doc-1": {
  499. "content_summary": "summary",
  500. "content_length": 12,
  501. "chunks_count": 1,
  502. "status": "processed",
  503. "file_path": "/uploads/a.[native-Fi].txt",
  504. "chunks_list": ["chunk-1"],
  505. "metadata": {"source": "test"},
  506. "content_hash": "abc123",
  507. "created_at": "2024-01-01T00:00:00+00:00",
  508. "updated_at": "2024-01-01T00:00:00+00:00",
  509. }
  510. }
  511. )
  512. sql, rows = storage._captured[0]
  513. # content_hash should be present in the INSERT column list and tuple
  514. assert "content_hash" in sql
  515. row = rows[0]
  516. # Tuple layout: workspace, id, content_summary, content_length, chunks_count,
  517. # status, file_path, chunks_list, track_id, metadata, error_msg,
  518. # content_hash, created_at, updated_at
  519. assert row[6] == "/uploads/a.[native-Fi].txt"
  520. assert row[11] == "abc123"
  521. @pytest.mark.asyncio
  522. async def test_doc_status_upsert_missing_content_hash_is_none():
  523. """Existing callers that do not pass content_hash still produce valid tuples."""
  524. storage = make_doc_status_storage()
  525. await storage.upsert(
  526. {
  527. "doc-1": {
  528. "content_summary": "summary",
  529. "content_length": 12,
  530. "chunks_count": 1,
  531. "status": "processed",
  532. "file_path": "/a.txt",
  533. "chunks_list": ["chunk-1"],
  534. "metadata": {"source": "test"},
  535. "created_at": "2024-01-01T00:00:00+00:00",
  536. "updated_at": "2024-01-01T00:00:00+00:00",
  537. }
  538. }
  539. )
  540. _, rows = storage._captured[0]
  541. assert rows[0][11] is None
  542. @pytest.mark.asyncio
  543. async def test_doc_status_upsert_sql_protects_existing_content_hash():
  544. """The ON CONFLICT clause must COALESCE+NULLIF to preserve a previously
  545. set content_hash when a subsequent state-transition upsert carries no
  546. hash (None) or an empty string.
  547. We assert this at the SQL-template level since the actual COALESCE
  548. behavior is executed by Postgres. The presence of the protective
  549. expression in the SQL is the single source of truth for the guarantee.
  550. """
  551. storage = make_doc_status_storage()
  552. await storage.upsert(
  553. {
  554. "doc-1": {
  555. "content_summary": "summary",
  556. "content_length": 12,
  557. "chunks_count": 1,
  558. "status": "processed",
  559. "file_path": "/a.txt",
  560. "chunks_list": [],
  561. "metadata": {},
  562. "created_at": "2024-01-01T00:00:00+00:00",
  563. "updated_at": "2024-01-01T00:00:00+00:00",
  564. }
  565. }
  566. )
  567. sql, _ = storage._captured[0]
  568. normalized = " ".join(sql.split()).lower()
  569. assert "coalesce(" in normalized
  570. assert "nullif(excluded.content_hash, '')" in normalized
  571. assert "lightrag_doc_status.content_hash" in normalized
  572. @pytest.mark.asyncio
  573. async def test_vector_flush_passes_timing_label():
  574. """upsert() now buffers; the timing_label is emitted by the flush path
  575. that runs from index_done_callback() / finalize().
  576. """
  577. storage = make_vector_storage(NameSpace.VECTOR_STORE_CHUNKS)
  578. await storage.upsert(
  579. {
  580. "chunk-1": {
  581. "tokens": 42,
  582. "chunk_order_index": 0,
  583. "full_doc_id": "doc-1",
  584. "content": "hello world",
  585. "file_path": "/a/b.txt",
  586. }
  587. }
  588. )
  589. # No retry call until flush.
  590. assert storage._retry_kwargs == []
  591. await storage.index_done_callback()
  592. assert storage._retry_kwargs[0]["timing_label"] == (
  593. f"test_ws PGVectorStorage.flush[{NameSpace.VECTOR_STORE_CHUNKS}]"
  594. )