sketch_utils.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. """Utilities for sketch operations, including automatic timestamp updates."""
  2. from functools import wraps
  3. from datetime import datetime
  4. from typing import Callable
  5. from uuid import UUID
  6. from fastapi import BackgroundTasks
  7. from sqlalchemy.orm import Session
  8. from flowsint_core.core.models import Sketch, Investigation
  9. def update_sketch_last_modified(db: Session, sketch_id: str | UUID) -> None:
  10. """
  11. Update the last_updated_at timestamp for a sketch and its parent investigation.
  12. This function is designed to be run as a background task to avoid
  13. blocking the response. It updates both the sketch's and its parent
  14. investigation's last_updated_at fields to the current time.
  15. Args:
  16. db: SQLAlchemy database session
  17. sketch_id: The ID of the sketch to update
  18. """
  19. try:
  20. sketch = db.query(Sketch).filter(Sketch.id == sketch_id).first()
  21. if sketch:
  22. current_time = datetime.now()
  23. # Update sketch timestamp
  24. sketch.last_updated_at = current_time
  25. # Update parent investigation timestamp if it exists
  26. if sketch.investigation_id:
  27. investigation = db.query(Investigation).filter(
  28. Investigation.id == sketch.investigation_id
  29. ).first()
  30. if investigation:
  31. investigation.last_updated_at = current_time
  32. db.commit()
  33. except Exception as e:
  34. # Log error but don't raise to avoid disrupting background task
  35. print(f"Error updating sketch/investigation timestamp for {sketch_id}: {e}")
  36. db.rollback()
  37. def update_sketch_timestamp(func: Callable) -> Callable:
  38. """
  39. Decorator to automatically update sketch's last_updated_at timestamp.
  40. This decorator:
  41. 1. Extracts the sketch_id from route parameters
  42. 2. Schedules a background task to update last_updated_at
  43. 3. Returns the response immediately (non-blocking)
  44. Usage:
  45. @router.post("/{sketch_id}/nodes/add")
  46. @update_sketch_timestamp
  47. def add_node(
  48. sketch_id: str,
  49. node: NodeInput,
  50. background_tasks: BackgroundTasks,
  51. db: Session = Depends(get_db),
  52. ...
  53. ):
  54. # Your route logic here
  55. pass
  56. Requirements:
  57. - Route must have a 'sketch_id' parameter (path or query)
  58. - Route must have 'background_tasks: BackgroundTasks' parameter
  59. - Route must have 'db: Session' parameter
  60. """
  61. @wraps(func)
  62. async def async_wrapper(*args, **kwargs):
  63. # Extract required dependencies from kwargs
  64. sketch_id = kwargs.get("sketch_id")
  65. background_tasks: BackgroundTasks = kwargs.get("background_tasks")
  66. db: Session = kwargs.get("db")
  67. if not sketch_id:
  68. raise ValueError("sketch_id parameter is required for @update_sketch_timestamp")
  69. if not background_tasks:
  70. raise ValueError("background_tasks parameter is required for @update_sketch_timestamp")
  71. if not db:
  72. raise ValueError("db parameter is required for @update_sketch_timestamp")
  73. # Schedule the timestamp update as a background task
  74. background_tasks.add_task(update_sketch_last_modified, db, sketch_id)
  75. # Execute the original route function
  76. return await func(*args, **kwargs)
  77. @wraps(func)
  78. def sync_wrapper(*args, **kwargs):
  79. # Extract required dependencies from kwargs
  80. sketch_id = kwargs.get("sketch_id")
  81. background_tasks: BackgroundTasks = kwargs.get("background_tasks")
  82. db: Session = kwargs.get("db")
  83. if not sketch_id:
  84. raise ValueError("sketch_id parameter is required for @update_sketch_timestamp")
  85. if not background_tasks:
  86. raise ValueError("background_tasks parameter is required for @update_sketch_timestamp")
  87. if not db:
  88. raise ValueError("db parameter is required for @update_sketch_timestamp")
  89. # Schedule the timestamp update as a background task
  90. background_tasks.add_task(update_sketch_last_modified, db, sketch_id)
  91. # Execute the original route function
  92. return func(*args, **kwargs)
  93. # Return the appropriate wrapper based on whether the function is async
  94. import inspect
  95. if inspect.iscoroutinefunction(func):
  96. return async_wrapper
  97. else:
  98. return sync_wrapper