You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
Web3Tools/porxy_tools/clash/switch_random_proxy_web3.py

164 lines
6.0 KiB

# -*- coding: utf-8 -*-
'''
切换到随机代理
'''
import random
from urllib.parse import quote
import httpx
import time
import subprocess
from typing import List
BASE_URL = "http://192.168.31.194"
PORT_LIST = ['58001', '58002', '58003', '58004', '58005', '58006', '58007', '58008', '58009', '58010']
PROXY_GROUP = []
# PROXY_GROUP = ['香港', 'HK', '新加坡', 'SG', '台湾', 'TW', '日本', 'JP', '韩国', '澳门']
GROUP = 1
class ClashProxyManager:
def __init__(self, base_url, base_port):
self.key_group = 0
self.base_url = base_url
self.base_port = base_port
self.all_proxies = []
self.used_proxy = []
def get_all_proxies(self, clash_tool_url: str) -> List[str]:
# 连接其中一个代理服务器, 获取所有代理节点, 因为每个代理服务器的所有节点都是一样的, 所以获取一个就行了
url = f"{clash_tool_url}/api/proxies"
try:
response = httpx.get(url)
response.raise_for_status()
proxies = response.json()
# 输出读取的所有代理信息
# for proxy_name, proxy_info in proxies['proxies'].items():
# logging.info(f"Name: {proxy_name}, Type: {proxy_info.get('type', 'Unknown')}")
proxy_list = list(proxies['proxies'].keys())
result_data = []
if GROUP == 1:
for filtered in proxy_list:
for group in PROXY_GROUP:
if group.lower() in filtered.lower() and filtered not in {'REJECT', 'GLOBAL', 'DIRECT'}:
result_data.append(filtered)
elif GROUP == 2:
for filtered in proxy_list:
if all(group.lower() not in filtered.lower() for group in PROXY_GROUP) and filtered not in {
'REJECT', 'GLOBAL', 'DIRECT'}:
result_data.append(filtered)
else:
print(f'选择代理组为 {GROUP}')
exit(1)
print(f'读取所有代理节点, 一共 {len(result_data)} 个节点')
return result_data
except Exception as e:
print(f"Failed to get proxies: {e}")
return []
def switch_proxy(self, proxy_name: str, url_and_port: str) -> None:
# 根据节点名称和代理服务器url, 切换到指定的代理节点
url = f"{url_and_port}/api/proxies/GLOBAL"
data = {"name": proxy_name}
try:
response = httpx.put(url, json=data)
if response.status_code == 204:
print(f"Switched to proxy: {proxy_name}")
else:
print(f"Failed to switch proxy: {response.status_code} - {proxy_name}")
except Exception as e:
print(f"Failed to switch proxy: {e}")
def update_configs(self):
for base_port in self.base_port:
url_and_port = self.base_url + ":" + base_port
key = "/api/configs"
url = url_and_port + key
# 构造 curl 命令
curl_cmd = [
"curl",
"-X", "PATCH",
url,
"-H", "Content-Type: application/json",
"-d", '{"mode": "Global"}'
]
try:
# 使用 subprocess 运行 curl 命令
result = subprocess.run(curl_cmd, capture_output=True, text=True, check=True)
# 检查 curl 命令的返回状态
if result.returncode != 0:
print(f"请求失败,状态码: {result.returncode}")
print("错误信息:", result.stderr.strip())
except subprocess.CalledProcessError as exc:
print(f"请求失败: {exc}")
print("错误信息:", exc.stderr.strip())
def check_proxy(self, proxy_url, choose_proxy):
encode_proxy_name = quote(choose_proxy, safe="")
command = [
"curl",
"-X", "GET",
f"http://{proxy_url}/api/proxies/{encode_proxy_name}/delay?timeout=5000&url=http:%2F%2Fwww.gstatic.com%2Fgenerate_204"
]
try:
result = subprocess.run(command, capture_output=True, text=True, check=True)
print("Output:", result.stdout)
if 'Timeout' in result.stdout:
return False
return True
except subprocess.CalledProcessError as e:
print("Error:", e.stderr)
return False
def main(self) -> None:
self.update_configs()
if not self.all_proxies:
for port_list in self.base_port:
base_url = self.base_url + ":" + port_list
clash_tool_url = f"{base_url}"
self.all_proxies = self.get_all_proxies(clash_tool_url)
if self.all_proxies:
break
if not self.all_proxies:
return
else:
print(f'待切换节点一共 {len(self.all_proxies)}')
for base_port in self.base_port:
url_and_port = self.base_url + ":" + base_port
while True:
choose_proxy = random.choice(self.all_proxies)
if choose_proxy in self.used_proxy:
if len(self.used_proxy) >= len(self.all_proxies):
print('=' * 88)
print(f'所有节点已尝试过, 退出')
print('=' * 88)
return
continue
self.switch_proxy(choose_proxy, url_and_port)
self.used_proxy.append(choose_proxy)
if self.check_proxy(url_and_port, choose_proxy):
print(f"代理 {choose_proxy} 切换成功,检测通过!")
print('=' * 88)
break
else:
print(f"{url_and_port} 切换 xxxxxxxx {choose_proxy} xxxxxxxx 检测失败")
time.sleep(1)
if __name__ == "__main__":
manager = ClashProxyManager(BASE_URL, PORT_LIST)
manager.main()