# -*- coding: utf-8 -*- """ 将当前 MySQL 演示数据与静态资源导出到仓库,便于他人 clone 后复现相同界面效果。 生成物: sql/seed_snapshot.sql — user / model / media / detection / operation 全量数据 seed_assets/snapshot/static/ — medias、models、results、avatars seed_assets/snapshot/training_meta/ — 训练任务与数据集元数据 JSON seed_assets/snapshot/manifest.json — 导出摘要 用法(项目根或 BridgeDiseaseBackend-main 下,需 MySQL 3307 可连): python scripts/export_db_snapshot.py """ from __future__ import annotations import json import os import shutil import subprocess import sys from datetime import datetime, timezone from pathlib import Path ROOT = Path(__file__).resolve().parents[1] SQL_OUT = ROOT / 'sql' / 'seed_snapshot.sql' SNAPSHOT_ROOT = ROOT / 'seed_assets' / 'snapshot' STATIC_SRC = ROOT / 'app' / 'static' STATIC_DST = SNAPSHOT_ROOT / 'static' TRAINING_SRC = ROOT / 'data' / 'training_meta' TRAINING_DST = SNAPSHOT_ROOT / 'training_meta' TABLES = ('user', 'model', 'media', 'detection', 'operation') STATIC_DIRS = ('medias', 'models', 'results', 'avatars') def _db_env(): uri = os.environ.get( 'SQLALCHEMY_DATABASE_URI', 'mysql+pymysql://root:bridgedisease_root@127.0.0.1:3307/bridge_disease?charset=utf8mb4', ) # mysql+pymysql://user:pass@host:port/db?... body = uri.split('://', 1)[-1] auth, rest = body.split('@', 1) user, password = auth.split(':', 1) host_port, db_part = rest.split('/', 1) database = db_part.split('?', 1)[0] host, port = (host_port.split(':', 1) + ['3306'])[:2] return { 'host': host, 'port': port, 'user': user, 'password': password, 'database': database, } def dump_mysql(cfg: dict) -> None: SQL_OUT.parent.mkdir(parents=True, exist_ok=True) header = ( '-- 检澜 DockScope 演示数据快照(由 scripts/export_db_snapshot.py 生成)\n' f"-- 导出时间 UTC: {datetime.now(timezone.utc).isoformat()}\n" 'SET NAMES utf8mb4;\n' 'SET FOREIGN_KEY_CHECKS=0;\n' 'SET sql_mode = \'NO_AUTO_VALUE_ON_ZERO\';\n\n' ) cmd = [ 'mysqldump', f"--host={cfg['host']}", f"--port={cfg['port']}", f"-u{cfg['user']}", f"-p{cfg['password']}", '--default-character-set=utf8mb4', '--no-create-info', '--skip-triggers', '--complete-insert', '--hex-blob', cfg['database'], *TABLES, ] try: proc = subprocess.run(cmd, capture_output=True, check=True) except FileNotFoundError: # 尝试 Docker 中的 mysql 客户端 docker_cmd = [ 'docker', 'exec', 'bridge-disease-mysql', 'mysqldump', f"-u{cfg['user']}", f"-p{cfg['password']}", '--default-character-set=utf8mb4', '--no-create-info', '--skip-triggers', '--complete-insert', '--hex-blob', cfg['database'], *TABLES, ] proc = subprocess.run(docker_cmd, capture_output=True, check=True) body = proc.stdout.decode('utf-8', errors='replace') footer = '\nSET FOREIGN_KEY_CHECKS=1;\n' SQL_OUT.write_text(header + body + footer, encoding='utf-8') print(f'[ok] SQL -> {SQL_OUT} ({SQL_OUT.stat().st_size // 1024} KB)') def copy_tree(src: Path, dst: Path) -> int: if not src.is_dir(): return 0 if dst.exists(): shutil.rmtree(dst) shutil.copytree(src, dst) count = sum(1 for _ in dst.rglob('*') if _.is_file()) print(f'[ok] {src.name} -> {dst} ({count} files)') return count def main() -> int: cfg = _db_env() print(f"Export from {cfg['user']}@{cfg['host']}:{cfg['port']}/{cfg['database']}") dump_mysql(cfg) STATIC_DST.parent.mkdir(parents=True, exist_ok=True) file_counts = {} for name in STATIC_DIRS: src = STATIC_SRC / name if src.is_dir(): file_counts[name] = copy_tree(src, STATIC_DST / name) # 用户头像(SQL 引用 static/avatars/1~3.jpg) avatars_dst = STATIC_DST / 'avatars' avatars_dst.mkdir(parents=True, exist_ok=True) media_pool = list((STATIC_SRC / 'medias').glob('*.jpg')) or list( (ROOT / 'seed_assets' / 'medias').glob('*.jpg') ) for i in range(1, 4): dst = avatars_dst / f'{i}.jpg' if not dst.is_file() and media_pool: shutil.copy2(media_pool[(i - 1) % len(media_pool)], dst) if avatars_dst.is_dir(): file_counts['avatars'] = sum(1 for _ in avatars_dst.glob('*.jpg')) print(f'[ok] avatars -> {avatars_dst} ({file_counts["avatars"]} files)') training_files = 0 if TRAINING_SRC.is_dir(): TRAINING_DST.parent.mkdir(parents=True, exist_ok=True) if TRAINING_DST.exists(): shutil.rmtree(TRAINING_DST) shutil.copytree(TRAINING_SRC, TRAINING_DST) training_files = sum(1 for _ in TRAINING_DST.rglob('*') if _.is_file()) print(f'[ok] training_meta -> {TRAINING_DST} ({training_files} files)') manifest = { 'exported_at': datetime.now(timezone.utc).isoformat(), 'database': cfg['database'], 'tables': list(TABLES), 'static_files': file_counts, 'training_meta_files': training_files, 'sql_file': 'sql/seed_snapshot.sql', } SNAPSHOT_ROOT.mkdir(parents=True, exist_ok=True) manifest_path = SNAPSHOT_ROOT / 'manifest.json' manifest_path.write_text( json.dumps(manifest, ensure_ascii=False, indent=2), encoding='utf-8' ) print(f'[ok] manifest -> {manifest_path}') print('\n请提交 sql/seed_snapshot.sql 与 seed_assets/snapshot/ 到 Git,供他人运行 import_db_snapshot.py') return 0 if __name__ == '__main__': sys.exit(main())