You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
11 KiB
11 KiB
02_generate_alpha_idea.py 微服务迁移方案
一、功能总结
1.1 核心功能
02_generate_alpha_idea.py 是一个 Odoo 工作流自动化脚本,主要功能:
- 搜索数据:在
alpha.idea模型中查找当天创建且状态为generated_prompt的记录 - 遍历处理:单线程依次处理每条记录
- 调用 post_to_ms:触发
alpha.idea模型的post_to_ms()方法,将数据发送到微服务 - 等待 LLM 返回:死循环轮询状态,每5秒检查一次
- 超时处理:10分钟超时,超时后在
result_message字段写入 "timeout" - 调用 decode_template:状态变为
llm_received后,调用decode_template()方法解码模板
1.2 关键特性
- 单线程处理:不并发,避免 LLM 压力过大
- 10分钟硬编码超时:正常2-3分钟,超时即放弃
- 状态机驱动:依赖 Odoo 模型的状态字段流转
二、微服务架构设计
2.1 接口设计
POST /api/alpha-idea/process
请求参数:
{
"idea_id": 123 // alpha.idea 记录的 ID
}
响应:
{
"success": true,
"message": "处理完成",
"idea_id": 123,
"wait_time": 45, // 等待秒数
"status": "llm_received"
}
2.2 处理流程
Odoo 按钮点击
↓
httpx 请求 FastAPI 微服务
↓
POST /api/alpha-idea/process (携带 idea_id)
↓
微服务调用 Odoo XML-RPC
↓
1. 调用 post_to_ms()
2. 轮询等待状态 (每5秒)
3. 超时10分钟或成功
4. 调用 decode_template()
↓
返回结果给 Odoo
三、代码实现
3.1 目录结构
rpc_alpha_workflow/
├── 01_generate_direction.py
├── 02_generate_alpha_idea.py # 原脚本(保留备份)
├── reference_01.py
├── reference_02.py
├── migration_plan.md # 本文档
└── alpha_idea_service/ # 微服务目录
├── main.py # FastAPI 入口
├── config.py # 配置
├── odoo_client.py # Odoo XML-RPC 客户端
└── requirements.txt # 依赖
3.2 核心代码
config.py
# -*- coding: utf-8 -*-
"""配置文件"""
# Odoo 连接配置
ODOO_URL = "http://192.168.31.41:32000"
ODOO_DB = "quantify"
ODOO_USERNAME = "rpc"
ODOO_PASSWORD = "aaaAAA111"
# 超时配置(硬编码,不对外暴露)
TIMEOUT_SECONDS = 10 * 60 # 10分钟
POLL_INTERVAL = 5 # 轮询间隔秒数
odoo_client.py
# -*- coding: utf-8 -*-
"""Odoo XML-RPC 客户端"""
from xmlrpc.client import ServerProxy, Fault
from . import config
class OdooClient:
def __init__(self):
self.url = config.ODOO_URL
self.db = config.ODOO_DB
self.username = config.ODOO_USERNAME
self.password = config.ODOO_PASSWORD
self.uid = None
self.models = None
self._authenticate()
def _authenticate(self):
"""认证并获取 uid"""
common = ServerProxy(f"{self.url}/xmlrpc/2/common")
self.uid = common.authenticate(self.db, self.username, self.password, {})
self.models = ServerProxy(f"{self.url}/xmlrpc/2/object")
def call(self, model, method, args=None, kwargs=None):
"""调用 Odoo 方法"""
args = args or []
kwargs = kwargs or {}
return self.models.execute_kw(self.db, self.uid, self.password, model, method, args, kwargs)
def post_to_ms(self, idea_id: int):
"""调用 post_to_ms 方法"""
try:
result = self.call('alpha.idea', 'post_to_ms', [[idea_id]])
return {"success": True, "result": result}
except Fault as e:
if "cannot marshal None" in str(e):
return {"success": True, "result": None}
return {"success": False, "error": str(e)}
def read_record(self, idea_id: int, fields: list):
"""读取记录"""
result = self.call('alpha.idea', 'read', [[idea_id], fields])
return result[0] if result else None
def write_record(self, idea_id: int, values: dict):
"""写入记录"""
return self.call('alpha.idea', 'write', [[idea_id], values])
def decode_template(self, idea_id: int):
"""调用 decode_template 方法"""
try:
result = self.call('alpha.idea', 'decode_template', [[idea_id]])
return {"success": True, "result": result}
except Fault as e:
if "cannot marshal None" in str(e):
return {"success": True, "result": None}
return {"success": False, "error": str(e)}
main.py
# -*- coding: utf-8 -*-
"""FastAPI 微服务入口"""
import time
from fastapi import FastAPI
from pydantic import BaseModel
from .odoo_client import OdooClient
from . import config
app = FastAPI(title="Alpha Idea Service")
class ProcessRequest(BaseModel):
idea_id: int
class ProcessResponse(BaseModel):
success: bool
message: str
idea_id: int
wait_time: float = 0
status: str = ""
@app.post("/api/alpha-idea/process", response_model=ProcessResponse)
async def process_idea(request: ProcessRequest):
"""
处理单个 alpha.idea 记录
1. 调用 post_to_ms
2. 等待状态变为 llm_received(10分钟超时)
3. 调用 decode_template
"""
idea_id = request.idea_id
client = OdooClient()
# 1. 调用 post_to_ms
post_result = client.post_to_ms(idea_id)
if not post_result["success"]:
return ProcessResponse(
success=False,
message=f"post_to_ms 失败: {post_result.get('error')}",
idea_id=idea_id,
status="failed"
)
# 2. 等待状态变为 llm_received
start_time = time.time()
current_status = None
while True:
elapsed_time = time.time() - start_time
# 检查超时
if elapsed_time > config.TIMEOUT_SECONDS:
# 写入 timeout
try:
client.write_record(idea_id, {"result_message": "timeout"})
except Exception:
pass
return ProcessResponse(
success=False,
message=f"超时!已等待 {elapsed_time/60:.1f} 分钟",
idea_id=idea_id,
wait_time=elapsed_time,
status="timeout"
)
time.sleep(config.POLL_INTERVAL)
# 读取状态
record = client.read_record(idea_id, ["status", "name"])
if not record:
return ProcessResponse(
success=False,
message="记录不存在",
idea_id=idea_id,
wait_time=elapsed_time,
status="not_found"
)
current_status = record.get("status")
if current_status == "llm_received":
break
elif current_status == "failed":
return ProcessResponse(
success=False,
message="处理失败",
idea_id=idea_id,
wait_time=elapsed_time,
status="failed"
)
# 3. 调用 decode_template
decode_result = client.decode_template(idea_id)
return ProcessResponse(
success=decode_result["success"],
message="处理完成" if decode_result["success"] else f"decode_template 失败: {decode_result.get('error')}",
idea_id=idea_id,
wait_time=time.time() - start_time,
status=current_status
)
@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "ok"}
requirements.txt
fastapi
uvicorn
pydantic
四、Odoo 端改造
4.1 修改 post_to_ms 方法
在 reference_02.py 的 post_to_ms 方法中,移除直接调用 httpx 发送请求到微服务的逻辑,改为:
def post_to_ms(self):
"""
修改为:只准备数据,不直接发送请求
真正的发送由外部微服务触发
"""
# ... 原有校验逻辑不变 ...
# 组装请求数据
payload = {
'record_id': self.id,
'prompt': full_prompt,
'model_name': model_name,
'base_url': base_url,
'api_key': api_key,
'callback_url': callback_url,
'system_prompt': self.system_prompt,
'user_prompt': self.user_prompt,
'final_prompt': self.final_prompt,
}
# 将 payload 保存到某个字段,或者返回给调用方
# 这里我们返回 payload,让调用方(微服务)来处理
return payload
或者更简单的方式:Odoo 按钮点击时,直接 httpx 请求微服务:
def btn_trigger_alpha_idea_service(self):
"""按钮方法:触发微服务处理"""
import httpx
# 获取微服务地址(从配置读取)
ms_url = self.env['ir.config_parameter'].sudo().get_param('alpha.idea.service.url')
try:
response = httpx.post(
f"{ms_url}/api/alpha-idea/process",
json={"idea_id": self.id},
timeout=600 # 10分钟超时
)
result = response.json()
if result["success"]:
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'title': '成功',
'message': f'处理完成,等待时间: {result["wait_time"]:.0f}秒',
'type': 'success',
'sticky': False,
}
}
else:
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'title': '失败',
'message': result["message"],
'type': 'danger',
'sticky': False,
}
}
except Exception as e:
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'title': '错误',
'message': str(e),
'type': 'danger',
'sticky': False,
}
}
五、部署步骤
5.1 微服务部署
- 创建目录结构
- 复制代码文件
- 安装依赖:
pip install -r requirements.txt - 启动服务:
uvicorn alpha_idea_service.main:app --host 0.0.0.0 --port 32005
5.2 Odoo 配置
- 添加系统参数:
alpha.idea.service.url=http://微服务地址:32005 - 在
alpha.idea模型添加按钮方法btn_trigger_alpha_idea_service - 在视图添加按钮
六、注意事项
- 单线程保证:微服务内部不开启多线程/异步并发处理单个请求
- 超时处理:HTTP 客户端(Odoo 端)和微服务端都要设置 10 分钟超时
- 状态回写:超时后必须回写
result_message = 'timeout' - 错误处理:任何异常都不能阻塞流程,要返回给调用方
七、待确认事项
- 微服务端口是否固定为 32005?
- Odoo 端是否需要批量处理接口(一次性传入多个 idea_id)?
- 是否需要添加日志记录(loguru 等)?
- 是否需要限制同时处理的请求数(比如只处理一个,其他排队)?