# -*- coding: utf-8 -*- """将 seed_assets/medias 中的真实照片导入 static/medias 并同步 media 表。""" import os import shutil import sys from pathlib import Path from PIL import Image ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) os.environ.setdefault( 'SQLALCHEMY_DATABASE_URI', 'mysql+pymysql://root:bridgedisease_root@127.0.0.1:3307/bridge_disease', ) from app import create_app from app.models import Media, db ASSETS_DIR = ROOT / 'seed_assets' / 'medias' MEDIAS_DIR = ROOT / 'app' / 'static' / 'medias' MAX_WIDTH = 1600 # 源文件(seed_assets 内) -> 目标与元数据 SEED_ITEMS = [ { 'source': '01_concrete_crack_bridge.jpg', 'filename': '01_concrete_crack_bridge.jpg', 'media_name': '混凝土桥面裂缝_01.jpg', 'description': '桥梁混凝土底板裂缝(德国 Darmsheim 桥,实景)', 'owner_id': 1, }, { 'source': '02_bridge_concrete_cracks.jpg', 'filename': '02_bridge_concrete_cracks.jpg', 'media_name': '桥梁混凝土结构裂缝_02.jpg', 'description': '桥梁混凝土结构多处裂缝(实景)', 'owner_id': 2, }, { 'source': '03_steel_bridge_corrosion.jpg', 'filename': '03_steel_bridge_corrosion.jpg', 'media_name': '钢构件锈蚀_03.jpg', 'description': '金属构件锈蚀与氧化皮(实景)', 'owner_id': 2, }, { 'source': '04_concrete_bending_cracks.jpg', 'filename': '04_concrete_bending_cracks.jpg', 'media_name': '混凝土弯曲裂缝_04.jpg', 'description': '混凝土试件弯曲裂缝(RILEM 三点弯曲试验实景)', 'owner_id': 2, }, { 'source': '05_bridge_substructure.jpg', 'filename': '05_bridge_substructure.jpg', 'media_name': '桥梁下部结构_05.jpg', 'description': '桥梁混凝土底板与下部结构裂缝(实景)', 'owner_id': 1, 'fallback_source': '01_concrete_crack_bridge.jpg', }, { 'source': '06_shrinkage_cracks_concrete.jpg', 'filename': '06_shrinkage_cracks_concrete.jpg', 'media_name': '混凝土收缩裂缝_06.jpg', 'description': '钢筋混凝土收缩裂缝(实景)', 'owner_id': 3, }, { 'source': '07_asphalt_crocodile_cracking.jpg', 'filename': '07_asphalt_crocodile_cracking.jpg', 'media_name': '桥面铺装龟裂_07.jpg', 'description': '沥青路面龟裂网状裂缝(实景)', 'owner_id': 1, }, { 'source': '08_concrete_rebar_corrosion.jpg', 'filename': '08_concrete_rebar_corrosion.jpg', 'media_name': '混凝土钢筋锈蚀_08.jpg', 'description': '混凝土结构钢筋锈蚀与表面劣化(实景)', 'owner_id': 3, }, { 'source': '09_steel_beam_site.jpg', 'filename': '09_steel_beam_site.jpg', 'media_name': '施工现场钢梁_09.jpg', 'description': '施工现场钢梁吊装与拼装(实景)', 'owner_id': 1, }, ] LEGACY_NAMES = [ 'main_span_deck_inspection.png', 'steel_box_girder_u_rib.png', 'expansion_joint_j03.png', 'pier_cap_east_view.png', 'bearing_pad_top_view.png', 'deck_pavement_drone_01.png', 'cable_saddle_interior.png', 'parapet_root_seepage.png', 'concrete_crack_deck.jpg', 'steel_corrosion_girder.jpg', 'expansion_joint_spalling.jpg', 'steel_beam_construction_site.jpg', 'bearing_pad_surface.jpg', 'coating_peel_steel.jpg', 'concrete_rebar_exposure.jpg', 'pavement_distress.jpg', '混凝土桥面裂缝_01.jpg', '钢箱梁锈蚀_02.jpg', '伸缩缝剥落_03.jpg', '施工现场钢梁_04.jpg', '支座垫石开裂_05.jpg', '涂层剥落_06.jpg', '混凝土露筋_07.jpg', '桥面铺装破损_08.jpg', ] def prepare_image(src: Path, dest: Path) -> None: """压缩并统一为 JPEG(PNG 源文件)。""" dest.parent.mkdir(parents=True, exist_ok=True) with Image.open(src) as im: im = im.convert('RGB') w, h = im.size if w > MAX_WIDTH: nh = int(h * MAX_WIDTH / w) im = im.resize((MAX_WIDTH, nh), Image.Resampling.LANCZOS) out = dest if dest.suffix.lower() == '.png': out = dest.with_suffix('.jpg') im.save(out, format='JPEG', quality=88, optimize=True) if out != dest and dest.exists(): dest.unlink() return out def resolve_source(item: dict) -> Path | None: p = ASSETS_DIR / item['source'] if p.is_file(): return p fb = item.get('fallback_source') if fb: p2 = ASSETS_DIR / fb if p2.is_file(): print(f" [fallback] {item['source']} -> {fb}") return p2 return None def main(): if not ASSETS_DIR.is_dir(): print(f'缺少目录 {ASSETS_DIR},请先运行: python scripts/download_real_medias.py') sys.exit(1) MEDIAS_DIR.mkdir(parents=True, exist_ok=True) app = create_app() with app.app_context(): Media.query.filter(Media.media_name.in_(LEGACY_NAMES)).delete(synchronize_session=False) for legacy in LEGACY_NAMES: p = MEDIAS_DIR / legacy if p.is_file(): p.unlink() db.session.commit() added, updated = 0, 0 for item in SEED_ITEMS: src = resolve_source(item) if not src: print(f'[skip] 缺少源图: {item["source"]}') continue dest_name = Path(item['filename']) if dest_name.suffix.lower() == '.png': dest_name = dest_name.with_suffix('.jpg') item = {**item, 'filename': dest_name.name, 'media_name': item['media_name'].replace('.png', '.jpg')} dest = MEDIAS_DIR / dest_name.name saved = prepare_image(src, dest) rel_path = f'static/medias/{saved.name}'.replace('\\', '/') abs_path = ROOT / 'app' / rel_path.replace('/', os.sep) with Image.open(abs_path) as im: w, h = im.size file_size = os.path.getsize(abs_path) / 1024 file_type = 'jpeg' existing = Media.query.filter_by(media_name=item['media_name']).first() if existing: existing.media_path = rel_path existing.description = item['description'] existing.file_size = round(file_size, 2) existing.file_type = file_type existing.resolution_width = w existing.resolution_height = h existing.frame_count = 1 updated += 1 print(f'[update] {item["media_name"]}') else: db.session.add( Media( media_name=item['media_name'], media_path=rel_path, description=item['description'], file_size=round(file_size, 2), file_type=file_type, resolution_width=w, resolution_height=h, frame_count=1, owner_id=item['owner_id'], ) ) added += 1 print(f'[insert] {item["media_name"]}') db.session.commit() print(f'完成:新增 {added},更新 {updated},媒体库共 {Media.query.count()} 条。') print('照片来源说明见 seed_assets/medias/ATTRIBUTION.md') if __name__ == '__main__': main()