diff --git a/wqb-filter/account.txt b/wqb-filter-alpha/account.txt similarity index 100% rename from wqb-filter/account.txt rename to wqb-filter-alpha/account.txt diff --git a/wqb-filter-alpha/filter-alpha.py b/wqb-filter-alpha/filter-alpha.py new file mode 100644 index 0000000..ce85ed5 --- /dev/null +++ b/wqb-filter-alpha/filter-alpha.py @@ -0,0 +1,245 @@ +import httpx +from httpx import BasicAuth +import os + +def login(credentials_file='account.txt'): + """登录WorldQuant Brain API""" + # 读取本地账号密码 + if not os.path.exists(credentials_file): + print("未找到 account.txt 文件") + with open(credentials_file, 'w') as f: f.write("") + print("account.txt 文件已创建,请填写账号密码, 格式: ['username', 'password]") + exit(1) + + with open(credentials_file) as f: + credentials = eval(f.read()) + username, password = credentials[0], credentials[1] + + print(f"正在登录账户: {username}") + + # 创建客户端并认证 + client = httpx.Client(auth=BasicAuth(username, password)) + + # 发送登录请求 + response = client.post('https://api.worldquantbrain.com/authentication') + print(f"登录状态: {response.status_code}") + + if response.status_code == 201: + print("登录成功!") + return client + else: + print(f"登录失败: {response.json()}") + client.close() + return None + +def get_alphas_data(client, page_size=10): + """获取所有alpha数据(自动处理分页)""" + all_alphas = [] + offset = 0 + total_count = 0 + + while True: + print(f"正在获取第 {offset//page_size + 1} 页数据 (offset={offset})...") + + # 构建请求参数 + params = { + 'limit': page_size, + 'offset': offset, + 'status': 'UNSUBMITTED%1FIS_FAIL', + 'is.sharpe%3E1.25': '', + 'order': '-dateSubmitted', + 'hidden': 'false' + } + + url = 'https://api.worldquantbrain.com/users/self/alphas' + + try: + response = client.get(url, params=params) + print(f"获取数据状态码: {response.status_code}") + + if response.status_code != 200: + print(f"获取数据失败: {response.text}") + break + + data = response.json() + + # 第一次请求时获取总数量 + if offset == 0: + total_count = data.get('count', 0) + print(f"总数据量: {total_count}") + if total_count == 0: + print("没有符合条件的Alpha因子") + break + + # 添加当前页的数据 + if 'alphas' in data: + current_page_alphas = data['alphas'] + all_alphas.extend(current_page_alphas) + print(f"当前页获取到 {len(current_page_alphas)} 条alpha数据") + + # 检查是否还有更多数据 + current_count = offset + len(data.get('alphas', [])) + print(f"进度: {current_count}/{total_count}") + + # 如果已经获取了所有数据,则退出循环 + if current_count >= total_count: + print("所有数据已获取完毕") + break + + # 更新offset继续获取下一页 + offset += page_size + + except Exception as e: + print(f"请求发生错误: {e}") + break + + return { + 'count': len(all_alphas), + 'alphas': all_alphas + } + +def extract_alpha_info(alpha_data): + """ + 从alpha数据中提取有用信息 + """ + extracted_data = [] + + # 检查数据结构 + if not alpha_data or 'results' not in alpha_data: + print("无效的数据格式") + return extracted_data + + for alpha in alpha_data['results']: + # 提取基本信息 + info = { + # 基础信息 + 'id': alpha.get('id', 'N/A'), + 'name': alpha.get('name', 'N/A'), + 'author': alpha.get('author', 'N/A'), + 'status': alpha.get('status', 'N/A'), + 'grade': alpha.get('grade', 'N/A'), + 'stage': alpha.get('stage', 'N/A'), + 'date_created': alpha.get('dateCreated', 'N/A'), + 'date_submitted': alpha.get('dateSubmitted', 'N/A'), + + # 因子代码和设置 + 'code': alpha.get('regular', {}).get('code', 'N/A'), + 'operator_count': alpha.get('regular', {}).get('operatorCount', 0), + + # 回测设置 + 'settings': { + 'region': alpha.get('settings', {}).get('region', 'N/A'), + 'universe': alpha.get('settings', {}).get('universe', 'N/A'), + 'neutralization': alpha.get('settings', {}).get('neutralization', 'N/A'), + 'truncation': alpha.get('settings', {}).get('truncation', 'N/A'), + 'start_date': alpha.get('settings', {}).get('startDate', 'N/A'), + 'end_date': alpha.get('settings', {}).get('endDate', 'N/A') + }, + + # 回测表现指标 + 'performance': { + 'sharpe': alpha.get('is', {}).get('sharpe', 0), + 'fitness': alpha.get('is', {}).get('fitness', 0), + 'returns': alpha.get('is', {}).get('returns', 0), + 'turnover': alpha.get('is', {}).get('turnover', 0), + 'drawdown': alpha.get('is', {}).get('drawdown', 0), + 'pnl': alpha.get('is', {}).get('pnl', 0), + 'book_size': alpha.get('is', {}).get('bookSize', 0), + 'long_count': alpha.get('is', {}).get('longCount', 0), + 'short_count': alpha.get('is', {}).get('shortCount', 0), + 'margin': alpha.get('is', {}).get('margin', 0) + }, + + # 检查结果 + 'checks': {} + } + + # 提取检查结果 + checks = alpha.get('is', {}).get('checks', []) + for check in checks: + check_name = check.get('name', 'N/A') + info['checks'][check_name] = { + 'result': check.get('result', 'N/A'), + 'limit': check.get('limit', 'N/A'), + 'value': check.get('value', 'N/A') + } + + # 分类信息 + classifications = alpha.get('classifications', []) + if classifications: + info['classifications'] = [cls.get('name', 'N/A') for cls in classifications] + else: + info['classifications'] = [] + + extracted_data.append(info) + + return extracted_data + +def print_alpha_summary(extracted_data): + """ + 打印提取的alpha信息摘要 + """ + print(f"\n总共提取了 {len(extracted_data)} 个Alpha因子") + print("=" * 100) + + for i, alpha in enumerate(extracted_data, 1): + print(f"\n{i}. Alpha ID: {alpha['id']}") + print(f" 代码: {alpha['code'][:80]}{'...' if len(alpha['code']) > 80 else ''}") + print(f" 状态: {alpha['status']} | 等级: {alpha['grade']} | 阶段: {alpha['stage']}") + print(f" 创建时间: {alpha['date_created']}") + print(f" 设置: {alpha['settings']['region']} | {alpha['settings']['universe']} | " + f"中性化: {alpha['settings']['neutralization']}") + print(f" 表现: Sharpe={alpha['performance']['sharpe']:.2f} | " + f"Fitness={alpha['performance']['fitness']:.2f} | " + f"收益={alpha['performance']['returns']:.2%} | " + f"换手率={alpha['performance']['turnover']:.2f}") + + # 显示关键检查结果 + key_checks = ['LOW_SHARPE', 'LOW_FITNESS', 'LOW_TURNOVER', 'HIGH_TURNOVER'] + check_results = [] + for check_name in key_checks: + if check_name in alpha['checks']: + check = alpha['checks'][check_name] + result_symbol = "✓" if check['result'] == 'PASS' else "✗" + check_results.append(f"{check_name}: {result_symbol}") + + if check_results: + print(f" 检查: {', '.join(check_results)}") + +# 使用示例 +def process_alpha_data(raw_data): + """ + 处理原始alpha数据的完整流程 + """ + # 提取信息 + extracted_data = extract_alpha_info(raw_data) + + # 打印摘要 + print_alpha_summary(extracted_data) + + return extracted_data + +def main(): + # 登录 + client = login() + if not client: + return + + try: + # 一次性获取所有alpha数据 + all_data = get_alphas_data(client, page_size=10) + + print(f"\n最终结果: 总共获取到 {all_data['count']} 条alpha数据") + + if all_data and 'alphas' in all_data: + # 处理提取信息 + processed_data = process_alpha_data({'results': all_data['alphas']}) + + finally: + # 确保客户端被关闭 + client.close() + print("\n连接已关闭") + +# 使用示例 +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/wqb-filter/filter.py b/wqb-filter/filter.py deleted file mode 100644 index 6a8c8f6..0000000 --- a/wqb-filter/filter.py +++ /dev/null @@ -1,99 +0,0 @@ -import httpx -from httpx import BasicAuth -import os - -def login(credentials_file='account.txt'): - """登录WorldQuant Brain API""" - # 读取本地账号密码 - if not os.path.exists(credentials_file): - print("未找到 account.txt 文件") - with open(credentials_file, 'w') as f: f.write("") - print("account.txt 文件已创建,请填写账号密码, 格式: ['username', 'password]") - exit(1) - - with open(credentials_file) as f: - credentials = eval(f.read()) - username, password = credentials[0], credentials[1] - - print(f"正在登录账户: {username}") - - # 创建客户端并认证 - client = httpx.Client(auth=BasicAuth(username, password)) - - # 发送登录请求 - response = client.post('https://api.worldquantbrain.com/authentication') - print(f"登录状态: {response.status_code}") - - if response.status_code == 201: - print("登录成功!") - print(response.json()) - return client - else: - print(f"登录失败: {response.json()}") - client.close() - return None - -def get_alphas_data(client, limit=10, offset=0): - """获取alpha数据""" - # 使用params参数而不是直接拼接URL - params = { - 'limit': limit, - 'offset': offset, - 'status': 'UNSUBMITTED%1FIS_FAIL', # 注意:这里可能需要根据实际API要求调整编码 - 'is.sharpe%3E1.25': '', # 注意:这里可能需要调整参数格式 - 'order': '-dateSubmitted', - 'hidden': 'false' - } - - url = 'https://api.worldquantbrain.com/users/self/alphas' - - try: - response = client.get(url, params=params) - print(f"获取数据状态码: {response.status_code}") - - if response.status_code == 200: - return response.json() - else: - print(f"获取数据失败: {response.text}") - return None - except Exception as e: - print(f"请求发生错误: {e}") - return None - -def main(): - # 登录 - client = login() - if not client: - return - - try: - # 第一次请求获取count - first_data = get_alphas_data(client, limit=10, offset=0) - - if first_data and 'count' in first_data: - count = first_data['count'] - print(f"获取到的count值: {count}") - - if int(count) == 0: - print("没有符合条件的Alpha因子") - return - - # 第二次请求,使用count作为offset - second_data = get_alphas_data(client, limit=count, offset=count) - - if second_data: - print("第二次请求返回的数据:") - print(second_data) - else: - print("第二次请求失败") - else: - print("第一次请求失败或未找到count字段") - - finally: - # 确保客户端被关闭 - client.close() - print("\n连接已关闭") - -# 使用示例 -if __name__ == "__main__": - main() \ No newline at end of file