import httpx from httpx import BasicAuth import os def login(credentials_file='account.txt'): """登录WorldQuant Brain API""" # 读取本地账号密码 if not os.path.exists(credentials_file): print("未找到 account.txt 文件") with open(credentials_file, 'w') as f: f.write("") print("account.txt 文件已创建,请填写账号密码, 格式: ['username', 'password]") exit(1) with open(credentials_file) as f: credentials = eval(f.read()) username, password = credentials[0], credentials[1] print(f"正在登录账户: {username}") # 创建客户端并认证 client = httpx.Client(auth=BasicAuth(username, password)) # 发送登录请求 response = client.post('https://api.worldquantbrain.com/authentication') print(f"登录状态: {response.status_code}") if response.status_code == 201: print("登录成功!") return client else: print(f"登录失败: {response.json()}") client.close() return None def get_alphas_data(client, page_size=10): """获取所有alpha数据(自动处理分页)""" all_alphas = [] offset = 0 total_count = 0 while True: print(f"正在获取第 {offset//page_size + 1} 页数据 (offset={offset})...") # 构建请求参数 params = { 'limit': page_size, 'offset': offset, 'status': 'UNSUBMITTED%1FIS_FAIL', 'is.sharpe%3E1.25': '', 'order': '-dateSubmitted', 'hidden': 'false' } url = 'https://api.worldquantbrain.com/users/self/alphas' try: response = client.get(url, params=params) print(f"获取数据状态码: {response.status_code}") if response.status_code != 200: print(f"获取数据失败: {response.text}") break data = response.json() # 第一次请求时获取总数量 if offset == 0: total_count = data.get('count', 0) print(f"总数据量: {total_count}") if total_count == 0: print("没有符合条件的Alpha因子") break # 添加当前页的数据 if 'alphas' in data: current_page_alphas = data['alphas'] all_alphas.extend(current_page_alphas) print(f"当前页获取到 {len(current_page_alphas)} 条alpha数据") # 检查是否还有更多数据 current_count = offset + len(data.get('alphas', [])) print(f"进度: {current_count}/{total_count}") # 如果已经获取了所有数据,则退出循环 if current_count >= total_count: print("所有数据已获取完毕") break # 更新offset继续获取下一页 offset += page_size except Exception as e: print(f"请求发生错误: {e}") break return { 'count': len(all_alphas), 'alphas': all_alphas } def extract_alpha_info(alpha_data): """ 从alpha数据中提取有用信息 """ extracted_data = [] # 检查数据结构 if not alpha_data or 'results' not in alpha_data: print("无效的数据格式") return extracted_data for alpha in alpha_data['results']: # 提取基本信息 info = { # 基础信息 'id': alpha.get('id', 'N/A'), 'name': alpha.get('name', 'N/A'), 'author': alpha.get('author', 'N/A'), 'status': alpha.get('status', 'N/A'), 'grade': alpha.get('grade', 'N/A'), 'stage': alpha.get('stage', 'N/A'), 'date_created': alpha.get('dateCreated', 'N/A'), 'date_submitted': alpha.get('dateSubmitted', 'N/A'), # 因子代码和设置 'code': alpha.get('regular', {}).get('code', 'N/A'), 'operator_count': alpha.get('regular', {}).get('operatorCount', 0), # 回测设置 'settings': { 'region': alpha.get('settings', {}).get('region', 'N/A'), 'universe': alpha.get('settings', {}).get('universe', 'N/A'), 'neutralization': alpha.get('settings', {}).get('neutralization', 'N/A'), 'truncation': alpha.get('settings', {}).get('truncation', 'N/A'), 'start_date': alpha.get('settings', {}).get('startDate', 'N/A'), 'end_date': alpha.get('settings', {}).get('endDate', 'N/A') }, # 回测表现指标 'performance': { 'sharpe': alpha.get('is', {}).get('sharpe', 0), 'fitness': alpha.get('is', {}).get('fitness', 0), 'returns': alpha.get('is', {}).get('returns', 0), 'turnover': alpha.get('is', {}).get('turnover', 0), 'drawdown': alpha.get('is', {}).get('drawdown', 0), 'pnl': alpha.get('is', {}).get('pnl', 0), 'book_size': alpha.get('is', {}).get('bookSize', 0), 'long_count': alpha.get('is', {}).get('longCount', 0), 'short_count': alpha.get('is', {}).get('shortCount', 0), 'margin': alpha.get('is', {}).get('margin', 0) }, # 检查结果 'checks': {} } # 提取检查结果 checks = alpha.get('is', {}).get('checks', []) for check in checks: check_name = check.get('name', 'N/A') info['checks'][check_name] = { 'result': check.get('result', 'N/A'), 'limit': check.get('limit', 'N/A'), 'value': check.get('value', 'N/A') } # 分类信息 classifications = alpha.get('classifications', []) if classifications: info['classifications'] = [cls.get('name', 'N/A') for cls in classifications] else: info['classifications'] = [] extracted_data.append(info) return extracted_data def print_alpha_summary(extracted_data): """ 打印提取的alpha信息摘要 """ print(f"\n总共提取了 {len(extracted_data)} 个Alpha因子") print("=" * 100) for i, alpha in enumerate(extracted_data, 1): print(f"\n{i}. Alpha ID: {alpha['id']}") print(f" 代码: {alpha['code'][:80]}{'...' if len(alpha['code']) > 80 else ''}") print(f" 状态: {alpha['status']} | 等级: {alpha['grade']} | 阶段: {alpha['stage']}") print(f" 创建时间: {alpha['date_created']}") print(f" 设置: {alpha['settings']['region']} | {alpha['settings']['universe']} | " f"中性化: {alpha['settings']['neutralization']}") print(f" 表现: Sharpe={alpha['performance']['sharpe']:.2f} | " f"Fitness={alpha['performance']['fitness']:.2f} | " f"收益={alpha['performance']['returns']:.2%} | " f"换手率={alpha['performance']['turnover']:.2f}") # 显示关键检查结果 key_checks = ['LOW_SHARPE', 'LOW_FITNESS', 'LOW_TURNOVER', 'HIGH_TURNOVER'] check_results = [] for check_name in key_checks: if check_name in alpha['checks']: check = alpha['checks'][check_name] result_symbol = "✓" if check['result'] == 'PASS' else "✗" check_results.append(f"{check_name}: {result_symbol}") if check_results: print(f" 检查: {', '.join(check_results)}") # 使用示例 def process_alpha_data(raw_data): """ 处理原始alpha数据的完整流程 """ # 提取信息 extracted_data = extract_alpha_info(raw_data) # 打印摘要 print_alpha_summary(extracted_data) return extracted_data def main(): # 登录 client = login() if not client: return try: # 一次性获取所有alpha数据 all_data = get_alphas_data(client, page_size=10) print(f"\n最终结果: 总共获取到 {all_data['count']} 条alpha数据") if all_data and 'alphas' in all_data: # 处理提取信息 processed_data = process_alpha_data({'results': all_data['alphas']}) finally: # 确保客户端被关闭 client.close() print("\n连接已关闭") # 使用示例 if __name__ == "__main__": main()