You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
207 lines
6.7 KiB
207 lines
6.7 KiB
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
异步批量抓取 E-H 画廊图片链接,按专辑保存 json
|
|
python eh_crawler.py
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
|
|
import httpx
|
|
from bs4 import BeautifulSoup
|
|
from tqdm.asyncio import tqdm_asyncio
|
|
from pathlib import Path
|
|
|
|
# -------------------- 可配置常量 --------------------
|
|
from config import config
|
|
|
|
CONCURRENCY = config.concurrency
|
|
MAX_PAGE = config.max_page
|
|
RETRY_PER_PAGE = config.retry_per_page
|
|
TIMEOUT = httpx.Timeout(config.timeout)
|
|
IMG_SELECTOR = "#gdt" # 图片入口区域
|
|
FAILED_RECORD = "data/failed_keys.json"
|
|
LOG_LEVEL = getattr(logging, config.log_level.upper())
|
|
# ----------------------------------------------------
|
|
|
|
# 确保数据目录存在
|
|
if not os.path.exists("data"):
|
|
os.mkdir("data")
|
|
|
|
# 使用统一的日志配置
|
|
from logger import get_logger
|
|
from realtime_logger import realtime_logger
|
|
log = get_logger("step1", "crawl.log")
|
|
|
|
# 预编译正则
|
|
ILLEGAL_CHARS = re.compile(r'[<>:"/\\|?*\x00-\x1F]')
|
|
|
|
|
|
# -------------------- 工具函数 --------------------
|
|
def clean_folder_name(title: str) -> str:
|
|
"""清洗文件夹名"""
|
|
return ILLEGAL_CHARS.sub("_", title).replace(" ", "").replace("_", "").strip() or "gallery"
|
|
|
|
|
|
def load_targets() -> List[str]:
|
|
"""读取 targets.txt"""
|
|
tgt = Path("data/targets.txt")
|
|
with open(tgt, 'r', encoding='utf-8') as f:
|
|
urls = [line.strip() for line in f.readlines() if line.strip()]
|
|
|
|
lines = []
|
|
for ln in tgt.read_text(encoding="utf-8").splitlines():
|
|
url = ln.strip()
|
|
if url and not url.startswith('#'):
|
|
lines.append(url)
|
|
if not lines:
|
|
log.error("targets.txt 为空,请先填写 URL")
|
|
return
|
|
return list(set(lines)) # 去重
|
|
|
|
|
|
def load_failed() -> List[str]:
|
|
if Path(FAILED_RECORD).exists():
|
|
try:
|
|
return json.loads(Path(FAILED_RECORD).read_text(encoding="utf-8"))
|
|
except Exception as exc:
|
|
log.warning(f"加载失败记录失败 -> {exc}")
|
|
return []
|
|
|
|
|
|
def save_failed(keys: List[str]) -> None:
|
|
Path(FAILED_RECORD).write_text(json.dumps(keys, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
|
|
# -------------------- 爬虫核心 --------------------
|
|
async def fetch_page(client: httpx.AsyncClient, url: str) -> Optional[str]:
|
|
"""获取单页 HTML"""
|
|
for attempt in range(1, RETRY_PER_PAGE + 1):
|
|
try:
|
|
resp = await client.get(url)
|
|
resp.raise_for_status()
|
|
return resp.text
|
|
except httpx.HTTPError as exc:
|
|
log.error(f"[{attempt}/{RETRY_PER_PAGE}] 请求失败 {url} -> {exc}")
|
|
await asyncio.sleep(2 ** attempt)
|
|
return None
|
|
|
|
|
|
async def crawl_single_gallery(
|
|
client: httpx.AsyncClient, sem: asyncio.Semaphore, gallery_url: str
|
|
) -> bool:
|
|
"""抓取单个画廊,成功返回 True"""
|
|
async with sem:
|
|
base_url = gallery_url.rstrip("/")
|
|
key = base_url.split("/")[-1] # 用最后一截当 key
|
|
json_name = f"{key}.json"
|
|
|
|
folder_path: Optional[Path] = None
|
|
json_data: Dict[str, str] = {}
|
|
img_count = 1
|
|
last_page = False
|
|
|
|
for page in range(MAX_PAGE):
|
|
if last_page:
|
|
break
|
|
url = f"{base_url}?p={page}"
|
|
html = await fetch_page(client, url)
|
|
if html is None:
|
|
continue
|
|
|
|
soup = BeautifulSoup(html, "lxml")
|
|
title = soup.title.string if soup.title else "gallery"
|
|
clean_title = clean_folder_name(title)
|
|
folder_path = Path("data/downloads") / clean_title
|
|
folder_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
# 如果 json 已存在则跳过整个画廊
|
|
json_path = folder_path / json_name
|
|
if json_path.exists():
|
|
log.info(f"{json_name} 已存在,跳过")
|
|
return True
|
|
|
|
log.info(f"当前页码:{page + 1} {url}")
|
|
|
|
selected = soup.select_one(IMG_SELECTOR)
|
|
if not selected:
|
|
log.warning(f"未找到选择器 {IMG_SELECTOR}")
|
|
continue
|
|
|
|
links = re.findall(r'<a href="(.*?)"', selected.prettify())
|
|
if not links:
|
|
log.info("本页无图片入口,视为最后一页")
|
|
last_page = True
|
|
continue
|
|
|
|
for img_entry in links:
|
|
if img_entry in json_data.values():
|
|
last_page = True
|
|
break
|
|
json_data[f"{img_count:04d}"] = img_entry
|
|
img_count += 1
|
|
|
|
if json_data:
|
|
json_path.write_text(
|
|
json.dumps(json_data, ensure_ascii=False, indent=2), encoding="utf-8"
|
|
)
|
|
log.info(f"保存成功 -> {json_path} ({len(json_data)} 张)")
|
|
# 发送实时日志
|
|
try:
|
|
realtime_logger.broadcast_log_sync(f"画廊 {key} 抓取完成,共 {len(json_data)} 张图片", "SUCCESS", "step1")
|
|
except Exception as e:
|
|
log.warning(f"发送实时日志失败: {e}")
|
|
return True
|
|
else:
|
|
log.warning(f"{key} 未解析到任何图片链接")
|
|
# 发送实时日志
|
|
try:
|
|
realtime_logger.broadcast_log_sync(f"画廊 {key} 未解析到任何图片链接", "WARNING", "step1")
|
|
except Exception as e:
|
|
log.warning(f"发送实时日志失败: {e}")
|
|
return False
|
|
|
|
|
|
# -------------------- 主流程 --------------------
|
|
async def main(proxy: str | None = None) -> None:
|
|
targets = load_targets()
|
|
failed = load_failed()
|
|
if failed:
|
|
log.info(f"优先重试上次失败画廊: {len(failed)} 个")
|
|
all_urls = list(set(targets + failed))
|
|
|
|
print(proxy)
|
|
limits = httpx.Limits(max_keepalive_connections=20, max_connections=50)
|
|
async with httpx.AsyncClient(
|
|
limits=limits, timeout=TIMEOUT, proxies=proxy, verify=True
|
|
) as client:
|
|
sem = asyncio.Semaphore(CONCURRENCY)
|
|
results = await tqdm_asyncio.gather(
|
|
*[crawl_single_gallery(client, sem, u) for u in all_urls],
|
|
desc="Galleries",
|
|
total=len(all_urls),
|
|
)
|
|
|
|
# 失败持久化
|
|
new_failed = [u for u, ok in zip(all_urls, results) if not ok]
|
|
if new_failed:
|
|
save_failed(new_failed)
|
|
log.warning(f"本轮仍有 {len(new_failed)} 个画廊失败,已写入 {FAILED_RECORD}")
|
|
else:
|
|
Path(FAILED_RECORD).unlink(missing_ok=True)
|
|
log.info("全部画廊抓取完成!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
log.info("用户中断,抓取结束") |