# -*- coding: utf-8 -*- import json import httpx import random import time from datetime import datetime, timedelta class OdooClient: """Odoo JSON-RPC 客户端类""" def __init__(self, url, db_name, username, password): """ 初始化 Odoo 客户端并自动登录 Args: url: Odoo 服务器地址 db_name: 数据库名称 username: 用户名 password: 密码 """ self.url = url self.db_name = db_name self.username = username self.password = password self.client = None self.uid = None # 自动登录 self.login() def login(self): """登录并获取 uid 和 client""" try: self.client = httpx.Client(timeout=30.0) # Odoo 登录 payload = { "jsonrpc": "2.0", "method": "call", "params": { "service": "common", "method": "login", "args": [self.db_name, self.username, self.password] }, "id": 1 } response = self.client.post(f"{self.url}/jsonrpc", json=payload) result = response.json() # 检查是否有错误 if "error" in result: raise Exception(f"登录失败: {result['error']}") # Odoo 的登录响应中,result 直接就是 uid self.uid = result.get("result") if not self.uid: raise Exception("登录失败:未获取到UID") print(f"登录成功,UID: {self.uid}") return True except Exception as e: print(f"登录失败: {e}") if self.client: self.client.close() self.client = None self.uid = None raise def logout(self): """退出登录并关闭连接""" if self.client: self.client.close() self.client = None self.uid = None print("已退出") def search_data(self, model, domain, fields=None, order=None, limit=None): """ 通用搜索方法 Args: model: 模型名称 domain: 搜索条件列表 fields: 需要返回的字段列表 order: 排序规则 limit: 返回记录数量限制 Returns: 查询结果列表,失败返回 None """ if not self.client or not self.uid: raise Exception("未登录或连接已断开") # 构建参数 args = [domain] if fields: args.append(fields) kwargs = {} if order: kwargs['order'] = order if limit: kwargs['limit'] = limit payload = { "jsonrpc": "2.0", "method": "call", "params": { "service": "object", "method": "execute_kw", "args": [ self.db_name, self.uid, self.password, model, "search_read", args, kwargs ] }, "id": 2 } try: response = self.client.post(f"{self.url}/jsonrpc", json=payload) result = response.json() if "error" in result: print(f"查询失败: {result['error']}") return None return result.get("result", []) except Exception as e: print(f"查询异常: {e}") return None def write_data(self, model, record_id, values): """ 更新记录 Args: model: 模型名称 record_id: 记录ID values: 要更新的字段值字典 Returns: 是否更新成功 """ if not self.client or not self.uid: raise Exception("未登录或连接已断开") payload = { "jsonrpc": "2.0", "method": "call", "params": { "service": "object", "method": "execute_kw", "args": [ self.db_name, self.uid, self.password, model, "write", [[record_id], values] ] }, "id": 3 } try: response = self.client.post(f"{self.url}/jsonrpc", json=payload) result = response.json() if "error" in result: print(f"更新失败: {result['error']}") return False return result.get("result", False) except Exception as e: print(f"更新异常: {e}") return False def __enter__(self): """上下文管理器入口""" return self def __exit__(self, exc_type, exc_val, exc_tb): """上下文管理器出口,自动登出""" self.logout() class SimpleAlphaFetcher: def __init__(self): """ 初始化 Alpha 获取器 """ self.client = None self.login() def login(self): """登录 WorldQuant Brain API""" try: # 从 nacos 获取账号密码 with httpx.Client(timeout=10.0) as temp_client: nacos_resp = temp_client.get( 'http://192.168.31.41:30848/nacos/v1/cs/configs?dataId=wq_account&group=quantify' ) if nacos_resp.status_code != 200: print('获取账号密码失败') return False config = nacos_resp.json() username = config.get('user_name') password = config.get('password') if not username or not password: print('账号密码不完整') return False print(f"正在登录账户: {username}") # 创建客户端并设置超时 timeout = httpx.Timeout(connect=30.0, read=60.0, write=30.0, pool=30.0) self.client = httpx.Client( auth=httpx.BasicAuth(username, password), timeout=timeout ) # 发送登录请求 response = self.client.post('https://api.worldquantbrain.com/authentication') if response.status_code == 201: print("登录成功!") return True else: print(f"登录失败: {response.status_code} - {response.text}") self.client.close() self.client = None return False except Exception as e: print(f"登录异常: {e}") return False def get_alpha_detail(self, alpha_id): """ 获取 Alpha 详细信息,带3次重试 Args: alpha_id: Alpha ID Returns: Alpha 详细信息字典,失败返回 None """ if not self.client: print("客户端未初始化") return None url = f"https://api.worldquantbrain.com/alphas/{alpha_id}" for attempt in range(3): try: response = self.client.get(url) if response.status_code == 200: return response.json() else: print(f"获取 Alpha 失败 (尝试 {attempt + 1}/3): {response.status_code} - {response.text}") except Exception as e: print(f"获取 Alpha 异常 (尝试 {attempt + 1}/3): {e}") if attempt < 2: sleep_time = random.uniform(5, 8) print(f"等待 {sleep_time:.1f} 秒后重试...") time.sleep(sleep_time) return None def logout(self): """退出登录""" if self.client: self.client.close() self.client = None print("Alpha 客户端已退出") # 使用 OdooClient 类重构后的函数 def fetch_local_performance(odoo_client, model, domain, fields, limit): """获取本地表现数据""" # 执行搜索 result = odoo_client.search_data(model=model, domain=domain, fields=fields, order="id desc", limit=limit) if result: return result else: print("未获取到数据") exit(1) # 使用示例 if __name__ == "__main__": # ============================== Odoo 连接配置 ==================================== ODOO_URL = "http://192.168.31.41:32000" DB_NAME = "quantify" USERNAME = "rpc" PASSWORD = "aaaAAA111" # ============================== 搜索设置 ==================================== days = 7 now = datetime.now() today_zero = now.replace(hour=0, minute=0, second=0, microsecond=0) days_ago_zero = today_zero - timedelta(days=days) time_range = ('write_date', '>=', days_ago_zero.strftime('%Y-%m-%d %H:%M:%S')) # 模型名称 model = "alpha.expression.line" # 搜索条件 domain = [('status', '=', 'success'), ('performance', '=', '{}')] # 搜索字段 fields = ['alpha_id'] # 搜索数量限制 limit = 1 try: with OdooClient(ODOO_URL, DB_NAME, USERNAME, PASSWORD) as odoo: all_data = fetch_local_performance(odoo, model, domain, fields, limit) # 初始化 Alpha 获取器 alpha_fetcher = SimpleAlphaFetcher() try: for data in all_data: alpha_expression_line_id = data.get('id') alpha_id = data.get('alpha_id') print(f'正在处理: {alpha_id}') if not alpha_id: print(f"记录 {alpha_expression_line_id} 没有 alpha_id,跳过") continue # 获取 Alpha 详细信息 alpha_detail = alpha_fetcher.get_alpha_detail(alpha_id) if alpha_detail: # 反写到 Odoo update_values = {'performance': json.dumps(alpha_detail, indent=4, ensure_ascii=False)} success = odoo.write_data(model, alpha_expression_line_id, update_values) if success: print(f"成功更新记录 {alpha_expression_line_id}") else: print(f"更新记录 {alpha_expression_line_id} 失败") else: print(f"获取 Alpha {alpha_id} 失败,跳过该记录") finally: alpha_fetcher.logout() except Exception as e: print(f"程序执行失败: {e}")