executor.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. from __future__ import annotations
  2. import asyncio
  3. import atexit
  4. from concurrent.futures import ProcessPoolExecutor
  5. from threading import BoundedSemaphore, Lock
  6. from pathlib import Path
  7. from .config import settings
  8. from .conversion import convert_path_to_markdown
  9. from .errors import ConversionError
  10. _executor: ProcessPoolExecutor | None = None
  11. _submission_limiter: BoundedSemaphore | None = None
  12. _executor_lock = Lock()
  13. _limiter_lock = Lock()
  14. def get_executor() -> ProcessPoolExecutor:
  15. global _executor
  16. if _executor is None:
  17. with _executor_lock:
  18. if _executor is None:
  19. _executor = ProcessPoolExecutor(max_workers=settings.max_workers)
  20. return _executor
  21. def get_submission_limiter() -> BoundedSemaphore:
  22. global _submission_limiter
  23. if _submission_limiter is None:
  24. with _limiter_lock:
  25. if _submission_limiter is None:
  26. _submission_limiter = BoundedSemaphore(
  27. value=max(1, settings.max_pending_tasks)
  28. )
  29. return _submission_limiter
  30. def release_submission_slot() -> None:
  31. limiter = get_submission_limiter()
  32. limiter.release()
  33. def shutdown_executor() -> None:
  34. global _executor
  35. if _executor is not None:
  36. _executor.shutdown(wait=False, cancel_futures=True)
  37. _executor = None
  38. atexit.register(shutdown_executor)
  39. async def run_conversion(
  40. input_path: Path, include_images: bool, original_name: str
  41. ) -> str:
  42. limiter = get_submission_limiter()
  43. if not limiter.acquire(blocking=False):
  44. raise ConversionError(
  45. code="queue_full",
  46. message="Too many conversion requests are in progress, please retry later",
  47. status_code=429,
  48. details={
  49. "filename": original_name,
  50. "max_workers": settings.max_workers,
  51. "max_queue_size": settings.max_queue_size,
  52. },
  53. )
  54. loop = asyncio.get_running_loop()
  55. executor = get_executor()
  56. future = None
  57. try:
  58. future = loop.run_in_executor(
  59. executor,
  60. convert_path_to_markdown,
  61. str(input_path),
  62. include_images,
  63. original_name,
  64. )
  65. future.add_done_callback(lambda _future: release_submission_slot())
  66. return await asyncio.wait_for(future, timeout=settings.request_timeout_seconds)
  67. except asyncio.TimeoutError as exc:
  68. raise ConversionError(
  69. code="conversion_timeout",
  70. message="Document conversion timed out",
  71. status_code=504,
  72. details={"filename": original_name},
  73. ) from exc
  74. except Exception:
  75. if future is None:
  76. release_submission_slot()
  77. raise