test_write_json_optimization.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490
  1. """
  2. Test suite for write_json optimization
  3. This test verifies:
  4. 1. Fast path works for clean data (no sanitization)
  5. 2. Slow path applies sanitization for dirty data
  6. 3. Sanitization is done during encoding (memory-efficient)
  7. 4. Reloading updates shared memory with cleaned data
  8. """
  9. import os
  10. import json
  11. import tempfile
  12. import pytest
  13. from lightrag.utils import (
  14. write_json,
  15. load_json,
  16. SanitizingJSONEncoder,
  17. sanitize_text_for_encoding,
  18. )
  19. @pytest.mark.offline
  20. class TestWriteJsonOptimization:
  21. """Test write_json optimization with two-stage approach"""
  22. def test_fast_path_clean_data(self):
  23. """Test that clean data takes the fast path without sanitization"""
  24. clean_data = {
  25. "name": "John Doe",
  26. "age": 30,
  27. "items": ["apple", "banana", "cherry"],
  28. "nested": {"key": "value", "number": 42},
  29. }
  30. with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
  31. temp_file = f.name
  32. try:
  33. # Write clean data - should return False (no sanitization)
  34. needs_reload = write_json(clean_data, temp_file)
  35. assert not needs_reload, "Clean data should not require sanitization"
  36. # Verify data was written correctly
  37. loaded_data = load_json(temp_file)
  38. assert loaded_data == clean_data, "Loaded data should match original"
  39. finally:
  40. os.unlink(temp_file)
  41. def test_slow_path_dirty_data(self):
  42. """Test that dirty data triggers sanitization"""
  43. # Create data with surrogate characters (U+D800 to U+DFFF)
  44. dirty_string = "Hello\ud800World" # Contains surrogate character
  45. dirty_data = {"text": dirty_string, "number": 123}
  46. with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
  47. temp_file = f.name
  48. try:
  49. # Write dirty data - should return True (sanitization applied)
  50. needs_reload = write_json(dirty_data, temp_file)
  51. assert needs_reload, "Dirty data should trigger sanitization"
  52. # Verify data was written and sanitized
  53. loaded_data = load_json(temp_file)
  54. assert loaded_data is not None, "Data should be written"
  55. assert loaded_data["number"] == 123, "Clean fields should remain unchanged"
  56. # Surrogate character should be removed
  57. assert (
  58. "\ud800" not in loaded_data["text"]
  59. ), "Surrogate character should be removed"
  60. finally:
  61. os.unlink(temp_file)
  62. def test_sanitizing_encoder_removes_surrogates(self):
  63. """Test that SanitizingJSONEncoder removes surrogate characters"""
  64. data_with_surrogates = {
  65. "text": "Hello\ud800\udc00World", # Contains surrogate pair
  66. "clean": "Clean text",
  67. "nested": {"dirty_key\ud801": "value", "clean_key": "clean\ud802value"},
  68. }
  69. # Encode using custom encoder
  70. encoded = json.dumps(
  71. data_with_surrogates, cls=SanitizingJSONEncoder, ensure_ascii=False
  72. )
  73. # Verify no surrogate characters in output
  74. assert "\ud800" not in encoded, "Surrogate U+D800 should be removed"
  75. assert "\udc00" not in encoded, "Surrogate U+DC00 should be removed"
  76. assert "\ud801" not in encoded, "Surrogate U+D801 should be removed"
  77. assert "\ud802" not in encoded, "Surrogate U+D802 should be removed"
  78. # Verify clean parts remain
  79. assert "Clean text" in encoded, "Clean text should remain"
  80. assert "clean_key" in encoded, "Clean keys should remain"
  81. def test_nested_structure_sanitization(self):
  82. """Test sanitization of deeply nested structures"""
  83. nested_data = {
  84. "level1": {
  85. "level2": {
  86. "level3": {"dirty": "text\ud800here", "clean": "normal text"},
  87. "list": ["item1", "item\ud801dirty", "item3"],
  88. }
  89. }
  90. }
  91. with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
  92. temp_file = f.name
  93. try:
  94. needs_reload = write_json(nested_data, temp_file)
  95. assert needs_reload, "Nested dirty data should trigger sanitization"
  96. # Verify nested structure is preserved
  97. loaded_data = load_json(temp_file)
  98. assert "level1" in loaded_data
  99. assert "level2" in loaded_data["level1"]
  100. assert "level3" in loaded_data["level1"]["level2"]
  101. # Verify surrogates are removed
  102. dirty_text = loaded_data["level1"]["level2"]["level3"]["dirty"]
  103. assert "\ud800" not in dirty_text, "Nested surrogate should be removed"
  104. # Verify list items are sanitized
  105. list_items = loaded_data["level1"]["level2"]["list"]
  106. assert (
  107. "\ud801" not in list_items[1]
  108. ), "List item surrogates should be removed"
  109. finally:
  110. os.unlink(temp_file)
  111. def test_unicode_non_characters_removed(self):
  112. """Test that Unicode non-characters (U+FFFE, U+FFFF) don't cause encoding errors
  113. Note: U+FFFE and U+FFFF are valid UTF-8 characters (though discouraged),
  114. so they don't trigger sanitization. They only get removed when explicitly
  115. using the SanitizingJSONEncoder.
  116. """
  117. data_with_nonchars = {"text1": "Hello\ufffeWorld", "text2": "Test\uffffString"}
  118. with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
  119. temp_file = f.name
  120. try:
  121. # These characters are valid UTF-8, so they take the fast path
  122. needs_reload = write_json(data_with_nonchars, temp_file)
  123. assert not needs_reload, "U+FFFE/U+FFFF are valid UTF-8 characters"
  124. loaded_data = load_json(temp_file)
  125. # They're written as-is in the fast path
  126. assert loaded_data == data_with_nonchars
  127. finally:
  128. os.unlink(temp_file)
  129. def test_mixed_clean_dirty_data(self):
  130. """Test data with both clean and dirty fields"""
  131. mixed_data = {
  132. "clean_field": "This is perfectly fine",
  133. "dirty_field": "This has\ud800issues",
  134. "number": 42,
  135. "boolean": True,
  136. "null_value": None,
  137. "clean_list": [1, 2, 3],
  138. "dirty_list": ["clean", "dirty\ud801item"],
  139. }
  140. with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
  141. temp_file = f.name
  142. try:
  143. needs_reload = write_json(mixed_data, temp_file)
  144. assert (
  145. needs_reload
  146. ), "Mixed data with dirty fields should trigger sanitization"
  147. loaded_data = load_json(temp_file)
  148. # Clean fields should remain unchanged
  149. assert loaded_data["clean_field"] == "This is perfectly fine"
  150. assert loaded_data["number"] == 42
  151. assert loaded_data["boolean"]
  152. assert loaded_data["null_value"] is None
  153. assert loaded_data["clean_list"] == [1, 2, 3]
  154. # Dirty fields should be sanitized
  155. assert "\ud800" not in loaded_data["dirty_field"]
  156. assert "\ud801" not in loaded_data["dirty_list"][1]
  157. finally:
  158. os.unlink(temp_file)
  159. def test_empty_and_none_strings(self):
  160. """Test handling of empty and None values"""
  161. data = {
  162. "empty": "",
  163. "none": None,
  164. "zero": 0,
  165. "false": False,
  166. "empty_list": [],
  167. "empty_dict": {},
  168. }
  169. with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
  170. temp_file = f.name
  171. try:
  172. needs_reload = write_json(data, temp_file)
  173. assert (
  174. not needs_reload
  175. ), "Clean empty values should not trigger sanitization"
  176. loaded_data = load_json(temp_file)
  177. assert loaded_data == data, "Empty/None values should be preserved"
  178. finally:
  179. os.unlink(temp_file)
  180. def test_specific_surrogate_udc9a(self):
  181. """Test specific surrogate character \\udc9a mentioned in the issue"""
  182. # Test the exact surrogate character from the error message:
  183. # UnicodeEncodeError: 'utf-8' codec can't encode character '\\udc9a'
  184. data_with_udc9a = {
  185. "text": "Some text with surrogate\udc9acharacter",
  186. "position": 201, # As mentioned in the error
  187. "clean_field": "Normal text",
  188. }
  189. with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
  190. temp_file = f.name
  191. try:
  192. # Write data - should trigger sanitization
  193. needs_reload = write_json(data_with_udc9a, temp_file)
  194. assert needs_reload, "Data with \\udc9a should trigger sanitization"
  195. # Verify surrogate was removed
  196. loaded_data = load_json(temp_file)
  197. assert loaded_data is not None
  198. assert "\udc9a" not in loaded_data["text"], "\\udc9a should be removed"
  199. assert (
  200. loaded_data["clean_field"] == "Normal text"
  201. ), "Clean fields should remain"
  202. finally:
  203. os.unlink(temp_file)
  204. def test_migration_with_surrogate_sanitization(self):
  205. """Test that migration process handles surrogate characters correctly
  206. This test simulates the scenario where legacy cache contains surrogate
  207. characters and ensures they are cleaned during migration.
  208. """
  209. # Simulate legacy cache data with surrogate characters
  210. legacy_data_with_surrogates = {
  211. "cache_entry_1": {
  212. "return": "Result with\ud800surrogate",
  213. "cache_type": "extract",
  214. "original_prompt": "Some\udc9aprompt",
  215. },
  216. "cache_entry_2": {
  217. "return": "Clean result",
  218. "cache_type": "query",
  219. "original_prompt": "Clean prompt",
  220. },
  221. }
  222. with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
  223. temp_file = f.name
  224. try:
  225. # First write the dirty data directly (simulating legacy cache file)
  226. # Use custom encoder to force write even with surrogates
  227. with open(temp_file, "w", encoding="utf-8") as f:
  228. json.dump(
  229. legacy_data_with_surrogates,
  230. f,
  231. cls=SanitizingJSONEncoder,
  232. ensure_ascii=False,
  233. )
  234. # Load and verify surrogates were cleaned during initial write
  235. loaded_data = load_json(temp_file)
  236. assert loaded_data is not None
  237. # The data should be sanitized
  238. assert (
  239. "\ud800" not in loaded_data["cache_entry_1"]["return"]
  240. ), "Surrogate in return should be removed"
  241. assert (
  242. "\udc9a" not in loaded_data["cache_entry_1"]["original_prompt"]
  243. ), "Surrogate in prompt should be removed"
  244. # Clean data should remain unchanged
  245. assert (
  246. loaded_data["cache_entry_2"]["return"] == "Clean result"
  247. ), "Clean data should remain"
  248. finally:
  249. os.unlink(temp_file)
  250. def test_empty_values_after_sanitization(self):
  251. """Test that data with empty values after sanitization is properly handled
  252. Critical edge case: When sanitization results in data with empty string values,
  253. we must use 'if cleaned_data is not None' instead of 'if cleaned_data' to ensure
  254. proper reload, since truthy check on dict depends on content, not just existence.
  255. """
  256. # Create data where ALL values are only surrogate characters
  257. all_dirty_data = {
  258. "key1": "\ud800\udc00\ud801",
  259. "key2": "\ud802\ud803",
  260. }
  261. with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
  262. temp_file = f.name
  263. try:
  264. # Write dirty data - should trigger sanitization
  265. needs_reload = write_json(all_dirty_data, temp_file)
  266. assert needs_reload, "All-dirty data should trigger sanitization"
  267. # Load the sanitized data
  268. cleaned_data = load_json(temp_file)
  269. # Critical assertions for the edge case
  270. assert cleaned_data is not None, "Cleaned data should not be None"
  271. # Sanitization removes surrogates but preserves keys with empty values
  272. assert cleaned_data == {
  273. "key1": "",
  274. "key2": "",
  275. }, "Surrogates should be removed, keys preserved"
  276. # This dict is truthy because it has keys (even with empty values)
  277. assert cleaned_data, "Dict with keys is truthy"
  278. # Test the actual edge case: empty dict
  279. empty_data = {}
  280. needs_reload2 = write_json(empty_data, temp_file)
  281. assert not needs_reload2, "Empty dict is clean"
  282. reloaded_empty = load_json(temp_file)
  283. assert reloaded_empty is not None, "Empty dict should not be None"
  284. assert reloaded_empty == {}, "Empty dict should remain empty"
  285. assert (
  286. not reloaded_empty
  287. ), "Empty dict evaluates to False (the critical check)"
  288. finally:
  289. os.unlink(temp_file)
  290. @pytest.mark.offline
  291. class TestSanitizeTextForEncoding:
  292. """Direct unit tests for sanitize_text_for_encoding function."""
  293. def test_empty_string_returns_empty(self):
  294. assert sanitize_text_for_encoding("") == ""
  295. def test_none_like_falsy_returns_as_is(self):
  296. # The function checks `if not text`, so empty string returns early
  297. assert sanitize_text_for_encoding("") == ""
  298. def test_whitespace_only_returns_empty(self):
  299. assert sanitize_text_for_encoding(" ") == ""
  300. def test_clean_text_unchanged(self):
  301. assert sanitize_text_for_encoding("hello world") == "hello world"
  302. def test_strips_leading_trailing_whitespace(self):
  303. assert sanitize_text_for_encoding(" hello ") == "hello"
  304. def test_lone_surrogate_removed(self):
  305. assert sanitize_text_for_encoding("hello\ud800world") == "helloworld"
  306. def test_lone_surrogate_with_replacement_char(self):
  307. assert (
  308. sanitize_text_for_encoding("hello\ud800world", replacement_char="?")
  309. == "hello?world"
  310. )
  311. def test_surrogate_range_boundaries(self):
  312. # U+D800 and U+DFFF are the surrogate range boundaries
  313. assert "\ud800" not in sanitize_text_for_encoding("\ud800")
  314. assert "\udfff" not in sanitize_text_for_encoding("\udfff")
  315. def test_non_characters_fffe_ffff_removed(self):
  316. # U+FFFE and U+FFFF are included in _SURROGATE_PATTERN
  317. assert sanitize_text_for_encoding("a\ufffeb") == "ab"
  318. assert sanitize_text_for_encoding("a\uffffb") == "ab"
  319. def test_html_entities_unescaped(self):
  320. assert sanitize_text_for_encoding("&") == "&"
  321. assert sanitize_text_for_encoding("&lt;p&gt;") == "<p>"
  322. assert sanitize_text_for_encoding("&quot;hello&quot;") == '"hello"'
  323. def test_html_entity_that_becomes_surrogate_is_removed(self):
  324. # &#xD800; — Python's html.unescape follows HTML5 spec and maps surrogate code
  325. # points to U+FFFD (replacement character), so \uD800 never appears in output.
  326. # Either way the result must not contain an actual lone surrogate.
  327. result = sanitize_text_for_encoding("&#xD800;")
  328. assert "\ud800" not in result
  329. def test_control_chars_removed(self):
  330. # C0 control characters (excluding \t \n \r)
  331. assert sanitize_text_for_encoding("\x01hello\x1fworld") == "helloworld"
  332. assert sanitize_text_for_encoding("\x00null") == "null"
  333. assert sanitize_text_for_encoding("del\x7f") == "del"
  334. def test_control_chars_with_replacement_char(self):
  335. # replacement_char must apply to control chars, not just surrogates.
  336. # Note: \x1f is treated as Unicode whitespace by Python's str.strip(),
  337. # so place control chars in the middle to avoid them being stripped first.
  338. result = sanitize_text_for_encoding("a\x01b\x08c", replacement_char="?")
  339. assert result == "a?b?c"
  340. def test_common_whitespace_preserved(self):
  341. # \t, \n, \r must NOT be removed (excluded from control char pattern)
  342. assert sanitize_text_for_encoding("line1\nline2") == "line1\nline2"
  343. assert sanitize_text_for_encoding("col1\tcol2") == "col1\tcol2"
  344. assert sanitize_text_for_encoding("line1\rline2") == "line1\rline2"
  345. def test_c1_control_chars_not_removed(self):
  346. # \x80-\x9F range must NOT be removed (restored original behavior).
  347. # These are valid in Latin-1 encoded European language text.
  348. result = sanitize_text_for_encoding("caf\x85e")
  349. assert "\x85" in result
  350. def test_replacement_char_default_is_deletion(self):
  351. # Default replacement_char="" means characters are deleted, not replaced
  352. assert sanitize_text_for_encoding("\ud800hello\x01") == "hello"
  353. def test_mixed_issues_in_one_string(self):
  354. # Surrogate + control char + HTML entity + clean text
  355. text = "\ud800&amp;\x01clean"
  356. result = sanitize_text_for_encoding(text)
  357. assert result == "&clean"
  358. def test_large_text_with_scattered_surrogates(self):
  359. # Regression guard: regex must handle large inputs correctly
  360. clean_segment = "a" * 10000
  361. text = f"prefix\ud800{clean_segment}\udfffsuffix"
  362. result = sanitize_text_for_encoding(text)
  363. assert "\ud800" not in result
  364. assert "\udfff" not in result
  365. assert clean_segment in result
  366. if __name__ == "__main__":
  367. # Run tests
  368. test = TestWriteJsonOptimization()
  369. print("Running test_fast_path_clean_data...")
  370. test.test_fast_path_clean_data()
  371. print("✓ Passed")
  372. print("Running test_slow_path_dirty_data...")
  373. test.test_slow_path_dirty_data()
  374. print("✓ Passed")
  375. print("Running test_sanitizing_encoder_removes_surrogates...")
  376. test.test_sanitizing_encoder_removes_surrogates()
  377. print("✓ Passed")
  378. print("Running test_nested_structure_sanitization...")
  379. test.test_nested_structure_sanitization()
  380. print("✓ Passed")
  381. print("Running test_unicode_non_characters_removed...")
  382. test.test_unicode_non_characters_removed()
  383. print("✓ Passed")
  384. print("Running test_mixed_clean_dirty_data...")
  385. test.test_mixed_clean_dirty_data()
  386. print("✓ Passed")
  387. print("Running test_empty_and_none_strings...")
  388. test.test_empty_and_none_strings()
  389. print("✓ Passed")
  390. print("Running test_specific_surrogate_udc9a...")
  391. test.test_specific_surrogate_udc9a()
  392. print("✓ Passed")
  393. print("Running test_migration_with_surrogate_sanitization...")
  394. test.test_migration_with_surrogate_sanitization()
  395. print("✓ Passed")
  396. print("Running test_empty_values_after_sanitization...")
  397. test.test_empty_values_after_sanitization()
  398. print("✓ Passed")
  399. print("\n✅ All tests passed!")