# -*- coding: utf-8 -*- import os import time import httpx import asyncio import re import sqlite3 from playwright.sync_api import sync_playwright comico_id = '419025' base_url = 'https://www.zhuimh.com' target_href_url = 'https://www.zhuimh.com/comic/' scroll_speed = 2 current_dir_path = os.path.dirname(os.path.abspath(__file__)) download_folder = os.path.join(current_dir_path, 'downloads') if not os.path.exists(download_folder): os.mkdir(download_folder) db_path = os.path.join(download_folder, 'zhuimh.db') txt_path = os.path.join(download_folder, 'target_comico_name.txt') def create_db(title): conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute( f'CREATE TABLE IF NOT EXISTS {title} (id INTEGER PRIMARY KEY AUTOINCREMENT, chapter_name TEXT, url TEXT, state BOOLEAN DEFAULT 0)' ) conn.commit() cursor.close() conn.close() def write_to_db(title, chapter_name, url): conn = sqlite3.connect(db_path) cursor = conn.cursor() # 检查chapter_name是否已存在 cursor.execute( f'SELECT EXISTS(SELECT 1 FROM {title} WHERE chapter_name = ?)', (chapter_name,)) exists = cursor.fetchone()[0] if not exists: # 如果不存在,则插入新记录 cursor.execute(f'INSERT INTO {title} (chapter_name, url) VALUES (?, ?)', (chapter_name, url)) conn.commit() cursor.close() conn.close() async def async_get_chapter_list(): async with httpx.AsyncClient() as client: chapters_data = {} response = await client.get(target_href_url + comico_id) if response.status_code == 200: text = response.text title = re.findall(r'

(.*?)

', text) title = title[0] if title else comico_id print(title) # 这里先创建一个 txt, 固定获取本次爬取的名称,下面读取数据库的时候, 需要通过这个名称作为表名读出来 with open(txt_path, 'w', encoding='utf-8') as f: print('写入当前目标名称') f.write(title) chapters = re.findall(r'
  • (.*?)
  • ', text) for chapter in chapters: chapters_data[chapter[1]] = base_url + chapter[0] # 创建 sqlite 将数据存起来 create_db(title) for chapter_name, url in chapters_data.items(): write_to_db(title, chapter_name, url) print('数据ok') async def get_chapter_list(): await async_get_chapter_list() def load_db(title): conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute(f'SELECT * FROM {title} WHERE state = 0 ORDER BY id ASC') rows = cursor.fetchall() cursor.close() conn.close() return rows def change_db_data_state(data_id, t_name): conn = sqlite3.connect(db_path) cursor = conn.cursor() table_name = t_name id_column = 'id' id_value = data_id bool_column = 'state' sql = f'UPDATE {table_name} SET {bool_column} = 1 WHERE {id_column} = ?' cursor.execute(sql, (id_value,)) conn.commit() cursor.close() conn.close() def scroll_to_percentage(page): # 滚动浏览器页面 percentage_list = [i for i in range(5, 101, scroll_speed)] for percentage in percentage_list: # 计算页面的指定百分比高度 height = page.evaluate("() => document.body.scrollHeight") scroll_position = height * (percentage / 100) # 跳转到指定的百分比位置 page.evaluate(f"window.scrollTo({0}, {scroll_position})") time.sleep(0.5) def request_chapter_data(title, data_id, chapter_name, chapter_url): chapter_folder = os.path.join(current_dir_path, 'downloads', title, chapter_name) with sync_playwright() as playwright: try: browser = playwright.chromium.launch(headless=True) page = browser.new_page() page.goto(chapter_url) page.wait_for_load_state('networkidle') except Exception as e: print(e) return False # 滚动页面 print('开始滚动页面') scroll_to_percentage(page) page.evaluate("window.scrollTo({top: 0, behavior: 'smooth'})") scroll_to_percentage(page) print('滚动完成') time.sleep(2) # 检查一下是否所有的图片都加载完成, 如果没有, 直接退出函数, 然后重新打开浏览器 html_content = page.content() check_list = re.findall('img class="lazy-read" src="(.*?)"', html_content) for l in check_list: if 'lazy-read.gif' in l: return False # 创建章节文件夹 if not os.path.exists(chapter_folder): os.makedirs(chapter_folder) # 获取匹配的元素数量 total_images = page.locator('.lazy-read').count() for page_num in range(1, total_images+1): img_locator = f'body > div.chpater-images > img:nth-child({page_num})' img_path = os.path.join(chapter_folder, f'{str(page_num).zfill(3)}.png') page.locator(img_locator).screenshot(path=img_path) print(f'已下载 {img_path}') page_num += 1 # 下载完当前章节后, 将state字段改为True print(f'{chapter_name} 已下载完成\n\n') change_db_data_state(data_id, title) browser.close() return True def main(): asyncio.run(get_chapter_list()) # 开始对每一页的章节进行爬取 # 先读取当前的目标名称 title = '' with open(txt_path, 'r', encoding='utf-8') as f: title = f.read() folder_name = os.path.join(download_folder, title) if not os.path.exists(folder_name): os.mkdir(folder_name) for retry in range(999): load_data = load_db(title) if not load_data: print('The database has no data or all done!') exit(0) for data in load_data: ok = True data_id = data[0] chapter_name = data[1] chapter_url = data[2] print(f'准备获取图片: {title} {chapter_name}') ok = request_chapter_data(title, data_id, chapter_name, chapter_url) if not ok: print(f'图片加载失败: {title} {chapter_name} 重试\n\n') time.sleep(5) break if __name__ == "__main__": main()