events.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. from fastapi import APIRouter, Depends, HTTPException, Request
  2. from sqlalchemy.orm import Session
  3. from sse_starlette.sse import EventSourceResponse
  4. import json
  5. import asyncio
  6. from datetime import datetime
  7. from flowsint_core.core.postgre_db import get_db
  8. from flowsint_core.core.events import event_emitter
  9. from flowsint_core.core.models import Profile
  10. from flowsint_core.core.services import (
  11. create_log_service,
  12. NotFoundError,
  13. PermissionDeniedError,
  14. DatabaseError,
  15. )
  16. from app.api.deps import get_current_user, get_current_user_sse
  17. router = APIRouter()
  18. @router.get("/sketch/{sketch_id}/logs")
  19. def get_logs_by_sketch(
  20. sketch_id: str,
  21. limit: int = 100,
  22. since: datetime | None = None,
  23. db: Session = Depends(get_db),
  24. current_user: Profile = Depends(get_current_user),
  25. ):
  26. """Get historical logs for a specific sketch with optional filtering."""
  27. service = create_log_service(db)
  28. try:
  29. return service.get_logs_by_sketch(sketch_id, current_user.id, limit, since)
  30. except NotFoundError as e:
  31. raise HTTPException(status_code=404, detail=str(e))
  32. except PermissionDeniedError:
  33. raise HTTPException(status_code=403, detail="Forbidden")
  34. @router.get("/sketch/{sketch_id}/stream")
  35. async def stream_events(
  36. request: Request,
  37. sketch_id: str,
  38. db: Session = Depends(get_db),
  39. current_user: Profile = Depends(get_current_user_sse),
  40. ):
  41. """Stream events for a specific sketch in real-time."""
  42. service = create_log_service(db)
  43. try:
  44. # Verify permission
  45. service._get_sketch_with_permission(sketch_id, current_user.id, ["read"])
  46. except NotFoundError as e:
  47. raise HTTPException(status_code=404, detail=str(e))
  48. except PermissionDeniedError:
  49. raise HTTPException(status_code=403, detail="Forbidden")
  50. async def event_generator():
  51. channel = sketch_id
  52. await event_emitter.subscribe(channel)
  53. try:
  54. yield 'data: {"event": "connected", "data": "Connected to log stream"}\n\n'
  55. while True:
  56. if await request.is_disconnected():
  57. break
  58. data = await event_emitter.get_message(channel)
  59. if data is None:
  60. await asyncio.sleep(0.1)
  61. continue
  62. if isinstance(data, dict) and data.get("type") == "enricher_complete":
  63. yield json.dumps({"event": "enricher_complete", "data": data})
  64. else:
  65. yield json.dumps({"event": "log", "data": data})
  66. await asyncio.sleep(0.1)
  67. except asyncio.CancelledError:
  68. print(f"[EventEmitter] Client disconnected from sketch_id: {sketch_id}")
  69. except Exception as e:
  70. print(f"[EventEmitter] Error in stream_logs: {str(e)}")
  71. finally:
  72. await event_emitter.unsubscribe(channel)
  73. return EventSourceResponse(
  74. event_generator(),
  75. media_type="text/event-stream",
  76. headers={
  77. "Cache-Control": "no-cache",
  78. "Connection": "keep-alive",
  79. "X-Accel-Buffering": "no",
  80. },
  81. )
  82. @router.delete("/sketch/{sketch_id}/logs")
  83. def delete_scan_logs(
  84. sketch_id: str,
  85. db: Session = Depends(get_db),
  86. current_user: Profile = Depends(get_current_user),
  87. ):
  88. """Delete all logs for a specific sketch."""
  89. service = create_log_service(db)
  90. try:
  91. return service.delete_logs_by_sketch(sketch_id, current_user.id)
  92. except NotFoundError as e:
  93. raise HTTPException(status_code=404, detail=str(e))
  94. except PermissionDeniedError:
  95. raise HTTPException(status_code=403, detail="Forbidden")
  96. except DatabaseError as e:
  97. raise HTTPException(status_code=500, detail=str(e))
  98. @router.get("/sketch/{sketch_id}/status/stream")
  99. async def stream_sketch_status(
  100. request: Request,
  101. sketch_id: str,
  102. db: Session = Depends(get_db),
  103. current_user: Profile = Depends(get_current_user_sse),
  104. ):
  105. """Stream COMPLETED events for a specific sketch (for graph refresh)."""
  106. service = create_log_service(db)
  107. try:
  108. service._get_sketch_with_permission(sketch_id, current_user.id, ["read"])
  109. except NotFoundError as e:
  110. raise HTTPException(status_code=404, detail=str(e))
  111. except PermissionDeniedError:
  112. raise HTTPException(status_code=403, detail="Forbidden")
  113. async def status_generator():
  114. channel = f"{sketch_id}_status"
  115. await event_emitter.subscribe(channel)
  116. try:
  117. yield json.dumps({"event": "connected", "data": "Connected to status stream"})
  118. while True:
  119. if await request.is_disconnected():
  120. break
  121. data = await event_emitter.get_message(channel)
  122. if data is None:
  123. await asyncio.sleep(0.1)
  124. continue
  125. yield json.dumps({"event": "status", "data": data})
  126. await asyncio.sleep(0.1)
  127. except asyncio.CancelledError:
  128. print(f"[EventEmitter] Client disconnected from status stream for sketch_id: {sketch_id}")
  129. except Exception as e:
  130. print(f"[EventEmitter] Error in stream_sketch_status: {str(e)}")
  131. finally:
  132. await event_emitter.unsubscribe(channel)
  133. return EventSourceResponse(
  134. status_generator(),
  135. media_type="text/event-stream",
  136. headers={
  137. "Cache-Control": "no-cache",
  138. "Connection": "keep-alive",
  139. "X-Accel-Buffering": "no",
  140. },
  141. )
  142. @router.get("/status/scan/{scan_id}/stream")
  143. async def stream_status(
  144. request: Request,
  145. scan_id: str,
  146. db: Session = Depends(get_db),
  147. current_user: Profile = Depends(get_current_user_sse),
  148. ):
  149. """Stream status updates for a specific scan in real-time."""
  150. service = create_log_service(db)
  151. try:
  152. service.get_scan_with_permission(scan_id, current_user.id)
  153. except NotFoundError as e:
  154. raise HTTPException(status_code=404, detail=str(e))
  155. except PermissionDeniedError:
  156. raise HTTPException(status_code=403, detail="Forbidden")
  157. async def status_generator():
  158. print("[EventEmitter] Start status generator")
  159. await event_emitter.subscribe(f"scan_{scan_id}_status")
  160. try:
  161. yield 'data: {"event": "connected", "data": "Connected to status stream"}\n\n'
  162. while True:
  163. data = await event_emitter.get_message(f"scan_{scan_id}_status")
  164. if data is None:
  165. await asyncio.sleep(0.1)
  166. continue
  167. print(f"[EventEmitter] Received status data: {data}")
  168. yield f"data: {data}\n\n"
  169. except asyncio.CancelledError:
  170. print(f"[EventEmitter] Client disconnected from status stream for scan_id: {scan_id}")
  171. finally:
  172. await event_emitter.unsubscribe(f"scan_{scan_id}_status")
  173. return EventSourceResponse(
  174. status_generator(),
  175. media_type="text/event-stream",
  176. headers={
  177. "Cache-Control": "no-cache",
  178. "Connection": "keep-alive",
  179. "X-Accel-Buffering": "no",
  180. },
  181. )