| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914 |
- #!/usr/bin/env python3
- """
- ABOUTME: Parses DOCX documents into text blocks using python-docx
- ABOUTME: Extracts automatic numbering, splits by headings, converts tables to JSON
- """
- import json
- import sys
- try:
- from docx import Document
- except ImportError:
- print(
- "Error: python-docx not installed. Run: pip install python-docx",
- file=sys.stderr,
- )
- sys.exit(1)
- from .numbering_resolver import NumberingResolver
- from .table_extractor import TableExtractor
- from .utils import estimate_tokens
- from .drawing_image_extractor import (
- DrawingExtractionContext,
- extract_drawing_placeholder_from_element,
- extract_vml_image_placeholder_from_element,
- )
- # Constants for content validation (character-based for UI/display)
- MAX_HEADING_LENGTH = 200 # Maximum heading length in characters (UI constraint)
- MAX_ANCHOR_CANDIDATE_LENGTH = (
- 100 # Maximum length for candidate anchor paragraphs (characters)
- )
- # Constants for content splitting (token-based for LLM context management)
- IDEAL_BLOCK_CONTENT_TOKENS = 6000 # Ideal target size for balanced splitting (tokens)
- MAX_BLOCK_CONTENT_TOKENS = 8000 # Maximum block content (tokens, hard limit)
- SMALL_TAIL_THRESHOLD = (
- MAX_BLOCK_CONTENT_TOKENS - IDEAL_BLOCK_CONTENT_TOKENS
- ) // 2 # Threshold for tail absorption (1000 tokens)
- # Constants for table splitting (token-based)
- TABLE_IDEAL_TOKENS = 3000 # Ideal target size for table chunks (tokens)
- TABLE_MAX_TOKENS = 5000 # Maximum table size before splitting (tokens), must smaller than IDEAL_BLOCK_CONTENT_TOKENS
- TABLE_MIN_LAST_CHUNK_TOKENS = int(
- (TABLE_MAX_TOKENS - TABLE_IDEAL_TOKENS) * 0.8
- ) # Minimum size for last chunk to avoid tiny fragments
- TABLE_CHUNK_SUFFIX_LABEL = "表格片段" # Label prefix for split table chunk headings
- # OOXML tracked-change/comment tags whose subtree must be dropped so we only
- # surface the *final* revised text. w:ins / w:moveTo are kept via default
- # recursion so inserted/moved-in content survives.
- _SKIP_REVISION_TAGS = frozenset({"del", "moveFrom"})
- _SKIP_COMMENT_TAGS = frozenset(
- {"commentRangeStart", "commentRangeEnd", "commentReference", "annotationRef"}
- )
- _SKIP_PARAGRAPH_TAGS = _SKIP_REVISION_TAGS | _SKIP_COMMENT_TAGS
- def print_error(title: str, details: str, solution: str):
- """
- Print a friendly, formatted error message.
- Args:
- title: Error title
- details: Detailed error information
- solution: Suggested solution steps
- """
- print("\n" + "=" * 80, file=sys.stderr)
- print(f"ERROR: {title}", file=sys.stderr)
- print("=" * 80, file=sys.stderr)
- print(f"\n{details}", file=sys.stderr)
- print("\nSOLUTION:", file=sys.stderr)
- print(solution, file=sys.stderr)
- print("\n" + "=" * 80 + "\n", file=sys.stderr)
- def truncate_heading(heading_text: str, para_id: str = None) -> str:
- """
- Truncate heading if it exceeds MAX_HEADING_LENGTH.
- Args:
- heading_text: The heading text to check
- para_id: Optional paragraph ID for warning message
- Returns:
- str: Original heading if within limit, truncated heading with "..." if too long
- """
- if len(heading_text) > MAX_HEADING_LENGTH:
- truncated = heading_text[: MAX_HEADING_LENGTH - 3] + "..."
- location = f" (para_id: {para_id})" if para_id else ""
- print(
- f"Warning: Heading truncated (length {len(heading_text)} > max {MAX_HEADING_LENGTH}){location}: "
- f'"{truncated}"',
- file=sys.stderr,
- )
- return truncated
- return heading_text
- def validate_heading_length(heading_text: str, para_id: str):
- """
- Validate that heading length does not exceed MAX_HEADING_LENGTH.
- Args:
- heading_text: The heading text to validate
- para_id: The paragraph ID for error reporting
- Exits:
- sys.exit(1) if heading exceeds maximum length
- """
- if len(heading_text) > MAX_HEADING_LENGTH:
- preview = (
- heading_text[:100] + "..." if len(heading_text) > 100 else heading_text
- )
- print_error(
- f"Heading too long ({len(heading_text)} characters, max {MAX_HEADING_LENGTH})",
- f'The following heading exceeds the maximum allowed length:\n\n "{preview}"\n\n'
- f"Location: Paragraph ID {para_id}\n"
- f"Actual length: {len(heading_text)} characters",
- " 1. Open the document in Microsoft Word\n"
- f" 2. Shorten this heading to {MAX_HEADING_LENGTH} characters or less\n"
- " 3. Re-upload it to LightRAG",
- )
- sys.exit(1)
- def validate_table_tokens(table_json: str, block_heading: str):
- """
- Validate that table JSON does not exceed MAX_BLOCK_CONTENT_TOKENS.
- Args:
- table_json: The JSON representation of the table
- block_heading: The heading of the block containing this table
- Exits:
- sys.exit(1) if table exceeds maximum token limit
- """
- table_tokens = estimate_tokens(table_json)
- if table_tokens > MAX_BLOCK_CONTENT_TOKENS:
- print_error(
- f"Table too large (~{table_tokens} tokens, max {MAX_BLOCK_CONTENT_TOKENS})",
- f"A table in the document is too large for LLM processing.\n\n"
- f'Location: Under heading "{block_heading}"\n'
- f"Table size: ~{table_tokens} tokens ({len(table_json)} characters)\n\n"
- "Large tables can cause issues with file chunking.",
- " 1. Open the document in Microsoft Word\n"
- f' 2. Locate the table under heading "{block_heading}"\n'
- " 3. Split the table into smaller tables, or\n"
- " 4. Simplify the table content\n"
- " 5. Re-upload it to LightRAG",
- )
- sys.exit(1)
- def find_first_valid_para_id(para_ids: list) -> str | None:
- """
- Find the first valid paraId in a 2D array of paraIds.
- Args:
- para_ids: 2D list of paraIds from table cells
- Returns:
- First non-None paraId found, or None when every cell lacks a paraId.
- Callers must tolerate ``None`` and treat it as a tracking gap rather
- than a fatal error (legacy / non-Word docx authors omit ``w14:paraId``
- attributes and we want to keep parsing).
- """
- for row in para_ids:
- for para_id in row:
- if para_id:
- return para_id
- return None
- def find_last_valid_para_id(para_ids: list) -> str | None:
- """
- Find the last valid paraId in a 2D array of paraIds.
- Returns the last non-None paraId, falling back to the first valid one
- when reverse-iteration does not yield anything (single-paraId tables),
- and finally ``None`` when every cell lacks a paraId.
- """
- for row in reversed(para_ids):
- for para_id in reversed(row):
- if para_id:
- return para_id
- return find_first_valid_para_id(para_ids)
- def _table_has_any_paraid(para_ids: list) -> bool:
- """True when at least one cell in the 2D paraId grid carries an id."""
- return find_first_valid_para_id(para_ids) is not None
- def split_table(
- table_rows: list,
- para_ids: list,
- para_ids_end: list,
- header_indices: list,
- debug: bool = False,
- ) -> list:
- """
- Split large table into chunks at row boundaries.
- Splitting Strategy:
- 1. Only split if table JSON exceeds TABLE_MAX_TOKENS (5000 tokens)
- 2. Calculate target chunks based on TABLE_IDEAL_TOKENS (3000 tokens)
- 3. Split at row boundaries to achieve balanced chunk sizes
- 4. Avoid very small last chunk: if last chunk < 1000 tokens, merge with previous
- 5. Extract first valid paraId for each chunk as UUID
- Output Strategy:
- - First chunk: Merges with preceding content, uses original heading
- - Middle chunks: Standalone blocks with heading suffix [1], [2], etc.
- - Last chunk: Merges with following content, carries the cross-page
- ``_table_header`` so the host block can surface it via ``table_headers``
- - The cross-page repeating header rows (extracted from ``w:tblHeader``)
- flow per-table into each containing block's ``table_headers`` list
- Args:
- table_rows: 2D array of table content
- para_ids: 2D array of paraIds - first paraId in each cell (for uuid)
- para_ids_end: 2D array of paraIds - last paraId in each cell (for uuid_end)
- header_indices: List of row indices that are table headers
- debug: If True, output debug information
- Returns:
- List of chunk dicts: [{
- 'rows': 2D array subset,
- 'para_ids': 2D array subset,
- 'para_ids_end': 2D array subset,
- 'uuid': first valid paraId in chunk,
- 'is_first': True if first chunk,
- 'is_last': True if last chunk
- }, ...]
- """
- import math
- # Calculate total JSON token count
- total_json = json.dumps(table_rows, ensure_ascii=False)
- total_tokens = estimate_tokens(total_json)
- if total_tokens <= TABLE_MAX_TOKENS:
- # No splitting needed
- uuid = find_first_valid_para_id(para_ids)
- return [
- {
- "rows": table_rows,
- "para_ids": para_ids,
- "para_ids_end": para_ids_end,
- "uuid": uuid,
- "is_first": True,
- "is_last": True,
- }
- ]
- # Need to split - calculate target number of chunks
- target_chunks = math.ceil(total_tokens / TABLE_IDEAL_TOKENS)
- min_chunks_needed = math.ceil(total_tokens / TABLE_MAX_TOKENS)
- target_chunks = max(target_chunks, min_chunks_needed)
- # Split at row boundaries
- chunks = []
- num_rows = len(table_rows)
- target_rows_per_chunk = num_rows / target_chunks
- start_row = 0
- for i in range(target_chunks):
- # Calculate end row for this chunk
- if i == target_chunks - 1:
- # Last chunk gets all remaining rows
- end_row = num_rows
- else:
- # Target end row (rounded)
- end_row = min(int((i + 1) * target_rows_per_chunk), num_rows)
- # Adjust to avoid very small last chunk
- rows_remaining = num_rows - end_row
- if rows_remaining > 0 and rows_remaining < target_rows_per_chunk * 0.3:
- # Last chunk would be too small, expand this chunk
- end_row = num_rows
- # Extract chunk
- chunk_rows = table_rows[start_row:end_row]
- chunk_para_ids = para_ids[start_row:end_row]
- chunk_para_ids_end = para_ids_end[start_row:end_row]
- if chunk_rows:
- chunk_uuid = find_first_valid_para_id(chunk_para_ids)
- chunks.append(
- {
- "rows": chunk_rows,
- "para_ids": chunk_para_ids,
- "para_ids_end": chunk_para_ids_end,
- "uuid": chunk_uuid,
- "is_first": (i == 0),
- "is_last": (end_row >= num_rows),
- }
- )
- start_row = end_row
- if start_row >= num_rows:
- break
- # Post-processing: Merge very small last chunk with previous chunk if possible
- if len(chunks) >= 2:
- last_chunk = chunks[-1]
- last_chunk_json = json.dumps(last_chunk["rows"], ensure_ascii=False)
- last_chunk_tokens = estimate_tokens(last_chunk_json)
- if last_chunk_tokens < TABLE_MIN_LAST_CHUNK_TOKENS:
- # Try to merge with previous chunk
- prev_chunk = chunks[-2]
- # Calculate combined size
- combined_rows = prev_chunk["rows"] + last_chunk["rows"]
- combined_json = json.dumps(combined_rows, ensure_ascii=False)
- combined_tokens = estimate_tokens(combined_json)
- # Only merge if combined size doesn't exceed max limit
- if combined_tokens <= TABLE_MAX_TOKENS:
- # Merge the chunks
- merged_para_ids = prev_chunk["para_ids"] + last_chunk["para_ids"]
- merged_para_ids_end = (
- prev_chunk["para_ids_end"] + last_chunk["para_ids_end"]
- )
- chunks[-2] = {
- "rows": combined_rows,
- "para_ids": merged_para_ids,
- "para_ids_end": merged_para_ids_end,
- "uuid": prev_chunk["uuid"], # Keep UUID of first chunk
- "is_first": prev_chunk["is_first"],
- "is_last": True, # This becomes the last chunk
- }
- chunks.pop() # Remove the last chunk
- if debug:
- print(
- f"[DEBUG] Merged small last chunk (~{last_chunk_tokens} tokens) with previous chunk",
- file=sys.stderr,
- )
- print(
- f" Combined size: ~{combined_tokens} tokens", file=sys.stderr
- )
- return chunks
- def split_table_with_heading(
- table_rows: list,
- para_ids: list,
- para_ids_end: list,
- header_indices: list,
- current_heading: str,
- start_suffix: int = 0,
- debug: bool = False,
- ) -> list:
- """
- Wrapper for split_table that includes heading information in debug output.
- Supports sequential numbering when multiple tables are split in the same block.
- Args:
- table_rows: 2D array of table content
- para_ids: 2D array of paraIds - first paraId in each cell (for uuid)
- para_ids_end: 2D array of paraIds - last paraId in each cell (for uuid_end)
- header_indices: List of row indices that are table headers
- current_heading: Current block heading (for generating chunk headings)
- start_suffix: Starting suffix number for non-first chunks (default: 0)
- When multiple tables in the same block are split, this ensures
- sequential numbering (e.g., [1], [2] for first table, [3], [4] for second)
- debug: If True, output debug information with headings
- Returns:
- Same as split_table(), with each chunk having suffix calculated from start_suffix
- """
- chunks = split_table(
- table_rows, para_ids, para_ids_end, header_indices, debug=False
- )
- # Add suffix_number to each chunk for later use
- for i, chunk in enumerate(chunks):
- if i == 0:
- chunk["suffix_number"] = None # First chunk has no suffix
- else:
- chunk["suffix_number"] = start_suffix + i
- # Debug output with headings
- if debug and len(chunks) > 1:
- print(
- f"\n[DEBUG] Table split into {len(chunks)} chunks (final)", file=sys.stderr
- )
- for i, chunk in enumerate(chunks):
- chunk_json = json.dumps(chunk["rows"], ensure_ascii=False)
- # Generate heading for this chunk
- if chunk["suffix_number"] is None:
- chunk_heading = current_heading
- else:
- chunk_heading = f"{current_heading} [{TABLE_CHUNK_SUFFIX_LABEL}{chunk['suffix_number']}]"
- print(
- f" Chunk {i+1}: heading=\"{chunk_heading}\", {len(chunk['rows'])} rows, {len(chunk_json)} chars",
- file=sys.stderr,
- )
- return chunks
- def merge_small_blocks(blocks: list, debug: bool = False) -> tuple:
- """
- Merge blocks below IDEAL_BLOCK_CONTENT_TOKENS following bottom-up, level-aware strategy.
- Strategy (bottom-up approach):
- 1. Process levels from deepest (largest number) to shallowest (level 1)
- 2. For each level:
- - Phase A: Same-level merging - merge adjacent blocks of same level
- - Phase B: Cross-level absorption - allow higher levels to absorb current level
- 3. Table chunk role restrictions:
- - 'middle': cannot merge with any block
- - 'first': can only merge forward (with next block)
- - 'last': can only merge backward (with previous block)
- - 'none': no restrictions
- 4. Stop merging a block once it reaches IDEAL_BLOCK_CONTENT_TOKENS (locked)
- 5. Reject merge if combined size > MAX_BLOCK_CONTENT_TOKENS
- 6. Merged block's level = level of the block whose heading is kept
- Args:
- blocks: List of block dictionaries with 'level' and 'table_chunk_role' fields
- debug: If True, output debug information and return merge count
- Returns:
- Tuple of (merged_blocks, merge_count)
- """
- if len(blocks) <= 1:
- return blocks, 0
- merged_count = 0
- result = blocks.copy()
- # Find all unique levels and sort from deepest to shallowest
- levels = sorted(set(block.get("level", 1) for block in result), reverse=True)
- if debug:
- print(
- f"\n[DEBUG] merge_small_blocks: Processing {len(result)} blocks across levels {levels}",
- file=sys.stderr,
- )
- # Process each level from deepest to shallowest
- for current_level in levels:
- if debug:
- print(f"[DEBUG] Processing level {current_level}", file=sys.stderr)
- # Phase A: Same-level merging
- changed = True
- iteration = 0
- while changed:
- iteration += 1
- changed = False
- i = 0
- new_result = []
- while i < len(result):
- current_block = result[i]
- current_tokens = estimate_tokens(current_block["content"])
- block_level = current_block.get("level", 1)
- current_role = current_block.get("table_chunk_role", "none")
- # Only process blocks of current level that are below IDEAL and not locked
- is_below_ideal = (
- current_tokens < IDEAL_BLOCK_CONTENT_TOKENS and current_tokens > 0
- )
- is_current_level = block_level == current_level
- if is_below_ideal and is_current_level:
- merged = False
- # Check table chunk role restrictions
- can_merge_forward = current_role in ["none", "first"]
- can_merge_backward = current_role in ["none", "last"]
- # Try forward merge with next block (only same level in Phase A)
- if can_merge_forward and i + 1 < len(result):
- next_block = result[i + 1]
- next_level = next_block.get("level", 1)
- next_role = next_block.get("table_chunk_role", "none")
- next_can_merge_backward = next_role in ["none", "last"]
- # Phase A: Only merge same-level blocks
- if next_level == current_level and next_can_merge_backward:
- merged_content = (
- current_block["content"]
- + "\n\n"
- + next_block["content"]
- )
- combined_tokens = estimate_tokens(merged_content)
- if combined_tokens <= MAX_BLOCK_CONTENT_TOKENS:
- merged_block = {
- "uuid": current_block["uuid"],
- "uuid_end": next_block.get(
- "uuid_end", next_block["uuid"]
- ),
- "heading": current_block["heading"],
- "content": merged_content,
- "type": "text",
- "parent_headings": current_block["parent_headings"],
- "level": current_level,
- "table_chunk_role": "none",
- }
- combined_headers = current_block.get(
- "table_headers", []
- ) + next_block.get("table_headers", [])
- if combined_headers:
- merged_block["table_headers"] = combined_headers
- new_result.append(merged_block)
- merged = True
- merged_count += 1
- changed = True
- i += 2
- continue
- # Try backward merge with previous (only same level in Phase A)
- if not merged and can_merge_backward and len(new_result) > 0:
- prev_block = new_result[-1]
- prev_level = prev_block.get("level", 1)
- prev_role = prev_block.get("table_chunk_role", "none")
- prev_tokens = estimate_tokens(prev_block["content"])
- prev_can_merge_forward = prev_role in ["none", "first"]
- prev_below_ideal = prev_tokens < IDEAL_BLOCK_CONTENT_TOKENS
- # Phase A: Only merge same-level blocks, and prev must be below IDEAL
- if (
- prev_level == current_level
- and prev_can_merge_forward
- and prev_below_ideal
- ):
- merged_content = (
- prev_block["content"]
- + "\n\n"
- + current_block["content"]
- )
- combined_tokens = estimate_tokens(merged_content)
- if combined_tokens <= MAX_BLOCK_CONTENT_TOKENS:
- merged_block = {
- "uuid": prev_block["uuid"],
- "uuid_end": current_block.get(
- "uuid_end", current_block["uuid"]
- ),
- "heading": prev_block["heading"],
- "content": merged_content,
- "type": "text",
- "parent_headings": prev_block["parent_headings"],
- "level": current_level,
- "table_chunk_role": "none",
- }
- combined_headers = prev_block.get(
- "table_headers", []
- ) + current_block.get("table_headers", [])
- if combined_headers:
- merged_block["table_headers"] = combined_headers
- new_result[-1] = merged_block
- merged = True
- merged_count += 1
- changed = True
- i += 1
- continue
- # No merge happened, keep block
- if not merged:
- new_result.append(current_block)
- i += 1
- else:
- # Current block is at or above IDEAL, or not current level
- # Check for tail absorption: if remaining same-level blocks are small enough, absorb them all
- if (
- is_current_level
- and current_tokens >= IDEAL_BLOCK_CONTENT_TOKENS
- ):
- # Calculate total size of remaining same-level blocks
- remaining_same_level_tokens = 0
- remaining_end_idx = i + 1
- for j in range(i + 1, len(result)):
- next_block = result[j]
- next_level = next_block.get("level", 1)
- # Stop when we encounter a different level
- if next_level != current_level:
- break
- # Check if this block can be absorbed (table_chunk_role constraints)
- next_role = next_block.get("table_chunk_role", "none")
- if next_role == "middle":
- # Middle chunks cannot be absorbed - stop here
- break
- remaining_same_level_tokens += estimate_tokens(
- next_block["content"]
- )
- remaining_end_idx = j + 1
- # If remaining same-level blocks are small enough, absorb them all
- if (
- remaining_same_level_tokens > 0
- and remaining_same_level_tokens < SMALL_TAIL_THRESHOLD
- ):
- # Check if combined size doesn't exceed MAX
- combined_tokens = (
- current_tokens + remaining_same_level_tokens
- )
- if combined_tokens <= MAX_BLOCK_CONTENT_TOKENS:
- # Absorb all remaining same-level blocks
- absorbed_content = current_block["content"]
- last_uuid_end = current_block.get(
- "uuid_end", current_block["uuid"]
- )
- combined_headers = list(
- current_block.get("table_headers", [])
- )
- for j in range(i + 1, remaining_end_idx):
- next_block = result[j]
- absorbed_content += "\n\n" + next_block["content"]
- last_uuid_end = next_block.get(
- "uuid_end", next_block["uuid"]
- )
- combined_headers.extend(
- next_block.get("table_headers", [])
- )
- # Create merged block
- merged_block = {
- "uuid": current_block["uuid"],
- "uuid_end": last_uuid_end,
- "heading": current_block["heading"],
- "content": absorbed_content,
- "type": "text",
- "parent_headings": current_block["parent_headings"],
- "level": current_level,
- "table_chunk_role": "none",
- }
- if combined_headers:
- merged_block["table_headers"] = combined_headers
- new_result.append(merged_block)
- merged_count += remaining_end_idx - i - 1
- changed = True
- i = remaining_end_idx
- if debug:
- num_absorbed = remaining_end_idx - i - 1
- print(
- f" Tail absorption: block at IDEAL ({current_tokens} tokens) absorbed {num_absorbed} small tail blocks ({remaining_same_level_tokens} tokens)",
- file=sys.stderr,
- )
- continue
- # No tail absorption, keep block as-is
- new_result.append(current_block)
- i += 1
- result = new_result
- if debug and changed:
- print(
- f" Phase A iteration {iteration}: {merged_count} total merges",
- file=sys.stderr,
- )
- # Phase B: Cross-level absorption (allow higher levels to absorb current level)
- changed = True
- iteration = 0
- while changed:
- iteration += 1
- changed = False
- i = 0
- new_result = []
- while i < len(result):
- current_block = result[i]
- current_tokens = estimate_tokens(current_block["content"])
- block_level = current_block.get("level", 1)
- current_role = current_block.get("table_chunk_role", "none")
- # Only process blocks of current level that are below IDEAL
- is_below_ideal = (
- current_tokens < IDEAL_BLOCK_CONTENT_TOKENS and current_tokens > 0
- )
- is_current_level = block_level == current_level
- if is_below_ideal and is_current_level:
- merged = False
- can_merge_forward = current_role in ["none", "first", "last"]
- can_merge_backward = current_role in ["none", "last"]
- # Try forward merge (current can absorb deeper levels)
- if can_merge_forward and i + 1 < len(result):
- next_block = result[i + 1]
- next_level = next_block.get("level", 1)
- next_role = next_block.get("table_chunk_role", "none")
- next_can_merge_backward = next_role in ["none", "last"]
- # Phase B: current level can absorb deeper levels (larger numbers)
- if next_level > current_level and next_can_merge_backward:
- merged_content = (
- current_block["content"]
- + "\n\n"
- + next_block["content"]
- )
- combined_tokens = estimate_tokens(merged_content)
- if combined_tokens <= MAX_BLOCK_CONTENT_TOKENS:
- merged_block = {
- "uuid": current_block["uuid"],
- "uuid_end": next_block.get(
- "uuid_end", next_block["uuid"]
- ),
- "heading": current_block["heading"],
- "content": merged_content,
- "type": "text",
- "parent_headings": current_block["parent_headings"],
- "level": current_level,
- "table_chunk_role": "none",
- }
- combined_headers = current_block.get(
- "table_headers", []
- ) + next_block.get("table_headers", [])
- if combined_headers:
- merged_block["table_headers"] = combined_headers
- new_result.append(merged_block)
- merged = True
- merged_count += 1
- changed = True
- i += 2
- continue
- # Try backward merge (higher level can absorb current)
- if not merged and can_merge_backward and len(new_result) > 0:
- prev_block = new_result[-1]
- prev_level = prev_block.get("level", 1)
- prev_role = prev_block.get("table_chunk_role", "none")
- prev_tokens = estimate_tokens(prev_block["content"])
- prev_can_merge_forward = prev_role in ["none", "first", "last"]
- prev_below_ideal = prev_tokens < IDEAL_BLOCK_CONTENT_TOKENS
- # Phase B: higher level (smaller number) can absorb current level
- if (
- prev_level < current_level
- and prev_can_merge_forward
- and prev_below_ideal
- ):
- merged_content = (
- prev_block["content"]
- + "\n\n"
- + current_block["content"]
- )
- combined_tokens = estimate_tokens(merged_content)
- if combined_tokens <= MAX_BLOCK_CONTENT_TOKENS:
- merged_block = {
- "uuid": prev_block["uuid"],
- "uuid_end": current_block.get(
- "uuid_end", current_block["uuid"]
- ),
- "heading": prev_block["heading"],
- "content": merged_content,
- "type": "text",
- "parent_headings": prev_block["parent_headings"],
- "level": prev_level,
- "table_chunk_role": "none",
- }
- combined_headers = prev_block.get(
- "table_headers", []
- ) + current_block.get("table_headers", [])
- if combined_headers:
- merged_block["table_headers"] = combined_headers
- new_result[-1] = merged_block
- merged = True
- merged_count += 1
- changed = True
- i += 1
- continue
- if not merged:
- new_result.append(current_block)
- i += 1
- else:
- new_result.append(current_block)
- i += 1
- result = new_result
- if debug and changed:
- print(
- f" Phase B iteration {iteration}: {merged_count} total merges",
- file=sys.stderr,
- )
- if debug:
- print(
- f"[DEBUG] merge_small_blocks complete: {len(result)} blocks, {merged_count} total merges",
- file=sys.stderr,
- )
- # Check for oversized blocks and print debug information
- oversized_blocks = []
- for idx, block in enumerate(result):
- block_tokens = estimate_tokens(block["content"])
- if block_tokens > 0: # MAX_BLOCK_CONTENT_TOKENS:
- oversized_blocks.append(
- {
- "index": idx,
- "heading": block.get("heading", "(no heading)"),
- "level": block.get("level", "N/A"),
- "tokens": block_tokens,
- "has_table_header": bool(block.get("table_headers")),
- "content_preview": block["content"][:200],
- }
- )
- if oversized_blocks:
- print(
- f"\n[WARNING] Found {len(oversized_blocks)} oversized blocks after merging:",
- file=sys.stderr,
- )
- for info in oversized_blocks:
- print(
- f" Block #{info['index']}: level={info['level']}, tokens={info['tokens']}, heading=\"{info['heading']}\"",
- file=sys.stderr,
- )
- return result, merged_count
- def split_long_block(
- block_heading: str,
- paragraphs: list,
- parent_headings: list,
- block_level: int,
- debug: bool = False,
- ) -> list:
- """
- Split a long text block into smaller blocks using anchor paragraphs.
- Strategy (improved for balanced splitting):
- 1. Calculate target number of blocks based on IDEAL_BLOCK_CONTENT_TOKENS
- 2. Ensure minimum blocks needed to stay under MAX_BLOCK_CONTENT_TOKENS
- 3. Find all candidate anchor paragraphs (<= MAX_ANCHOR_CANDIDATE_LENGTH chars)
- 4. Select anchors closest to ideal split positions for balanced distribution
- 5. Create blocks using selected anchors as new headings
- Important: Tables are NOT split by this function.
- - Tables are already split at row boundaries by split_table() if needed (TABLE_MAX_TOKENS limit)
- - Table paragraphs (is_table=True) are excluded from anchor candidate selection
- - Table content remains intact and is not re-split into smaller table chunks
- - If a block contains both text and table chunks exceeding the limit, only text
- paragraphs are used as split points; table chunks stay complete
- Args:
- block_heading: Original heading text
- paragraphs: List of dicts with 'text', 'para_id', and 'is_table' keys
- parent_headings: Parent heading stack
- block_level: Heading level of this block (1=Heading 1, 2=Heading 2, etc.)
- debug: If True, output debug information when splitting occurs
- Returns:
- List of block dictionaries (may be split into multiple blocks), each with 'level' field
- Exits:
- sys.exit(1) if no suitable anchor found and content exceeds limit
- """
- import math
- # Check if this block starts with a split table chunk (has _chunk_heading metadata)
- # If so, use that heading instead of block_heading
- effective_heading = block_heading
- if paragraphs and paragraphs[0].get("_chunk_heading"):
- effective_heading = paragraphs[0]["_chunk_heading"]
- # Calculate total content token count
- total_content = "\n".join(p["text"] for p in paragraphs)
- total_tokens = estimate_tokens(total_content)
- if total_tokens <= MAX_BLOCK_CONTENT_TOKENS:
- # Within limit, return as single block
- # Use first paragraph's para_id as UUID
- # For uuid_end: use para_id_end if last element is a table, otherwise para_id
- last_para = paragraphs[-1] if paragraphs else {}
- uuid_end = last_para.get("para_id_end") or last_para.get("para_id")
- block = {
- "uuid": paragraphs[0]["para_id"] if paragraphs else None,
- "uuid_end": uuid_end,
- "heading": effective_heading,
- "content": total_content,
- "type": "text",
- "parent_headings": parent_headings,
- "level": block_level, # Add level to block
- }
- # Collect per-table cross-page headers (aligned with <table> tag order)
- table_headers = _collect_table_headers(paragraphs)
- if table_headers:
- block["table_headers"] = table_headers
- return [block]
- # Content exceeds limit, need to split
- # Calculate target number of blocks based on IDEAL_BLOCK_CONTENT_TOKENS
- target_blocks = math.ceil(total_tokens / IDEAL_BLOCK_CONTENT_TOKENS)
- # Ensure we have enough blocks to stay under MAX_BLOCK_CONTENT_TOKENS
- min_blocks_needed = math.ceil(total_tokens / MAX_BLOCK_CONTENT_TOKENS)
- target_blocks = max(target_blocks, min_blocks_needed)
- # Calculate ideal token size per block
- target_size = total_tokens / target_blocks
- # Find candidate anchors (short paragraphs, excluding tables and empty placeholders)
- # Use character length for anchor candidate selection (UI/readability constraint)
- candidates = []
- cumulative_tokens = 0
- for idx, para in enumerate(paragraphs):
- if (
- not para.get("is_table", False)
- and 0 < len(para["text"]) <= MAX_ANCHOR_CANDIDATE_LENGTH
- ):
- candidates.append(
- {
- "index": idx,
- "text": para["text"],
- "para_id": para["para_id"],
- "position": cumulative_tokens,
- }
- )
- cumulative_tokens += estimate_tokens(para["text"])
- if not candidates:
- # No suitable anchor found
- preview = (
- block_heading[:80] + "..." if len(block_heading) > 80 else block_heading
- )
- print_error(
- "Cannot split long block (no suitable anchor paragraphs found)",
- f"A text block is too long (~{total_tokens} tokens, max {MAX_BLOCK_CONTENT_TOKENS})\n"
- f"but no paragraphs <= {MAX_ANCHOR_CANDIDATE_LENGTH} characters were found to use as split points.\n\n"
- f'Location: Under heading "{preview}"\n'
- f"Block size: ~{total_tokens} tokens ({len(total_content)} characters)\n"
- f"Number of paragraphs: {len(paragraphs)}\n"
- f"Calculated target blocks: {target_blocks}",
- " 1. Open the document in Microsoft Word\n"
- f' 2. Locate the section under heading "{preview}"\n'
- f" 3. Add short headings or paragraph breaks (≤{MAX_ANCHOR_CANDIDATE_LENGTH} chars) to divide the content\n"
- " 4. Re-upload it to LightRAG",
- )
- sys.exit(1)
- # Select anchors for splitting (target_blocks - 1 split points needed)
- selected_anchors = []
- remaining_candidates = candidates.copy()
- for i in range(1, target_blocks):
- if not remaining_candidates:
- break
- # Calculate ideal position for this split (in tokens)
- ideal_position = i * target_size
- # Find candidate closest to ideal position
- best_candidate = min(
- remaining_candidates, key=lambda c: abs(c["position"] - ideal_position)
- )
- selected_anchors.append(best_candidate)
- remaining_candidates.remove(best_candidate)
- # Sort selected anchors by index to maintain document order
- selected_anchors.sort(key=lambda a: a["index"])
- # Create blocks using selected split points
- result_blocks = []
- prev_idx = 0
- current_parent_headings = parent_headings
- current_block_heading = block_heading
- for anchor in selected_anchors:
- split_idx = anchor["index"]
- # Create block from prev_idx to split_idx (exclusive)
- block_paragraphs = paragraphs[prev_idx:split_idx]
- if block_paragraphs:
- block_content = "\n".join(p["text"] for p in block_paragraphs)
- # For uuid_end: use para_id_end if last element is a table, otherwise para_id
- last_para = block_paragraphs[-1]
- block_uuid_end = last_para.get("para_id_end") or last_para.get("para_id")
- new_block = {
- "uuid": block_paragraphs[0][
- "para_id"
- ], # UUID from first paragraph in content
- "uuid_end": block_uuid_end, # UUID_end from last paragraph (or table's last cell)
- "heading": current_block_heading,
- "content": block_content,
- "type": "text",
- "parent_headings": current_parent_headings,
- "_paragraphs": block_paragraphs, # Keep original paragraphs for potential re-splitting
- }
- new_table_headers = _collect_table_headers(block_paragraphs)
- if new_table_headers:
- new_block["table_headers"] = new_table_headers
- result_blocks.append(new_block)
- # Validate anchor as new heading
- validate_heading_length(anchor["text"], anchor["para_id"])
- # Update for next block
- current_block_heading = anchor["text"]
- # Update parent headings: add previous heading only if not "Preface/Uncategorized"
- if block_heading != "Preface/Uncategorized":
- current_parent_headings = parent_headings + [block_heading]
- prev_idx = (
- split_idx # Don't skip anchor - it becomes first paragraph of next block
- )
- # Create final block with remaining paragraphs
- final_paragraphs = paragraphs[prev_idx:]
- if final_paragraphs:
- final_content = "\n".join(p["text"] for p in final_paragraphs)
- # For uuid_end: use para_id_end if last element is a table, otherwise para_id
- last_final_para = final_paragraphs[-1]
- final_uuid_end = last_final_para.get("para_id_end") or last_final_para.get(
- "para_id"
- )
- final_block = {
- "uuid": final_paragraphs[0][
- "para_id"
- ], # UUID from first paragraph in content
- "uuid_end": final_uuid_end, # UUID_end from last paragraph (or table's last cell)
- "heading": current_block_heading,
- "content": final_content,
- "type": "text",
- "parent_headings": current_parent_headings,
- "_paragraphs": final_paragraphs, # Keep original paragraphs for potential re-splitting
- }
- final_table_headers = _collect_table_headers(final_paragraphs)
- if final_table_headers:
- final_block["table_headers"] = final_table_headers
- result_blocks.append(final_block)
- # Post-split validation: Check if any block still exceeds MAX_BLOCK_CONTENT_TOKENS
- # If so, recursively split that block (handles sparse anchor scenarios)
- validated_blocks = []
- for block in result_blocks:
- block_tokens = estimate_tokens(block["content"])
- if block_tokens > MAX_BLOCK_CONTENT_TOKENS:
- # This block is still too large - need to recursively split it
- # Use the preserved paragraph structure
- block_paragraphs = block.get("_paragraphs", [])
- if not block_paragraphs:
- # Fallback: shouldn't happen, but handle gracefully
- preview = (
- block["heading"][:80] + "..."
- if len(block["heading"]) > 80
- else block["heading"]
- )
- print_error(
- "Cannot re-split oversized block (internal error)",
- f"A block exceeded MAX_BLOCK_CONTENT_TOKENS but paragraph metadata was lost.\n\n"
- f"Location: Under heading \"{preview}\"\n"
- f"Block size: ~{block_tokens} tokens ({len(block['content'])} characters)",
- "This is an internal error. Please report this issue.",
- )
- sys.exit(1)
- # Recursively split this oversized block
- # The recursive call will either find more anchors or raise an error
- sub_blocks = split_long_block(
- block["heading"],
- block_paragraphs,
- block["parent_headings"],
- block_level,
- debug,
- )
- validated_blocks.extend(sub_blocks)
- else:
- # Remove internal _paragraphs field before adding to final output
- block.pop("_paragraphs", None)
- validated_blocks.append(block)
- # Add level to all blocks
- for block in validated_blocks:
- block["level"] = block_level
- # Output debug information if enabled and split occurred
- if debug and len(validated_blocks) > 1:
- print(f'\n[DEBUG] Block split: "{block_heading}"', file=sys.stderr)
- print(
- f" Original size: ~{total_tokens} tokens ({len(total_content)} characters)",
- file=sys.stderr,
- )
- block_tokens = [estimate_tokens(block["content"]) for block in validated_blocks]
- print(
- f" Final result: {len(validated_blocks)} blocks: ~{block_tokens} tokens",
- file=sys.stderr,
- )
- return validated_blocks
- def extract_para_id(para_element) -> str:
- """
- Extract w14:paraId attribute from paragraph element.
- Args:
- para_element: lxml paragraph element
- Returns:
- 8-character hex paraId, or ``None`` when the paragraph carries no
- ``w14:paraId`` attribute (legacy / non-Word docx authors). Callers
- propagate the ``None`` upward — the LightRAG adapter counts these
- and surfaces a single warning per document.
- """
- return para_element.get(
- "{http://schemas.microsoft.com/office/word/2010/wordml}paraId"
- )
- def parse_styles_outline_levels(docx_path: str) -> dict:
- """
- Parse styles.xml to extract outlineLvl definitions for each style,
- following style inheritance chain (basedOn).
- Args:
- docx_path: Path to DOCX file
- Returns:
- dict: styleId -> outlineLvl (0-8 for headings, 9 for body text)
- """
- import zipfile
- try:
- from defusedxml import ElementTree as ET
- except ImportError:
- from xml.etree import ElementTree as ET
- styles_outline = {} # styleId -> outlineLvl (directly defined)
- style_based_on = {} # styleId -> parent styleId
- try:
- with zipfile.ZipFile(docx_path, "r") as zf:
- if "word/styles.xml" not in zf.namelist():
- return styles_outline
- tree = ET.parse(zf.open("word/styles.xml"))
- root = tree.getroot()
- ns = "http://schemas.openxmlformats.org/wordprocessingml/2006/main"
- # First pass: collect outlineLvl and basedOn for all styles
- for style in root.findall(f".//{{{ns}}}style"):
- style_id = style.get(f"{{{ns}}}styleId")
- if not style_id:
- continue
- # Check for basedOn (style inheritance)
- based_on = style.find(f"{{{ns}}}basedOn")
- if based_on is not None:
- parent_id = based_on.get(f"{{{ns}}}val")
- if parent_id:
- style_based_on[style_id] = parent_id
- # Check for outlineLvl in style's pPr
- pPr = style.find(f"{{{ns}}}pPr")
- if pPr is not None:
- outline_lvl_elem = pPr.find(f"{{{ns}}}outlineLvl")
- if outline_lvl_elem is not None:
- level = int(outline_lvl_elem.get(f"{{{ns}}}val"))
- styles_outline[style_id] = level
- # Second pass: resolve inheritance chain for styles without direct outlineLvl
- def get_outline_level(style_id: str, visited: set = None) -> int:
- if visited is None:
- visited = set()
- if style_id in visited:
- return None # Prevent circular references
- visited.add(style_id)
- # If this style directly defines outlineLvl, return it
- if style_id in styles_outline:
- return styles_outline[style_id]
- # Otherwise check parent style
- if style_id in style_based_on:
- parent_id = style_based_on[style_id]
- return get_outline_level(parent_id, visited)
- return None
- # Fill in missing outlineLvl from inheritance chain
- all_style_ids = set(styles_outline.keys()) | set(style_based_on.keys())
- for style_id in all_style_ids:
- if style_id not in styles_outline:
- level = get_outline_level(style_id)
- if level is not None:
- styles_outline[style_id] = level
- except Exception:
- # Silently ignore parsing errors
- pass
- return styles_outline
- def get_heading_level(para_element, styles_outline_map: dict) -> int:
- """
- Get heading level from paragraph, checking both direct format and style.
- Priority: paragraph outlineLvl > style outlineLvl
- Args:
- para_element: lxml paragraph element
- styles_outline_map: dict of styleId -> outlineLvl from styles.xml
- Returns:
- int: 0-8 for heading levels (0=level 1, 1=level 2, etc.), None for non-heading
- """
- # 1. Check paragraph direct format
- pPr = para_element.find(
- "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}pPr"
- )
- if pPr is not None:
- outline_elem = pPr.find(
- "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}outlineLvl"
- )
- if outline_elem is not None:
- level = int(
- outline_elem.get(
- "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val"
- )
- )
- # Only 0-8 are true heading levels (9 is body text)
- if level < 9:
- return level
- else:
- return None # Level 9 is body text
- # 2. Check style definition's outlineLvl
- if pPr is not None:
- pStyle_elem = pPr.find(
- "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}pStyle"
- )
- if pStyle_elem is not None:
- style_id = pStyle_elem.get(
- "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val"
- )
- if style_id and style_id in styles_outline_map:
- level = styles_outline_map[style_id]
- if level < 9:
- return level
- else:
- return None
- return None
- def extract_text_from_run(
- run,
- ns: dict,
- drawing_context: DrawingExtractionContext = None,
- ) -> str:
- """
- Extract text from a run element, preserving superscript/subscript with markup.
- Converts Word formatting to HTML-like tags:
- - Superscript: <sup>text</sup>
- - Subscript: <sub>text</sub>
- - Normal text: unchanged
- Args:
- run: lxml run element (w:r)
- ns: XML namespace dictionary
- Returns:
- Text string with <sup>/<sub> markup for formatted portions
- """
- text = ""
- # Check for vertAlign in rPr (superscript/subscript)
- vert_align = None
- rPr = run.find("w:rPr", ns)
- if rPr is not None:
- vert_elem = rPr.find("w:vertAlign", ns)
- if vert_elem is not None:
- vert_align = vert_elem.get(
- "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val"
- )
- # Extract text content from run children
- for child in run:
- tag = child.tag.split("}")[-1] # Remove namespace
- if tag == "t" and child.text:
- text += child.text
- elif tag == "tab":
- text += "\t"
- elif tag == "br":
- # Handle line breaks - textWrapping or no type = soft line break
- br_type = child.get(
- "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}type"
- )
- if br_type in (None, "textWrapping"):
- text += "\n"
- # Skip page and column breaks (layout elements)
- elif tag == "drawing":
- text += extract_drawing_placeholder_from_element(
- child,
- context=drawing_context,
- include_extended_attrs=True,
- )
- elif tag in ("pict", "object"):
- text += extract_vml_image_placeholder_from_element(
- child,
- context=drawing_context,
- include_extended_attrs=True,
- )
- # Apply superscript/subscript markup if needed
- if text and vert_align == "superscript":
- return f"<sup>{text}</sup>"
- elif text and vert_align == "subscript":
- return f"<sub>{text}</sub>"
- return text
- def extract_paragraph_content(
- element,
- ns,
- drawing_context: DrawingExtractionContext = None,
- ) -> str:
- """
- Extract text and equations from a paragraph element in document order.
- Handles w:r (text runs), m:oMath (inline equations), and m:oMathPara
- (block equations). Recurses into container elements (e.g., w:hyperlink,
- w:ins, w:sdt, w:fldSimple, w:smartTag) to avoid dropping content.
- Args:
- element: lxml paragraph element (w:p)
- ns: XML namespace dictionary
- Returns:
- Text string with equations wrapped in <equation> tags
- """
- parts = []
- def append_from(node) -> None:
- tag = node.tag.split("}")[-1]
- # Drop tracked-change deletions (w:del/w:moveFrom) and comment markers
- # (w:commentRangeStart/End, w:commentReference, w:annotationRef) so the
- # output only contains the final revised text without annotation glyphs.
- if tag in _SKIP_PARAGRAPH_TAGS:
- return
- if tag == "r":
- parts.append(
- extract_text_from_run(node, ns, drawing_context=drawing_context)
- )
- return
- if tag == "oMath":
- from .omml import convert_omml_to_latex
- latex = convert_omml_to_latex(node)
- if latex:
- parts.append(f"<equation>{latex}</equation>")
- return
- if tag == "oMathPara":
- from .omml import convert_omml_to_latex
- for omath in node:
- if omath.tag.split("}")[-1] == "oMath":
- latex = convert_omml_to_latex(omath)
- if latex:
- parts.append(f"<equation>{latex}</equation>")
- return
- for child in node:
- append_from(child)
- for child in element:
- append_from(child)
- return "".join(parts)
- def _is_table_empty(rows: list) -> bool:
- """Return True iff every cell in ``rows`` is whitespace-only."""
- return all(not (cell or "").strip() for row in rows for cell in row)
- def _collect_table_headers(paragraphs: list) -> list:
- """Collect per-table cross-page header rows from ``is_table`` paragraphs.
- The returned list is aligned 1:1 with the order of ``<table>`` placeholder
- tags emitted into the block's content; entries are either the list of
- header rows captured from ``w:tblHeader`` or ``None`` when the table has
- no cross-page repeating header.
- """
- return [p.get("_table_header") for p in paragraphs if p.get("is_table")]
- def _build_unsplit_block(
- heading: str, paragraphs: list, parent_headings: list, level: int
- ) -> dict:
- """Build a single block from paragraphs without size-based splitting."""
- last_para = paragraphs[-1]
- block = {
- "uuid": paragraphs[0]["para_id"],
- "uuid_end": last_para.get("para_id_end") or last_para.get("para_id"),
- "heading": heading,
- "content": "\n".join(p["text"] for p in paragraphs),
- "type": "text",
- "parent_headings": parent_headings,
- "level": level,
- }
- table_headers = _collect_table_headers(paragraphs)
- if table_headers:
- block["table_headers"] = table_headers
- return block
- def _flush_current_block(
- blocks: list,
- heading: str,
- paragraphs: list,
- parent_headings: list,
- level: int,
- fixlevel: int,
- debug: bool,
- ) -> None:
- """
- Flush accumulated paragraphs into blocks, respecting fixlevel mode.
- In default mode (fixlevel is None), runs split_long_block for token-based splitting.
- In fixlevel mode, emits a single unsplit block and warns when size exceeds the limit.
- """
- if not paragraphs:
- return
- if fixlevel is None:
- blocks.extend(
- split_long_block(heading, paragraphs, parent_headings, level, debug)
- )
- return
- block = _build_unsplit_block(heading, paragraphs, parent_headings, level)
- block_tokens = estimate_tokens(block["content"])
- if block_tokens > MAX_BLOCK_CONTENT_TOKENS:
- preview = heading[:80] + "..." if len(heading) > 80 else heading
- print(
- f"Warning: fixlevel block exceeds {MAX_BLOCK_CONTENT_TOKENS} tokens "
- f'(~{block_tokens} tokens) under heading "{preview}". '
- f"Consider increasing --fixlevel=N or removing --fixlevel for automatic splitting.",
- file=sys.stderr,
- )
- blocks.append(block)
- def extract_docx_blocks(
- file_path: str,
- debug: bool = False,
- fixlevel: int = None,
- drawing_context: DrawingExtractionContext = None,
- parse_warnings: dict | None = None,
- parse_metadata: dict | None = None,
- ) -> list:
- """
- Extract text blocks (chunks) from a DOCX file for chunking later.
- Uses python-docx with custom numbering resolver to:
- 1. Capture automatic numbering (list labels)
- 2. Split document by headings
- 3. Convert tables to JSON (2D array)
- 4. Validate heading lengths and table sizes
- 5. Split long blocks using anchor paragraphs
- 6. Preserve superscript/subscript formatting with <sup>/<sub> markup
- Args:
- file_path: Path to the DOCX file
- debug: If True, output debug information when splitting blocks
- fixlevel: If specified, disable smart splitting/merging and only split at heading levels <= fixlevel
- (0 = split at all heading levels, 1 = Heading 1 only, 2 = Heading 1-2, etc.)
- parse_warnings: Optional out-dict that this function mutates with
- non-fatal warnings observed during parsing. Currently used for
- ``missing_paraid_count`` — incremented once per body-level
- paragraph (heading or text) that lacks a ``w14:paraId`` and once
- per table whose every cell lacks one. Callers (the LightRAG
- adapter / debug CLI) read this to surface a one-line warning per
- document instead of crashing.
- parse_metadata: Optional out-dict that this function mutates with
- document-level metadata derived during parsing. Currently used
- for ``first_heading`` — the text of the first heading encountered
- in document order (regardless of level). Used by the LightRAG
- adapter to populate ``meta.doc_title`` in ``.blocks.jsonl``.
- Returns:
- List of block dictionaries with heading, content, type, and metadata
- """
- doc = Document(file_path)
- resolver = NumberingResolver(file_path)
- styles_outline = parse_styles_outline_levels(file_path)
- blocks = []
- current_heading = "Preface/Uncategorized"
- current_heading_level = 1 # Default level for "Preface/Uncategorized"
- current_heading_stack = {} # {level: heading_text} - Use dict to correctly track heading hierarchy
- current_parent_headings = [] # Parent headings for current block
- current_paragraphs = [] # Track paragraphs with metadata for splitting
- has_body_content = (
- False # Track if current block has body content (non-heading paragraphs/tables)
- )
- matched_fixlevel_heading = False # Track whether --fixlevel matched any heading
- table_split_counter = (
- 0 # Track cumulative table split suffix numbers within current block
- )
- first_heading_recorded = (
- False # Track whether the document's first heading has been captured
- )
- # Iterate through document body elements (paragraphs and tables)
- body = doc._element.body
- for element in body:
- tag = element.tag.split("}")[-1] # Remove namespace
- if tag == "sectPr": # Document-level section break
- resolver.reset_tracking_state()
- continue
- if tag == "p": # Paragraph
- # Get paragraph text with superscript/subscript markup and equations
- para_text = ""
- ns = {
- "w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main",
- "wp": "http://schemas.openxmlformats.org/drawingml/2006/wordprocessingDrawing",
- "m": "http://schemas.openxmlformats.org/officeDocument/2006/math",
- }
- para_text = extract_paragraph_content(
- element,
- ns,
- drawing_context=drawing_context,
- )
- para_text = para_text.strip()
- if not para_text:
- continue
- # Get numbering label using our resolver
- label = resolver.get_label(element)
- full_text = f"{label} {para_text}".strip() if label else para_text
- # Check if this is a heading using the new function
- outline_level = get_heading_level(element, styles_outline)
- if outline_level is not None:
- # This is a heading (outline level 0-8)
- # Convert 0-based to 1-based level
- level = outline_level + 1
- # In fixlevel mode, check if this heading should trigger a block split
- should_split = True
- if fixlevel is not None and fixlevel > 0:
- # If fixlevel is specified and > 0, only split at levels <= fixlevel
- should_split = level <= fixlevel
- # Extract paraId for this heading
- heading_para_id = extract_para_id(element)
- if parse_warnings is not None and not heading_para_id:
- parse_warnings["missing_paraid_count"] = (
- parse_warnings.get("missing_paraid_count", 0) + 1
- )
- # Validate heading length
- validate_heading_length(full_text, heading_para_id)
- # Truncate heading if needed before storing
- truncated_text = truncate_heading(full_text, heading_para_id)
- # Record the document's first heading (any level) for meta.doc_title.
- if not first_heading_recorded:
- if parse_metadata is not None:
- parse_metadata["first_heading"] = truncated_text
- first_heading_recorded = True
- if should_split:
- if fixlevel is not None and fixlevel > 0:
- matched_fixlevel_heading = True
- # This heading triggers a block split
- # Only save previous block if it has body content
- if has_body_content and current_paragraphs:
- _flush_current_block(
- blocks,
- current_heading,
- current_paragraphs,
- current_parent_headings,
- current_heading_level,
- fixlevel,
- debug,
- )
- # Reset for new block
- current_paragraphs = []
- has_body_content = False
- table_split_counter = (
- 0 # Reset table split counter for new heading
- )
- # Add heading to current_paragraphs
- current_paragraphs.append(
- {
- "text": truncated_text,
- "para_id": heading_para_id,
- "is_table": False,
- }
- )
- # Update current_heading and parent_headings for the FIRST heading in a block
- # (when current_paragraphs just had this heading added as its first element)
- if len(current_paragraphs) == 1:
- current_heading = truncated_text
- current_heading_level = (
- level # Only set level when setting heading
- )
- # Parent headings = all headings from levels strictly less than current level
- # Sort by level to maintain hierarchy order
- current_parent_headings = [
- current_heading_stack[lvl]
- for lvl in sorted(current_heading_stack.keys())
- if lvl < level
- ]
- # Update heading stack: remove current level and all lower levels, then add current
- current_heading_stack = {
- k: v for k, v in current_heading_stack.items() if k < level
- }
- current_heading_stack[level] = truncated_text
- else:
- # This heading doesn't trigger split - treat as regular paragraph
- para_id = heading_para_id
- # Store as regular paragraph with metadata
- current_paragraphs.append(
- {"text": truncated_text, "para_id": para_id, "is_table": False}
- )
- # Mark that we have body content
- has_body_content = True
- else:
- # Regular paragraph content
- para_id = extract_para_id(element)
- if parse_warnings is not None and not para_id:
- parse_warnings["missing_paraid_count"] = (
- parse_warnings.get("missing_paraid_count", 0) + 1
- )
- # Store paragraph with metadata for potential splitting
- current_paragraphs.append(
- {"text": full_text, "para_id": para_id, "is_table": False}
- )
- # Mark that we have body content
- has_body_content = True
- # Check for paragraph-level section break (after processing paragraph)
- # sectPr in pPr means this paragraph ends a section
- pPr = element.find(
- "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}pPr"
- )
- if pPr is not None:
- sectPr = pPr.find(
- "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}sectPr"
- )
- if sectPr is not None:
- # Section break after this paragraph - reset tracking
- resolver.reset_tracking_state()
- elif tag == "tbl": # Table
- # Reset numbering tracking before table (table start boundary)
- resolver.reset_tracking_state()
- # Directly create Table object from XML element to avoid index mismatch
- # (doc.tables may have different order due to nested tables)
- from docx.table import Table
- table = Table(element, doc)
- table_metadata = TableExtractor.extract_with_metadata(
- table,
- numbering_resolver=resolver,
- drawing_context=drawing_context,
- )
- table_rows = table_metadata["rows"]
- para_ids = table_metadata["para_ids"]
- para_ids_end = table_metadata["para_ids_end"] # Last paraId in each cell
- header_indices = table_metadata["header_indices"]
- # Skip tables whose every cell is whitespace-only — otherwise an
- # empty `<table>[[""]]</table>` placeholder would leak into block
- # content and a useless IRTable would appear in tables.json.
- if _is_table_empty(table_rows):
- resolver.reset_tracking_state()
- continue
- # Count tables whose cells carry no w14:paraId. Legacy / non-Word
- # docx authors omit these attributes; we no longer fail-fast, but
- # the adapter surfaces a single warning so the user knows the edit
- # range hints will be missing for these tables.
- if parse_warnings is not None and not _table_has_any_paraid(para_ids):
- parse_warnings["missing_paraid_count"] = (
- parse_warnings.get("missing_paraid_count", 0) + 1
- )
- # Convert table to JSON and estimate token count
- table_json = json.dumps(table_rows, ensure_ascii=False)
- table_tokens = estimate_tokens(table_json)
- # Extract cross-page repeating header rows (w:tblHeader) once per
- # table so both split and unsplit branches can surface them to the
- # sidecar via the block-level ``table_headers`` list.
- header_rows = []
- if header_indices:
- header_rows = [
- table_rows[idx] for idx in header_indices if idx < len(table_rows)
- ]
- header_rows_or_none = header_rows if header_rows else None
- # Check if table needs splitting (disabled in fixlevel mode)
- if fixlevel is None and table_tokens > TABLE_MAX_TOKENS:
- # Table exceeds limit - split it
- # Pass table_split_counter to ensure sequential numbering across multiple tables
- table_chunks = split_table_with_heading(
- table_rows,
- para_ids,
- para_ids_end,
- header_indices,
- current_heading,
- table_split_counter,
- debug,
- )
- for chunk_idx, chunk in enumerate(table_chunks):
- chunk_json = json.dumps(chunk["rows"], ensure_ascii=False)
- # Get uuid_end from last valid paraId in chunk (use para_ids_end for last cell's last paragraph)
- chunk_para_id_end = find_last_valid_para_id(chunk["para_ids_end"])
- if chunk["is_first"]:
- # First chunk: add to current_paragraphs (will merge with preceding content)
- current_paragraphs.append(
- {
- "text": f"<table>{chunk_json}</table>",
- "para_id": chunk["uuid"],
- "para_id_end": chunk_para_id_end, # Store end paraId for uuid_end calculation
- "is_table": True,
- "_table_header": header_rows_or_none,
- }
- )
- has_body_content = True
- else:
- # Middle or last chunk: save current block first
- if current_paragraphs:
- _flush_current_block(
- blocks,
- current_heading,
- current_paragraphs,
- current_parent_headings,
- current_heading_level,
- fixlevel,
- debug,
- )
- current_paragraphs = []
- has_body_content = False
- # Generate heading using suffix_number from chunk
- if chunk["suffix_number"] is not None:
- chunk_heading = f"{current_heading} [{TABLE_CHUNK_SUFFIX_LABEL}{chunk['suffix_number']}]"
- else:
- chunk_heading = current_heading
- # Build block for this table chunk
- # Get uuid_end from last valid paraId in chunk (use para_ids_end for last cell's last paragraph)
- chunk_uuid_end = find_last_valid_para_id(chunk["para_ids_end"])
- # Determine table_chunk_role based on chunk position
- if chunk["is_first"] and chunk["is_last"]:
- table_chunk_role = "none" # Not split
- elif chunk["is_first"]:
- table_chunk_role = "first"
- elif chunk["is_last"]:
- table_chunk_role = "last"
- else:
- table_chunk_role = "middle"
- chunk_block = {
- "uuid": chunk["uuid"],
- "uuid_end": chunk_uuid_end,
- "heading": chunk_heading,
- "content": f"<table>{chunk_json}</table>",
- "type": "text",
- "parent_headings": current_parent_headings,
- "level": current_heading_level,
- "table_chunk_role": table_chunk_role,
- }
- # Always emit a per-table headers list (aligned with the
- # single <table> placeholder in this standalone block);
- # the entry is None when the table has no cross-page
- # repeating header so downstream counters stay aligned.
- chunk_block["table_headers"] = [header_rows_or_none]
- if chunk["is_last"]:
- # Last chunk: add to current_paragraphs for merging with following content
- current_paragraphs.append(
- {
- "text": f"<table>{chunk_json}</table>",
- "para_id": chunk["uuid"],
- "para_id_end": chunk_para_id_end, # Store end paraId for uuid_end calculation
- "is_table": True,
- "_chunk_heading": chunk_heading,
- "_table_header": header_rows_or_none,
- }
- )
- has_body_content = True
- else:
- # Middle chunk: output immediately as standalone block
- blocks.append(chunk_block)
- # Update table_split_counter: add number of non-first chunks
- # (first chunk doesn't get a suffix, so we count from second chunk onwards)
- table_split_counter += len(table_chunks) - 1
- else:
- # Table is within size limit - no splitting needed
- # Store table as a paragraph with special marker
- # Use first valid paraId from table, and last valid paraId (from para_ids_end) for uuid_end
- table_para_id = find_first_valid_para_id(para_ids)
- table_para_id_end = find_last_valid_para_id(para_ids_end)
- current_paragraphs.append(
- {
- "text": f"<table>{table_json}</table>",
- "para_id": table_para_id,
- "para_id_end": table_para_id_end, # Store end paraId for uuid_end calculation
- "is_table": True,
- "_table_header": header_rows_or_none,
- }
- )
- # Mark that we have body content
- has_body_content = True
- # Reset numbering tracking after table (table end boundary)
- resolver.reset_tracking_state()
- # Save final block (respecting fixlevel mode)
- _flush_current_block(
- blocks,
- current_heading,
- current_paragraphs,
- current_parent_headings,
- current_heading_level,
- fixlevel,
- debug,
- )
- # Add table_chunk_role="none" to all blocks that don't have it (non-table or unsplit table blocks)
- for block in blocks:
- if "table_chunk_role" not in block:
- block["table_chunk_role"] = "none"
- # Perform small block merging (unified merging after all splits)
- # Disabled in fixlevel mode
- if fixlevel is None:
- if debug:
- print(f"\n[DEBUG] Before merging: {len(blocks)} blocks", file=sys.stderr)
- merged_blocks, merge_count = merge_small_blocks(blocks, debug)
- if debug and merge_count > 0:
- print(
- f"[DEBUG] After merging: {len(merged_blocks)} blocks ({merge_count} merges performed)",
- file=sys.stderr,
- )
- return merged_blocks
- # Fixed level mode: skip merging, but warn if no heading matched the requested level
- if fixlevel > 0 and not matched_fixlevel_heading:
- print(
- f"Warning: --fixlevel={fixlevel} produced {len(blocks)} block(s). "
- f"Document may not have heading levels <= {fixlevel}. "
- f"Try a higher --fixlevel value or remove the flag.",
- file=sys.stderr,
- )
- return blocks
|