Jack 1 month ago
commit d8c5c04581
  1. 345
      fetch_local_performance/fetch_local_performance.py
  2. 0
      fetch_local_performance/获取odoo中模拟后的alpha性能
  3. 117
      rpc_batch_fetch_dataset/rpc_batch_fetch_dataset.py

@ -0,0 +1,345 @@
# -*- coding: utf-8 -*-
import json
import httpx
from datetime import datetime, timedelta
class OdooClient:
"""Odoo JSON-RPC 客户端类"""
def __init__(self, url, db_name, username, password):
"""
初始化 Odoo 客户端并自动登录
Args:
url: Odoo 服务器地址
db_name: 数据库名称
username: 用户名
password: 密码
"""
self.url = url
self.db_name = db_name
self.username = username
self.password = password
self.client = None
self.uid = None
# 自动登录
self.login()
def login(self):
"""登录并获取 uid 和 client"""
try:
self.client = httpx.Client(timeout=30.0)
# Odoo 登录
payload = {
"jsonrpc": "2.0",
"method": "call",
"params": {
"service": "common",
"method": "login",
"args": [self.db_name, self.username, self.password]
},
"id": 1
}
response = self.client.post(f"{self.url}/jsonrpc", json=payload)
result = response.json()
# 检查是否有错误
if "error" in result:
raise Exception(f"登录失败: {result['error']}")
# Odoo 的登录响应中,result 直接就是 uid
self.uid = result.get("result")
if not self.uid:
raise Exception("登录失败:未获取到UID")
print(f"登录成功,UID: {self.uid}")
return True
except Exception as e:
print(f"登录失败: {e}")
if self.client:
self.client.close()
self.client = None
self.uid = None
raise
def logout(self):
"""退出登录并关闭连接"""
if self.client:
self.client.close()
self.client = None
self.uid = None
print("已退出")
def search_data(self, model, domain, fields=None, order=None, limit=None):
"""
通用搜索方法
Args:
model: 模型名称
domain: 搜索条件列表
fields: 需要返回的字段列表
order: 排序规则
limit: 返回记录数量限制
Returns:
查询结果列表失败返回 None
"""
if not self.client or not self.uid:
raise Exception("未登录或连接已断开")
# 构建参数
args = [domain]
if fields:
args.append(fields)
kwargs = {}
if order:
kwargs['order'] = order
if limit:
kwargs['limit'] = limit
payload = {
"jsonrpc": "2.0",
"method": "call",
"params": {
"service": "object",
"method": "execute_kw",
"args": [
self.db_name,
self.uid,
self.password,
model,
"search_read",
args,
kwargs
]
},
"id": 2
}
try:
response = self.client.post(f"{self.url}/jsonrpc", json=payload)
result = response.json()
if "error" in result:
print(f"查询失败: {result['error']}")
return None
return result.get("result", [])
except Exception as e:
print(f"查询异常: {e}")
return None
def __enter__(self):
"""上下文管理器入口"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器出口,自动登出"""
self.logout()
# 使用 OdooClient 类重构后的函数
def fetch_local_performance(odoo_client, model, domain, fields, limit):
"""获取本地表现数据"""
# 执行搜索
result = odoo_client.search_data(model=model, domain=domain, fields=fields, order="id desc", limit=limit)
if result:
return result
else:
print("未获取到数据")
exit(1)
def deconstruction_data(all_data):
"""解构并输出数据"""
for idx, data in enumerate(all_data):
data = json.loads(data.get('performance'))
print(f"\n{'=' * 60}")
print(f"策略 #{idx + 1}")
print(f"{'=' * 60}")
# 基础信息
alpha_id = data.get("id")
alpha_type = data.get("type")
author = data.get("author")
name = data.get("name")
print(f"ID: {alpha_id}")
print(f"类型: {alpha_type}")
print(f"作者: {author}")
print(f"名称: {name if name else '未命名'}")
# Settings 配置
settings = data.get("settings", {})
print(f"\n【配置信息】")
print(f" 品种类型: {settings.get('instrumentType')}")
print(f" 地区: {settings.get('region')}")
print(f" 股票池: {settings.get('universe')}")
print(f" 延迟: {settings.get('delay')}")
print(f" 衰减: {settings.get('decay')}")
print(f" 中性化: {settings.get('neutralization')}")
print(f" 截断: {settings.get('truncation')}")
print(f" 时间区间: {settings.get('startDate')} -> {settings.get('endDate')}")
# 因子代码
regular = data.get('regular', {})
code = regular.get('code')
description = regular.get('description')
print(f"\n【因子代码】")
print(f" Code: {code}")
if description:
print(f" Desc: {description}")
print(f" 操作符数量: {regular.get('operatorCount')}")
# 样本内(IS)回测结果 - 重点
in_sample = data.get('is', {})
if in_sample:
print(f"\n【样本内回测结果 (IS)】")
print(f" 📉 PnL: {in_sample.get('pnl', 0):,.0f}")
print(f" 📊 总收益率: {in_sample.get('returns', 0) * 100:.2f}%")
print(f" 🎯 夏普比率: {in_sample.get('sharpe', 0):.2f}")
print(f" 💪 Fitness: {in_sample.get('fitness', 0):.2f}")
print(f" 📉 最大回撤: {in_sample.get('drawdown', 0) * 100:.2f}%")
print(f" 🔄 换手率: {in_sample.get('turnover', 0) * 100:.2f}%")
print(f" 📈 多头持仓数: {in_sample.get('longCount', 0)}")
print(f" 📉 空头持仓数: {in_sample.get('shortCount', 0)}")
print(f" 💰 保证金: {in_sample.get('margin', 0):.6f}")
print(f" 📅 回测区间: {in_sample.get('startDate')} -> {in_sample.get('endDate')}")
# 检查项结果 - 排序后输出
checks = in_sample.get('checks', [])
if checks:
print(f"\n 【检查项】(已排序:PASS → WARNING → FAIL → PENDING)")
# 定义排序优先级
def sort_priority(check):
result = check.get('result')
priority_order = {
'PASS': 1,
'WARNING': 2,
'FAIL': 3,
'PENDING': 4
}
return priority_order.get(result, 5)
# 排序
sorted_checks = sorted(checks, key=sort_priority)
# 输出排序后的检查项
for check in sorted_checks:
name_check = check.get('name')
result = check.get('result')
limit = check.get('limit')
value = check.get('value')
# 根据结果用不同符号标记
if result == 'PASS':
symbol = ''
elif result == 'FAIL':
symbol = ''
elif result == 'WARNING':
symbol = ''
else:
symbol = ''
if limit is not None and value is not None:
print(f" {symbol} {name_check}: {result} (要求: {limit}, 实际: {value})")
else:
print(f" {symbol} {name_check}: {result}")
else:
print(f"\n【样本内回测结果 (IS)】")
print(f" 无数据(因子可能在样本内阶段就失败了)")
# 样本外(OS)结果 - 通常为空表示策略无效
out_sample = data.get('os', {})
train = data.get('train', {})
test = data.get('test', {})
if out_sample:
print(f"\n【样本外回测结果 (OS)】")
print(f" 夏普比率: {out_sample.get('sharpe', 0):.2f}")
else:
print(f"\n【样本外回测结果 (OS)】⚠ 为空 - 因子可能未通过样本内测试")
if train:
print(f"\n【训练集结果 (Train)】")
print(f" 夏普比率: {train.get('sharpe', 0):.2f}")
if test:
print(f"\n【测试集结果 (Test)】")
print(f" 夏普比率: {test.get('sharpe', 0):.2f}")
# 主题和竞赛信息
themes = data.get('themes', [])
if themes:
print(f"\n【关联主题】")
for theme in themes:
print(f" - {theme.get('name')} (乘数: {theme.get('multiplier')}x)")
competitions = data.get('competitions', [])
if competitions:
print(f"\n【关联竞赛】")
for comp in competitions:
print(f" - {comp.get('name')}")
# 状态信息
print(f"\n【状态】")
print(f" 阶段: {data.get('stage')}")
print(f" 状态: {data.get('status')}")
print(f" 是否收藏: {data.get('favorite')}")
print(f" 创建时间: {data.get('dateCreated')}")
# 如果有分类信息
classifications = data.get('classifications', [])
if classifications:
print(f"\n【分类】")
for cls in classifications:
print(f" - {cls.get('name')}")
# 使用示例
if __name__ == "__main__":
# ============================== Odoo 连接配置 ====================================
ODOO_URL = "http://192.168.31.41:32000"
DB_NAME = "quantify"
USERNAME = "rpc"
PASSWORD = "aaaAAA111"
# ============================== 搜索设置 ====================================
days = 7
now = datetime.now()
today_zero = now.replace(hour=0, minute=0, second=0, microsecond=0)
days_ago_zero = today_zero - timedelta(days=days)
# 模型名称
model = "alpha.expression.line"
# 搜索条件
domain = [('performance', '!=', '{}'), ('write_date', '>=', days_ago_zero.strftime('%Y-%m-%d %H:%M:%S'))]
# 搜索字段
fields = ['id', 'performance']
# 搜索数量限制
limit = 1
try:
odoo = OdooClient(ODOO_URL, DB_NAME, USERNAME, PASSWORD)
all_data = fetch_local_performance(odoo, model, domain, fields, limit)
deconstruction_data(all_data)
except Exception as e:
print(f"程序执行失败: {e}")
with OdooClient(ODOO_URL, DB_NAME, USERNAME, PASSWORD) as odoo:
all_data = fetch_local_performance(odoo)
deconstruction_data(all_data)

