You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
alpha_tools/fetch-alphas/wqb-get-alphas-one-week.py

330 lines
12 KiB

import httpx
from httpx import BasicAuth, Timeout
import pandas as pd
from tqdm import tqdm
from pathlib import Path
import logging
from typing import List, Optional
import time
from datetime import datetime, timedelta
class AlphaManager:
def __init__(self, credentials_file='account.txt', base_path=None):
"""
初始化Alpha管理器
Args:
credentials_file: 账号文件路径(备用)
base_path: 输出文件保存路径
"""
self.client = None
self.base_path = Path(base_path) if base_path else Path.cwd()
self.logger = self._setup_logger()
# 登录
self.login(credentials_file)
def _setup_logger(self):
"""设置日志记录器"""
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def login(self, credentials_file='account.txt'):
"""登录WorldQuant Brain API"""
try:
# 从nacos获取账号密码
with httpx.Client(timeout=10.0) as temp_client:
nacos_resp = temp_client.get(
'http://192.168.31.41:30848/nacos/v1/cs/configs?dataId=wq_account&group=quantify'
)
if nacos_resp.status_code != 200:
self.logger.error('获取账号密码失败')
return False
config = nacos_resp.json()
username = config.get('user_name')
password = config.get('password')
if not username or not password:
self.logger.error('账号密码不完整')
return False
self.logger.info(f"正在登录账户: {username}")
# 创建客户端并设置超时(关键修复)
# 设置更长的超时时间:连接30秒,读取60秒
timeout = Timeout(connect=30.0, read=60.0, write=30.0, pool=30.0)
self.client = httpx.Client(
auth=BasicAuth(username, password),
timeout=timeout
)
# 发送登录请求
response = self.client.post('https://api.worldquantbrain.com/authentication')
if response.status_code == 201:
self.logger.info("登录成功!")
return True
else:
self.logger.error(f"登录失败: {response.status_code} - {response.text}")
self.client.close()
self.client = None
return False
except Exception as e:
self.logger.error(f"登录异常: {e}")
return False
def update_alpha_color(self, alpha_id: str, color: str) -> bool:
"""标记Alpha颜色"""
if not self.client:
self.logger.error("客户端未登录")
return False
try:
update_data = {"color": color}
response = self.client.patch(
f"https://api.worldquantbrain.com/alphas/{alpha_id}",
json=update_data
)
return response.status_code == 200
except Exception as e:
self.logger.error(f"标记颜色失败: {e}")
return False
def _make_request_with_retry(self, url: str, max_retries: int = 3, retry_delay: float = 2.0):
"""
带重试机制的请求
Args:
url: 请求URL
max_retries: 最大重试次数
retry_delay: 重试延迟(秒)
Returns:
httpx.Response 或 None
"""
for attempt in range(max_retries):
try:
response = self.client.get(url)
return response
except Exception as e:
self.logger.warning(f"请求失败 (尝试 {attempt+1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
else:
self.logger.error(f"请求最终失败: {url}")
raise
return None
def get_alphas(
self,
total_alphas: Optional[int] = None,
limit: int = 100,
delay: int = 1,
days_back: int = 7,
required_fields: Optional[List[str]] = None,
match_mode: str = "all",
min_sharpe: Optional[float] = None,
min_fitness: Optional[float] = None,
hidden: str = "false",
submittable: bool = False,
auto_color: bool = False,
color: str = "GREEN",
output_file_name: str = "alpha_search_list.csv",
mode: str = "w",
max_retries: int = 3,
):
"""
搜索Alpha并筛选(按时间范围,默认最近一周)
Args:
total_alphas: 最多获取多少个Alpha
limit: 每次API请求获取的数量
delay: API请求的延迟设置
days_back: 查询多少天前的数据(默认7天,即一周)
required_fields: 关键词列表,如 ['put', 'call']
match_mode: 关键词匹配模式:"all"(全匹配) 或 "any"(任一匹配)
min_sharpe: 最小夏普比率阈值
min_fitness: 最小适应度阈值
hidden: 是否搜索已隐藏的Alpha ("true"/"false")
submittable: 是否只筛选可提交(无FAIL检查项)的Alpha
auto_color: 是否自动给符合条件的Alpha标记颜色
color: 标记的颜色
output_file_name: 输出的CSV文件名
mode: 写入模式:"w"(覆盖写入) 或 "a"(追加写入)
max_retries: 请求失败时的最大重试次数
Returns:
list: 符合条件的Alpha列表
"""
if not self.client:
self.logger.error("客户端未登录,无法执行搜索")
return []
# 验证颜色参数
valid_colors = [None, "GREEN", "YELLOW", "RED", "BLUE", "PURPLE", "ORANGE"]
if color not in valid_colors:
raise ValueError(f"颜色必须是以下之一: {valid_colors}")
# 计算时间范围(美国东部时间 UTC-5,夏令时 UTC-4)
now = datetime.now()
end_date = now
start_date = now - timedelta(days=days_back)
# 格式化为 ISO 8601 格式(带时区偏移)
# 使用 UTC-4(夏令时),如果是标准时间则改为 -05:00
start_date_str = start_date.strftime("%Y-%m-%dT%H:%M:%S-04:00")
end_date_str = end_date.strftime("%Y-%m-%dT%H:%M:%S-04:00")
self.logger.info(f"查询时间范围: {start_date_str}{end_date_str}")
fetched_alphas = []
offset = 0
total_accessed = 0
colored_count = 0
# 先获取总数(带重试)
count_url = (
f"https://api.worldquantbrain.com/users/self/alphas?stage=IS&hidden={hidden}"
f"&limit=1&status=UNSUBMITTED%1FIS_FAIL"
f"&dateCreated%3E={start_date_str}&dateCreated%3C={end_date_str}"
)
total_available = 0
for attempt in range(max_retries):
try:
count_response = self.client.get(count_url)
total_available = count_response.json()["count"]
break
except Exception as e:
self.logger.warning(f"获取Alpha总数失败 (尝试 {attempt+1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(2.0)
else:
self.logger.error("获取Alpha总数最终失败")
return []
if total_available == 0:
self.logger.warning("未找到任何Alpha")
return []
self.logger.info(f"共找到 {total_available} 个Alpha,开始筛选...")
pbar = tqdm(total=min(total_available, 10000), desc="扫描Alpha", unit="")
while (total_alphas is None or len(fetched_alphas) < total_alphas) and offset < total_available:
# 构建请求URL(使用时间范围,移除 region 和 universe)
url = (
f"https://api.worldquantbrain.com/users/self/alphas?stage=IS&limit={limit}"
f"&offset={offset}&hidden={hidden}&status=UNSUBMITTED%1FIS_FAIL"
f"&dateCreated%3E={start_date_str}&dateCreated%3C={end_date_str}&order=dateCreated"
)
try:
# 使用重试机制
response = None
for attempt in range(max_retries):
try:
response = self.client.get(url)
break
except Exception as e:
self.logger.warning(f"请求失败 (尝试 {attempt+1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(2.0)
else:
raise
if response.status_code == 400:
self.logger.warning(f"遇到API限制 (offset={offset}),停止获取更多数据")
break
response_data = response.json()
if not isinstance(response_data, dict) or "results" not in response_data:
self.logger.error(f"API返回了意外的数据: {response_data}")
break
alphas = response_data["results"]
if not alphas:
break
total_accessed += len(alphas)
# 直接保存所有获取到的Alpha,不做任何筛选
fetched_alphas.extend(alphas)
# 更新进度条
pbar.update(len(alphas))
pbar.set_postfix({
"时间范围": f"{days_back}",
"已扫描": total_accessed,
"已获取": len(fetched_alphas),
})
if len(alphas) < limit:
break
offset += limit
# 添加延迟避免请求过快
time.sleep(0.5)
except Exception as e:
self.logger.error(f"请求失败: {e}")
break
pbar.close()
# 获取所有数据,不限制数量
alpha_list = fetched_alphas
if not alpha_list:
self.logger.warning("未获取到任何Alpha数据!")
else:
df = pd.DataFrame(alpha_list)
output_path = self.base_path / output_file_name
if mode == "w":
df.to_csv(output_path, index=False)
elif mode == "a":
df.to_csv(output_path, mode="a", index=False, header=False)
self.logger.info(f"一周内Alpha数据下载完成!共{len(alpha_list)}条记录!\n保存至: {output_path}")
return alpha_list
def close(self):
"""关闭客户端"""
if self.client:
self.client.close()
# 使用示例
if __name__ == "__main__":
# 创建管理器实例(会自动登录)
manager = AlphaManager()
if manager.client:
# 获取最近一周内的所有Alpha数据(不做任何筛选)
results = manager.get_alphas(
days_back=7, # 查询最近7天(一周)的数据
output_file_name="alpha_one_week_all.csv"
)
print(f"共获取 {len(results) if results else 0} 条Alpha数据")
# 关闭客户端
manager.close()