test_multimodal_content_truncation.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. """Unit tests for ``trim_content_to_budget`` in ``multimodal_context``.
  2. Companion to ``test_multimodal_surrounding_context.py``. Uses the same
  3. 1:1 character-token tokenizer so budgets in each scenario stay readable.
  4. """
  5. import json
  6. import re
  7. import pytest
  8. from lightrag.multimodal_context import trim_content_to_budget
  9. from lightrag.utils import Tokenizer, TokenizerInterface
  10. class _CharTokenizer(TokenizerInterface):
  11. def encode(self, content: str):
  12. return [ord(ch) for ch in content]
  13. def decode(self, tokens):
  14. return "".join(chr(t) for t in tokens)
  15. def _tokenizer() -> Tokenizer:
  16. return Tokenizer(model_name="char", tokenizer=_CharTokenizer())
  17. _MARKER_RE = re.compile(
  18. r"<!-- content truncated from (\d+) to (\d+) tokens, head preserved -->"
  19. )
  20. @pytest.mark.offline
  21. def test_short_content_passes_through():
  22. tok = _tokenizer()
  23. content = "<table><tr><td>cell</td></tr></table>"
  24. out, was_trimmed = trim_content_to_budget(
  25. content, kind="tables", max_tokens=10_000, tokenizer=tok
  26. )
  27. assert out == content
  28. assert was_trimmed is False
  29. assert _MARKER_RE.search(out) is None
  30. @pytest.mark.offline
  31. def test_table_html_row_trim_keeps_head():
  32. tok = _tokenizer()
  33. rows_html = "".join(f"<tr><td>r{i}c0</td><td>r{i}c1</td></tr>" for i in range(10))
  34. body = f"<tbody>{rows_html}</tbody>"
  35. content = f'<table id="t-html" format="html">{body}</table>'
  36. out, was_trimmed = trim_content_to_budget(
  37. content, kind="tables", max_tokens=200, tokenizer=tok
  38. )
  39. assert was_trimmed is True
  40. assert "<table " in out
  41. # Marker sits outside the </table> wrapper.
  42. table_close = out.rfind("</table>")
  43. marker_match = _MARKER_RE.search(out)
  44. assert marker_match is not None
  45. assert marker_match.start() > table_close
  46. # Head rows preserved, tail rows dropped.
  47. assert "r0c0" in out
  48. assert "r9c0" not in out
  49. assert len(tok.encode(out)) <= 200
  50. @pytest.mark.offline
  51. def test_table_json_row_trim_keeps_head():
  52. tok = _tokenizer()
  53. rows = [[f"r{i}c0", f"r{i}c1"] for i in range(10)]
  54. content = '<table id="t-json" format="json">' + json.dumps(rows) + "</table>"
  55. out, was_trimmed = trim_content_to_budget(
  56. content, kind="tables", max_tokens=150, tokenizer=tok
  57. )
  58. assert was_trimmed is True
  59. assert "<table " in out
  60. assert "</table>" in out
  61. # First row preserved, last row dropped.
  62. assert "r0c0" in out
  63. assert "r9c0" not in out
  64. # Marker present and outside </table>.
  65. table_close = out.rfind("</table>")
  66. marker_match = _MARKER_RE.search(out)
  67. assert marker_match is not None
  68. assert marker_match.start() > table_close
  69. assert len(tok.encode(out)) <= 150
  70. @pytest.mark.offline
  71. def test_table_char_fallback_when_single_row_oversized():
  72. tok = _tokenizer()
  73. # A single huge JSON row whose serialized form alone exceeds budget.
  74. long_cell = "X" * 400
  75. content = (
  76. '<table id="t-big" format="json">'
  77. + json.dumps([[long_cell]], ensure_ascii=False)
  78. + "</table>"
  79. )
  80. out, was_trimmed = trim_content_to_budget(
  81. content, kind="tables", max_tokens=120, tokenizer=tok
  82. )
  83. assert was_trimmed is True
  84. # <table> wrapper must still be present even after char fallback.
  85. assert out.lstrip().startswith("<table ")
  86. assert "</table>" in out
  87. # Marker still appended outside the wrapper.
  88. assert _MARKER_RE.search(out) is not None
  89. assert len(tok.encode(out)) <= 120
  90. @pytest.mark.offline
  91. def test_equation_char_trim_keeps_head():
  92. tok = _tokenizer()
  93. content = "HEAD_" + "A" * 500 + "_TAIL"
  94. out, was_trimmed = trim_content_to_budget(
  95. content, kind="equations", max_tokens=100, tokenizer=tok
  96. )
  97. assert was_trimmed is True
  98. assert out.startswith("HEAD_")
  99. # Tail must have been dropped.
  100. assert "_TAIL" not in out
  101. assert _MARKER_RE.search(out) is not None
  102. assert len(tok.encode(out)) <= 100
  103. @pytest.mark.offline
  104. def test_malformed_table_falls_back_to_char_trim():
  105. tok = _tokenizer()
  106. # Missing closing </table> tag — TABLE_TAG_RE will reject this, so the
  107. # generic char-trim path applies (no <table> wrapper reconstruction).
  108. content = "<table><tr><td>" + "Z" * 500 + "</td></tr>"
  109. out, was_trimmed = trim_content_to_budget(
  110. content, kind="tables", max_tokens=100, tokenizer=tok
  111. )
  112. assert was_trimmed is True
  113. assert out.startswith("<table>")
  114. assert _MARKER_RE.search(out) is not None
  115. assert len(tok.encode(out)) <= 100
  116. @pytest.mark.offline
  117. def test_zero_budget_returns_input_unchanged():
  118. tok = _tokenizer()
  119. content = "x" * 5000
  120. out, was_trimmed = trim_content_to_budget(
  121. content, kind="tables", max_tokens=0, tokenizer=tok
  122. )
  123. assert out == content
  124. assert was_trimmed is False
  125. @pytest.mark.offline
  126. def test_negative_budget_returns_input_unchanged():
  127. tok = _tokenizer()
  128. content = "x" * 5000
  129. out, was_trimmed = trim_content_to_budget(
  130. content, kind="equations", max_tokens=-10, tokenizer=tok
  131. )
  132. assert out == content
  133. assert was_trimmed is False
  134. @pytest.mark.offline
  135. def test_tokenizer_none_returns_input_unchanged():
  136. content = "x" * 5000
  137. out, was_trimmed = trim_content_to_budget(
  138. content, kind="tables", max_tokens=100, tokenizer=None
  139. )
  140. assert out == content
  141. assert was_trimmed is False
  142. @pytest.mark.offline
  143. def test_marker_reports_original_and_final_token_counts():
  144. tok = _tokenizer()
  145. content = "x" * 500
  146. out, was_trimmed = trim_content_to_budget(
  147. content, kind="equations", max_tokens=100, tokenizer=tok
  148. )
  149. assert was_trimmed is True
  150. match = _MARKER_RE.search(out)
  151. assert match is not None
  152. original_in_marker = int(match.group(1))
  153. final_in_marker = int(match.group(2))
  154. assert original_in_marker == 500
  155. # The reported final-token count is the inner-content size (before marker),
  156. # so it should be strictly less than the original.
  157. assert final_in_marker < original_in_marker
  158. assert len(tok.encode(out)) <= 100
  159. @pytest.mark.offline
  160. def test_empty_content_returns_unchanged():
  161. tok = _tokenizer()
  162. out, was_trimmed = trim_content_to_budget(
  163. "", kind="tables", max_tokens=100, tokenizer=tok
  164. )
  165. assert out == ""
  166. assert was_trimmed is False