first commit

main
jack 4 weeks ago
commit 62948d9d81
  1. 41
      .gitignore
  2. 142
      README.md
  3. 108
      alpha_strategy.py
  4. 398
      brain_batch_alpha.py
  5. 54
      build.py
  6. 52
      build_windows.py
  7. 88
      create_zipapp.py
  8. 110
      dataset_config.py
  9. BIN
      icon.ico
  10. 69
      mac/build_mac.py
  11. 48
      mac/create_icns.py
  12. BIN
      mac/icon.icns
  13. BIN
      mac/icon.ico
  14. BIN
      mac/icon.iconset/icon_1024x1024.png
  15. BIN
      mac/icon.iconset/icon_128x128.png
  16. BIN
      mac/icon.iconset/icon_128x128@2x.png
  17. BIN
      mac/icon.iconset/icon_16x16.png
  18. BIN
      mac/icon.iconset/icon_16x16@2x.png
  19. BIN
      mac/icon.iconset/icon_256x256.png
  20. BIN
      mac/icon.iconset/icon_256x256@2x.png
  21. BIN
      mac/icon.iconset/icon_32x32.png
  22. BIN
      mac/icon.iconset/icon_32x32@2x.png
  23. BIN
      mac/icon.iconset/icon_512x512.png
  24. BIN
      mac/icon.iconset/icon_512x512@2x.png
  25. BIN
      mac/icon.iconset/icon_64x64.png
  26. BIN
      mac/icon.iconset/icon_64x64@2x.png
  27. BIN
      mac/icon.png
  28. 98
      main.py
  29. 4
      requirements.txt
  30. 29
      setup.py
  31. BIN
      tor_alpha_browser_logo_icon_152957.ico

41
.gitignore vendored

@ -0,0 +1,41 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
*.manifest
*.spec
WorldQuant_Brain_Alpha_Generator.spec
# Credentials
brain_credentials.txt
# Generated files
alpha_ids.txt
# Mac
.DS_Store
mac/dist/
mac/build/
# Windows
dist/
build/

