# -*- coding: utf-8 -*- """ 导入 export_db_snapshot.py 生成的演示快照,并还原静态文件与训练元数据。 用法(空库或需覆盖演示数据时): cd BridgeDiseaseBackend-main $env:SQLALCHEMY_DATABASE_URI="mysql+pymysql://root:bridgedisease_root@127.0.0.1:3307/bridge_disease?charset=utf8mb4" python scripts/import_db_snapshot.py 可选:仅还原文件不导入 SQL python scripts/import_db_snapshot.py --files-only """ from __future__ import annotations import argparse import os import shutil import subprocess import sys from pathlib import Path ROOT = Path(__file__).resolve().parents[1] SQL_FILE = ROOT / 'sql' / 'seed_snapshot.sql' SNAPSHOT_STATIC = ROOT / 'seed_assets' / 'snapshot' / 'static' SNAPSHOT_TRAINING = ROOT / 'seed_assets' / 'snapshot' / 'training_meta' STATIC_DST = ROOT / 'app' / 'static' TRAINING_DST = ROOT / 'data' / 'training_meta' sys.path.insert(0, str(ROOT)) os.environ.setdefault( 'SQLALCHEMY_DATABASE_URI', 'mysql+pymysql://root:bridgedisease_root@127.0.0.1:3307/bridge_disease?charset=utf8mb4', ) def _db_env(): uri = os.environ['SQLALCHEMY_DATABASE_URI'] body = uri.split('://', 1)[-1] auth, rest = body.split('@', 1) user, password = auth.split(':', 1) host_port, db_part = rest.split('/', 1) database = db_part.split('?', 1)[0] host, port = (host_port.split(':', 1) + ['3306'])[:2] return host, port, user, password, database def import_sql() -> None: if not SQL_FILE.is_file(): print(f'缺少 {SQL_FILE},请先运行 export_db_snapshot.py', file=sys.stderr) sys.exit(1) host, port, user, password, database = _db_env() from app import create_app from app.models import db app = create_app() from sqlalchemy import text with app.app_context(): db.create_all() # 清空业务表(保留表结构) db.session.execute(text('SET FOREIGN_KEY_CHECKS=0')) for table in ('detection', 'operation', 'media', 'model', 'user'): db.session.execute(text(f'TRUNCATE TABLE `{table}`')) db.session.commit() db.session.execute(text('SET FOREIGN_KEY_CHECKS=1')) db.session.commit() print('[ok] 已清空 user/model/media/detection/operation') cmd = [ 'mysql', f'--host={host}', f'--port={port}', f'-u{user}', f'-p{password}', '--default-character-set=utf8mb4', database, ] try: with open(SQL_FILE, 'rb') as f: subprocess.run(cmd, stdin=f, check=True) except FileNotFoundError: docker_cmd = [ 'docker', 'exec', '-i', 'bridge-disease-mysql', 'mysql', f'-u{user}', f'-p{password}', '--default-character-set=utf8mb4', database, ] with open(SQL_FILE, 'rb') as f: subprocess.run(docker_cmd, stdin=f, check=True) print(f'[ok] 已导入 {SQL_FILE}') def restore_files() -> None: if not SNAPSHOT_STATIC.is_dir(): print(f'缺少 {SNAPSHOT_STATIC},跳过静态文件还原') return STATIC_DST.mkdir(parents=True, exist_ok=True) for child in SNAPSHOT_STATIC.iterdir(): dst = STATIC_DST / child.name if dst.exists(): shutil.rmtree(dst) shutil.copytree(child, dst) n = sum(1 for _ in dst.rglob('*') if _.is_file()) print(f'[ok] 静态 {child.name} -> {dst} ({n} files)') if SNAPSHOT_TRAINING.is_dir(): TRAINING_DST.mkdir(parents=True, exist_ok=True) for f in SNAPSHOT_TRAINING.glob('*.json'): shutil.copy2(f, TRAINING_DST / f.name) print(f'[ok] training_meta -> {TRAINING_DST}') def main() -> int: parser = argparse.ArgumentParser() parser.add_argument('--files-only', action='store_true', help='仅还原静态与 training_meta') parser.add_argument('--sql-only', action='store_true', help='仅导入 SQL') args = parser.parse_args() if not args.files_only: import_sql() if not args.sql_only: restore_files() print('\n完成。请启动后端并刷新前端(admin / Admin123456)。') return 0 if __name__ == '__main__': sys.exit(main())