#!/usr/bin/env python3
"""
ABOUTME: Parses DOCX documents into text blocks using python-docx
ABOUTME: Extracts automatic numbering, splits by headings, converts tables to JSON
"""
import json
import sys
try:
from docx import Document
except ImportError:
print(
"Error: python-docx not installed. Run: pip install python-docx",
file=sys.stderr,
)
sys.exit(1)
from .numbering_resolver import NumberingResolver
from .table_extractor import TableExtractor
from .utils import estimate_tokens
from .drawing_image_extractor import (
DrawingExtractionContext,
extract_drawing_placeholder_from_element,
extract_vml_image_placeholder_from_element,
)
# Constants for content validation (character-based for UI/display)
MAX_HEADING_LENGTH = 200 # Maximum heading length in characters (UI constraint)
MAX_ANCHOR_CANDIDATE_LENGTH = (
100 # Maximum length for candidate anchor paragraphs (characters)
)
# Constants for content splitting (token-based for LLM context management)
IDEAL_BLOCK_CONTENT_TOKENS = 6000 # Ideal target size for balanced splitting (tokens)
MAX_BLOCK_CONTENT_TOKENS = 8000 # Maximum block content (tokens, hard limit)
SMALL_TAIL_THRESHOLD = (
MAX_BLOCK_CONTENT_TOKENS - IDEAL_BLOCK_CONTENT_TOKENS
) // 2 # Threshold for tail absorption (1000 tokens)
# Constants for table splitting (token-based)
TABLE_IDEAL_TOKENS = 3000 # Ideal target size for table chunks (tokens)
TABLE_MAX_TOKENS = 5000 # Maximum table size before splitting (tokens), must smaller than IDEAL_BLOCK_CONTENT_TOKENS
TABLE_MIN_LAST_CHUNK_TOKENS = int(
(TABLE_MAX_TOKENS - TABLE_IDEAL_TOKENS) * 0.8
) # Minimum size for last chunk to avoid tiny fragments
TABLE_CHUNK_SUFFIX_LABEL = "表格片段" # Label prefix for split table chunk headings
# OOXML tracked-change/comment tags whose subtree must be dropped so we only
# surface the *final* revised text. w:ins / w:moveTo are kept via default
# recursion so inserted/moved-in content survives.
_SKIP_REVISION_TAGS = frozenset({"del", "moveFrom"})
_SKIP_COMMENT_TAGS = frozenset(
{"commentRangeStart", "commentRangeEnd", "commentReference", "annotationRef"}
)
_SKIP_PARAGRAPH_TAGS = _SKIP_REVISION_TAGS | _SKIP_COMMENT_TAGS
def print_error(title: str, details: str, solution: str):
"""
Print a friendly, formatted error message.
Args:
title: Error title
details: Detailed error information
solution: Suggested solution steps
"""
print("\n" + "=" * 80, file=sys.stderr)
print(f"ERROR: {title}", file=sys.stderr)
print("=" * 80, file=sys.stderr)
print(f"\n{details}", file=sys.stderr)
print("\nSOLUTION:", file=sys.stderr)
print(solution, file=sys.stderr)
print("\n" + "=" * 80 + "\n", file=sys.stderr)
def truncate_heading(heading_text: str, para_id: str = None) -> str:
"""
Truncate heading if it exceeds MAX_HEADING_LENGTH.
Args:
heading_text: The heading text to check
para_id: Optional paragraph ID for warning message
Returns:
str: Original heading if within limit, truncated heading with "..." if too long
"""
if len(heading_text) > MAX_HEADING_LENGTH:
truncated = heading_text[: MAX_HEADING_LENGTH - 3] + "..."
location = f" (para_id: {para_id})" if para_id else ""
print(
f"Warning: Heading truncated (length {len(heading_text)} > max {MAX_HEADING_LENGTH}){location}: "
f'"{truncated}"',
file=sys.stderr,
)
return truncated
return heading_text
def validate_heading_length(heading_text: str, para_id: str):
"""
Validate that heading length does not exceed MAX_HEADING_LENGTH.
Args:
heading_text: The heading text to validate
para_id: The paragraph ID for error reporting
Exits:
sys.exit(1) if heading exceeds maximum length
"""
if len(heading_text) > MAX_HEADING_LENGTH:
preview = (
heading_text[:100] + "..." if len(heading_text) > 100 else heading_text
)
print_error(
f"Heading too long ({len(heading_text)} characters, max {MAX_HEADING_LENGTH})",
f'The following heading exceeds the maximum allowed length:\n\n "{preview}"\n\n'
f"Location: Paragraph ID {para_id}\n"
f"Actual length: {len(heading_text)} characters",
" 1. Open the document in Microsoft Word\n"
f" 2. Shorten this heading to {MAX_HEADING_LENGTH} characters or less\n"
" 3. Re-upload it to LightRAG",
)
sys.exit(1)
def validate_table_tokens(table_json: str, block_heading: str):
"""
Validate that table JSON does not exceed MAX_BLOCK_CONTENT_TOKENS.
Args:
table_json: The JSON representation of the table
block_heading: The heading of the block containing this table
Exits:
sys.exit(1) if table exceeds maximum token limit
"""
table_tokens = estimate_tokens(table_json)
if table_tokens > MAX_BLOCK_CONTENT_TOKENS:
print_error(
f"Table too large (~{table_tokens} tokens, max {MAX_BLOCK_CONTENT_TOKENS})",
f"A table in the document is too large for LLM processing.\n\n"
f'Location: Under heading "{block_heading}"\n'
f"Table size: ~{table_tokens} tokens ({len(table_json)} characters)\n\n"
"Large tables can cause issues with file chunking.",
" 1. Open the document in Microsoft Word\n"
f' 2. Locate the table under heading "{block_heading}"\n'
" 3. Split the table into smaller tables, or\n"
" 4. Simplify the table content\n"
" 5. Re-upload it to LightRAG",
)
sys.exit(1)
def find_first_valid_para_id(para_ids: list) -> str | None:
"""
Find the first valid paraId in a 2D array of paraIds.
Args:
para_ids: 2D list of paraIds from table cells
Returns:
First non-None paraId found, or None when every cell lacks a paraId.
Callers must tolerate ``None`` and treat it as a tracking gap rather
than a fatal error (legacy / non-Word docx authors omit ``w14:paraId``
attributes and we want to keep parsing).
"""
for row in para_ids:
for para_id in row:
if para_id:
return para_id
return None
def find_last_valid_para_id(para_ids: list) -> str | None:
"""
Find the last valid paraId in a 2D array of paraIds.
Returns the last non-None paraId, falling back to the first valid one
when reverse-iteration does not yield anything (single-paraId tables),
and finally ``None`` when every cell lacks a paraId.
"""
for row in reversed(para_ids):
for para_id in reversed(row):
if para_id:
return para_id
return find_first_valid_para_id(para_ids)
def _table_has_any_paraid(para_ids: list) -> bool:
"""True when at least one cell in the 2D paraId grid carries an id."""
return find_first_valid_para_id(para_ids) is not None
def split_table(
table_rows: list,
para_ids: list,
para_ids_end: list,
header_indices: list,
debug: bool = False,
) -> list:
"""
Split large table into chunks at row boundaries.
Splitting Strategy:
1. Only split if table JSON exceeds TABLE_MAX_TOKENS (5000 tokens)
2. Calculate target chunks based on TABLE_IDEAL_TOKENS (3000 tokens)
3. Split at row boundaries to achieve balanced chunk sizes
4. Avoid very small last chunk: if last chunk < 1000 tokens, merge with previous
5. Extract first valid paraId for each chunk as UUID
Output Strategy:
- First chunk: Merges with preceding content, uses original heading
- Middle chunks: Standalone blocks with heading suffix [1], [2], etc.
- Last chunk: Merges with following content, carries the cross-page
``_table_header`` so the host block can surface it via ``table_headers``
- The cross-page repeating header rows (extracted from ``w:tblHeader``)
flow per-table into each containing block's ``table_headers`` list
Args:
table_rows: 2D array of table content
para_ids: 2D array of paraIds - first paraId in each cell (for uuid)
para_ids_end: 2D array of paraIds - last paraId in each cell (for uuid_end)
header_indices: List of row indices that are table headers
debug: If True, output debug information
Returns:
List of chunk dicts: [{
'rows': 2D array subset,
'para_ids': 2D array subset,
'para_ids_end': 2D array subset,
'uuid': first valid paraId in chunk,
'is_first': True if first chunk,
'is_last': True if last chunk
}, ...]
"""
import math
# Calculate total JSON token count
total_json = json.dumps(table_rows, ensure_ascii=False)
total_tokens = estimate_tokens(total_json)
if total_tokens <= TABLE_MAX_TOKENS:
# No splitting needed
uuid = find_first_valid_para_id(para_ids)
return [
{
"rows": table_rows,
"para_ids": para_ids,
"para_ids_end": para_ids_end,
"uuid": uuid,
"is_first": True,
"is_last": True,
}
]
# Need to split - calculate target number of chunks
target_chunks = math.ceil(total_tokens / TABLE_IDEAL_TOKENS)
min_chunks_needed = math.ceil(total_tokens / TABLE_MAX_TOKENS)
target_chunks = max(target_chunks, min_chunks_needed)
# Split at row boundaries
chunks = []
num_rows = len(table_rows)
target_rows_per_chunk = num_rows / target_chunks
start_row = 0
for i in range(target_chunks):
# Calculate end row for this chunk
if i == target_chunks - 1:
# Last chunk gets all remaining rows
end_row = num_rows
else:
# Target end row (rounded)
end_row = min(int((i + 1) * target_rows_per_chunk), num_rows)
# Adjust to avoid very small last chunk
rows_remaining = num_rows - end_row
if rows_remaining > 0 and rows_remaining < target_rows_per_chunk * 0.3:
# Last chunk would be too small, expand this chunk
end_row = num_rows
# Extract chunk
chunk_rows = table_rows[start_row:end_row]
chunk_para_ids = para_ids[start_row:end_row]
chunk_para_ids_end = para_ids_end[start_row:end_row]
if chunk_rows:
chunk_uuid = find_first_valid_para_id(chunk_para_ids)
chunks.append(
{
"rows": chunk_rows,
"para_ids": chunk_para_ids,
"para_ids_end": chunk_para_ids_end,
"uuid": chunk_uuid,
"is_first": (i == 0),
"is_last": (end_row >= num_rows),
}
)
start_row = end_row
if start_row >= num_rows:
break
# Post-processing: Merge very small last chunk with previous chunk if possible
if len(chunks) >= 2:
last_chunk = chunks[-1]
last_chunk_json = json.dumps(last_chunk["rows"], ensure_ascii=False)
last_chunk_tokens = estimate_tokens(last_chunk_json)
if last_chunk_tokens < TABLE_MIN_LAST_CHUNK_TOKENS:
# Try to merge with previous chunk
prev_chunk = chunks[-2]
# Calculate combined size
combined_rows = prev_chunk["rows"] + last_chunk["rows"]
combined_json = json.dumps(combined_rows, ensure_ascii=False)
combined_tokens = estimate_tokens(combined_json)
# Only merge if combined size doesn't exceed max limit
if combined_tokens <= TABLE_MAX_TOKENS:
# Merge the chunks
merged_para_ids = prev_chunk["para_ids"] + last_chunk["para_ids"]
merged_para_ids_end = (
prev_chunk["para_ids_end"] + last_chunk["para_ids_end"]
)
chunks[-2] = {
"rows": combined_rows,
"para_ids": merged_para_ids,
"para_ids_end": merged_para_ids_end,
"uuid": prev_chunk["uuid"], # Keep UUID of first chunk
"is_first": prev_chunk["is_first"],
"is_last": True, # This becomes the last chunk
}
chunks.pop() # Remove the last chunk
if debug:
print(
f"[DEBUG] Merged small last chunk (~{last_chunk_tokens} tokens) with previous chunk",
file=sys.stderr,
)
print(
f" Combined size: ~{combined_tokens} tokens", file=sys.stderr
)
return chunks
def split_table_with_heading(
table_rows: list,
para_ids: list,
para_ids_end: list,
header_indices: list,
current_heading: str,
start_suffix: int = 0,
debug: bool = False,
) -> list:
"""
Wrapper for split_table that includes heading information in debug output.
Supports sequential numbering when multiple tables are split in the same block.
Args:
table_rows: 2D array of table content
para_ids: 2D array of paraIds - first paraId in each cell (for uuid)
para_ids_end: 2D array of paraIds - last paraId in each cell (for uuid_end)
header_indices: List of row indices that are table headers
current_heading: Current block heading (for generating chunk headings)
start_suffix: Starting suffix number for non-first chunks (default: 0)
When multiple tables in the same block are split, this ensures
sequential numbering (e.g., [1], [2] for first table, [3], [4] for second)
debug: If True, output debug information with headings
Returns:
Same as split_table(), with each chunk having suffix calculated from start_suffix
"""
chunks = split_table(
table_rows, para_ids, para_ids_end, header_indices, debug=False
)
# Add suffix_number to each chunk for later use
for i, chunk in enumerate(chunks):
if i == 0:
chunk["suffix_number"] = None # First chunk has no suffix
else:
chunk["suffix_number"] = start_suffix + i
# Debug output with headings
if debug and len(chunks) > 1:
print(
f"\n[DEBUG] Table split into {len(chunks)} chunks (final)", file=sys.stderr
)
for i, chunk in enumerate(chunks):
chunk_json = json.dumps(chunk["rows"], ensure_ascii=False)
# Generate heading for this chunk
if chunk["suffix_number"] is None:
chunk_heading = current_heading
else:
chunk_heading = f"{current_heading} [{TABLE_CHUNK_SUFFIX_LABEL}{chunk['suffix_number']}]"
print(
f" Chunk {i+1}: heading=\"{chunk_heading}\", {len(chunk['rows'])} rows, {len(chunk_json)} chars",
file=sys.stderr,
)
return chunks
def merge_small_blocks(blocks: list, debug: bool = False) -> tuple:
"""
Merge blocks below IDEAL_BLOCK_CONTENT_TOKENS following bottom-up, level-aware strategy.
Strategy (bottom-up approach):
1. Process levels from deepest (largest number) to shallowest (level 1)
2. For each level:
- Phase A: Same-level merging - merge adjacent blocks of same level
- Phase B: Cross-level absorption - allow higher levels to absorb current level
3. Table chunk role restrictions:
- 'middle': cannot merge with any block
- 'first': can only merge forward (with next block)
- 'last': can only merge backward (with previous block)
- 'none': no restrictions
4. Stop merging a block once it reaches IDEAL_BLOCK_CONTENT_TOKENS (locked)
5. Reject merge if combined size > MAX_BLOCK_CONTENT_TOKENS
6. Merged block's level = level of the block whose heading is kept
Args:
blocks: List of block dictionaries with 'level' and 'table_chunk_role' fields
debug: If True, output debug information and return merge count
Returns:
Tuple of (merged_blocks, merge_count)
"""
if len(blocks) <= 1:
return blocks, 0
merged_count = 0
result = blocks.copy()
# Find all unique levels and sort from deepest to shallowest
levels = sorted(set(block.get("level", 1) for block in result), reverse=True)
if debug:
print(
f"\n[DEBUG] merge_small_blocks: Processing {len(result)} blocks across levels {levels}",
file=sys.stderr,
)
# Process each level from deepest to shallowest
for current_level in levels:
if debug:
print(f"[DEBUG] Processing level {current_level}", file=sys.stderr)
# Phase A: Same-level merging
changed = True
iteration = 0
while changed:
iteration += 1
changed = False
i = 0
new_result = []
while i < len(result):
current_block = result[i]
current_tokens = estimate_tokens(current_block["content"])
block_level = current_block.get("level", 1)
current_role = current_block.get("table_chunk_role", "none")
# Only process blocks of current level that are below IDEAL and not locked
is_below_ideal = (
current_tokens < IDEAL_BLOCK_CONTENT_TOKENS and current_tokens > 0
)
is_current_level = block_level == current_level
if is_below_ideal and is_current_level:
merged = False
# Check table chunk role restrictions
can_merge_forward = current_role in ["none", "first"]
can_merge_backward = current_role in ["none", "last"]
# Try forward merge with next block (only same level in Phase A)
if can_merge_forward and i + 1 < len(result):
next_block = result[i + 1]
next_level = next_block.get("level", 1)
next_role = next_block.get("table_chunk_role", "none")
next_can_merge_backward = next_role in ["none", "last"]
# Phase A: Only merge same-level blocks
if next_level == current_level and next_can_merge_backward:
merged_content = (
current_block["content"]
+ "\n\n"
+ next_block["content"]
)
combined_tokens = estimate_tokens(merged_content)
if combined_tokens <= MAX_BLOCK_CONTENT_TOKENS:
merged_block = {
"uuid": current_block["uuid"],
"uuid_end": next_block.get(
"uuid_end", next_block["uuid"]
),
"heading": current_block["heading"],
"content": merged_content,
"type": "text",
"parent_headings": current_block["parent_headings"],
"level": current_level,
"table_chunk_role": "none",
}
combined_headers = current_block.get(
"table_headers", []
) + next_block.get("table_headers", [])
if combined_headers:
merged_block["table_headers"] = combined_headers
new_result.append(merged_block)
merged = True
merged_count += 1
changed = True
i += 2
continue
# Try backward merge with previous (only same level in Phase A)
if not merged and can_merge_backward and len(new_result) > 0:
prev_block = new_result[-1]
prev_level = prev_block.get("level", 1)
prev_role = prev_block.get("table_chunk_role", "none")
prev_tokens = estimate_tokens(prev_block["content"])
prev_can_merge_forward = prev_role in ["none", "first"]
prev_below_ideal = prev_tokens < IDEAL_BLOCK_CONTENT_TOKENS
# Phase A: Only merge same-level blocks, and prev must be below IDEAL
if (
prev_level == current_level
and prev_can_merge_forward
and prev_below_ideal
):
merged_content = (
prev_block["content"]
+ "\n\n"
+ current_block["content"]
)
combined_tokens = estimate_tokens(merged_content)
if combined_tokens <= MAX_BLOCK_CONTENT_TOKENS:
merged_block = {
"uuid": prev_block["uuid"],
"uuid_end": current_block.get(
"uuid_end", current_block["uuid"]
),
"heading": prev_block["heading"],
"content": merged_content,
"type": "text",
"parent_headings": prev_block["parent_headings"],
"level": current_level,
"table_chunk_role": "none",
}
combined_headers = prev_block.get(
"table_headers", []
) + current_block.get("table_headers", [])
if combined_headers:
merged_block["table_headers"] = combined_headers
new_result[-1] = merged_block
merged = True
merged_count += 1
changed = True
i += 1
continue
# No merge happened, keep block
if not merged:
new_result.append(current_block)
i += 1
else:
# Current block is at or above IDEAL, or not current level
# Check for tail absorption: if remaining same-level blocks are small enough, absorb them all
if (
is_current_level
and current_tokens >= IDEAL_BLOCK_CONTENT_TOKENS
):
# Calculate total size of remaining same-level blocks
remaining_same_level_tokens = 0
remaining_end_idx = i + 1
for j in range(i + 1, len(result)):
next_block = result[j]
next_level = next_block.get("level", 1)
# Stop when we encounter a different level
if next_level != current_level:
break
# Check if this block can be absorbed (table_chunk_role constraints)
next_role = next_block.get("table_chunk_role", "none")
if next_role == "middle":
# Middle chunks cannot be absorbed - stop here
break
remaining_same_level_tokens += estimate_tokens(
next_block["content"]
)
remaining_end_idx = j + 1
# If remaining same-level blocks are small enough, absorb them all
if (
remaining_same_level_tokens > 0
and remaining_same_level_tokens < SMALL_TAIL_THRESHOLD
):
# Check if combined size doesn't exceed MAX
combined_tokens = (
current_tokens + remaining_same_level_tokens
)
if combined_tokens <= MAX_BLOCK_CONTENT_TOKENS:
# Absorb all remaining same-level blocks
absorbed_content = current_block["content"]
last_uuid_end = current_block.get(
"uuid_end", current_block["uuid"]
)
combined_headers = list(
current_block.get("table_headers", [])
)
for j in range(i + 1, remaining_end_idx):
next_block = result[j]
absorbed_content += "\n\n" + next_block["content"]
last_uuid_end = next_block.get(
"uuid_end", next_block["uuid"]
)
combined_headers.extend(
next_block.get("table_headers", [])
)
# Create merged block
merged_block = {
"uuid": current_block["uuid"],
"uuid_end": last_uuid_end,
"heading": current_block["heading"],
"content": absorbed_content,
"type": "text",
"parent_headings": current_block["parent_headings"],
"level": current_level,
"table_chunk_role": "none",
}
if combined_headers:
merged_block["table_headers"] = combined_headers
new_result.append(merged_block)
merged_count += remaining_end_idx - i - 1
changed = True
i = remaining_end_idx
if debug:
num_absorbed = remaining_end_idx - i - 1
print(
f" Tail absorption: block at IDEAL ({current_tokens} tokens) absorbed {num_absorbed} small tail blocks ({remaining_same_level_tokens} tokens)",
file=sys.stderr,
)
continue
# No tail absorption, keep block as-is
new_result.append(current_block)
i += 1
result = new_result
if debug and changed:
print(
f" Phase A iteration {iteration}: {merged_count} total merges",
file=sys.stderr,
)
# Phase B: Cross-level absorption (allow higher levels to absorb current level)
changed = True
iteration = 0
while changed:
iteration += 1
changed = False
i = 0
new_result = []
while i < len(result):
current_block = result[i]
current_tokens = estimate_tokens(current_block["content"])
block_level = current_block.get("level", 1)
current_role = current_block.get("table_chunk_role", "none")
# Only process blocks of current level that are below IDEAL
is_below_ideal = (
current_tokens < IDEAL_BLOCK_CONTENT_TOKENS and current_tokens > 0
)
is_current_level = block_level == current_level
if is_below_ideal and is_current_level:
merged = False
can_merge_forward = current_role in ["none", "first", "last"]
can_merge_backward = current_role in ["none", "last"]
# Try forward merge (current can absorb deeper levels)
if can_merge_forward and i + 1 < len(result):
next_block = result[i + 1]
next_level = next_block.get("level", 1)
next_role = next_block.get("table_chunk_role", "none")
next_can_merge_backward = next_role in ["none", "last"]
# Phase B: current level can absorb deeper levels (larger numbers)
if next_level > current_level and next_can_merge_backward:
merged_content = (
current_block["content"]
+ "\n\n"
+ next_block["content"]
)
combined_tokens = estimate_tokens(merged_content)
if combined_tokens <= MAX_BLOCK_CONTENT_TOKENS:
merged_block = {
"uuid": current_block["uuid"],
"uuid_end": next_block.get(
"uuid_end", next_block["uuid"]
),
"heading": current_block["heading"],
"content": merged_content,
"type": "text",
"parent_headings": current_block["parent_headings"],
"level": current_level,
"table_chunk_role": "none",
}
combined_headers = current_block.get(
"table_headers", []
) + next_block.get("table_headers", [])
if combined_headers:
merged_block["table_headers"] = combined_headers
new_result.append(merged_block)
merged = True
merged_count += 1
changed = True
i += 2
continue
# Try backward merge (higher level can absorb current)
if not merged and can_merge_backward and len(new_result) > 0:
prev_block = new_result[-1]
prev_level = prev_block.get("level", 1)
prev_role = prev_block.get("table_chunk_role", "none")
prev_tokens = estimate_tokens(prev_block["content"])
prev_can_merge_forward = prev_role in ["none", "first", "last"]
prev_below_ideal = prev_tokens < IDEAL_BLOCK_CONTENT_TOKENS
# Phase B: higher level (smaller number) can absorb current level
if (
prev_level < current_level
and prev_can_merge_forward
and prev_below_ideal
):
merged_content = (
prev_block["content"]
+ "\n\n"
+ current_block["content"]
)
combined_tokens = estimate_tokens(merged_content)
if combined_tokens <= MAX_BLOCK_CONTENT_TOKENS:
merged_block = {
"uuid": prev_block["uuid"],
"uuid_end": current_block.get(
"uuid_end", current_block["uuid"]
),
"heading": prev_block["heading"],
"content": merged_content,
"type": "text",
"parent_headings": prev_block["parent_headings"],
"level": prev_level,
"table_chunk_role": "none",
}
combined_headers = prev_block.get(
"table_headers", []
) + current_block.get("table_headers", [])
if combined_headers:
merged_block["table_headers"] = combined_headers
new_result[-1] = merged_block
merged = True
merged_count += 1
changed = True
i += 1
continue
if not merged:
new_result.append(current_block)
i += 1
else:
new_result.append(current_block)
i += 1
result = new_result
if debug and changed:
print(
f" Phase B iteration {iteration}: {merged_count} total merges",
file=sys.stderr,
)
if debug:
print(
f"[DEBUG] merge_small_blocks complete: {len(result)} blocks, {merged_count} total merges",
file=sys.stderr,
)
# Check for oversized blocks and print debug information
oversized_blocks = []
for idx, block in enumerate(result):
block_tokens = estimate_tokens(block["content"])
if block_tokens > 0: # MAX_BLOCK_CONTENT_TOKENS:
oversized_blocks.append(
{
"index": idx,
"heading": block.get("heading", "(no heading)"),
"level": block.get("level", "N/A"),
"tokens": block_tokens,
"has_table_header": bool(block.get("table_headers")),
"content_preview": block["content"][:200],
}
)
if oversized_blocks:
print(
f"\n[WARNING] Found {len(oversized_blocks)} oversized blocks after merging:",
file=sys.stderr,
)
for info in oversized_blocks:
print(
f" Block #{info['index']}: level={info['level']}, tokens={info['tokens']}, heading=\"{info['heading']}\"",
file=sys.stderr,
)
return result, merged_count
def split_long_block(
block_heading: str,
paragraphs: list,
parent_headings: list,
block_level: int,
debug: bool = False,
) -> list:
"""
Split a long text block into smaller blocks using anchor paragraphs.
Strategy (improved for balanced splitting):
1. Calculate target number of blocks based on IDEAL_BLOCK_CONTENT_TOKENS
2. Ensure minimum blocks needed to stay under MAX_BLOCK_CONTENT_TOKENS
3. Find all candidate anchor paragraphs (<= MAX_ANCHOR_CANDIDATE_LENGTH chars)
4. Select anchors closest to ideal split positions for balanced distribution
5. Create blocks using selected anchors as new headings
Important: Tables are NOT split by this function.
- Tables are already split at row boundaries by split_table() if needed (TABLE_MAX_TOKENS limit)
- Table paragraphs (is_table=True) are excluded from anchor candidate selection
- Table content remains intact and is not re-split into smaller table chunks
- If a block contains both text and table chunks exceeding the limit, only text
paragraphs are used as split points; table chunks stay complete
Args:
block_heading: Original heading text
paragraphs: List of dicts with 'text', 'para_id', and 'is_table' keys
parent_headings: Parent heading stack
block_level: Heading level of this block (1=Heading 1, 2=Heading 2, etc.)
debug: If True, output debug information when splitting occurs
Returns:
List of block dictionaries (may be split into multiple blocks), each with 'level' field
Exits:
sys.exit(1) if no suitable anchor found and content exceeds limit
"""
import math
# Check if this block starts with a split table chunk (has _chunk_heading metadata)
# If so, use that heading instead of block_heading
effective_heading = block_heading
if paragraphs and paragraphs[0].get("_chunk_heading"):
effective_heading = paragraphs[0]["_chunk_heading"]
# Calculate total content token count
total_content = "\n".join(p["text"] for p in paragraphs)
total_tokens = estimate_tokens(total_content)
if total_tokens <= MAX_BLOCK_CONTENT_TOKENS:
# Within limit, return as single block
# Use first paragraph's para_id as UUID
# For uuid_end: use para_id_end if last element is a table, otherwise para_id
last_para = paragraphs[-1] if paragraphs else {}
uuid_end = last_para.get("para_id_end") or last_para.get("para_id")
block = {
"uuid": paragraphs[0]["para_id"] if paragraphs else None,
"uuid_end": uuid_end,
"heading": effective_heading,
"content": total_content,
"type": "text",
"parent_headings": parent_headings,
"level": block_level, # Add level to block
}
# Collect per-table cross-page headers (aligned with
tag order)
table_headers = _collect_table_headers(paragraphs)
if table_headers:
block["table_headers"] = table_headers
return [block]
# Content exceeds limit, need to split
# Calculate target number of blocks based on IDEAL_BLOCK_CONTENT_TOKENS
target_blocks = math.ceil(total_tokens / IDEAL_BLOCK_CONTENT_TOKENS)
# Ensure we have enough blocks to stay under MAX_BLOCK_CONTENT_TOKENS
min_blocks_needed = math.ceil(total_tokens / MAX_BLOCK_CONTENT_TOKENS)
target_blocks = max(target_blocks, min_blocks_needed)
# Calculate ideal token size per block
target_size = total_tokens / target_blocks
# Find candidate anchors (short paragraphs, excluding tables and empty placeholders)
# Use character length for anchor candidate selection (UI/readability constraint)
candidates = []
cumulative_tokens = 0
for idx, para in enumerate(paragraphs):
if (
not para.get("is_table", False)
and 0 < len(para["text"]) <= MAX_ANCHOR_CANDIDATE_LENGTH
):
candidates.append(
{
"index": idx,
"text": para["text"],
"para_id": para["para_id"],
"position": cumulative_tokens,
}
)
cumulative_tokens += estimate_tokens(para["text"])
if not candidates:
# No suitable anchor found
preview = (
block_heading[:80] + "..." if len(block_heading) > 80 else block_heading
)
print_error(
"Cannot split long block (no suitable anchor paragraphs found)",
f"A text block is too long (~{total_tokens} tokens, max {MAX_BLOCK_CONTENT_TOKENS})\n"
f"but no paragraphs <= {MAX_ANCHOR_CANDIDATE_LENGTH} characters were found to use as split points.\n\n"
f'Location: Under heading "{preview}"\n'
f"Block size: ~{total_tokens} tokens ({len(total_content)} characters)\n"
f"Number of paragraphs: {len(paragraphs)}\n"
f"Calculated target blocks: {target_blocks}",
" 1. Open the document in Microsoft Word\n"
f' 2. Locate the section under heading "{preview}"\n'
f" 3. Add short headings or paragraph breaks (≤{MAX_ANCHOR_CANDIDATE_LENGTH} chars) to divide the content\n"
" 4. Re-upload it to LightRAG",
)
sys.exit(1)
# Select anchors for splitting (target_blocks - 1 split points needed)
selected_anchors = []
remaining_candidates = candidates.copy()
for i in range(1, target_blocks):
if not remaining_candidates:
break
# Calculate ideal position for this split (in tokens)
ideal_position = i * target_size
# Find candidate closest to ideal position
best_candidate = min(
remaining_candidates, key=lambda c: abs(c["position"] - ideal_position)
)
selected_anchors.append(best_candidate)
remaining_candidates.remove(best_candidate)
# Sort selected anchors by index to maintain document order
selected_anchors.sort(key=lambda a: a["index"])
# Create blocks using selected split points
result_blocks = []
prev_idx = 0
current_parent_headings = parent_headings
current_block_heading = block_heading
for anchor in selected_anchors:
split_idx = anchor["index"]
# Create block from prev_idx to split_idx (exclusive)
block_paragraphs = paragraphs[prev_idx:split_idx]
if block_paragraphs:
block_content = "\n".join(p["text"] for p in block_paragraphs)
# For uuid_end: use para_id_end if last element is a table, otherwise para_id
last_para = block_paragraphs[-1]
block_uuid_end = last_para.get("para_id_end") or last_para.get("para_id")
new_block = {
"uuid": block_paragraphs[0][
"para_id"
], # UUID from first paragraph in content
"uuid_end": block_uuid_end, # UUID_end from last paragraph (or table's last cell)
"heading": current_block_heading,
"content": block_content,
"type": "text",
"parent_headings": current_parent_headings,
"_paragraphs": block_paragraphs, # Keep original paragraphs for potential re-splitting
}
new_table_headers = _collect_table_headers(block_paragraphs)
if new_table_headers:
new_block["table_headers"] = new_table_headers
result_blocks.append(new_block)
# Validate anchor as new heading
validate_heading_length(anchor["text"], anchor["para_id"])
# Update for next block
current_block_heading = anchor["text"]
# Update parent headings: add previous heading only if not "Preface/Uncategorized"
if block_heading != "Preface/Uncategorized":
current_parent_headings = parent_headings + [block_heading]
prev_idx = (
split_idx # Don't skip anchor - it becomes first paragraph of next block
)
# Create final block with remaining paragraphs
final_paragraphs = paragraphs[prev_idx:]
if final_paragraphs:
final_content = "\n".join(p["text"] for p in final_paragraphs)
# For uuid_end: use para_id_end if last element is a table, otherwise para_id
last_final_para = final_paragraphs[-1]
final_uuid_end = last_final_para.get("para_id_end") or last_final_para.get(
"para_id"
)
final_block = {
"uuid": final_paragraphs[0][
"para_id"
], # UUID from first paragraph in content
"uuid_end": final_uuid_end, # UUID_end from last paragraph (or table's last cell)
"heading": current_block_heading,
"content": final_content,
"type": "text",
"parent_headings": current_parent_headings,
"_paragraphs": final_paragraphs, # Keep original paragraphs for potential re-splitting
}
final_table_headers = _collect_table_headers(final_paragraphs)
if final_table_headers:
final_block["table_headers"] = final_table_headers
result_blocks.append(final_block)
# Post-split validation: Check if any block still exceeds MAX_BLOCK_CONTENT_TOKENS
# If so, recursively split that block (handles sparse anchor scenarios)
validated_blocks = []
for block in result_blocks:
block_tokens = estimate_tokens(block["content"])
if block_tokens > MAX_BLOCK_CONTENT_TOKENS:
# This block is still too large - need to recursively split it
# Use the preserved paragraph structure
block_paragraphs = block.get("_paragraphs", [])
if not block_paragraphs:
# Fallback: shouldn't happen, but handle gracefully
preview = (
block["heading"][:80] + "..."
if len(block["heading"]) > 80
else block["heading"]
)
print_error(
"Cannot re-split oversized block (internal error)",
f"A block exceeded MAX_BLOCK_CONTENT_TOKENS but paragraph metadata was lost.\n\n"
f"Location: Under heading \"{preview}\"\n"
f"Block size: ~{block_tokens} tokens ({len(block['content'])} characters)",
"This is an internal error. Please report this issue.",
)
sys.exit(1)
# Recursively split this oversized block
# The recursive call will either find more anchors or raise an error
sub_blocks = split_long_block(
block["heading"],
block_paragraphs,
block["parent_headings"],
block_level,
debug,
)
validated_blocks.extend(sub_blocks)
else:
# Remove internal _paragraphs field before adding to final output
block.pop("_paragraphs", None)
validated_blocks.append(block)
# Add level to all blocks
for block in validated_blocks:
block["level"] = block_level
# Output debug information if enabled and split occurred
if debug and len(validated_blocks) > 1:
print(f'\n[DEBUG] Block split: "{block_heading}"', file=sys.stderr)
print(
f" Original size: ~{total_tokens} tokens ({len(total_content)} characters)",
file=sys.stderr,
)
block_tokens = [estimate_tokens(block["content"]) for block in validated_blocks]
print(
f" Final result: {len(validated_blocks)} blocks: ~{block_tokens} tokens",
file=sys.stderr,
)
return validated_blocks
def extract_para_id(para_element) -> str:
"""
Extract w14:paraId attribute from paragraph element.
Args:
para_element: lxml paragraph element
Returns:
8-character hex paraId, or ``None`` when the paragraph carries no
``w14:paraId`` attribute (legacy / non-Word docx authors). Callers
propagate the ``None`` upward — the LightRAG adapter counts these
and surfaces a single warning per document.
"""
return para_element.get(
"{http://schemas.microsoft.com/office/word/2010/wordml}paraId"
)
def parse_styles_outline_levels(docx_path: str) -> dict:
"""
Parse styles.xml to extract outlineLvl definitions for each style,
following style inheritance chain (basedOn).
Args:
docx_path: Path to DOCX file
Returns:
dict: styleId -> outlineLvl (0-8 for headings, 9 for body text)
"""
import zipfile
try:
from defusedxml import ElementTree as ET
except ImportError:
from xml.etree import ElementTree as ET
styles_outline = {} # styleId -> outlineLvl (directly defined)
style_based_on = {} # styleId -> parent styleId
try:
with zipfile.ZipFile(docx_path, "r") as zf:
if "word/styles.xml" not in zf.namelist():
return styles_outline
tree = ET.parse(zf.open("word/styles.xml"))
root = tree.getroot()
ns = "http://schemas.openxmlformats.org/wordprocessingml/2006/main"
# First pass: collect outlineLvl and basedOn for all styles
for style in root.findall(f".//{{{ns}}}style"):
style_id = style.get(f"{{{ns}}}styleId")
if not style_id:
continue
# Check for basedOn (style inheritance)
based_on = style.find(f"{{{ns}}}basedOn")
if based_on is not None:
parent_id = based_on.get(f"{{{ns}}}val")
if parent_id:
style_based_on[style_id] = parent_id
# Check for outlineLvl in style's pPr
pPr = style.find(f"{{{ns}}}pPr")
if pPr is not None:
outline_lvl_elem = pPr.find(f"{{{ns}}}outlineLvl")
if outline_lvl_elem is not None:
level = int(outline_lvl_elem.get(f"{{{ns}}}val"))
styles_outline[style_id] = level
# Second pass: resolve inheritance chain for styles without direct outlineLvl
def get_outline_level(style_id: str, visited: set = None) -> int:
if visited is None:
visited = set()
if style_id in visited:
return None # Prevent circular references
visited.add(style_id)
# If this style directly defines outlineLvl, return it
if style_id in styles_outline:
return styles_outline[style_id]
# Otherwise check parent style
if style_id in style_based_on:
parent_id = style_based_on[style_id]
return get_outline_level(parent_id, visited)
return None
# Fill in missing outlineLvl from inheritance chain
all_style_ids = set(styles_outline.keys()) | set(style_based_on.keys())
for style_id in all_style_ids:
if style_id not in styles_outline:
level = get_outline_level(style_id)
if level is not None:
styles_outline[style_id] = level
except Exception:
# Silently ignore parsing errors
pass
return styles_outline
def get_heading_level(para_element, styles_outline_map: dict) -> int:
"""
Get heading level from paragraph, checking both direct format and style.
Priority: paragraph outlineLvl > style outlineLvl
Args:
para_element: lxml paragraph element
styles_outline_map: dict of styleId -> outlineLvl from styles.xml
Returns:
int: 0-8 for heading levels (0=level 1, 1=level 2, etc.), None for non-heading
"""
# 1. Check paragraph direct format
pPr = para_element.find(
"{http://schemas.openxmlformats.org/wordprocessingml/2006/main}pPr"
)
if pPr is not None:
outline_elem = pPr.find(
"{http://schemas.openxmlformats.org/wordprocessingml/2006/main}outlineLvl"
)
if outline_elem is not None:
level = int(
outline_elem.get(
"{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val"
)
)
# Only 0-8 are true heading levels (9 is body text)
if level < 9:
return level
else:
return None # Level 9 is body text
# 2. Check style definition's outlineLvl
if pPr is not None:
pStyle_elem = pPr.find(
"{http://schemas.openxmlformats.org/wordprocessingml/2006/main}pStyle"
)
if pStyle_elem is not None:
style_id = pStyle_elem.get(
"{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val"
)
if style_id and style_id in styles_outline_map:
level = styles_outline_map[style_id]
if level < 9:
return level
else:
return None
return None
def extract_text_from_run(
run,
ns: dict,
drawing_context: DrawingExtractionContext = None,
) -> str:
"""
Extract text from a run element, preserving superscript/subscript with markup.
Converts Word formatting to HTML-like tags:
- Superscript: text
- Subscript: text
- Normal text: unchanged
Args:
run: lxml run element (w:r)
ns: XML namespace dictionary
Returns:
Text string with / markup for formatted portions
"""
text = ""
# Check for vertAlign in rPr (superscript/subscript)
vert_align = None
rPr = run.find("w:rPr", ns)
if rPr is not None:
vert_elem = rPr.find("w:vertAlign", ns)
if vert_elem is not None:
vert_align = vert_elem.get(
"{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val"
)
# Extract text content from run children
for child in run:
tag = child.tag.split("}")[-1] # Remove namespace
if tag == "t" and child.text:
text += child.text
elif tag == "tab":
text += "\t"
elif tag == "br":
# Handle line breaks - textWrapping or no type = soft line break
br_type = child.get(
"{http://schemas.openxmlformats.org/wordprocessingml/2006/main}type"
)
if br_type in (None, "textWrapping"):
text += "\n"
# Skip page and column breaks (layout elements)
elif tag == "drawing":
text += extract_drawing_placeholder_from_element(
child,
context=drawing_context,
include_extended_attrs=True,
)
elif tag in ("pict", "object"):
text += extract_vml_image_placeholder_from_element(
child,
context=drawing_context,
include_extended_attrs=True,
)
# Apply superscript/subscript markup if needed
if text and vert_align == "superscript":
return f"{text}"
elif text and vert_align == "subscript":
return f"{text}"
return text
def extract_paragraph_content(
element,
ns,
drawing_context: DrawingExtractionContext = None,
) -> str:
"""
Extract text and equations from a paragraph element in document order.
Handles w:r (text runs), m:oMath (inline equations), and m:oMathPara
(block equations). Recurses into container elements (e.g., w:hyperlink,
w:ins, w:sdt, w:fldSimple, w:smartTag) to avoid dropping content.
Args:
element: lxml paragraph element (w:p)
ns: XML namespace dictionary
Returns:
Text string with equations wrapped in tags
"""
parts = []
def append_from(node) -> None:
tag = node.tag.split("}")[-1]
# Drop tracked-change deletions (w:del/w:moveFrom) and comment markers
# (w:commentRangeStart/End, w:commentReference, w:annotationRef) so the
# output only contains the final revised text without annotation glyphs.
if tag in _SKIP_PARAGRAPH_TAGS:
return
if tag == "r":
parts.append(
extract_text_from_run(node, ns, drawing_context=drawing_context)
)
return
if tag == "oMath":
from .omml import convert_omml_to_latex
latex = convert_omml_to_latex(node)
if latex:
parts.append(f"{latex}")
return
if tag == "oMathPara":
from .omml import convert_omml_to_latex
for omath in node:
if omath.tag.split("}")[-1] == "oMath":
latex = convert_omml_to_latex(omath)
if latex:
parts.append(f"{latex}")
return
for child in node:
append_from(child)
for child in element:
append_from(child)
return "".join(parts)
def _is_table_empty(rows: list) -> bool:
"""Return True iff every cell in ``rows`` is whitespace-only."""
return all(not (cell or "").strip() for row in rows for cell in row)
def _collect_table_headers(paragraphs: list) -> list:
"""Collect per-table cross-page header rows from ``is_table`` paragraphs.
The returned list is aligned 1:1 with the order of ```` placeholder
tags emitted into the block's content; entries are either the list of
header rows captured from ``w:tblHeader`` or ``None`` when the table has
no cross-page repeating header.
"""
return [p.get("_table_header") for p in paragraphs if p.get("is_table")]
def _build_unsplit_block(
heading: str, paragraphs: list, parent_headings: list, level: int
) -> dict:
"""Build a single block from paragraphs without size-based splitting."""
last_para = paragraphs[-1]
block = {
"uuid": paragraphs[0]["para_id"],
"uuid_end": last_para.get("para_id_end") or last_para.get("para_id"),
"heading": heading,
"content": "\n".join(p["text"] for p in paragraphs),
"type": "text",
"parent_headings": parent_headings,
"level": level,
}
table_headers = _collect_table_headers(paragraphs)
if table_headers:
block["table_headers"] = table_headers
return block
def _flush_current_block(
blocks: list,
heading: str,
paragraphs: list,
parent_headings: list,
level: int,
fixlevel: int,
debug: bool,
) -> None:
"""
Flush accumulated paragraphs into blocks, respecting fixlevel mode.
In default mode (fixlevel is None), runs split_long_block for token-based splitting.
In fixlevel mode, emits a single unsplit block and warns when size exceeds the limit.
"""
if not paragraphs:
return
if fixlevel is None:
blocks.extend(
split_long_block(heading, paragraphs, parent_headings, level, debug)
)
return
block = _build_unsplit_block(heading, paragraphs, parent_headings, level)
block_tokens = estimate_tokens(block["content"])
if block_tokens > MAX_BLOCK_CONTENT_TOKENS:
preview = heading[:80] + "..." if len(heading) > 80 else heading
print(
f"Warning: fixlevel block exceeds {MAX_BLOCK_CONTENT_TOKENS} tokens "
f'(~{block_tokens} tokens) under heading "{preview}". '
f"Consider increasing --fixlevel=N or removing --fixlevel for automatic splitting.",
file=sys.stderr,
)
blocks.append(block)
def extract_docx_blocks(
file_path: str,
debug: bool = False,
fixlevel: int = None,
drawing_context: DrawingExtractionContext = None,
parse_warnings: dict | None = None,
parse_metadata: dict | None = None,
) -> list:
"""
Extract text blocks (chunks) from a DOCX file for chunking later.
Uses python-docx with custom numbering resolver to:
1. Capture automatic numbering (list labels)
2. Split document by headings
3. Convert tables to JSON (2D array)
4. Validate heading lengths and table sizes
5. Split long blocks using anchor paragraphs
6. Preserve superscript/subscript formatting with / markup
Args:
file_path: Path to the DOCX file
debug: If True, output debug information when splitting blocks
fixlevel: If specified, disable smart splitting/merging and only split at heading levels <= fixlevel
(0 = split at all heading levels, 1 = Heading 1 only, 2 = Heading 1-2, etc.)
parse_warnings: Optional out-dict that this function mutates with
non-fatal warnings observed during parsing. Currently used for
``missing_paraid_count`` — incremented once per body-level
paragraph (heading or text) that lacks a ``w14:paraId`` and once
per table whose every cell lacks one. Callers (the LightRAG
adapter / debug CLI) read this to surface a one-line warning per
document instead of crashing.
parse_metadata: Optional out-dict that this function mutates with
document-level metadata derived during parsing. Currently used
for ``first_heading`` — the text of the first heading encountered
in document order (regardless of level). Used by the LightRAG
adapter to populate ``meta.doc_title`` in ``.blocks.jsonl``.
Returns:
List of block dictionaries with heading, content, type, and metadata
"""
doc = Document(file_path)
resolver = NumberingResolver(file_path)
styles_outline = parse_styles_outline_levels(file_path)
blocks = []
current_heading = "Preface/Uncategorized"
current_heading_level = 1 # Default level for "Preface/Uncategorized"
current_heading_stack = {} # {level: heading_text} - Use dict to correctly track heading hierarchy
current_parent_headings = [] # Parent headings for current block
current_paragraphs = [] # Track paragraphs with metadata for splitting
has_body_content = (
False # Track if current block has body content (non-heading paragraphs/tables)
)
matched_fixlevel_heading = False # Track whether --fixlevel matched any heading
table_split_counter = (
0 # Track cumulative table split suffix numbers within current block
)
first_heading_recorded = (
False # Track whether the document's first heading has been captured
)
# Iterate through document body elements (paragraphs and tables)
body = doc._element.body
for element in body:
tag = element.tag.split("}")[-1] # Remove namespace
if tag == "sectPr": # Document-level section break
resolver.reset_tracking_state()
continue
if tag == "p": # Paragraph
# Get paragraph text with superscript/subscript markup and equations
para_text = ""
ns = {
"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main",
"wp": "http://schemas.openxmlformats.org/drawingml/2006/wordprocessingDrawing",
"m": "http://schemas.openxmlformats.org/officeDocument/2006/math",
}
para_text = extract_paragraph_content(
element,
ns,
drawing_context=drawing_context,
)
para_text = para_text.strip()
if not para_text:
continue
# Get numbering label using our resolver
label = resolver.get_label(element)
full_text = f"{label} {para_text}".strip() if label else para_text
# Check if this is a heading using the new function
outline_level = get_heading_level(element, styles_outline)
if outline_level is not None:
# This is a heading (outline level 0-8)
# Convert 0-based to 1-based level
level = outline_level + 1
# In fixlevel mode, check if this heading should trigger a block split
should_split = True
if fixlevel is not None and fixlevel > 0:
# If fixlevel is specified and > 0, only split at levels <= fixlevel
should_split = level <= fixlevel
# Extract paraId for this heading
heading_para_id = extract_para_id(element)
if parse_warnings is not None and not heading_para_id:
parse_warnings["missing_paraid_count"] = (
parse_warnings.get("missing_paraid_count", 0) + 1
)
# Validate heading length
validate_heading_length(full_text, heading_para_id)
# Truncate heading if needed before storing
truncated_text = truncate_heading(full_text, heading_para_id)
# Record the document's first heading (any level) for meta.doc_title.
if not first_heading_recorded:
if parse_metadata is not None:
parse_metadata["first_heading"] = truncated_text
first_heading_recorded = True
if should_split:
if fixlevel is not None and fixlevel > 0:
matched_fixlevel_heading = True
# This heading triggers a block split
# Only save previous block if it has body content
if has_body_content and current_paragraphs:
_flush_current_block(
blocks,
current_heading,
current_paragraphs,
current_parent_headings,
current_heading_level,
fixlevel,
debug,
)
# Reset for new block
current_paragraphs = []
has_body_content = False
table_split_counter = (
0 # Reset table split counter for new heading
)
# Add heading to current_paragraphs
current_paragraphs.append(
{
"text": truncated_text,
"para_id": heading_para_id,
"is_table": False,
}
)
# Update current_heading and parent_headings for the FIRST heading in a block
# (when current_paragraphs just had this heading added as its first element)
if len(current_paragraphs) == 1:
current_heading = truncated_text
current_heading_level = (
level # Only set level when setting heading
)
# Parent headings = all headings from levels strictly less than current level
# Sort by level to maintain hierarchy order
current_parent_headings = [
current_heading_stack[lvl]
for lvl in sorted(current_heading_stack.keys())
if lvl < level
]
# Update heading stack: remove current level and all lower levels, then add current
current_heading_stack = {
k: v for k, v in current_heading_stack.items() if k < level
}
current_heading_stack[level] = truncated_text
else:
# This heading doesn't trigger split - treat as regular paragraph
para_id = heading_para_id
# Store as regular paragraph with metadata
current_paragraphs.append(
{"text": truncated_text, "para_id": para_id, "is_table": False}
)
# Mark that we have body content
has_body_content = True
else:
# Regular paragraph content
para_id = extract_para_id(element)
if parse_warnings is not None and not para_id:
parse_warnings["missing_paraid_count"] = (
parse_warnings.get("missing_paraid_count", 0) + 1
)
# Store paragraph with metadata for potential splitting
current_paragraphs.append(
{"text": full_text, "para_id": para_id, "is_table": False}
)
# Mark that we have body content
has_body_content = True
# Check for paragraph-level section break (after processing paragraph)
# sectPr in pPr means this paragraph ends a section
pPr = element.find(
"{http://schemas.openxmlformats.org/wordprocessingml/2006/main}pPr"
)
if pPr is not None:
sectPr = pPr.find(
"{http://schemas.openxmlformats.org/wordprocessingml/2006/main}sectPr"
)
if sectPr is not None:
# Section break after this paragraph - reset tracking
resolver.reset_tracking_state()
elif tag == "tbl": # Table
# Reset numbering tracking before table (table start boundary)
resolver.reset_tracking_state()
# Directly create Table object from XML element to avoid index mismatch
# (doc.tables may have different order due to nested tables)
from docx.table import Table
table = Table(element, doc)
table_metadata = TableExtractor.extract_with_metadata(
table,
numbering_resolver=resolver,
drawing_context=drawing_context,
)
table_rows = table_metadata["rows"]
para_ids = table_metadata["para_ids"]
para_ids_end = table_metadata["para_ids_end"] # Last paraId in each cell
header_indices = table_metadata["header_indices"]
# Skip tables whose every cell is whitespace-only — otherwise an
# empty `` placeholder would leak into block
# content and a useless IRTable would appear in tables.json.
if _is_table_empty(table_rows):
resolver.reset_tracking_state()
continue
# Count tables whose cells carry no w14:paraId. Legacy / non-Word
# docx authors omit these attributes; we no longer fail-fast, but
# the adapter surfaces a single warning so the user knows the edit
# range hints will be missing for these tables.
if parse_warnings is not None and not _table_has_any_paraid(para_ids):
parse_warnings["missing_paraid_count"] = (
parse_warnings.get("missing_paraid_count", 0) + 1
)
# Convert table to JSON and estimate token count
table_json = json.dumps(table_rows, ensure_ascii=False)
table_tokens = estimate_tokens(table_json)
# Extract cross-page repeating header rows (w:tblHeader) once per
# table so both split and unsplit branches can surface them to the
# sidecar via the block-level ``table_headers`` list.
header_rows = []
if header_indices:
header_rows = [
table_rows[idx] for idx in header_indices if idx < len(table_rows)
]
header_rows_or_none = header_rows if header_rows else None
# Check if table needs splitting (disabled in fixlevel mode)
if fixlevel is None and table_tokens > TABLE_MAX_TOKENS:
# Table exceeds limit - split it
# Pass table_split_counter to ensure sequential numbering across multiple tables
table_chunks = split_table_with_heading(
table_rows,
para_ids,
para_ids_end,
header_indices,
current_heading,
table_split_counter,
debug,
)
for chunk_idx, chunk in enumerate(table_chunks):
chunk_json = json.dumps(chunk["rows"], ensure_ascii=False)
# Get uuid_end from last valid paraId in chunk (use para_ids_end for last cell's last paragraph)
chunk_para_id_end = find_last_valid_para_id(chunk["para_ids_end"])
if chunk["is_first"]:
# First chunk: add to current_paragraphs (will merge with preceding content)
current_paragraphs.append(
{
"text": f"",
"para_id": chunk["uuid"],
"para_id_end": chunk_para_id_end, # Store end paraId for uuid_end calculation
"is_table": True,
"_table_header": header_rows_or_none,
}
)
has_body_content = True
else:
# Middle or last chunk: save current block first
if current_paragraphs:
_flush_current_block(
blocks,
current_heading,
current_paragraphs,
current_parent_headings,
current_heading_level,
fixlevel,
debug,
)
current_paragraphs = []
has_body_content = False
# Generate heading using suffix_number from chunk
if chunk["suffix_number"] is not None:
chunk_heading = f"{current_heading} [{TABLE_CHUNK_SUFFIX_LABEL}{chunk['suffix_number']}]"
else:
chunk_heading = current_heading
# Build block for this table chunk
# Get uuid_end from last valid paraId in chunk (use para_ids_end for last cell's last paragraph)
chunk_uuid_end = find_last_valid_para_id(chunk["para_ids_end"])
# Determine table_chunk_role based on chunk position
if chunk["is_first"] and chunk["is_last"]:
table_chunk_role = "none" # Not split
elif chunk["is_first"]:
table_chunk_role = "first"
elif chunk["is_last"]:
table_chunk_role = "last"
else:
table_chunk_role = "middle"
chunk_block = {
"uuid": chunk["uuid"],
"uuid_end": chunk_uuid_end,
"heading": chunk_heading,
"content": f"",
"type": "text",
"parent_headings": current_parent_headings,
"level": current_heading_level,
"table_chunk_role": table_chunk_role,
}
# Always emit a per-table headers list (aligned with the
# single placeholder in this standalone block);
# the entry is None when the table has no cross-page
# repeating header so downstream counters stay aligned.
chunk_block["table_headers"] = [header_rows_or_none]
if chunk["is_last"]:
# Last chunk: add to current_paragraphs for merging with following content
current_paragraphs.append(
{
"text": f"",
"para_id": chunk["uuid"],
"para_id_end": chunk_para_id_end, # Store end paraId for uuid_end calculation
"is_table": True,
"_chunk_heading": chunk_heading,
"_table_header": header_rows_or_none,
}
)
has_body_content = True
else:
# Middle chunk: output immediately as standalone block
blocks.append(chunk_block)
# Update table_split_counter: add number of non-first chunks
# (first chunk doesn't get a suffix, so we count from second chunk onwards)
table_split_counter += len(table_chunks) - 1
else:
# Table is within size limit - no splitting needed
# Store table as a paragraph with special marker
# Use first valid paraId from table, and last valid paraId (from para_ids_end) for uuid_end
table_para_id = find_first_valid_para_id(para_ids)
table_para_id_end = find_last_valid_para_id(para_ids_end)
current_paragraphs.append(
{
"text": f"",
"para_id": table_para_id,
"para_id_end": table_para_id_end, # Store end paraId for uuid_end calculation
"is_table": True,
"_table_header": header_rows_or_none,
}
)
# Mark that we have body content
has_body_content = True
# Reset numbering tracking after table (table end boundary)
resolver.reset_tracking_state()
# Save final block (respecting fixlevel mode)
_flush_current_block(
blocks,
current_heading,
current_paragraphs,
current_parent_headings,
current_heading_level,
fixlevel,
debug,
)
# Add table_chunk_role="none" to all blocks that don't have it (non-table or unsplit table blocks)
for block in blocks:
if "table_chunk_role" not in block:
block["table_chunk_role"] = "none"
# Perform small block merging (unified merging after all splits)
# Disabled in fixlevel mode
if fixlevel is None:
if debug:
print(f"\n[DEBUG] Before merging: {len(blocks)} blocks", file=sys.stderr)
merged_blocks, merge_count = merge_small_blocks(blocks, debug)
if debug and merge_count > 0:
print(
f"[DEBUG] After merging: {len(merged_blocks)} blocks ({merge_count} merges performed)",
file=sys.stderr,
)
return merged_blocks
# Fixed level mode: skip merging, but warn if no heading matched the requested level
if fixlevel > 0 and not matched_fixlevel_heading:
print(
f"Warning: --fixlevel={fixlevel} produced {len(blocks)} block(s). "
f"Document may not have heading levels <= {fixlevel}. "
f"Try a higher --fixlevel value or remove the flag.",
file=sys.stderr,
)
return blocks