You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
eh-fastapi/step1.py

207 lines
6.7 KiB

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
异步批量抓取 E-H 画廊图片链接,按专辑保存 json
python eh_crawler.py
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import re
import sys
from pathlib import Path
from typing import Dict, List, Optional
import httpx
from bs4 import BeautifulSoup
from tqdm.asyncio import tqdm_asyncio
from pathlib import Path
# -------------------- 可配置常量 --------------------
from config import config
CONCURRENCY = config.concurrency
MAX_PAGE = config.max_page
RETRY_PER_PAGE = config.retry_per_page
TIMEOUT = httpx.Timeout(config.timeout)
IMG_SELECTOR = "#gdt" # 图片入口区域
FAILED_RECORD = "data/failed_keys.json"
LOG_LEVEL = getattr(logging, config.log_level.upper())
# ----------------------------------------------------
# 确保数据目录存在
if not os.path.exists("data"):
os.mkdir("data")
# 使用统一的日志配置
from logger import get_logger
from realtime_logger import realtime_logger
log = get_logger("step1", "crawl.log")
# 预编译正则
ILLEGAL_CHARS = re.compile(r'[<>:"/\\|?*\x00-\x1F]')
# -------------------- 工具函数 --------------------
def clean_folder_name(title: str) -> str:
"""清洗文件夹名"""
return ILLEGAL_CHARS.sub("_", title).replace(" ", "").replace("_", "").strip() or "gallery"
def load_targets() -> List[str]:
"""读取 targets.txt"""
tgt = Path("data/targets.txt")
with open(tgt, 'r', encoding='utf-8') as f:
urls = [line.strip() for line in f.readlines() if line.strip()]
lines = []
for ln in tgt.read_text(encoding="utf-8").splitlines():
url = ln.strip()
if url and not url.startswith('#'):
lines.append(url)
if not lines:
log.error("targets.txt 为空,请先填写 URL")
return
return list(set(lines)) # 去重
def load_failed() -> List[str]:
if Path(FAILED_RECORD).exists():
try:
return json.loads(Path(FAILED_RECORD).read_text(encoding="utf-8"))
except Exception as exc:
log.warning(f"加载失败记录失败 -> {exc}")
return []
def save_failed(keys: List[str]) -> None:
Path(FAILED_RECORD).write_text(json.dumps(keys, ensure_ascii=False, indent=2), encoding="utf-8")
# -------------------- 爬虫核心 --------------------
async def fetch_page(client: httpx.AsyncClient, url: str) -> Optional[str]:
"""获取单页 HTML"""
for attempt in range(1, RETRY_PER_PAGE + 1):
try:
resp = await client.get(url)
resp.raise_for_status()
return resp.text
except httpx.HTTPError as exc:
log.error(f"[{attempt}/{RETRY_PER_PAGE}] 请求失败 {url} -> {exc}")
await asyncio.sleep(2 ** attempt)
return None
async def crawl_single_gallery(
client: httpx.AsyncClient, sem: asyncio.Semaphore, gallery_url: str
) -> bool:
"""抓取单个画廊,成功返回 True"""
async with sem:
base_url = gallery_url.rstrip("/")
key = base_url.split("/")[-1] # 用最后一截当 key
json_name = f"{key}.json"
folder_path: Optional[Path] = None
json_data: Dict[str, str] = {}
img_count = 1
last_page = False
for page in range(MAX_PAGE):
if last_page:
break
url = f"{base_url}?p={page}"
html = await fetch_page(client, url)
if html is None:
continue
soup = BeautifulSoup(html, "lxml")
title = soup.title.string if soup.title else "gallery"
clean_title = clean_folder_name(title)
folder_path = Path("data/downloads") / clean_title
folder_path.mkdir(parents=True, exist_ok=True)
# 如果 json 已存在则跳过整个画廊
json_path = folder_path / json_name
if json_path.exists():
log.info(f"{json_name} 已存在,跳过")
return True
log.info(f"当前页码:{page + 1} {url}")
selected = soup.select_one(IMG_SELECTOR)
if not selected:
log.warning(f"未找到选择器 {IMG_SELECTOR}")
continue
links = re.findall(r'<a href="(.*?)"', selected.prettify())
if not links:
log.info("本页无图片入口,视为最后一页")
last_page = True
continue
for img_entry in links:
if img_entry in json_data.values():
last_page = True
break
json_data[f"{img_count:04d}"] = img_entry
img_count += 1
if json_data:
json_path.write_text(
json.dumps(json_data, ensure_ascii=False, indent=2), encoding="utf-8"
)
log.info(f"保存成功 -> {json_path} ({len(json_data)} 张)")
# 发送实时日志
try:
realtime_logger.broadcast_log_sync(f"画廊 {key} 抓取完成,共 {len(json_data)} 张图片", "SUCCESS", "step1")
except Exception as e:
log.warning(f"发送实时日志失败: {e}")
return True
else:
log.warning(f"{key} 未解析到任何图片链接")
# 发送实时日志
try:
realtime_logger.broadcast_log_sync(f"画廊 {key} 未解析到任何图片链接", "WARNING", "step1")
except Exception as e:
log.warning(f"发送实时日志失败: {e}")
return False
# -------------------- 主流程 --------------------
async def main(proxy: str | None = None) -> None:
targets = load_targets()
failed = load_failed()
if failed:
log.info(f"优先重试上次失败画廊: {len(failed)}")
all_urls = list(set(targets + failed))
print(proxy)
limits = httpx.Limits(max_keepalive_connections=20, max_connections=50)
async with httpx.AsyncClient(
limits=limits, timeout=TIMEOUT, proxies=proxy, verify=True
) as client:
sem = asyncio.Semaphore(CONCURRENCY)
results = await tqdm_asyncio.gather(
*[crawl_single_gallery(client, sem, u) for u in all_urls],
desc="Galleries",
total=len(all_urls),
)
# 失败持久化
new_failed = [u for u, ok in zip(all_urls, results) if not ok]
if new_failed:
save_failed(new_failed)
log.warning(f"本轮仍有 {len(new_failed)} 个画廊失败,已写入 {FAILED_RECORD}")
else:
Path(FAILED_RECORD).unlink(missing_ok=True)
log.info("全部画廊抓取完成!")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
log.info("用户中断,抓取结束")