| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- import sys
- import pytest
- sys.argv = sys.argv[:1]
- from lightrag.api.routers.document_routes import ( # noqa: E402
- DocStatusResponse,
- normalize_file_path,
- pipeline_index_texts,
- )
- from lightrag.base import DocStatus # noqa: E402
- from lightrag.constants import PROCESS_OPTION_CHUNK_FIXED # noqa: E402
- from lightrag.pipeline import _PipelineMixin # noqa: E402
- class DummyRAG:
- def __init__(self):
- self.enqueued_calls = []
- self.processed = False
- # _resolve_text_chunking reads addon_params; {} -> default chunker config.
- self.addon_params = {}
- async def apipeline_enqueue_documents(
- self,
- input,
- file_paths=None,
- track_id=None,
- process_options=None,
- chunk_options=None,
- ):
- self.enqueued_calls.append(
- {
- "input": input,
- "file_paths": file_paths,
- "track_id": track_id,
- "process_options": process_options,
- "chunk_options": chunk_options,
- }
- )
- async def apipeline_process_enqueue_documents(self):
- self.processed = True
- class CaptureDocStatus:
- def __init__(self):
- self.upserts = []
- async def upsert(self, data):
- self.upserts.append(data)
- class DummyPipeline(_PipelineMixin):
- def __init__(self):
- self.doc_status = CaptureDocStatus()
- class CaptureKV:
- def __init__(self):
- self.upserts = []
- async def filter_keys(self, keys):
- return set(keys)
- async def upsert(self, data):
- self.upserts.append(data)
- @pytest.mark.asyncio
- async def test_pipeline_index_texts_rejects_missing_file_sources():
- rag = DummyRAG()
- with pytest.raises(ValueError, match="valid file source"):
- await pipeline_index_texts(
- rag,
- texts=["alpha"],
- file_sources=[None],
- track_id="track-1",
- )
- assert rag.enqueued_calls == []
- assert rag.processed is False
- @pytest.mark.asyncio
- async def test_pipeline_index_texts_normalizes_file_sources_to_basename():
- rag = DummyRAG()
- await pipeline_index_texts(
- rag,
- texts=["alpha"],
- file_sources=["/tmp/source/alpha.txt"],
- track_id="track-1",
- )
- assert len(rag.enqueued_calls) == 1
- call = rag.enqueued_calls[0]
- assert call["input"] == ["alpha"]
- assert call["file_paths"] == ["alpha.txt"]
- assert call["track_id"] == "track-1"
- assert call["process_options"] == PROCESS_OPTION_CHUNK_FIXED
- # No chunking config supplied -> default F snapshot from addon_params.
- assert isinstance(call["chunk_options"], dict)
- assert "fixed_token" in call["chunk_options"]
- assert rag.processed is True
- def test_doc_status_response_uses_non_null_unknown_source():
- response = DocStatusResponse(
- id="doc-1",
- content_summary="summary",
- content_length=5,
- status=DocStatus.PENDING,
- created_at="2026-03-19T00:00:00+00:00",
- updated_at="2026-03-19T00:00:00+00:00",
- file_path=normalize_file_path(None),
- )
- assert response.file_path == "unknown_source"
- @pytest.mark.asyncio
- async def test_error_document_enqueue_canonicalizes_file_path_before_upsert():
- rag = DummyPipeline()
- await rag.apipeline_enqueue_error_documents(
- [
- {
- "file_path": "/tmp/uploads/report.[native-Fi].pdf",
- "error_description": "bad file",
- "original_error": "parse failed",
- }
- ],
- track_id="track-1",
- )
- saved = next(iter(rag.doc_status.upserts[0].values()))
- assert saved["file_path"] == "report.pdf"
- @pytest.mark.asyncio
- async def test_custom_chunks_use_canonical_unknown_source_before_upsert():
- from lightrag import LightRAG
- rag = LightRAG.__new__(LightRAG)
- rag.full_docs = CaptureKV()
- rag.text_chunks = CaptureKV()
- rag.chunks_vdb = CaptureKV()
- rag.tokenizer = type("Tokenizer", (), {"encode": lambda self, text: [text]})()
- async def _process_extract_entities(chunks):
- return []
- async def _insert_done():
- return None
- rag._process_extract_entities = _process_extract_entities
- rag._insert_done = _insert_done
- await rag.ainsert_custom_chunks("full text", ["chunk text"], doc_id="doc-1")
- assert rag.full_docs.upserts[0]["doc-1"]["file_path"] == "unknown_source"
- chunk = next(iter(rag.text_chunks.upserts[0].values()))
- assert chunk["file_path"] == "unknown_source"
|