file_service.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611
  1. """文件处理服务"""
  2. import aiofiles
  3. import os
  4. import time
  5. import gc
  6. import io
  7. import logging
  8. from datetime import datetime
  9. from typing import Optional, List, Dict, Tuple
  10. import PyPDF2
  11. import docx
  12. from fastapi import UploadFile
  13. import aiohttp
  14. import asyncio
  15. from ..config import settings
  16. logger = logging.getLogger(__name__)
  17. # 新增的第三方库
  18. try:
  19. import pdfplumber
  20. import fitz # PyMuPDF
  21. from docx2python import docx2python
  22. from PIL import Image
  23. HAS_ADVANCED_LIBS = True
  24. except ImportError as e:
  25. HAS_ADVANCED_LIBS = False
  26. logger.warning("高级文档处理库未安装: %s", e)
  27. class FileService:
  28. """文件处理服务"""
  29. ALLOWED_DOCUMENT_TYPES = {
  30. "application/pdf",
  31. "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
  32. }
  33. # 图片上传配置
  34. IMAGE_UPLOAD_URL = "https://mt.agnet.top/image/upload"
  35. IMAGE_UPLOAD_TIMEOUT = 30 # 超时时间(秒)
  36. @staticmethod
  37. def is_supported_document(content_type: str | None) -> bool:
  38. """判断上传文件类型是否受支持。"""
  39. return bool(content_type and content_type in FileService.ALLOWED_DOCUMENT_TYPES)
  40. @staticmethod
  41. async def upload_image_to_server(image_data: bytes, filename: str) -> Optional[str]:
  42. """上传图片到外部服务器"""
  43. try:
  44. # 准备multipart/form-data格式的数据
  45. form_data = aiohttp.FormData()
  46. form_data.add_field(
  47. "file",
  48. io.BytesIO(image_data),
  49. filename=filename,
  50. content_type="image/jpeg",
  51. )
  52. timeout = aiohttp.ClientTimeout(total=FileService.IMAGE_UPLOAD_TIMEOUT)
  53. async with aiohttp.ClientSession(timeout=timeout) as session:
  54. async with session.post(
  55. FileService.IMAGE_UPLOAD_URL, data=form_data
  56. ) as response:
  57. if response.status == 200:
  58. result = await response.json()
  59. # 根据实际API返回格式获取图片URL
  60. return result.get("file_url")
  61. else:
  62. logger.warning("图片上传失败,状态码: %s", response.status)
  63. return None
  64. except Exception as e:
  65. logger.warning("图片上传异常: %s", e)
  66. return None
  67. @staticmethod
  68. def extract_images_from_pdf(file_path: str) -> List[Tuple[bytes, str, int, int]]:
  69. """从PDF提取图片,返回 (图片数据, 扩展名, 页码, 图片索引) 列表"""
  70. if not HAS_ADVANCED_LIBS:
  71. return []
  72. images = []
  73. try:
  74. doc = fitz.open(file_path)
  75. for page_num in range(doc.page_count):
  76. page = doc[page_num]
  77. image_list = page.get_images(full=True)
  78. for img_index, img in enumerate(image_list):
  79. try:
  80. # 获取图片数据
  81. xref = img[0]
  82. pix = fitz.Pixmap(doc, xref)
  83. # 转换为RGB格式(如果是CMYK)
  84. if pix.n - pix.alpha < 4:
  85. img_data = pix.tobytes("jpeg")
  86. ext = "jpg"
  87. else:
  88. pix1 = fitz.Pixmap(fitz.csRGB, pix)
  89. img_data = pix1.tobytes("jpeg")
  90. ext = "jpg"
  91. pix1 = None
  92. pix = None
  93. images.append((img_data, ext, page_num + 1, img_index + 1))
  94. except Exception as e:
  95. logger.warning(
  96. "提取PDF第%s页图片%s失败: %s",
  97. page_num + 1,
  98. img_index + 1,
  99. e,
  100. )
  101. continue
  102. doc.close()
  103. return images
  104. except Exception as e:
  105. logger.warning("PDF图片提取失败: %s", e)
  106. return []
  107. @staticmethod
  108. def extract_images_from_docx(file_path: str) -> List[Tuple[bytes, str, int]]:
  109. """从Word文档提取图片,返回 (图片数据, 扩展名, 图片索引) 列表"""
  110. images = []
  111. doc = None
  112. try:
  113. doc = docx.Document(file_path)
  114. # 获取文档中的所有关系
  115. rels = doc.part.rels
  116. img_index = 0
  117. for rel in rels.values():
  118. if "image" in rel.target_ref:
  119. try:
  120. # 读取图片数据
  121. img_data = rel.target_part.blob
  122. # 根据content_type确定扩展名
  123. content_type = rel.target_part.content_type
  124. if "jpeg" in content_type:
  125. ext = "jpg"
  126. elif "png" in content_type:
  127. ext = "png"
  128. elif "gif" in content_type:
  129. ext = "gif"
  130. elif "bmp" in content_type:
  131. ext = "bmp"
  132. else:
  133. ext = "jpg" # 默认
  134. img_index += 1
  135. images.append((img_data, ext, img_index))
  136. except Exception as e:
  137. logger.warning("提取Word文档图片%s失败: %s", img_index + 1, e)
  138. continue
  139. if doc:
  140. del doc
  141. gc.collect()
  142. return images
  143. except Exception as e:
  144. if doc:
  145. del doc
  146. gc.collect()
  147. logger.warning("Word文档图片提取失败: %s", e)
  148. return []
  149. @staticmethod
  150. def _safe_file_cleanup(file_path: str, max_retries: int = 3) -> bool:
  151. """安全删除文件,带重试机制"""
  152. for attempt in range(max_retries):
  153. try:
  154. if os.path.exists(file_path):
  155. # 强制垃圾回收,释放可能的文件句柄
  156. gc.collect()
  157. time.sleep(0.1 * (attempt + 1)) # 递增延迟
  158. os.remove(file_path)
  159. return True
  160. except OSError as e:
  161. if attempt == max_retries - 1:
  162. logger.warning("无法删除文件 %s: %s", file_path, e)
  163. return False
  164. time.sleep(0.5) # 等待后重试
  165. return True
  166. @staticmethod
  167. async def save_uploaded_file(file: UploadFile) -> str:
  168. """保存上传的文件并返回文件路径"""
  169. # 创建上传目录
  170. os.makedirs(settings.upload_dir, exist_ok=True)
  171. # 生成带时间戳的文件名,防止重复
  172. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3] # 精确到毫秒
  173. filename = file.filename or "unknown_file"
  174. # 分离文件名和扩展名
  175. name, ext = os.path.splitext(filename)
  176. # 生成新的文件名:原文件名_时间戳.扩展名
  177. new_filename = f"{name}_{timestamp}{ext}"
  178. file_path = os.path.join(settings.upload_dir, new_filename)
  179. # 异步保存文件
  180. async with aiofiles.open(file_path, "wb") as f:
  181. content = await file.read()
  182. await f.write(content)
  183. return file_path
  184. @staticmethod
  185. async def extract_text_from_pdf(file_path: str) -> str:
  186. """从PDF文件提取文本,支持表格内容和图片"""
  187. if HAS_ADVANCED_LIBS:
  188. return await FileService._extract_pdf_with_pdfplumber(file_path)
  189. else:
  190. # 降级到原来的PyPDF2方法
  191. return FileService._extract_pdf_with_pypdf2(file_path)
  192. @staticmethod
  193. async def _extract_pdf_with_pdfplumber(file_path: str) -> str:
  194. """使用pdfplumber提取PDF文本,包含表格和图片(确保及时释放文件句柄)"""
  195. try:
  196. extracted_text = []
  197. image_references = [] # 存储图片引用映射
  198. global_img_counter = 1
  199. # 获取PDF文档的所有图片信息,用于后续匹配
  200. all_images = FileService.extract_images_from_pdf(file_path)
  201. page_images_map = {}
  202. for img_data, ext, page_num, img_index in all_images:
  203. if page_num not in page_images_map:
  204. page_images_map[page_num] = []
  205. page_images_map[page_num].append((img_data, ext, img_index))
  206. # 使用上下文管理器,避免在Windows上产生文件锁
  207. with pdfplumber.open(file_path) as pdf:
  208. for page_num, page in enumerate(pdf.pages, 1):
  209. # 添加页码标识
  210. extracted_text.append(f"\n--- 第 {page_num} 页 ---\n")
  211. # 提取普通文本
  212. text = page.extract_text()
  213. if text:
  214. # 检查文本中是否有图片标记
  215. import re
  216. img_pattern = r"----.*?(?:image|img|media).*?----"
  217. img_matches = list(
  218. re.finditer(img_pattern, text, re.IGNORECASE)
  219. )
  220. if img_matches and page_num in page_images_map:
  221. # 按顺序处理页面中的图片
  222. page_images = page_images_map[page_num]
  223. processed_text = text
  224. for i, match in enumerate(img_matches):
  225. if i < len(page_images):
  226. # 获取对应的图片数据
  227. img_data, ext, img_index = page_images[i]
  228. filename = (
  229. f"pdf_page{page_num}_img{img_index}.{ext}"
  230. )
  231. # 上传图片
  232. image_url = (
  233. await FileService.upload_image_to_server(
  234. img_data, filename
  235. )
  236. )
  237. if image_url:
  238. # 替换图片标记
  239. old_mark = match.group()
  240. new_mark = f"[图片{global_img_counter}]"
  241. processed_text = processed_text.replace(
  242. old_mark, new_mark, 1
  243. )
  244. # 记录图片引用
  245. image_references.append(
  246. f"[图片{global_img_counter}]: {image_url}"
  247. )
  248. global_img_counter += 1
  249. extracted_text.append(processed_text)
  250. else:
  251. extracted_text.append(text)
  252. # 提取表格
  253. tables = page.extract_tables()
  254. for table_num, table in enumerate(tables, 1):
  255. extracted_text.append(f"\n[表格 {table_num}]")
  256. for row in table:
  257. if row: # 跳过空行
  258. # 过滤空值并连接单元格
  259. row_text = " | ".join(
  260. [str(cell) if cell else "" for cell in row]
  261. )
  262. extracted_text.append(row_text)
  263. extracted_text.append("[表格结束]\n")
  264. # 在文档末尾添加图片引用映射
  265. if image_references:
  266. extracted_text.append(f"\n\n--- 图片引用 ---")
  267. extracted_text.extend(image_references)
  268. result = "\n".join(extracted_text).strip()
  269. gc.collect()
  270. return result
  271. except Exception as e:
  272. gc.collect()
  273. # 如果pdfplumber失败,尝试PyMuPDF
  274. try:
  275. return await FileService._extract_pdf_with_pymupdf(file_path)
  276. except Exception:
  277. raise Exception(f"PDF文件读取失败: {str(e)}")
  278. @staticmethod
  279. async def _extract_pdf_with_pymupdf(file_path: str) -> str:
  280. """使用PyMuPDF提取PDF文本和图片"""
  281. try:
  282. doc = fitz.open(file_path)
  283. extracted_text = []
  284. for page_num in range(doc.page_count):
  285. page = doc[page_num]
  286. extracted_text.append(f"\n--- 第 {page_num + 1} 页 ---\n")
  287. # 提取文本
  288. text = page.get_text()
  289. if text:
  290. extracted_text.append(text)
  291. # 尝试提取表格
  292. try:
  293. tables = page.find_tables()
  294. for table_num, table in enumerate(tables, 1):
  295. extracted_text.append(f"\n[表格 {table_num}]")
  296. table_data = table.extract()
  297. for row in table_data:
  298. if row:
  299. row_text = " | ".join(
  300. [str(cell) if cell else "" for cell in row]
  301. )
  302. extracted_text.append(row_text)
  303. extracted_text.append("[表格结束]\n")
  304. except:
  305. # 如果表格提取失败,跳过
  306. pass
  307. doc.close()
  308. return "\n".join(extracted_text).strip()
  309. except Exception as e:
  310. raise Exception(f"PDF文件读取失败: {str(e)}")
  311. @staticmethod
  312. def _extract_pdf_with_pypdf2(file_path: str) -> str:
  313. """使用PyPDF2提取PDF文本(原方法)"""
  314. try:
  315. with open(file_path, "rb") as file:
  316. pdf_reader = PyPDF2.PdfReader(file)
  317. text = ""
  318. for page in pdf_reader.pages:
  319. text += page.extract_text() + "\n"
  320. return text.strip()
  321. except Exception as e:
  322. raise Exception(f"PDF文件读取失败: {str(e)}")
  323. @staticmethod
  324. async def extract_text_from_docx(file_path: str) -> str:
  325. """从Word文档提取文本,支持表格内容和图片"""
  326. if HAS_ADVANCED_LIBS:
  327. return await FileService._extract_docx_with_docx2python(file_path)
  328. else:
  329. # 降级到原来的python-docx方法,但增强表格处理
  330. return await FileService._extract_docx_with_python_docx(file_path)
  331. @staticmethod
  332. async def _extract_docx_with_docx2python(file_path: str) -> str:
  333. """使用docx2python提取Word文档内容和图片(确保及时释放文件句柄)"""
  334. try:
  335. extracted_text = []
  336. image_references = [] # 存储图片引用映射
  337. global_img_counter = 1
  338. # 获取Word文档的所有图片信息
  339. all_images = FileService.extract_images_from_docx(file_path)
  340. # 使用上下文管理器确保文件及时关闭,避免Windows上的锁定
  341. with docx2python(file_path) as content:
  342. # 处理文档内容
  343. if hasattr(content, "document"):
  344. for section in content.document:
  345. for element in section:
  346. if isinstance(element, list):
  347. # 这可能是表格
  348. extracted_text.append("\n[表格内容]")
  349. for row in element:
  350. if isinstance(row, list):
  351. row_text = " | ".join(
  352. [str(cell).strip() for cell in row if cell]
  353. )
  354. if row_text:
  355. extracted_text.append(row_text)
  356. else:
  357. extracted_text.append(str(row))
  358. extracted_text.append("[表格结束]\n")
  359. else:
  360. # 普通文本,检查是否包含图片标记
  361. text = str(element).strip()
  362. if text:
  363. # 检查文本中是否有图片标记
  364. import re
  365. img_pattern = r"----.*?(?:image|img|media).*?----"
  366. img_matches = list(
  367. re.finditer(img_pattern, text, re.IGNORECASE)
  368. )
  369. if img_matches and all_images:
  370. processed_text = text
  371. for match in img_matches:
  372. if global_img_counter <= len(all_images):
  373. # 获取对应的图片数据
  374. img_data, ext, img_index = all_images[
  375. global_img_counter - 1
  376. ]
  377. filename = f"docx_img{global_img_counter}.{ext}"
  378. # 上传图片
  379. image_url = await FileService.upload_image_to_server(
  380. img_data, filename
  381. )
  382. if image_url:
  383. # 替换图片标记
  384. old_mark = match.group()
  385. new_mark = (
  386. f"[图片{global_img_counter}]"
  387. )
  388. processed_text = (
  389. processed_text.replace(
  390. old_mark, new_mark, 1
  391. )
  392. )
  393. # 记录图片引用
  394. image_references.append(
  395. f"[图片{global_img_counter}]: {image_url}"
  396. )
  397. global_img_counter += 1
  398. extracted_text.append(processed_text)
  399. else:
  400. extracted_text.append(text)
  401. # 在文档末尾添加图片引用映射
  402. if image_references:
  403. extracted_text.append(f"\n\n--- 图片引用 ---")
  404. extracted_text.extend(image_references)
  405. result = "\n".join(extracted_text).strip()
  406. gc.collect()
  407. return result
  408. except Exception as e:
  409. gc.collect()
  410. # 如果docx2python失败,回退到增强的python-docx
  411. try:
  412. return await FileService._extract_docx_with_python_docx(file_path)
  413. except Exception:
  414. raise Exception(f"Word文档读取失败: {str(e)}")
  415. @staticmethod
  416. async def _extract_docx_with_python_docx(file_path: str) -> str:
  417. """使用python-docx提取Word文档内容和图片(增强版)"""
  418. doc = None
  419. try:
  420. doc = docx.Document(file_path)
  421. extracted_text = []
  422. image_references = [] # 存储图片引用映射
  423. global_img_counter = 1
  424. # 获取Word文档的所有图片信息
  425. all_images = FileService.extract_images_from_docx(file_path)
  426. # 提取段落文本,同时处理图片
  427. for paragraph in doc.paragraphs:
  428. text = paragraph.text.strip()
  429. if text:
  430. # 检查文本中是否有图片标记
  431. import re
  432. img_pattern = r"----.*?(?:image|img|media).*?----"
  433. img_matches = list(re.finditer(img_pattern, text, re.IGNORECASE))
  434. if img_matches and all_images:
  435. processed_text = text
  436. for match in img_matches:
  437. if global_img_counter <= len(all_images):
  438. # 获取对应的图片数据
  439. img_data, ext, img_index = all_images[
  440. global_img_counter - 1
  441. ]
  442. filename = f"docx_img{global_img_counter}.{ext}"
  443. # 上传图片
  444. image_url = await FileService.upload_image_to_server(
  445. img_data, filename
  446. )
  447. if image_url:
  448. # 替换图片标记
  449. old_mark = match.group()
  450. new_mark = f"[图片{global_img_counter}]"
  451. processed_text = processed_text.replace(
  452. old_mark, new_mark, 1
  453. )
  454. # 记录图片引用
  455. image_references.append(
  456. f"[图片{global_img_counter}]: {image_url}"
  457. )
  458. global_img_counter += 1
  459. extracted_text.append(processed_text)
  460. else:
  461. extracted_text.append(text)
  462. # 提取表格内容
  463. for table_num, table in enumerate(doc.tables, 1):
  464. extracted_text.append(f"\n[表格 {table_num}]")
  465. for row in table.rows:
  466. row_data = []
  467. for cell in row.cells:
  468. cell_text = cell.text.strip()
  469. row_data.append(cell_text if cell_text else "")
  470. row_text = " | ".join(row_data)
  471. if row_text.strip():
  472. extracted_text.append(row_text)
  473. extracted_text.append("[表格结束]\n")
  474. # 在文档末尾添加图片引用映射
  475. if image_references:
  476. extracted_text.append(f"\n\n--- 图片引用 ---")
  477. extracted_text.extend(image_references)
  478. result = "\n".join(extracted_text).strip()
  479. # 确保释放资源
  480. if doc:
  481. del doc
  482. gc.collect()
  483. return result
  484. except Exception as e:
  485. # 确保释放资源
  486. if doc:
  487. del doc
  488. gc.collect()
  489. raise Exception(f"Word文档读取失败: {str(e)}")
  490. @staticmethod
  491. async def process_uploaded_file(file: UploadFile) -> str:
  492. """处理上传的文件并提取文本内容"""
  493. # 检查文件大小
  494. content = await file.read()
  495. if len(content) > settings.max_file_size:
  496. raise Exception(
  497. f"文件大小超过限制 ({settings.max_file_size / 1024 / 1024}MB)"
  498. )
  499. # 重置文件指针
  500. await file.seek(0)
  501. # 保存文件
  502. file_path = await FileService.save_uploaded_file(file)
  503. try:
  504. # 根据文件类型提取文本和图片
  505. if file.content_type == "application/pdf":
  506. text = await FileService.extract_text_from_pdf(file_path)
  507. elif (
  508. file.content_type
  509. == "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
  510. ):
  511. text = await FileService.extract_text_from_docx(file_path)
  512. else:
  513. raise Exception("不支持的文件类型,请上传PDF或Word文档")
  514. # 成功提取后,使用安全的文件清理方法
  515. FileService._safe_file_cleanup(file_path)
  516. return text
  517. except Exception as e:
  518. # 异常情况下也使用安全的文件清理方法
  519. FileService._safe_file_cleanup(file_path)
  520. raise e