test_document_file_path_normalization.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. import sys
  2. import pytest
  3. sys.argv = sys.argv[:1]
  4. from lightrag.api.routers.document_routes import ( # noqa: E402
  5. DocStatusResponse,
  6. normalize_file_path,
  7. pipeline_index_texts,
  8. )
  9. from lightrag.base import DocStatus # noqa: E402
  10. from lightrag.constants import PROCESS_OPTION_CHUNK_FIXED # noqa: E402
  11. from lightrag.pipeline import _PipelineMixin # noqa: E402
  12. class DummyRAG:
  13. def __init__(self):
  14. self.enqueued_calls = []
  15. self.processed = False
  16. # _resolve_text_chunking reads addon_params; {} -> default chunker config.
  17. self.addon_params = {}
  18. async def apipeline_enqueue_documents(
  19. self,
  20. input,
  21. file_paths=None,
  22. track_id=None,
  23. process_options=None,
  24. chunk_options=None,
  25. ):
  26. self.enqueued_calls.append(
  27. {
  28. "input": input,
  29. "file_paths": file_paths,
  30. "track_id": track_id,
  31. "process_options": process_options,
  32. "chunk_options": chunk_options,
  33. }
  34. )
  35. async def apipeline_process_enqueue_documents(self):
  36. self.processed = True
  37. class CaptureDocStatus:
  38. def __init__(self):
  39. self.upserts = []
  40. async def upsert(self, data):
  41. self.upserts.append(data)
  42. class DummyPipeline(_PipelineMixin):
  43. def __init__(self):
  44. self.doc_status = CaptureDocStatus()
  45. class CaptureKV:
  46. def __init__(self):
  47. self.upserts = []
  48. async def filter_keys(self, keys):
  49. return set(keys)
  50. async def upsert(self, data):
  51. self.upserts.append(data)
  52. @pytest.mark.asyncio
  53. async def test_pipeline_index_texts_rejects_missing_file_sources():
  54. rag = DummyRAG()
  55. with pytest.raises(ValueError, match="valid file source"):
  56. await pipeline_index_texts(
  57. rag,
  58. texts=["alpha"],
  59. file_sources=[None],
  60. track_id="track-1",
  61. )
  62. assert rag.enqueued_calls == []
  63. assert rag.processed is False
  64. @pytest.mark.asyncio
  65. async def test_pipeline_index_texts_normalizes_file_sources_to_basename():
  66. rag = DummyRAG()
  67. await pipeline_index_texts(
  68. rag,
  69. texts=["alpha"],
  70. file_sources=["/tmp/source/alpha.txt"],
  71. track_id="track-1",
  72. )
  73. assert len(rag.enqueued_calls) == 1
  74. call = rag.enqueued_calls[0]
  75. assert call["input"] == ["alpha"]
  76. assert call["file_paths"] == ["alpha.txt"]
  77. assert call["track_id"] == "track-1"
  78. assert call["process_options"] == PROCESS_OPTION_CHUNK_FIXED
  79. # No chunking config supplied -> default F snapshot from addon_params.
  80. assert isinstance(call["chunk_options"], dict)
  81. assert "fixed_token" in call["chunk_options"]
  82. assert rag.processed is True
  83. def test_doc_status_response_uses_non_null_unknown_source():
  84. response = DocStatusResponse(
  85. id="doc-1",
  86. content_summary="summary",
  87. content_length=5,
  88. status=DocStatus.PENDING,
  89. created_at="2026-03-19T00:00:00+00:00",
  90. updated_at="2026-03-19T00:00:00+00:00",
  91. file_path=normalize_file_path(None),
  92. )
  93. assert response.file_path == "unknown_source"
  94. @pytest.mark.asyncio
  95. async def test_error_document_enqueue_canonicalizes_file_path_before_upsert():
  96. rag = DummyPipeline()
  97. await rag.apipeline_enqueue_error_documents(
  98. [
  99. {
  100. "file_path": "/tmp/uploads/report.[native-Fi].pdf",
  101. "error_description": "bad file",
  102. "original_error": "parse failed",
  103. }
  104. ],
  105. track_id="track-1",
  106. )
  107. saved = next(iter(rag.doc_status.upserts[0].values()))
  108. assert saved["file_path"] == "report.pdf"
  109. @pytest.mark.asyncio
  110. async def test_custom_chunks_use_canonical_unknown_source_before_upsert():
  111. from lightrag import LightRAG
  112. rag = LightRAG.__new__(LightRAG)
  113. rag.full_docs = CaptureKV()
  114. rag.text_chunks = CaptureKV()
  115. rag.chunks_vdb = CaptureKV()
  116. rag.tokenizer = type("Tokenizer", (), {"encode": lambda self, text: [text]})()
  117. async def _process_extract_entities(chunks):
  118. return []
  119. async def _insert_done():
  120. return None
  121. rag._process_extract_entities = _process_extract_entities
  122. rag._insert_done = _insert_done
  123. await rag.ainsert_custom_chunks("full text", ["chunk text"], doc_id="doc-1")
  124. assert rag.full_docs.upserts[0]["doc-1"]["file_path"] == "unknown_source"
  125. chunk = next(iter(rag.text_chunks.upserts[0].values()))
  126. assert chunk["file_path"] == "unknown_source"