You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
eh-fastapi/step2.py

187 lines
6.4 KiB

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
异步批量下载 EH 画廊真实图片
python download_images.py
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import re
import sys
from pathlib import Path
from typing import Dict, List
import aiofiles
import httpx
from pathlib import Path
from tqdm.asyncio import tqdm_asyncio
# -------------------- 可配置常量 --------------------
from config import config
CONCURRENCY = config.concurrency
RETRY_PER_IMG = config.retry_per_image
TIMEOUT = httpx.Timeout(config.image_timeout)
FAILED_RECORD = "data/failed_downloads.json"
LOG_LEVEL = getattr(logging, config.log_level.upper())
# ----------------------------------------------------
# 确保数据目录存在
if not os.path.exists("data"):
os.mkdir("data")
# 使用统一的日志配置
from logger import get_logger
from realtime_logger import realtime_logger
log = get_logger("step2", "download.log")
# 预编译正则
IMG_URL_RE = re.compile(r'<img id="img" src="(.*?)"', re.S)
EXT_RE = re.compile(r"\.(jpg|jpeg|png|gif|webp)$", re.I)
# -------------------- 工具函数 --------------------
def load_failed() -> List[Dict[str, str]]:
if Path(FAILED_RECORD).exists():
try:
return json.loads(Path(FAILED_RECORD).read_text(encoding="utf-8"))
except Exception as exc:
log.warning(f"加载失败记录失败 -> {exc}")
return []
def save_failed(failed: List[Dict[str, str]]) -> None:
Path(FAILED_RECORD).write_text(json.dumps(failed, ensure_ascii=False, indent=2), encoding="utf-8")
# -------------------- 下载核心 --------------------
async def download_one(
client: httpx.AsyncClient, sem: asyncio.Semaphore, item: Dict[str, str]
) -> bool:
"""下载单张图,成功返回 True"""
img_path, img_url = Path(item["img_path"]), item["img_url"]
await sem.acquire()
try:
for attempt in range(1, RETRY_PER_IMG + 1):
try:
# 1. 获取详情页
resp = await client.get(img_url)
resp.raise_for_status()
real_url_match = IMG_URL_RE.search(resp.text)
if not real_url_match:
log.warning(f"未解析到真实图片链接: {img_url}")
return False # <- 这里不会触发 await
real_url = real_url_match.group(1)
# 2. 下载真实图片(流式)
ext_match = EXT_RE.search(real_url)
ext = ext_match.group(1).lower() if ext_match else "jpg"
final_path = img_path.with_suffix(f".{ext}")
if final_path.exists():
log.info(f"已存在,跳过: {final_path.name}")
return True
async with client.stream("GET", real_url) as img_resp:
img_resp.raise_for_status()
final_path.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(final_path, "wb") as fp:
async for chunk in img_resp.aiter_bytes(chunk_size=65536):
await fp.write(chunk)
# log.info(f"[OK] {final_path.name}")
# 发送实时日志
try:
realtime_logger.broadcast_log_sync(f"下载完成: {final_path.name}", "SUCCESS", "step2")
except Exception as e:
log.warning(f"发送实时日志失败: {e}")
return True
except httpx.HTTPStatusError as exc:
if exc.response.status_code == 429:
wait = 2 ** (attempt - 1)
log.warning(f"[429] 等待 {wait}s 后重试({attempt}/{RETRY_PER_IMG}")
await asyncio.sleep(wait)
else:
log.error(f"[HTTP {exc.response.status_code}] {img_url}")
break
except Exception as exc:
log.error(f"[ERROR] {img_url} -> {exc}{attempt}/{RETRY_PER_IMG}")
await asyncio.sleep(1)
return False
finally:
sem.release()
# -------------------- 扫描待下载列表 --------------------
async def scan_tasks() -> List[Dict[str, str]]:
"""扫描 downloads/ 下所有 json,返回待下载列表"""
result = []
root = Path("data/downloads")
if not root.exists():
return result
for json_path in root.rglob("*.json"):
folder = json_path.parent
try:
data: Dict[str, str] = json.loads(json_path.read_text(encoding="utf-8"))
except Exception as exc:
log.warning(f"读取 json 失败 {json_path} -> {exc}")
continue
for img_name, img_url in data.items():
img_path = folder / img_name # 无后缀
# 判断任意后缀是否存在
exists = False
for ext in (".jpg", ".jpeg", ".png", ".gif", ".webp"):
if img_path.with_suffix(ext).exists():
exists = True
break
if not exists:
result.append({"img_path": str(img_path), "img_url": img_url})
return result
# -------------------- 主流程 --------------------
async def main(proxy: str | None = None) -> None:
# 1. 优先重试上次失败
failed_tasks = load_failed()
if failed_tasks:
log.info(f"优先重试上次失败任务: {len(failed_tasks)}")
tasks = failed_tasks + await scan_tasks()
if not tasks:
log.info("没有需要下载的图片,收工!")
return
limits = httpx.Limits(max_keepalive_connections=20, max_connections=50)
async with httpx.AsyncClient(limits=limits, timeout=TIMEOUT, proxies=proxy, verify=True) as client:
sem = asyncio.Semaphore(CONCURRENCY)
results = await tqdm_asyncio.gather(
*[download_one(client, sem, t) for t in tasks],
desc="Downloading",
total=len(tasks),
)
# 统计 & 持久化新失败
failed_again = [t for t, ok in zip(tasks, results) if not ok]
if failed_again:
save_failed(failed_again)
log.warning(f"本轮仍有 {len(failed_again)} 张下载失败,已写入 {FAILED_RECORD}")
else:
Path(FAILED_RECORD).unlink(missing_ok=True)
log.info("全部下载完成!")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
log.info("用户中断,下载结束")