import asyncio import json import random import sys import time import traceback import uuid import aiohttp from fake_useragent import UserAgent from tenacity import stop_after_attempt, retry, retry_if_not_exception_type, wait_random, retry_if_exception_type # 配置文件路径 ACCOUNTS_DETAILS_FILE_PATH = "account_details.txt" ACCOUNTS_FILE_PATH = "accounts.txt" PROXIES_FILE_PATH = "proxies.txt" MIN_PROXY_SCORE = 50 # 日志记录 class Logger: @staticmethod def info(message): print(f"[INFO] {message}") @staticmethod def error(message): print(f"[ERROR] {message}") @staticmethod def success(message): print(f"[SUCCESS] {message}") logger = Logger() # 异常类 class ProxyForbiddenException(Exception): pass class LowProxyScoreException(Exception): pass class ProxyScoreNotFoundException(Exception): pass class WebsocketClosedException(Exception): pass # Grass 类,负责登录、检测代理分数和持久连接 class Grass: def __init__(self, _id: int, email: str, user_id: str, proxy: str = None): self.proxy = f"socks5://{proxy}" if proxy else None # 添加协议 self.email = email self.user_id = user_id self.user_agent = UserAgent().random self.proxy_score = None self.id = _id self.session = aiohttp.ClientSession(trust_env=True, connector=aiohttp.TCPConnector(ssl=False)) self.all_proxies_name = [] async def start(self): try: user_id = self.user_id browser_id = str(self.email) # 使用邮箱作为浏览器ID await self.run(browser_id, user_id) except Exception as e: logger.error(f"{self.id} | Error: {e}") finally: await self.close() async def run(self, browser_id: str, user_id: str): while True: try: await self.connection_handler() await self.auth_to_extension(browser_id, user_id) if self.proxy_score is None: await asyncio.sleep(1) await self.handle_proxy_score(MIN_PROXY_SCORE) while True: await self.send_ping() await self.send_pong() logger.info(f"{self.id} | Mined grass.") await asyncio.sleep(19.9) except WebsocketClosedException as e: logger.info(f"Websocket closed: {e}. Retrying...") except ConnectionResetError as e: logger.info(f"Connection reset: {e}. Retrying...") except TypeError as e: logger.info(f"Type error: {e}. Retrying...") await asyncio.sleep(1) @retry(stop=stop_after_attempt(30), retry=(retry_if_exception_type(ConnectionError) | retry_if_not_exception_type(ProxyForbiddenException)), wait=wait_random(0.5, 1), reraise=True) async def connection_handler(self): logger.info(f"{self.id} | Connecting...") await self.connect() logger.info(f"{self.id} | Connected") @retry(stop=stop_after_attempt(10), retry=retry_if_not_exception_type(LowProxyScoreException), before_sleep=lambda retry_state, **kwargs: logger.info(f"{retry_state.outcome.exception()}"), wait=wait_random(5, 7), reraise=True) async def handle_proxy_score(self, min_score: int): if (proxy_score := await self.get_proxy_score_by_device_id()) is None: raise ProxyScoreNotFoundException(f"{self.id} | Proxy score not found for {self.proxy}. Guess Bad proxies!") elif proxy_score >= min_score: self.proxy_score = proxy_score logger.success(f"{self.id} | Proxy score: {self.proxy_score}") return True else: raise LowProxyScoreException(f"{self.id} | Too low proxy score: {proxy_score} for {self.proxy}. Exit...") async def auth_to_extension(self, browser_id: str, user_id: str): connection_id = await self.get_connection_id() message = json.dumps( { "id": connection_id, "origin_action": "AUTH", "result": { "browser_id": browser_id, "user_id": user_id, "user_agent": self.user_agent, "timestamp": int(time.time()), "device_type": "extension", "version": "4.26.2" } } ) print(message) await self.send_message(message) async def connect(self): uri = "wss://proxy.wynd.network" headers = { 'Pragma': 'no-cache', 'Origin': 'chrome-extension://ilehaonighjijnmpnagapkhpcdbhclfg', 'Accept-Language': 'uk-UA,uk;q=0.9,en-US;q=0.8,en;q=0.7', 'User-Agent': self.user_agent, 'Upgrade': 'websocket', 'Cache-Control': 'no-cache', 'Connection': 'Upgrade', 'Sec-WebSocket-Version': '13', 'Sec-WebSocket-Extensions': 'permessage-deflate; client_max_window_bits', } try: self.websocket = await self.session.ws_connect(uri, proxy_headers=headers, proxy=self.proxy) except Exception as e: if 'status' in dir(e) and e.status == 403: raise ProxyForbiddenException(f"Low proxy score. Can't connect. Error: {e}") raise e async def get_proxy_score_by_device_id(self): url = 'https://api.getgrass.io/extension/user-score' headers = { 'authority': 'api.getgrass.io', 'accept': 'application/json, text/plain, */*', 'accept-language': 'uk-UA,uk;q=0.9,en-US;q=0.8,en;q=0.7', 'content-type': 'application/json', 'origin': 'https://app.getgrass.io', 'referer': 'https://app.getgrass.io/', 'sec-ch-ua': '"Not(A:Brand";v="99", "Brave";v="133", "Chromium";v="133"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'sec-fetch-dest': 'document', 'sec-fetch-mode': 'navigate', 'sec-fetch-site': 'none', 'user-agent': self.user_agent, } response = await self.session.get(url, headers=headers, proxy=self.proxy) res_json = await response.json() print(res_json) if not (isinstance(res_json, dict) and res_json.get("data", None) is not None): return devices = res_json['data']['currentDeviceData'] self.ip = await self.get_ip() logger.info(f"当前代理IP: {self.ip}") return next((device['final_score'] for device in devices if device['device_ip'] == self.ip), None) async def send_message(self, message): await self.websocket.send_str(message) async def receive_message(self): msg = await self.websocket.receive() if msg.type == aiohttp.WSMsgType.CLOSED: raise WebsocketClosedException(f"Websocket closed: {msg}") return msg.data async def get_connection_id(self): msg = await self.receive_message() return json.loads(msg)['id'] async def send_ping(self): message = json.dumps( {"id": str(uuid.uuid4()), "version": "1.0.0", "action": "PING", "data": {}} ) await self.send_message(message) async def send_pong(self): connection_id = await self.get_connection_id() message = json.dumps( {"id": connection_id, "origin_action": "PONG"} ) await self.send_message(message) async def get_ip(self): return await (await self.session.get('https://api.ipify.org', proxy=self.proxy)).text() async def close(self): if self.session: await self.session.close() # 主函数 async def main(): account_details = [] with open(ACCOUNTS_DETAILS_FILE_PATH, 'r') as f: account_details = f.readlines() grass_instances = [] tasks = [] for i, account in enumerate(account_details): email, user_id, proxy_web, proxy_api = account.strip().split("|||") grass = Grass(i, email, user_id, proxy_api) grass_instances.append(grass) tasks.append(grass.start()) try: await asyncio.gather(*tasks) except Exception as e: logger.error(f"An error occurred: {e}") finally: # 确保所有会话都被关闭 for grass in grass_instances: await grass.close() if __name__ == "__main__": if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) loop = asyncio.ProactorEventLoop() asyncio.set_event_loop(loop) loop.run_until_complete(main()) else: asyncio.run(main())