# -*- coding: utf-8 -*- """插入 20 条桥梁安全隐患检测记录(需已有 user / model / media 种子数据)。""" from __future__ import annotations import os import random import shutil import sys from datetime import datetime, timedelta from pathlib import Path from zoneinfo import ZoneInfo ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) os.environ.setdefault( 'SQLALCHEMY_DATABASE_URI', 'mysql+pymysql://root:bridgedisease_root@127.0.0.1:3307/bridge_disease?charset=utf8mb4', ) from app import create_app from app.constants import DiseaseGrade, TaskStatus from app.models import Detection, Media, Model, User, db TZ = ZoneInfo('Asia/Shanghai') # (模型 ID, 隐患数量, 周长, 面积, 形状复杂度, 纹理粗糙度, 裂缝宽度, 色调, 状态) # 模型:1锈蚀 2涂层 3剥落露筋 4钢裂缝 5混凝土裂缝 6风化 7坑凼 SEED_ROWS = [ # admin 巡检 7 条 (5, 3, 842.5, 12560.0, 0.41, 128.0, 4.2, 42.0, TaskStatus.COMPLETED), (5, 2, 610.0, 9800.0, 0.35, 95.0, 3.1, 38.0, TaskStatus.COMPLETED), (7, 5, 1200.0, 18200.0, 0.52, 210.0, 0.0, 55.0, TaskStatus.COMPLETED), (1, 4, 920.0, 15400.0, 0.48, 165.0, 0.0, 12.0, TaskStatus.COMPLETED), (3, 2, 480.0, 6200.0, 0.29, 88.0, 5.8, 28.0, TaskStatus.COMPLETED), (5, 1, 320.0, 4100.0, 0.22, 62.0, 2.4, 40.0, TaskStatus.COMPLETED), (6, 3, 750.0, 11200.0, 0.38, 142.0, 0.0, 48.0, TaskStatus.COMPLETED), # developer 7 条 (5, 4, 1105.0, 16800.0, 0.45, 155.0, 3.8, 36.0, TaskStatus.COMPLETED), (4, 2, 540.0, 8900.0, 0.33, 102.0, 6.1, 15.0, TaskStatus.COMPLETED), (2, 3, 680.0, 9400.0, 0.36, 118.0, 0.0, 22.0, TaskStatus.COMPLETED), (1, 5, 1350.0, 22100.0, 0.58, 198.0, 0.0, 8.0, TaskStatus.COMPLETED), (7, 6, 1580.0, 24500.0, 0.61, 225.0, 0.0, 60.0, TaskStatus.COMPLETED), (3, 1, 210.0, 2800.0, 0.18, 45.0, 4.5, 30.0, TaskStatus.COMPLETED), (5, 2, 590.0, 7600.0, 0.31, 91.0, 2.9, 41.0, TaskStatus.FAILED), # demo 6 条 (5, 2, 505.0, 7200.0, 0.27, 78.0, 3.2, 39.0, TaskStatus.COMPLETED), (3, 3, 890.0, 13100.0, 0.42, 136.0, 7.2, 25.0, TaskStatus.COMPLETED), (1, 4, 1020.0, 14900.0, 0.44, 148.0, 0.0, 10.0, TaskStatus.COMPLETED), (6, 2, 430.0, 5500.0, 0.24, 58.0, 0.0, 52.0, TaskStatus.COMPLETED), (4, 1, 180.0, 2200.0, 0.16, 40.0, 5.2, 18.0, TaskStatus.COMPLETED), (7, 4, 990.0, 14200.0, 0.39, 125.0, 0.0, 58.0, TaskStatus.COMPLETED), ] # 按媒体名称关键字匹配默认模型(SEED_ROWS 中 None 时回填) MEDIA_MODEL_HINTS = [ (('锈蚀', '钢'), 1), (('涂层', '剥落', '鼓包'), 2), (('露筋', '剥落'), 3), (('钢', '施工'), 4), (('裂缝', '收缩', '弯曲', '下部'), 5), (('风化',), 6), (('龟裂', '铺装', '坑'), 7), ] DESCRIPTIONS = { DiseaseGrade.MILD: '隐患程度较轻,建议纳入日常巡检观察。', DiseaseGrade.MODERATE: '存在中等程度结构隐患,建议安排专项复核与养护。', DiseaseGrade.SEVERE: '隐患较为明显,应尽快组织检测评估并制定处置方案。', DiseaseGrade.CRITICAL: '隐患严重,需立即采取限载或封闭措施并启动应急处置。', } def pick_grade(score: float) -> DiseaseGrade: if score >= 0.8: return DiseaseGrade.CRITICAL if score >= 0.5: return DiseaseGrade.SEVERE if score >= 0.2: return DiseaseGrade.MODERATE return DiseaseGrade.MILD def score_from_metrics(count, area, perimeter, complexity) -> float: base = min(1.0, (count * 0.08 + area / 50000 + perimeter / 3000 + complexity * 0.3)) return round(max(0.05, min(0.95, base + random.uniform(-0.05, 0.08))), 3) def resolve_model_id(media: Media, hinted: int | None) -> int: if hinted is not None: return hinted name = media.media_name + (media.description or '') for keys, mid in MEDIA_MODEL_HINTS: if any(k in name for k in keys): return mid return 5 def ensure_result_image(media: Media, username: str) -> str: """复制媒体图为检测结果占位图,便于列表预览。""" rel = os.path.join('static', 'results', username, Path(media.media_path).name) rel = rel.replace('\\', '/') if rel.lower().endswith('.jpeg'): rel = rel[:-5] + '.jpg' elif not rel.lower().endswith(('.jpg', '.png', '.mp4')): rel = str(Path(rel).with_suffix('.jpg')) src = ROOT / 'app' / media.media_path.replace('/', os.sep) dest = ROOT / 'app' / rel.replace('/', os.sep) dest.parent.mkdir(parents=True, exist_ok=True) if src.is_file(): shutil.copy2(src, dest) return rel def assign_media_pairs(): """为 20 条记录分配互不重复的 (owner_id, media_id)。""" medias = Media.query.order_by(Media.media_id.asc()).all() if len(medias) < 3: print('媒体库为空或过少,请先运行: python scripts/seed_medias.py') sys.exit(1) users = [1, 2, 3] pairs: list[tuple[int, Media]] = [] for uid in users: for m in medias: pairs.append((uid, m)) if len(pairs) >= 20: return pairs[:20] return pairs[:20] def main() -> None: random.seed(20260401) app = create_app() with app.app_context(): existing = Detection.query.count() if existing >= 20: print(f'检测记录已有 {existing} 条,跳过(如需重建请先清空 detection 表)。') return users = {u.user_id: u for u in User.query.all()} models = {m.model_id: m for m in Model.query.all()} media_pairs = assign_media_pairs() if len(SEED_ROWS) != len(media_pairs): print('内部配置条数与媒体分配不一致') sys.exit(1) base_time = datetime.now(TZ) - timedelta(days=28) added = 0 for idx, (pair, row) in enumerate(zip(media_pairs, SEED_ROWS)): owner_id, media = pair ( model_id, disease_count, disease_perimeter, disease_area, shape_complexity, texture_roughness, crack_width, avg_hue, status, ) = row owner = users.get(owner_id) if not owner: print(f'跳过:用户 {owner_id} 不存在') continue if Detection.query.filter_by(owner_id=owner_id, media_id=media.media_id).first(): print(f'[skip] 已存在 owner={owner_id} media={media.media_id}') continue mid = model_id if model_id in models else resolve_model_id(media, None) severity = score_from_metrics( disease_count, disease_area, disease_perimeter, shape_complexity ) grade = pick_grade(severity) model = models[mid] t = base_time + timedelta( days=idx % 27, hours=9 + (idx % 8), minutes=(idx * 13) % 60, ) result_path = ( ensure_result_image(media, owner.username) if status == TaskStatus.COMPLETED else None ) det = Detection( result_path=result_path, disease_count=disease_count, disease_perimeter=round(disease_perimeter, 2), disease_area=round(disease_area, 2), shape_complexity=round(shape_complexity, 3), texture_roughness=round(texture_roughness, 2), crack_width=round(crack_width, 2), avg_hue=round(avg_hue, 2), disease_severity_score=severity, disease_grade=grade, disease_description=( f'【{model.disease_category}】{DESCRIPTIONS[grade]} ' f'检出隐患 {disease_count} 处,覆盖面积约 {int(disease_area)} 像素。' ), detection_duration=round(1200 + random.uniform(200, 3500), 2), avg_frame_detection_duration=round(800 + random.uniform(50, 400), 2), status=status, detection_at=t, updated_at=t + timedelta(minutes=random.randint(1, 45)), owner_id=owner_id, model_id=mid, media_id=media.media_id, ) db.session.add(det) added += 1 print( f'[insert] #{added} {owner.username} | {media.media_name} | ' f'{model.model_name} | {grade.name} | {status.name}' ) db.session.commit() print(f'完成:新增 {added} 条检测记录,当前共 {Detection.query.count()} 条。') if __name__ == '__main__': main()