| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- # -*- coding: utf-8 -*-
- """
- 将当前 MySQL 演示数据与静态资源导出到仓库,便于他人 clone 后复现相同界面效果。
- 生成物:
- sql/seed_snapshot.sql — user / model / media / detection / operation 全量数据
- seed_assets/snapshot/static/ — medias、models、results、avatars
- seed_assets/snapshot/training_meta/ — 训练任务与数据集元数据 JSON
- seed_assets/snapshot/manifest.json — 导出摘要
- 用法(项目根或 BridgeDiseaseBackend-main 下,需 MySQL 3307 可连):
- python scripts/export_db_snapshot.py
- """
- from __future__ import annotations
- import json
- import os
- import shutil
- import subprocess
- import sys
- from datetime import datetime, timezone
- from pathlib import Path
- ROOT = Path(__file__).resolve().parents[1]
- SQL_OUT = ROOT / 'sql' / 'seed_snapshot.sql'
- SNAPSHOT_ROOT = ROOT / 'seed_assets' / 'snapshot'
- STATIC_SRC = ROOT / 'app' / 'static'
- STATIC_DST = SNAPSHOT_ROOT / 'static'
- TRAINING_SRC = ROOT / 'data' / 'training_meta'
- TRAINING_DST = SNAPSHOT_ROOT / 'training_meta'
- TABLES = ('user', 'model', 'media', 'detection', 'operation')
- STATIC_DIRS = ('medias', 'models', 'results', 'avatars')
- def _db_env():
- uri = os.environ.get(
- 'SQLALCHEMY_DATABASE_URI',
- 'mysql+pymysql://root:bridgedisease_root@127.0.0.1:3307/bridge_disease?charset=utf8mb4',
- )
- # mysql+pymysql://user:pass@host:port/db?...
- body = uri.split('://', 1)[-1]
- auth, rest = body.split('@', 1)
- user, password = auth.split(':', 1)
- host_port, db_part = rest.split('/', 1)
- database = db_part.split('?', 1)[0]
- host, port = (host_port.split(':', 1) + ['3306'])[:2]
- return {
- 'host': host,
- 'port': port,
- 'user': user,
- 'password': password,
- 'database': database,
- }
- def dump_mysql(cfg: dict) -> None:
- SQL_OUT.parent.mkdir(parents=True, exist_ok=True)
- header = (
- '-- 检澜 DockScope 演示数据快照(由 scripts/export_db_snapshot.py 生成)\n'
- f"-- 导出时间 UTC: {datetime.now(timezone.utc).isoformat()}\n"
- 'SET NAMES utf8mb4;\n'
- 'SET FOREIGN_KEY_CHECKS=0;\n'
- 'SET sql_mode = \'NO_AUTO_VALUE_ON_ZERO\';\n\n'
- )
- cmd = [
- 'mysqldump',
- f"--host={cfg['host']}",
- f"--port={cfg['port']}",
- f"-u{cfg['user']}",
- f"-p{cfg['password']}",
- '--default-character-set=utf8mb4',
- '--no-create-info',
- '--skip-triggers',
- '--complete-insert',
- '--hex-blob',
- cfg['database'],
- *TABLES,
- ]
- try:
- proc = subprocess.run(cmd, capture_output=True, check=True)
- except FileNotFoundError:
- # 尝试 Docker 中的 mysql 客户端
- docker_cmd = [
- 'docker',
- 'exec',
- 'bridge-disease-mysql',
- 'mysqldump',
- f"-u{cfg['user']}",
- f"-p{cfg['password']}",
- '--default-character-set=utf8mb4',
- '--no-create-info',
- '--skip-triggers',
- '--complete-insert',
- '--hex-blob',
- cfg['database'],
- *TABLES,
- ]
- proc = subprocess.run(docker_cmd, capture_output=True, check=True)
- body = proc.stdout.decode('utf-8', errors='replace')
- footer = '\nSET FOREIGN_KEY_CHECKS=1;\n'
- SQL_OUT.write_text(header + body + footer, encoding='utf-8')
- print(f'[ok] SQL -> {SQL_OUT} ({SQL_OUT.stat().st_size // 1024} KB)')
- def copy_tree(src: Path, dst: Path) -> int:
- if not src.is_dir():
- return 0
- if dst.exists():
- shutil.rmtree(dst)
- shutil.copytree(src, dst)
- count = sum(1 for _ in dst.rglob('*') if _.is_file())
- print(f'[ok] {src.name} -> {dst} ({count} files)')
- return count
- def main() -> int:
- cfg = _db_env()
- print(f"Export from {cfg['user']}@{cfg['host']}:{cfg['port']}/{cfg['database']}")
- dump_mysql(cfg)
- STATIC_DST.parent.mkdir(parents=True, exist_ok=True)
- file_counts = {}
- for name in STATIC_DIRS:
- src = STATIC_SRC / name
- if src.is_dir():
- file_counts[name] = copy_tree(src, STATIC_DST / name)
- # 用户头像(SQL 引用 static/avatars/1~3.jpg)
- avatars_dst = STATIC_DST / 'avatars'
- avatars_dst.mkdir(parents=True, exist_ok=True)
- media_pool = list((STATIC_SRC / 'medias').glob('*.jpg')) or list(
- (ROOT / 'seed_assets' / 'medias').glob('*.jpg')
- )
- for i in range(1, 4):
- dst = avatars_dst / f'{i}.jpg'
- if not dst.is_file() and media_pool:
- shutil.copy2(media_pool[(i - 1) % len(media_pool)], dst)
- if avatars_dst.is_dir():
- file_counts['avatars'] = sum(1 for _ in avatars_dst.glob('*.jpg'))
- print(f'[ok] avatars -> {avatars_dst} ({file_counts["avatars"]} files)')
- training_files = 0
- if TRAINING_SRC.is_dir():
- TRAINING_DST.parent.mkdir(parents=True, exist_ok=True)
- if TRAINING_DST.exists():
- shutil.rmtree(TRAINING_DST)
- shutil.copytree(TRAINING_SRC, TRAINING_DST)
- training_files = sum(1 for _ in TRAINING_DST.rglob('*') if _.is_file())
- print(f'[ok] training_meta -> {TRAINING_DST} ({training_files} files)')
- manifest = {
- 'exported_at': datetime.now(timezone.utc).isoformat(),
- 'database': cfg['database'],
- 'tables': list(TABLES),
- 'static_files': file_counts,
- 'training_meta_files': training_files,
- 'sql_file': 'sql/seed_snapshot.sql',
- }
- SNAPSHOT_ROOT.mkdir(parents=True, exist_ok=True)
- manifest_path = SNAPSHOT_ROOT / 'manifest.json'
- manifest_path.write_text(
- json.dumps(manifest, ensure_ascii=False, indent=2), encoding='utf-8'
- )
- print(f'[ok] manifest -> {manifest_path}')
- print('\n请提交 sql/seed_snapshot.sql 与 seed_assets/snapshot/ 到 Git,供他人运行 import_db_snapshot.py')
- return 0
- if __name__ == '__main__':
- sys.exit(main())
|