@ -0,0 +1,142 @@
# 🚀 WorldQuant Brain Alpha Generator
<div align="center">
![GitHub stars](https://img.shields.io/github/stars/YHYYDS666/WorldQuant-Brain-Alpha?style=social)
![GitHub forks](https://img.shields.io/github/forks/YHYYDS666/WorldQuant-Brain-Alpha?style=social)
![GitHub watchers](https://img.shields.io/github/watchers/YHYYDS666/WorldQuant-Brain-Alpha?style=social)
```txt
____ _____ _____ ____ _ _ _____
| _ \ |_ _| | ___| / ___| | | | | |_ _|
| |_) | | | | |_ | | _ | |_| | | |
| _ < | | | _| | |_| | | _ | | |
|_| \_\ |_| |_| \____| |_| |_| |_|
```
</div>
## 📖 项目介绍
这是一个用于自动生成和提交 WorldQuant Brain Alpha 表达式的工具。它可以帮助用户自动化测试和提交 Alpha 策略。
## 🗂 目录结构
```txt
WorldQuant-Brain-Alpha/
├── 📜 main.py # 主程序入口
├── 🧠 brain_batch_alpha.py # 核心处理模块
├── 📊 alpha_strategy.py # 策略生成模块
├── ⚙ dataset_config.py # 数据集配置
├── 📋 requirements.txt # 依赖列表
├── 🔨 build.py # 通用构建脚本
├── 🪟 build_windows.py # Windows构建脚本
├── 📦 setup.py # 打包配置
├── 🗜 create_zipapp.py # ZIP打包脚本
└── 🍎 mac/ # Mac相关文件
├── build_mac.py # Mac构建脚本
├── create_icns.py # 图标生成
└── icon.png # 图标源文件
```
## ✨ 功能特点
- 🤖 自动生成 Alpha 策略
- 📈 自动测试性能指标
- 🚀 自动提交合格策略
- 💾 保存策略 ID
- 🔄 支持多种运行模式
## 🛠 安装方法
上传文件出问题了,所有就分开放了两个版本。之后会合并成一个版本。
### Windows 用户
```bash
# 下载发布版本
✨ 从 Releases选择Alpha_Tool_v1.0版本 下载 Alpha_.zip
# 从源码构建
🔨 pip install -r requirements.txt
🚀 python build_windows.py
```
### Mac 用户
```bash
# 下载发布版本
✨ 从 Releases选择最新版 下载 Alpha_Tool_Mac.zip
# 解压文件
unzip Alpha_Tool_Mac.zip
# 进入解压目录
cd Alpha_Tool_Mac
# 添加执行权限
chmod +x Alpha_Tool
# 运行程序
./Alpha_Tool
# 从源码构建
🔨 pip install -r requirements.txt
🚀 cd mac && python build_mac.py
```
## 📊 数据集支持
| 数据集 | 描述 | 股票范围 |
|--------|------|----------|
| 📈 fundamental6 | 基础财务数据 | TOP3000 |
| 📊 analyst4 | 分析师预测 | TOP1000 |
| 📉 pv1 | 成交量数据 | TOP1000 |
## 👍 性能要求
```txt
___________
| METRICS |
|-----------|
| ✓ Sharpe | > 1.5
| ✓ Fitness | > 1.0
| ✓ Turnover| 0.1-0.9
| ✓ IC Mean | > 0.02
|___________|
```
## 🎯 使用流程
1. 📝 配置账号信息
2. 🎲 选择数据集
3. 🔄 选择运行模式
4. 📊 等待结果生成
5. 🚀 自动提交策略
## 🤝 贡献指南
欢迎提交 Issue 和 Pull Request!
## 📄 许可证
MIT License
## 👨💻 联系方式
- 📧 Email: <666@woaiys.filegear-sg.me>
- 🌟 GitHub: [YHYYDS666](https://github.com/YHYYDS666)
---
⭐ 如果这个项目帮助到你,请给一个 star! ⭐
## Star History
<a href="https://star-history.com/#WorldQuant-Brain-AlphaP/WorldQuant-Brain-AlphaP&Date">
<picture>
<source media="(prefers-color-scheme: dark)" srcset="https://api.star-history.com/svg?repos=WorldQuant-Brain-AlphaP/WorldQuant-Brain-AlphaP&type=Date&theme=dark" />
<source media="(prefers-color-scheme: light)" srcset="https://api.star-history.com/svg?repos=WorldQuant-Brain-AlphaP/WorldQuant-Brain-AlphaP&type=Date" />
<img alt="Star History Chart" src="https://api.star-history.com/svg?repos=WorldQuant-Brain-AlphaP/WorldQuant-Brain-AlphaP&type=Date" />
</picture>
</a>

@ -0,0 +1,108 @@
"""Alpha 策略生成模块"""
class AlphaStrategy:
def get_simulation_data(self, datafields, mode=1):
"""根据模式生成策略列表"""
if mode == 1:
return self.generate_basic_strategy(datafields)
elif mode == 2:
return self.generate_multi_factor_strategy(datafields)
else:
print("❌ 无效的策略模式")
return []
def generate_basic_strategy(self, datafields):
"""生成基础策略"""
strategies = []
for field in datafields:
# 1. 日内策略
strategies.extend([
# 日内收益率
"group_rank((close - open)/open, subindustry)",
# 隔夜收益率
"group_rank((open - delay(close, 1))/delay(close, 1), subindustry)",
# 高低价差异
"group_rank((high - low)/open, subindustry)"
])
# 2. 波动率策略
strategies.extend([
# 波动率偏度
f"power(ts_std_dev(abs({field}), 30), 2) - power(ts_std_dev({field}, 30), 2)",
# 波动率动态调整
f"group_rank(std({field}, 20)/mean({field}, 20) * (1/cap), subindustry)"
])
# 3. 成交量策略
if field in ['volume', 'turnover']:
strategies.extend([
# 成交量异常
"group_rank((volume/sharesout - mean(volume/sharesout, 20))/std(volume/sharesout, 20), subindustry)",
# 成交量趋势
"ts_corr(volume/sharesout, abs(returns), 10)"
])
# 4. 市场微观结构
strategies.extend([
# 小单买卖压力
f"group_neutralize(power(rank({field} - group_mean({field}, 1, subindustry)), 3), bucket(rank(cap), range='0,1,0.1'))",
# 流动性压力
f"group_rank(correlation({field}, volume/sharesout, 20), subindustry)"
])
# 5. 条件触发策略
strategies.extend([
# 条件触发
f"trade_when(ts_rank(ts_std_dev(returns, 10), 252) < 0.9, {field}, -1)",
# 市场状态过滤
f"trade_when(volume > mean(volume, 20), {field}, -1)"
])
return strategies
def generate_multi_factor_strategy(self, datafields):
"""生成多因子组合策略"""
strategies = []
n = len(datafields)
for i in range(0, n-1, 2):
field1 = datafields[i]
field2 = datafields[i+1]
# 1. 回归中性化
strategies.extend([
f"regression_neut(vector_neut({field1}, {field2}), abs(ts_mean(returns, 252)/ts_std_dev(returns, 252)))",
# 多重回归
f"regression_neut(regression_neut({field1}, {field2}), ts_std_dev(returns, 30))"
])
# 2. 条件组合
strategies.extend([
# 条件选择
f"if_else(rank({field1}) > 0.5, {field2}, -1 * {field2})",
# 分组组合
f"group_neutralize({field1} * {field2}, bucket(rank(cap), range='0.1,1,0.1'))"
])
# 3. 复杂信号
strategies.extend([
# 信号强度
f"power(rank(group_neutralize(-ts_decay_exp_window(ts_sum(if_else({field1}-group_mean({field1},1,industry)-0.02>0,1,0)*ts_corr({field2},cap,5),3),50),industry)),2)",
# 市场状态
f"trade_when(ts_rank(ts_std_dev(returns,10),252)<0.9, {field1} * {field2}, -1)"
])
return strategies

@ -0,0 +1,398 @@
"""WorldQuant Brain API 批量处理模块"""
import json
import os
from datetime import datetime
from os.path import expanduser
from time import sleep
import pandas as pd
import requests
from requests.auth import HTTPBasicAuth
from alpha_strategy import AlphaStrategy
from dataset_config import get_api_settings, get_dataset_config
class BrainBatchAlpha:
API_BASE_URL = 'https://api.worldquantbrain.com'
def __init__(self, credentials_file='brain_credentials.txt'):
"""初始化 API 客户端"""
self.session = requests.Session()
self._setup_authentication(credentials_file)
def _setup_authentication(self, credentials_file):
"""设置认证"""
try:
with open(expanduser(credentials_file)) as f:
credentials = json.load(f)
username, password = credentials
self.session.auth = HTTPBasicAuth(username, password)
response = self.session.post(f"{self.API_BASE_URL}/authentication")
if response.status_code not in [200, 201]:
raise Exception(f"认证失败: HTTP {response.status_code}")
print("✅ 认证成功!")
except Exception as e:
print(f"❌ 认证错误: {str(e)}")
raise
def simulate_alphas(self, datafields=None, strategy_mode=1, dataset_name=None):
"""模拟 Alpha 列表"""
try:
datafields = self._get_datafields_if_none(datafields, dataset_name)
if not datafields:
return []
alpha_list = self._generate_alpha_list(datafields, strategy_mode)
if not alpha_list:
return []
print(f"\n🚀 开始模拟 {len(alpha_list)} 个 Alpha 表达式...")
results = []
for i, alpha in enumerate(alpha_list, 1):
print(f"\n[{i}/{len(alpha_list)}] 正在模拟 Alpha...")
result = self._simulate_single_alpha(alpha)
if result and result.get('passed_all_checks'):
results.append(result)
self._save_alpha_id(result['alpha_id'], result)
if i < len(alpha_list):
sleep(5)
return results
except Exception as e:
print(f"❌ 模拟过程出错: {str(e)}")
return []
def _simulate_single_alpha(self, alpha):
"""模拟单个 Alpha"""
try:
print(f"表达式: {alpha.get('regular', 'Unknown')}")
# 发送模拟请求
sim_resp = self.session.post(
f"{self.API_BASE_URL}/simulations",
json=alpha
)
if sim_resp.status_code != 201:
print(f"❌ 模拟请求失败 (状态码: {sim_resp.status_code})")
return None
try:
sim_progress_url = sim_resp.headers['Location']
start_time = datetime.now()
total_wait = 0
while True:
sim_progress_resp = self.session.get(sim_progress_url)
retry_after_sec = float(sim_progress_resp.headers.get("Retry-After", 0))
if retry_after_sec == 0: # simulation done!
alpha_id = sim_progress_resp.json()['alpha']
print(f"✅ 获得 Alpha ID: {alpha_id}")
# 等待一下让指标计算完成
sleep(3)
# 获取 Alpha 详情
alpha_url = f"{self.API_BASE_URL}/alphas/{alpha_id}"
alpha_detail = self.session.get(alpha_url)
alpha_data = alpha_detail.json()
# 检查是否有 is 字段
if 'is' not in alpha_data:
print("❌ 无法获取指标数据")
return None
is_qualified = self.check_alpha_qualification(alpha_data)
return {
'expression': alpha.get('regular'),
'alpha_id': alpha_id,
'passed_all_checks': is_qualified,
'metrics': alpha_data.get('is', {}),
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
# 更新等待时间和进度
total_wait += retry_after_sec
elapsed = (datetime.now() - start_time).total_seconds()
progress = min(95, (elapsed / 30) * 100) # 假设通常需要 30 秒完成
print(f"⏳ 等待模拟结果... ({elapsed:.1f} 秒 | 进度约 {progress:.0f}%)")
sleep(retry_after_sec)
except KeyError:
print("❌ 无法获取模拟进度 URL")
return None
except Exception as e:
print(f" Alpha 模拟失败: {str(e)}")
return None
def check_alpha_qualification(self, alpha_data):
"""检查 Alpha 是否满足所有提交条件"""
try:
# 从 'is' 字段获取指标
is_data = alpha_data.get('is', {})
if not is_data:
print("❌ 无法获取指标数据")
return False
# 获取指标值
sharpe = float(is_data.get('sharpe', 0))
fitness = float(is_data.get('fitness', 0))
turnover = float(is_data.get('turnover', 0))
ic_mean = float(is_data.get('margin', 0)) # margin 对应 IC Mean
# 获取子宇宙 Sharpe
sub_universe_check = next(
(
check for check in is_data.get('checks', [])
if check['name'] == 'LOW_SUB_UNIVERSE_SHARPE'
),
{}
)
subuniverse_sharpe = float(sub_universe_check.get('value', 0))
required_subuniverse_sharpe = float(sub_universe_check.get('limit', 0))
# 打印指标
print("\n📊 Alpha 指标详情:")
print(f" Sharpe: {sharpe:.3f} (>1.5)")
print(f" Fitness: {fitness:.3f} (>1.0)")
print(f" Turnover: {turnover:.3f} (0.1-0.9)")
print(f" IC Mean: {ic_mean:.3f} (>0.02)")
print(f" 子宇宙 Sharpe: {subuniverse_sharpe:.3f} (>{required_subuniverse_sharpe:.3f})")
print("\n📝 指标评估结果:")
# 检查每个指标并输出结果
is_qualified = True
if sharpe < 1.5:
print("❌ Sharpe ratio 不达标")
is_qualified = False
else:
print("✅ Sharpe ratio 达标")
if fitness < 1.0:
print("❌ Fitness 不达标")
is_qualified = False
else:
print("✅ Fitness 达标")
if turnover < 0.1 or turnover > 0.9:
print("❌ Turnover 不在合理范围")
is_qualified = False
else:
print("✅ Turnover 达标")
if ic_mean < 0.02:
print("❌ IC Mean 不达标")
is_qualified = False
else:
print("✅ IC Mean 达标")
if subuniverse_sharpe < required_subuniverse_sharpe:
print(f"❌ 子宇宙 Sharpe 不达标 ({subuniverse_sharpe:.3f} < {required_subuniverse_sharpe:.3f})")
is_qualified = False
else:
print(f"✅ 子宇宙 Sharpe 达标 ({subuniverse_sharpe:.3f} > {required_subuniverse_sharpe:.3f})")
print("\n🔍 检查项结果:")
checks = is_data.get('checks', [])
for check in checks:
name = check.get('name')
result = check.get('result')
value = check.get('value', 'N/A')
limit = check.get('limit', 'N/A')
if result == 'PASS':
print(f"{name}: {value} (限制: {limit})")
elif result == 'FAIL':
print(f"{name}: {value} (限制: {limit})")
is_qualified = False
elif result == 'PENDING':
print(f" {name}: 检查尚未完成")
is_qualified = False
print("\n📋 最终评判:")
if is_qualified:
print("✅ Alpha 满足所有条件,可以提交!")
else:
print("❌ Alpha 未达到提交标准")
return is_qualified
except Exception as e:
print(f"❌ 检查 Alpha 资格时出错: {str(e)}")
return False
def submit_alpha(self, alpha_id):
"""提交单个 Alpha"""
submit_url = f"{self.API_BASE_URL}/alphas/{alpha_id}/submit"
for attempt in range(5):
print(f"🔄 第 {attempt + 1} 次尝试提交 Alpha {alpha_id}")
# POST 请求
res = self.session.post(submit_url)
if res.status_code == 201:
print("✅ POST: 成功,等待提交完成...")
elif res.status_code in [400, 403]:
print(f"❌ 提交被拒绝 ({res.status_code})")
return False
else:
sleep(3)
continue
# 检查提交状态
while True:
res = self.session.get(submit_url)
retry = float(res.headers.get('Retry-After', 0))
if retry == 0:
if res.status_code == 200:
print("✅ 提交成功!")
return True
return False
sleep(retry)
return False
def submit_multiple_alphas(self, alpha_ids):
"""批量提交 Alpha"""
successful = []
failed = []
for alpha_id in alpha_ids:
if self.submit_alpha(alpha_id):
successful.append(alpha_id)
else:
failed.append(alpha_id)
if alpha_id != alpha_ids[-1]:
sleep(10)
return successful, failed
def _get_datafields_if_none(self, datafields=None, dataset_name=None):
"""获取数据字段列表"""
try:
if datafields is not None:
return datafields
if dataset_name is None:
print("❌ 未指定数据集")
return None
config = get_dataset_config(dataset_name)
if not config:
print(f"❌ 无效的数据集: {dataset_name}")
return None
# 获取数据字段
search_scope = {
'instrumentType': 'EQUITY',
'region': 'USA',
'delay': '1',
'universe': config['universe']
}
url_template = (
f"{self.API_BASE_URL}/data-fields?"
f"instrumentType={search_scope['instrumentType']}"
f"&region={search_scope['region']}"
f"&delay={search_scope['delay']}"
f"&universe={search_scope['universe']}"
f"&dataset.id={config['id']}"
"&limit=50&offset={offset}"
)
# 获取总数
initial_resp = self.session.get(url_template.format(offset=0))
if initial_resp.status_code != 200:
print("❌ 获取数据字段失败")
return None
total_count = initial_resp.json()['count']
# 获取所有数据字段
all_fields = []
for offset in range(0, total_count, 50):
resp = self.session.get(url_template.format(offset=offset))
if resp.status_code != 200:
continue
all_fields.extend(resp.json()['results'])
# 过滤矩阵类型字段
matrix_fields = [
field['id'] for field in all_fields
if field.get('type') == 'MATRIX'
]
if not matrix_fields:
print("❌ 未找到可用的数据字段")
return None
print(f"✅ 获取到 {len(matrix_fields)} 个数据字段")
return matrix_fields
except Exception as e:
print(f"❌ 获取数据字段时出错: {str(e)}")
return None
def _generate_alpha_list(self, datafields, strategy_mode):
"""生成 Alpha 表达式列表"""
try:
# 初始化策略生成器
strategy_generator = AlphaStrategy()
# 生成策略列表
strategies = strategy_generator.get_simulation_data(datafields, strategy_mode)
print(f"生成了 {len(strategies)} 个Alpha表达式")
# 转换为 API 所需的格式
alpha_list = []
for strategy in strategies:
simulation_data = {
'type': 'REGULAR',
'settings': {
'instrumentType': 'EQUITY',
'region': 'USA',
'universe': 'TOP3000',
'delay': 1,
'decay': 0,
'neutralization': 'SUBINDUSTRY',
'truncation': 0.08,
'pasteurization': 'ON',
'unitHandling': 'VERIFY',
'nanHandling': 'ON',
'language': 'FASTEXPR',
'visualization': False,
},
'regular': strategy
}
alpha_list.append(simulation_data)
return alpha_list
except Exception as e:
print(f"❌ 生成 Alpha 列表失败: {str(e)}")
return []

@ -0,0 +1,54 @@
import PyInstaller.__main__
import os
import sys
import shutil
# 确保目录存在
if not os.path.exists('dist'):
os.makedirs('dist')
# 设置命令行参数
args = [
'main.py', # 主程序入口
'--name=Alpha_工具', # 可执行文件名
'--onefile', # 打包成单个文件
'--console', # 使用控制台窗口(改为控制台模式)
'--add-data=dataset_config.py{0}.'.format(os.pathsep), # 添加配置文件
'--add-data=alpha_strategy.py{0}.'.format(os.pathsep), # 添加策略文件
'--add-data=brain_batch_alpha.py{0}.'.format(os.pathsep), # 添加核心处理文件
'--clean', # 清理临时文件
'--noconfirm', # 不确认覆盖
]
# 如果有图标文件,添加图标
if os.path.exists('icon.ico'):
args.append('--icon=icon.ico')
# 运行打包命令
PyInstaller.__main__.run(args)
# 打包完成后,复制或创建配置文件到dist目录
print("\n正在处理配置文件...")
try:
# 处理认证文件
if os.path.exists('brain_credentials.txt'):
shutil.copy2('brain_credentials.txt', 'dist/')
print("✅ brain_credentials.txt 复制成功")
else:
# 创建示例认证文件
with open('dist/brain_credentials.txt', 'w') as f:
f.write('["your_email@example.com","your_password"]')
print("✅ 创建了示例 brain_credentials.txt")
# 处理Alpha ID文件
if os.path.exists('alpha_ids.txt'):
shutil.copy2('alpha_ids.txt', 'dist/')
print("✅ alpha_ids.txt 复制成功")
else:
# 创建空的alpha_ids.txt
with open('dist/alpha_ids.txt', 'w') as f:
...
print("✅ 创建了空的 alpha_ids.txt")
except Exception as e:
print(f"❌ 处理配置文件时出错: {str(e)}")

@ -0,0 +1,52 @@
import PyInstaller.__main__
import os
import sys
import shutil
# 确保目录存在
if not os.path.exists('dist'):
os.makedirs('dist')
# 设置命令行参数
args = [
'main.py', # 主程序入口
'--name=Alpha_工具', # 可执行文件名
'--onefile', # 打包成单个文件
'--console', # 使用控制台窗口
'--add-data=dataset_config.py{0}.'.format(os.pathsep), # 添加配置文件
'--add-data=alpha_strategy.py{0}.'.format(os.pathsep), # 添加策略文件
'--add-data=brain_batch_alpha.py{0}.'.format(os.pathsep), # 添加核心处理文件
'--clean', # 清理临时文件
'--noconfirm', # 不确认覆盖
]
# 如果有图标文件,添加图标
if os.path.exists('icon.ico'):
args.append('--icon=icon.ico')
# 运行打包命令
PyInstaller.__main__.run(args)
# 打包完成后,复制或创建配置文件到dist目录
print("\n正在处理配置文件...")
try:
# 处理认证文件
if os.path.exists('brain_credentials.txt'):
shutil.copy2('brain_credentials.txt', 'dist/')
print("✅ brain_credentials.txt 复制成功")
else:
with open('dist/brain_credentials.txt', 'w') as f:
f.write('["your_email@example.com","your_password"]')
print("✅ 创建了示例 brain_credentials.txt")
# 处理Alpha ID文件
if os.path.exists('alpha_ids.txt'):
shutil.copy2('alpha_ids.txt', 'dist/')
print("✅ alpha_ids.txt 复制成功")
else:
with open('dist/alpha_ids.txt', 'w') as f:
...
print("✅ 创建了空的 alpha_ids.txt")
except Exception as e:
print(f"❌ 处理配置文件时出错: {str(e)}")

@ -0,0 +1,88 @@
import zipapp
import os
import shutil
import subprocess
import sys
def create_zipapp():
# 创建临时目录
build_dir = "build"
if os.path.exists(build_dir):
shutil.rmtree(build_dir)
os.makedirs(build_dir)
# 复制源文件
source_files = [
"main.py",
"brain_batch_alpha.py",
"alpha_strategy.py",
"dataset_config.py",
]
for file in source_files:
shutil.copy2(file, build_dir)
# 复制配置文件
config_files = [
"brain_credentials.txt",
"alpha_ids.txt",
]
for file in config_files:
if os.path.exists(file):
shutil.copy2(file, build_dir)
else:
print(f"Warning: {file} not found, will be created on first run")
# 创建 requirements.txt
with open(os.path.join(build_dir, "requirements.txt"), "w") as f:
f.write("requests>=2.31.0\npandas>=2.0.0\n")
# 创建 __main__.py
with open(os.path.join(build_dir, "__main__.py"), "w") as f:
f.write("""
import sys
import os
def install_deps():
import subprocess
import pkg_resources
required = {'requests>=2.31.0', 'pandas>=2.0.0'}
installed = {f"{pkg.key}=={pkg.version}" for pkg in pkg_resources.working_set}
missing = required - installed
if missing:
subprocess.check_call([sys.executable, '-m', 'pip', 'install', *missing])
if __name__ == '__main__':
# 安装依赖
install_deps()
# 导入主程序
from main import main
main()
""")
# 创建可执行文件
output = "Alpha_Tool.pyz"
if os.path.exists(output):
os.remove(output)
zipapp.create_archive(
build_dir,
output,
main="__main__:main",
compressed=True
)
print(f"\n✅ 成功创建 {output}")
print("使用方法:")
print(f"python {output}")
if __name__ == "__main__":
create_zipapp()

@ -0,0 +1,110 @@
"""数据集配置管理模块"""
DATASET_CONFIGS = {
'fundamental6': {
'id': 'fundamental6',
'universe': 'TOP3000',
'description': '基础财务数据',
'api_settings': {
'instrumentType': 'EQUITY',
'region': 'USA',
'delay': 1,
'decay': 0,
'neutralization': 'SUBINDUSTRY',
'truncation': 0.08,
'pasteurization': 'ON',
'unitHandling': 'VERIFY',
'nanHandling': 'ON',
'language': 'FASTEXPR'
},
'fields': [
'assets', 'liabilities', 'revenue', 'netincome',
'cash', 'debt', 'equity', 'eps', 'pe_ratio',
'pb_ratio', 'market_cap', 'dividend_yield'
]
},
'analyst4': {
'id': 'analyst4',
'universe': 'TOP1000',
'description': '分析师预测数据',
'api_settings': {
'instrumentType': 'EQUITY',
'region': 'USA',
'delay': 1,
'decay': 0,
'neutralization': 'SUBINDUSTRY',
'truncation': 0.08,
'pasteurization': 'ON',
'unitHandling': 'VERIFY',
'nanHandling': 'ON',
'language': 'FASTEXPR'
},
'fields': [
'anl4_tbvps_low', 'anl4_tbvps_high',
'anl4_tbvps_mean', 'anl4_tbvps_median'
]
},
'pv1': {
'id': 'pv1',
'universe': 'TOP1000',
'description': '股市成交量数据',
'api_settings': {
'instrumentType': 'EQUITY',
'region': 'USA',
'delay': 1,
'decay': 0,
'neutralization': 'SUBINDUSTRY',
'truncation': 0.08,
'pasteurization': 'ON',
'unitHandling': 'VERIFY',
'nanHandling': 'ON',
'language': 'FASTEXPR'
},
'fields': [
'volume', 'close', 'open', 'high', 'low',
'vwap', 'returns', 'turnover', 'volatility'
]
}
}
def get_dataset_list():
"""获取所有可用数据集列表"""
return [
f"{idx+1}: {name} ({config['universe']}) - {config['description']}"
for idx, (name, config) in enumerate(DATASET_CONFIGS.items())
]
def get_dataset_config(dataset_name):
"""获取指定数据集的配置"""
return DATASET_CONFIGS.get(dataset_name)
def get_dataset_by_index(index):
"""通过索引获取数据集名称"""
try:
return list(DATASET_CONFIGS.keys())[int(index)-1]
except (IndexError, ValueError):
return None
def get_dataset_fields(dataset_name):
"""获取指定数据集的字段列表"""
config = DATASET_CONFIGS.get(dataset_name)
return config['fields'] if config else []
def get_api_settings(dataset_name):
"""获取指定数据集的API设置"""
config = DATASET_CONFIGS.get(dataset_name)
if config and 'api_settings' in config:
settings = config['api_settings'].copy()
settings['universe'] = config['universe']
return settings
return None

Binary file not shown.

After

Width:  |  Height:  |  Size: 66 KiB

@ -0,0 +1,69 @@
import PyInstaller.__main__
import os
import sys
import shutil
# 获取项目根目录的绝对路径
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# 创建 mac 目录
mac_dir = os.path.dirname(os.path.abspath(__file__))
mac_dist_dir = os.path.join(mac_dir, 'dist')
if not os.path.exists(mac_dir):
os.makedirs(mac_dir)
if not os.path.exists(mac_dist_dir):
os.makedirs(mac_dist_dir)
# 设置命令行参数
args = [
os.path.join(ROOT_DIR, 'main.py'), # 主程序入口
'--name=Alpha_Tool', # Mac版本名称
'--onefile', # 打包成单个文件
'--console', # 使用控制台窗口
f'--add-data={os.path.join(ROOT_DIR, "dataset_config.py")}{os.pathsep}.',
f'--add-data={os.path.join(ROOT_DIR, "alpha_strategy.py")}{os.pathsep}.',
f'--add-data={os.path.join(ROOT_DIR, "brain_batch_alpha.py")}{os.pathsep}.',
'--clean',
'--noconfirm',
f'--distpath={mac_dist_dir}',
f'--workpath={os.path.join(mac_dir, "build")}',
f'--specpath={mac_dir}'
]
# 如果有 Mac 图标文件,添加图标
icon_path = os.path.join(mac_dir, 'icon.icns')
if os.path.exists(icon_path):
args.append(f'--icon={icon_path}')
try:
# 运行打包命令
PyInstaller.__main__.run(args)
# 打包完成后,复制或创建配置文件到 dist 目录
print("\n正在处理配置文件...")
# 处理认证文件
credentials_src = os.path.join(ROOT_DIR, 'brain_credentials.txt')
if os.path.exists(credentials_src):
shutil.copy2(credentials_src, mac_dist_dir)
print("✅ brain_credentials.txt 复制成功")
else:
with open(os.path.join(mac_dist_dir, 'brain_credentials.txt'), 'w') as f:
f.write('["your_email@example.com","your_password"]')
print("✅ 创建了示例 brain_credentials.txt")
# 处理 Alpha ID 文件
alpha_ids_src = os.path.join(ROOT_DIR, 'alpha_ids.txt')
if os.path.exists(alpha_ids_src):
shutil.copy2(alpha_ids_src, mac_dist_dir)
print("✅ alpha_ids.txt 复制成功")
else:
with open(os.path.join(mac_dist_dir, 'alpha_ids.txt'), 'w') as f:
...
print("✅ 创建了空的 alpha_ids.txt")
print(f"\n✅ Mac 版本打包完成! 文件位于 {mac_dist_dir}")
except Exception as e:
print(f"\n❌ 打包过程出错: {str(e)}")
sys.exit(1)

@ -0,0 +1,48 @@
import os
from PIL import Image
def create_icns():
"""创建 .icns 文件"""
try:
# 检查源图片
png_path = os.path.join(os.path.dirname(__file__), 'icon.png')
if not os.path.exists(png_path):
print(" 未找到 icon.png,将创建默认图标")
# 创建一个简单的默认图标
img = Image.new('RGB', (512, 512), color='white')
img.save(png_path)
# 创建临时 iconset 目录
iconset_name = "icon.iconset"
if not os.path.exists(iconset_name):
os.makedirs(iconset_name)
# 需要的图标尺寸
icon_sizes = [16, 32, 64, 128, 256, 512]
# 打开原始图片
img = Image.open(png_path)
# 生成不同尺寸的图标
for size in icon_sizes:
img_copy = img.copy()
img_copy.thumbnail((size, size), Image.Resampling.LANCZOS)
img_copy.save(f"{iconset_name}/icon_{size}x{size}.png")
# 使用 iconutil 创建 .icns 文件(仅在 Mac 上可用)
if os.system('which iconutil') == 0:
os.system(f'iconutil -c icns {iconset_name}')
else:
print(" iconutil 不可用,跳过 .icns 创建")
print("✅ 图标处理完成")
except Exception as e:
print(f"❌ 创建图标时出错: {str(e)}")
# 不抛出异常,让构建继续进行
if __name__ == "__main__":
create_icns()

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 56 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 62 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 14 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 823 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.0 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 62 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.0 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.8 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 62 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 62 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.8 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 14 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 62 KiB

@ -0,0 +1,98 @@
"""WorldQuant Brain 批量 Alpha 生成系统"""
import os
from brain_batch_alpha import BrainBatchAlpha
from dataset_config import get_dataset_by_index, get_dataset_list
STORAGE_ALPHA_ID_PATH = "alpha_ids.txt"
def submit_alpha_ids(brain, num_to_submit=2):
"""提交保存的 Alpha ID"""
try:
if not os.path.exists(STORAGE_ALPHA_ID_PATH):
print("❌ 没有找到保存的Alpha ID文件")
return
with open(STORAGE_ALPHA_ID_PATH, 'r') as f:
alpha_ids = [line.strip() for line in f.readlines() if line.strip()]
if not alpha_ids:
print("❌ 没有可提交的Alpha ID")
return
print("\n📝 已保存的Alpha ID列表:")
for i, alpha_id in enumerate(alpha_ids, 1):
print(f"{i}. {alpha_id}")
if num_to_submit > len(alpha_ids):
num_to_submit = len(alpha_ids)
selected_ids = alpha_ids[:num_to_submit]
successful, failed = brain.submit_multiple_alphas(selected_ids)
# 更新 alpha_ids.txt
remaining_ids = [id for id in alpha_ids if id not in successful]
with open(STORAGE_ALPHA_ID_PATH, 'w') as f:
f.writelines([f"{id}\n" for id in remaining_ids])
except Exception as e:
print(f"❌ 提交 Alpha 时出错: {str(e)}")
def main():
"""主程序入口"""
try:
print("🚀 启动 WorldQuant Brain 批量 Alpha 生成系统")
print("\n📋 请选择运行模式:")
print("1: 自动模式 (测试并自动提交 2 个合格 Alpha)")
print("2: 仅测试模式 (测试并保存合格 Alpha ID)")
print("3: 仅提交模式 (提交已保存的合格 Alpha ID)")
mode = int(input("\n请选择模式 (1-3): "))
if mode not in [1, 2, 3]:
print("❌ 无效的模式选择")
return
brain = BrainBatchAlpha()
if mode in [1, 2]:
print("\n📊 可用数据集列表:")
for dataset in get_dataset_list():
print(dataset)
dataset_index = input("\n请选择数据集编号: ")
dataset_name = get_dataset_by_index(dataset_index)
if not dataset_name:
print("❌ 无效的数据集编号")
return
print("\n📈 可用策略模式:")
print("1: 基础策略模式")
print("2: 多因子组合模式")
strategy_mode = int(input("\n请选择策略模式 (1-2): "))
if strategy_mode not in [1, 2]:
print("❌ 无效的策略模式")
return
results = brain.simulate_alphas(None, strategy_mode, dataset_name)
if mode == 1:
submit_alpha_ids(brain, 2)
elif mode == 3:
num_to_submit = int(input("\n请输入要提交的 Alpha 数量: "))
if num_to_submit <= 0:
print("❌ 无效的提交数量")
return
submit_alpha_ids(brain, num_to_submit)
except Exception as e:
print(f"❌ 程序运行出错: {str(e)}")
if __name__ == "__main__":
main()

@ -0,0 +1,4 @@
requests>=2.31.0
pandas>=2.0.0
pyinstaller>=5.13.2
pillow>=10.0.0

@ -0,0 +1,29 @@
from setuptools import setup, find_packages
setup(
name="Alpha_Tool",
version="1.0.0",
author="YourName",
description="WorldQuant Brain Alpha Generator",
packages=find_packages(),
python_requires=">=3.8",
install_requires=[
"requests>=2.31.0",
"pandas>=2.0.0",
],
entry_points={
'console_scripts': [
'alpha_tool=main:main',
],
},
# 包含非Python文件
package_data={
'': ['*.txt', '*.json', '*.ico', '*.png'],
},
# 额外信息
classifiers=[
"Programming Language :: Python :: 3",
"Operating System :: OS Independent",
],
)

Binary file not shown.

After

Width:  |  Height:  |  Size: 66 KiB

Loading…
Cancel
Save