first commit

main
jack 3 months ago
commit f9b23bc448
  1. 67
      .gitignore
  2. 78
      clash_check_now_node.py
  3. 37
      clash_set_global.py

67
.gitignore vendored

@ -0,0 +1,67 @@
.DS_Store
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
.idea/*
xml_files/
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/
other/split_clash_config/split_config
ai_news/save_data
manual/clash/clash_each_node
manual/singbox/singbox_each_node

@ -0,0 +1,78 @@
import asyncio
import httpx
from typing import Optional, Dict, Any, List, Tuple
async def check_now_node(url_and_port: str) -> Optional[str]:
"""检测当前节点并设置全局代理"""
async with httpx.AsyncClient(timeout=10.0) as client:
try:
# 设置全局模式
set_url = f"http://{url_and_port}/api/configs"
set_data = {"mode": "Global"}
set_response = await client.patch(set_url, json=set_data)
set_response.raise_for_status()
# 获取代理信息
get_url = f"http://{url_and_port}/api/proxies"
get_response = await client.get(get_url)
get_response.raise_for_status()
json_data = get_response.json()
proxies: Dict[str, Any] = json_data.get("proxies", {})
proxy_global: Dict[str, Any] = proxies.get("GLOBAL", {})
now_proxy: Optional[str] = proxy_global.get("now")
return now_proxy
except httpx.HTTPError as exc:
print(f"请求失败 {url_and_port}: {exc}")
return None
async def batch_check_nodes(ip: str, ports: List[str]) -> Dict[str, Optional[str]]:
"""批量检测节点"""
tasks = [check_now_node(f"{ip}:{port}") for port in ports]
results = await asyncio.gather(*tasks)
return {
f"{ip}:{port}": result
for port, result in zip(ports, results)
}
def find_duplicate_nodes(results: Dict[str, Optional[str]]) -> List[Tuple[str, str]]:
"""查找重复的节点"""
node_to_urls = {}
for url, node in results.items():
if node: # 只处理成功检测的节点
if node not in node_to_urls:
node_to_urls[node] = []
node_to_urls[node].append(url)
# 找出有重复的节点
duplicates = []
for node, urls in node_to_urls.items():
if len(urls) > 1:
for i in range(len(urls)):
for j in range(i + 1, len(urls)):
duplicates.append((urls[i], urls[j]))
return duplicates
if __name__ == "__main__":
ip = '192.168.31.201'
ports = [f'{58000 + i}' for i in range(1, 11)]
results = asyncio.run(batch_check_nodes(ip, ports))
# 输出所有节点信息
for url, node in results.items():
print(f"{url}: {node or '检测失败'}")
# 检查并输出重复节点
duplicates = find_duplicate_nodes(results)
if duplicates:
print("\n发现重复节点:")
for url1, url2 in duplicates:
print(f"{url1}{url2} 重复")
else:
print("\n没有发现重复节点")

@ -0,0 +1,37 @@
import asyncio
import httpx
async def set_global(url_and_port):
url = f"http://{url_and_port}"
key = "/api/configs"
full_url = url + key
data = {"mode": "Global"}
headers = {"Content-Type": "application/json"}
async with httpx.AsyncClient(timeout=10.0) as client:
try:
response = await client.patch(full_url, json=data, headers=headers)
response.raise_for_status() # 自动抛出4xx/5xx异常
print(f"成功设置 {url_and_port}")
return True
except httpx.HTTPError as exc:
print(f"请求失败 {url_and_port}: {exc}")
return False
async def main():
ip = '192.168.31.201'
port_list = [f'{58000 + i}' for i in range(1, 11)] # 生成端口列表
# 创建任务列表
tasks = [set_global(f"{ip}:{port}") for port in port_list]
# 并发执行所有任务
results = await asyncio.gather(*tasks)
# 统计结果
success_count = sum(results)
print(f"\n完成设置: {success_count}/{len(port_list)} 个代理成功")
if __name__ == "__main__":
asyncio.run(main())
Loading…
Cancel
Save