"""Unit tests for ``trim_content_to_budget`` in ``multimodal_context``. Companion to ``test_multimodal_surrounding_context.py``. Uses the same 1:1 character-token tokenizer so budgets in each scenario stay readable. """ import json import re import pytest from lightrag.multimodal_context import trim_content_to_budget from lightrag.utils import Tokenizer, TokenizerInterface class _CharTokenizer(TokenizerInterface): def encode(self, content: str): return [ord(ch) for ch in content] def decode(self, tokens): return "".join(chr(t) for t in tokens) def _tokenizer() -> Tokenizer: return Tokenizer(model_name="char", tokenizer=_CharTokenizer()) _MARKER_RE = re.compile( r"" ) @pytest.mark.offline def test_short_content_passes_through(): tok = _tokenizer() content = "
cell
" out, was_trimmed = trim_content_to_budget( content, kind="tables", max_tokens=10_000, tokenizer=tok ) assert out == content assert was_trimmed is False assert _MARKER_RE.search(out) is None @pytest.mark.offline def test_table_html_row_trim_keeps_head(): tok = _tokenizer() rows_html = "".join(f"r{i}c0r{i}c1" for i in range(10)) body = f"{rows_html}" content = f'{body}
' out, was_trimmed = trim_content_to_budget( content, kind="tables", max_tokens=200, tokenizer=tok ) assert was_trimmed is True assert " wrapper. table_close = out.rfind("
") marker_match = _MARKER_RE.search(out) assert marker_match is not None assert marker_match.start() > table_close # Head rows preserved, tail rows dropped. assert "r0c0" in out assert "r9c0" not in out assert len(tok.encode(out)) <= 200 @pytest.mark.offline def test_table_json_row_trim_keeps_head(): tok = _tokenizer() rows = [[f"r{i}c0", f"r{i}c1"] for i in range(10)] content = '' + json.dumps(rows) + "
" out, was_trimmed = trim_content_to_budget( content, kind="tables", max_tokens=150, tokenizer=tok ) assert was_trimmed is True assert "" in out # First row preserved, last row dropped. assert "r0c0" in out assert "r9c0" not in out # Marker present and outside
. table_close = out.rfind("") marker_match = _MARKER_RE.search(out) assert marker_match is not None assert marker_match.start() > table_close assert len(tok.encode(out)) <= 150 @pytest.mark.offline def test_table_char_fallback_when_single_row_oversized(): tok = _tokenizer() # A single huge JSON row whose serialized form alone exceeds budget. long_cell = "X" * 400 content = ( '' + json.dumps([[long_cell]], ensure_ascii=False) + "
" ) out, was_trimmed = trim_content_to_budget( content, kind="tables", max_tokens=120, tokenizer=tok ) assert was_trimmed is True # wrapper must still be present even after char fallback. assert out.lstrip().startswith("
" in out # Marker still appended outside the wrapper. assert _MARKER_RE.search(out) is not None assert len(tok.encode(out)) <= 120 @pytest.mark.offline def test_equation_char_trim_keeps_head(): tok = _tokenizer() content = "HEAD_" + "A" * 500 + "_TAIL" out, was_trimmed = trim_content_to_budget( content, kind="equations", max_tokens=100, tokenizer=tok ) assert was_trimmed is True assert out.startswith("HEAD_") # Tail must have been dropped. assert "_TAIL" not in out assert _MARKER_RE.search(out) is not None assert len(tok.encode(out)) <= 100 @pytest.mark.offline def test_malformed_table_falls_back_to_char_trim(): tok = _tokenizer() # Missing closing
tag — TABLE_TAG_RE will reject this, so the # generic char-trim path applies (no wrapper reconstruction). content = "
" out, was_trimmed = trim_content_to_budget( content, kind="tables", max_tokens=100, tokenizer=tok ) assert was_trimmed is True assert out.startswith("
" + "Z" * 500 + "
") assert _MARKER_RE.search(out) is not None assert len(tok.encode(out)) <= 100 @pytest.mark.offline def test_zero_budget_returns_input_unchanged(): tok = _tokenizer() content = "x" * 5000 out, was_trimmed = trim_content_to_budget( content, kind="tables", max_tokens=0, tokenizer=tok ) assert out == content assert was_trimmed is False @pytest.mark.offline def test_negative_budget_returns_input_unchanged(): tok = _tokenizer() content = "x" * 5000 out, was_trimmed = trim_content_to_budget( content, kind="equations", max_tokens=-10, tokenizer=tok ) assert out == content assert was_trimmed is False @pytest.mark.offline def test_tokenizer_none_returns_input_unchanged(): content = "x" * 5000 out, was_trimmed = trim_content_to_budget( content, kind="tables", max_tokens=100, tokenizer=None ) assert out == content assert was_trimmed is False @pytest.mark.offline def test_marker_reports_original_and_final_token_counts(): tok = _tokenizer() content = "x" * 500 out, was_trimmed = trim_content_to_budget( content, kind="equations", max_tokens=100, tokenizer=tok ) assert was_trimmed is True match = _MARKER_RE.search(out) assert match is not None original_in_marker = int(match.group(1)) final_in_marker = int(match.group(2)) assert original_in_marker == 500 # The reported final-token count is the inner-content size (before marker), # so it should be strictly less than the original. assert final_in_marker < original_in_marker assert len(tok.encode(out)) <= 100 @pytest.mark.offline def test_empty_content_returns_unchanged(): tok = _tokenizer() out, was_trimmed = trim_content_to_budget( "", kind="tables", max_tokens=100, tokenizer=tok ) assert out == "" assert was_trimmed is False