camera_detector.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. import cv2
  2. import torch
  3. import numpy as np
  4. from models.yolo import YOLOv5
  5. class CameraDetector:
  6. def __init__(self, config):
  7. """初始化摄像头检测器"""
  8. self.config = config
  9. self.device = torch.device(config['device'])
  10. # 初始化YOLOv5模型
  11. weights_path = f"{config['weights_path']}/yolov5{config['model_size']}.pt"
  12. self.model = YOLOv5(weights_path, self.device)
  13. self.model.conf = config['conf_threshold']
  14. self.model.iou = config['iou_threshold']
  15. # 初始化摄像头
  16. self.cap = None
  17. self.is_running = False
  18. def start_camera(self, camera_id=0):
  19. """启动摄像头"""
  20. try:
  21. self.cap = cv2.VideoCapture(camera_id)
  22. if not self.cap.isOpened():
  23. raise Exception("无法打开摄像头")
  24. self.is_running = True
  25. return True
  26. except Exception as e:
  27. print(f"摄像头启动失败: {e}")
  28. return False
  29. def stop_camera(self):
  30. """停止摄像头"""
  31. self.is_running = False
  32. if self.cap:
  33. self.cap.release()
  34. def detect_frame(self, frame):
  35. """对单帧图像进行检测"""
  36. if frame is None:
  37. return None, []
  38. # 调整图像大小
  39. img = cv2.resize(frame, (self.config['image_size'], self.config['image_size']))
  40. # 执行检测
  41. results = self.model(img)
  42. # 处理检测结果
  43. detections = []
  44. for *xyxy, conf, cls in results.xyxy[0]:
  45. x1, y1, x2, y2 = map(int, xyxy)
  46. label = self.model.names[int(cls)]
  47. confidence = float(conf)
  48. # 在图像上绘制边界框
  49. color = (0, 0, 255) if label == 'fire' else (0, 255, 0) # 火焰用红色,烟雾用绿色
  50. cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
  51. cv2.putText(frame, f'{label} {confidence:.2f}',
  52. (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX,
  53. 0.5, color, 2)
  54. detections.append({
  55. 'label': label,
  56. 'confidence': confidence,
  57. 'bbox': (x1, y1, x2, y2)
  58. })
  59. return frame, detections
  60. def get_frame(self):
  61. """获取一帧并进行检测"""
  62. if not self.is_running or not self.cap:
  63. return None, []
  64. ret, frame = self.cap.read()
  65. if not ret:
  66. return None, []
  67. return self.detect_frame(frame)
  68. def __del__(self):
  69. """析构函数,确保释放摄像头资源"""
  70. self.stop_camera()