| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- # -*- coding: utf-8 -*-
- """
- 导入 export_db_snapshot.py 生成的演示快照,并还原静态文件与训练元数据。
- 用法(空库或需覆盖演示数据时):
- cd BridgeDiseaseBackend-main
- $env:SQLALCHEMY_DATABASE_URI="mysql+pymysql://root:bridgedisease_root@127.0.0.1:3307/bridge_disease?charset=utf8mb4"
- python scripts/import_db_snapshot.py
- 可选:仅还原文件不导入 SQL
- python scripts/import_db_snapshot.py --files-only
- """
- from __future__ import annotations
- import argparse
- import os
- import shutil
- import subprocess
- import sys
- from pathlib import Path
- ROOT = Path(__file__).resolve().parents[1]
- SQL_FILE = ROOT / 'sql' / 'seed_snapshot.sql'
- SNAPSHOT_STATIC = ROOT / 'seed_assets' / 'snapshot' / 'static'
- SNAPSHOT_TRAINING = ROOT / 'seed_assets' / 'snapshot' / 'training_meta'
- STATIC_DST = ROOT / 'app' / 'static'
- TRAINING_DST = ROOT / 'data' / 'training_meta'
- sys.path.insert(0, str(ROOT))
- os.environ.setdefault(
- 'SQLALCHEMY_DATABASE_URI',
- 'mysql+pymysql://root:bridgedisease_root@127.0.0.1:3307/bridge_disease?charset=utf8mb4',
- )
- def _db_env():
- uri = os.environ['SQLALCHEMY_DATABASE_URI']
- body = uri.split('://', 1)[-1]
- auth, rest = body.split('@', 1)
- user, password = auth.split(':', 1)
- host_port, db_part = rest.split('/', 1)
- database = db_part.split('?', 1)[0]
- host, port = (host_port.split(':', 1) + ['3306'])[:2]
- return host, port, user, password, database
- def import_sql() -> None:
- if not SQL_FILE.is_file():
- print(f'缺少 {SQL_FILE},请先运行 export_db_snapshot.py', file=sys.stderr)
- sys.exit(1)
- host, port, user, password, database = _db_env()
- from app import create_app
- from app.models import db
- app = create_app()
- from sqlalchemy import text
- with app.app_context():
- db.create_all()
- # 清空业务表(保留表结构)
- db.session.execute(text('SET FOREIGN_KEY_CHECKS=0'))
- for table in ('detection', 'operation', 'media', 'model', 'user'):
- db.session.execute(text(f'TRUNCATE TABLE `{table}`'))
- db.session.commit()
- db.session.execute(text('SET FOREIGN_KEY_CHECKS=1'))
- db.session.commit()
- print('[ok] 已清空 user/model/media/detection/operation')
- cmd = [
- 'mysql',
- f'--host={host}',
- f'--port={port}',
- f'-u{user}',
- f'-p{password}',
- '--default-character-set=utf8mb4',
- database,
- ]
- try:
- with open(SQL_FILE, 'rb') as f:
- subprocess.run(cmd, stdin=f, check=True)
- except FileNotFoundError:
- docker_cmd = [
- 'docker',
- 'exec',
- '-i',
- 'bridge-disease-mysql',
- 'mysql',
- f'-u{user}',
- f'-p{password}',
- '--default-character-set=utf8mb4',
- database,
- ]
- with open(SQL_FILE, 'rb') as f:
- subprocess.run(docker_cmd, stdin=f, check=True)
- print(f'[ok] 已导入 {SQL_FILE}')
- def restore_files() -> None:
- if not SNAPSHOT_STATIC.is_dir():
- print(f'缺少 {SNAPSHOT_STATIC},跳过静态文件还原')
- return
- STATIC_DST.mkdir(parents=True, exist_ok=True)
- for child in SNAPSHOT_STATIC.iterdir():
- dst = STATIC_DST / child.name
- if dst.exists():
- shutil.rmtree(dst)
- shutil.copytree(child, dst)
- n = sum(1 for _ in dst.rglob('*') if _.is_file())
- print(f'[ok] 静态 {child.name} -> {dst} ({n} files)')
- if SNAPSHOT_TRAINING.is_dir():
- TRAINING_DST.mkdir(parents=True, exist_ok=True)
- for f in SNAPSHOT_TRAINING.glob('*.json'):
- shutil.copy2(f, TRAINING_DST / f.name)
- print(f'[ok] training_meta -> {TRAINING_DST}')
- def main() -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument('--files-only', action='store_true', help='仅还原静态与 training_meta')
- parser.add_argument('--sql-only', action='store_true', help='仅导入 SQL')
- args = parser.parse_args()
- if not args.files_only:
- import_sql()
- if not args.sql_only:
- restore_files()
- print('\n完成。请启动后端并刷新前端(admin / Admin123456)。')
- return 0
- if __name__ == '__main__':
- sys.exit(main())
|