@ -1,8 +1,7 @@
import xmlrpc.client import httpx
import time import time
import sys import sys
# Odoo 连接配置
ODOO_URL = "http://192.168.31.41:32000" ODOO_URL = "http://192.168.31.41:32000"
DB_NAME = "quantify" DB_NAME = "quantify"
USERNAME = "rpc" USERNAME = "rpc"
@ -16,75 +15,91 @@ class OdooClient:
self.username = username self.username = username
self.password = password self.password = password
self.uid = None self.uid = None
self.models = None self.client = None
self._connect() self._connect()
def _connect(self): def _connect(self):
"""建立 XML-RPC 连接"""
try: try:
common = xmlrpc.client.ServerProxy(f"{self.url}/xmlrpc/2/common") self.client = httpx.Client(timeout=30.0)
self.uid = common.authenticate(self.db, self.username, self.password, {})
payload = {
"jsonrpc": "2.0",
"method": "call",
"params": {
"service": "common",
"method": "login",
"args": [self.db, self.username, self.password]
},
"id": 1
}
response = self.client.post(f"{self.url}/jsonrpc", json=payload)
result = response.json()
if "error" in result:
raise Exception(f"登录失败: {result['error']}")
self.uid = result.get("result")
if not self.uid: if not self.uid:
raise Exception("认证失败,请检查用户名和密码") raise Exception("认证失败,请检查用户名和密码")
self.models = xmlrpc.client.ServerProxy(f"{self.url}/xmlrpc/2/object")
print(f"[INFO] 连接成功,UID: {self.uid}") print(f"[INFO] 连接成功,UID: {self.uid}")
except Exception as e: except Exception as e:
print(f"[ERROR] 连接失败: {e}") print(f"[ERROR] 连接失败: {e}")
sys.exit(1) sys.exit(1)
def _execute(self, model, method, args=None, kwargs=None):
if args is None:
args = []
if kwargs is None:
kwargs = {}
payload = {
"jsonrpc": "2.0",
"method": "call",
"params": {
"service": "object",
"method": "execute_kw",
"args": [self.db, self.uid, self.password, model, method, args, kwargs]
},
"id": 2
}
response = self.client.post(f"{self.url}/jsonrpc", json=payload)
result = response.json()
if "error" in result:
raise Exception(f"RPC错误: {result['error']}")
return result.get("result")
def search_draft_records(self): def search_draft_records(self):
"""搜索所有状态为 draft 的 alpha.datasets 记录"""
try: try:
ids = self.models.execute_kw( ids = self._execute('alpha.datasets', 'search', [[('status', '=', 'draft')]])
self.db, self.uid, self.password,
'alpha.datasets', 'search',
[[('status', '=', 'draft')]]
)
print(f"[INFO] 找到 {len(ids)} 条 draft 记录: {ids}") print(f"[INFO] 找到 {len(ids)} 条 draft 记录: {ids}")
return ids return ids
except xmlrpc.client.Fault as e:
print(f"[ERROR] XML-RPC Fault 搜索记录失败: {e.faultCode} - {e.faultString}")
return []
except Exception as e: except Exception as e:
print(f"[ERROR] 搜索 draft 记录失败: {e}") print(f"[ERROR] 搜索 draft 记录失败: {e}")
return [] return []
def get_record_status(self, record_id): def get_record_status(self, record_id):
"""获取单条记录的状态"""
try: try:
records = self.models.execute_kw( records = self._execute('alpha.datasets', 'read', [[record_id], ['status', 'result_message', 'datasets_id', 'name']])
self.db, self.uid, self.password,
'alpha.datasets', 'read',
[[record_id], ['status', 'result_message', 'datasets_id', 'name']]
)
if records: if records:
return records[0] return records[0]
return None return None
except xmlrpc.client.Fault as e:
print(f"[ERROR] XML-RPC Fault 获取记录 {record_id} 状态失败: {e.faultCode} - {e.faultString}")
return None
except Exception as e: except Exception as e:
print(f"[ERROR] 获取记录 {record_id} 状态失败: {e}") print(f"[ERROR] 获取记录 {record_id} 状态失败: {e}")
return None return None
def trigger_btn_get_datasets(self, record_id): def trigger_btn_get_datasets(self, record_id):
"""
调用 btn_get_datasets 方法
返回: (success, message)
"""
try: try:
result = self.models.execute_kw( result = self._execute('alpha.datasets', 'btn_get_datasets', [[record_id]])
self.db, self.uid, self.password,
'alpha.datasets', 'btn_get_datasets',
[[record_id]]
)
# 处理不同的返回值
if result is True: if result is True:
print(f"[INFO] 记录 {record_id} 的 btn_get_datasets 调用成功") print(f"[INFO] 记录 {record_id} 的 btn_get_datasets 调用成功")
return True, "调用成功" return True, "调用成功"
elif isinstance(result, dict) and result.get('type') == 'ir.actions.client': elif isinstance(result, dict) and result.get('type') == 'ir.actions.client':
# 返回的是 notification 字典
message = result.get('params', {}).get('message', '') message = result.get('params', {}).get('message', '')
msg_type = result.get('params', {}).get('type', 'info') msg_type = result.get('params', {}).get('type', 'info')
if msg_type == 'danger': if msg_type == 'danger':
@ -97,20 +112,12 @@ class OdooClient:
print(f"[INFO] 记录 {record_id} 的 btn_get_datasets 调用完成,返回: {result}") print(f"[INFO] 记录 {record_id} 的 btn_get_datasets 调用完成,返回: {result}")
return True, "调用成功" return True, "调用成功"
except xmlrpc.client.Fault as e:
error_msg = f"XML-RPC Fault: {e.faultCode} - {e.faultString}"
print(f"[ERROR] 调用记录 {record_id} 的 btn_get_datasets 失败: {error_msg}")
return False, error_msg
except Exception as e: except Exception as e:
error_msg = f"Exception: {str(e)}" error_msg = f"Exception: {str(e)}"
print(f"[ERROR] 调用记录 {record_id} 的 btn_get_datasets 失败: {error_msg}") print(f"[ERROR] 调用记录 {record_id} 的 btn_get_datasets 失败: {error_msg}")
return False, error_msg return False, error_msg
def wait_for_status_change(self, record_id, check_interval=5, max_wait_minutes=30): def wait_for_status_change(self, record_id, check_interval=5, max_wait_minutes=30):
"""
轮询等待记录状态变为 done failed
返回: (status, result_message)
"""
max_attempts = (max_wait_minutes * 60) // check_interval max_attempts = (max_wait_minutes * 60) // check_interval
attempt = 0 attempt = 0
@ -139,22 +146,18 @@ class OdooClient:
def main(): def main():
"""主函数"""
print("=" * 60) print("=" * 60)
print("Odoo 批量处理 draft 数据集脚本") print("Odoo 批量处理 draft 数据集脚本")
print("=" * 60) print("=" * 60)
# 初始化客户端
client = OdooClient(ODOO_URL, DB_NAME, USERNAME, PASSWORD) client = OdooClient(ODOO_URL, DB_NAME, USERNAME, PASSWORD)
# 统计信息
total_processed = 0 total_processed = 0
success_count = 0 success_count = 0
failed_count = 0 failed_count = 0
timeout_count = 0 timeout_count = 0
while True: while True:
# 搜索所有 draft 记录
draft_ids = client.search_draft_records() draft_ids = client.search_draft_records()
if not draft_ids: if not draft_ids:
@ -166,7 +169,6 @@ def main():
for record_id in draft_ids: for record_id in draft_ids:
print(f"\n--- 处理记录 ID: {record_id} ---") print(f"\n--- 处理记录 ID: {record_id} ---")
# 再次确认状态(防止在搜索后状态被改变)
current_record = client.get_record_status(record_id) current_record = client.get_record_status(record_id)
if not current_record: if not current_record:
print(f"[ERROR] 无法获取记录 {record_id} 的信息,跳过") print(f"[ERROR] 无法获取记录 {record_id} 的信息,跳过")
@ -184,7 +186,6 @@ def main():
print(f"[INFO] 数据集名称: {name}") print(f"[INFO] 数据集名称: {name}")
print(f"[INFO] 数据集 ID: {datasets_id}") print(f"[INFO] 数据集 ID: {datasets_id}")
# 触发 btn_get_datasets
success, message = client.trigger_btn_get_datasets(record_id) success, message = client.trigger_btn_get_datasets(record_id)
if not success: if not success:
@ -192,21 +193,15 @@ def main():
failed_count += 1 failed_count += 1
total_processed += 1 total_processed += 1
# 更新记录状态为 failed(如果触发失败)
try: try:
client.models.execute_kw( client._execute('alpha.datasets', 'write', [[record_id], {
client.db, client.uid, client.password, 'status': 'failed',
'alpha.datasets', 'write', 'result_message': f'Trigger failed: {message}'
[[record_id], { }])
'status': 'failed',
'result_message': f'Trigger failed: {message}'
}]
)
except Exception as e: except Exception as e:
print(f"[WARN] 无法更新记录状态: {e}") print(f"[WARN] 无法更新记录状态: {e}")
continue continue
# 等待状态变化
status, result_msg = client.wait_for_status_change(record_id) status, result_msg = client.wait_for_status_change(record_id)
if status == 'done': if status == 'done':
@ -225,10 +220,8 @@ def main():
total_processed += 1 total_processed += 1
# 短暂延迟,避免请求过快
time.sleep(1) time.sleep(1)
# 输出统计
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("处理完成统计:") print("处理完成统计:")
print(f" 总处理记录数: {total_processed}") print(f" 总处理记录数: {total_processed}")

Loading…
Cancel
Save