# -*- coding: utf-8 -*- # 获取当前使用代理, 并检查延时 from urllib.parse import quote import subprocess import httpx nodes = [ ["192.168.31.201", ['58001', '58002', '58003', '58004', '58005', '58006', '58007', '58008', '58009', '58010'], ], ["192.168.31.201", ['32001', '32002', '32003', '32004', '32005', '32006', '32007', '32008', '32009', '32010', '32011', '32012']], ["127.0.0.1", ['17888']] ] selected_nodes = nodes[0] def check_proxy(proxy_url, choose_proxy): encode_proxy_name = quote(choose_proxy, safe="") command = [ "curl", "-X", "GET", f"{proxy_url}/api/proxies/{encode_proxy_name}/delay?timeout=5000&url=http:%2F%2Fwww.gstatic.com%2Fgenerate_204" ] try: result = subprocess.run(command, capture_output=True, text=True, check=True) # print("Output:", result.stdout) if 'Timeout' in result.stdout: return "Timeout" res = eval(result.stdout).get("meanDelay") return f"meanDelay: {res}" except subprocess.CalledProcessError as e: print("Error:", e.stderr) return str(e) def check_now_proxy(client: httpx.Client, ip, port): url_and_port = f"{ip}:{port}" url = f"http://{ip}:{port}/api/proxies" headers = { "Accept": "application/json, text/plain, */*", "Accept-Encoding": "gzip, deflate, br, zstd", "Accept-Language": "zh-CN,zh;q=0.8", "Connection": "keep-alive", "Host": url_and_port, "Referer": f"http://{url_and_port}/", "Sec-CH-UA": '"Chromium";v="134", "Not:A-Brand";v="24", "Brave";v="134"', "Sec-CH-UA-Mobile": "?0", "Sec-CH-UA-Platform": '"macOS"', "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", "Sec-GPC": "1", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36" } try: response = client.get(url, headers=headers) json_data = response.json() if not json_data or response.status_code != 200: print("JSON data is empty or request failed") return proxies = json_data.get("proxies") proxy_global = proxies.get("GLOBAL") now_proxy = proxy_global.get("now") check_result = check_proxy(url_and_port, now_proxy) message = f"{url_and_port} --- {now_proxy} --- {check_result}" print(message) return now_proxy except httpx.RequestError as e: print(f"Request failed: {e}") return None def main(): ip = selected_nodes[0] now_proxy_list = [] with httpx.Client() as client: for port in selected_nodes[1]: now_proxy = check_now_proxy(client, ip, port) if now_proxy: now_proxy_list.append(now_proxy) # 查看当前代理节点有没有使用重复 if len(now_proxy_list) != len(set(now_proxy_list)): print("当前代理节点有使用重复") return if __name__ == "__main__": main()