You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
wqb-server/blueprints/idea_house.py

221 lines
8.5 KiB

"""
idea库蓝图 - 使用 Coze API 分析数据字段的 Flask 蓝图
"""
from flask import Blueprint, render_template, request, jsonify, session as flask_session
import os
import sys
from cozepy import Coze, TokenAuth, Message, ChatStatus, MessageContentType
from cozepy import COZE_CN_BASE_URL
import json
# 创建蓝图
idea_house_bp = Blueprint('idea_house', __name__, url_prefix='/idea-house')
@idea_house_bp.route('/')
def idea_house():
"""idea库页面"""
return render_template('idea_house.html')
@idea_house_bp.route('/api/process-fields', methods=['POST'])
def process_fields():
"""使用 Coze API 处理选定的数据字段"""
try:
# 从请求获取选定的字段
data = request.get_json()
selected_fields = data.get('selected_fields', {})
coze_api_token = data.get('coze_api_token')
workflow_id = data.get('workflow_id')
dataset_description = data.get('dataset_description', '')
print("=" * 60)
print("🚀 COZE API 请求已启动")
print("=" * 60)
print(f"📋 选定字段数量: {len(selected_fields)}")
print(f"🔑 API 令牌 (最后10位): ...{coze_api_token[-10:] if coze_api_token else ''}")
print(f" 工作流 ID: {workflow_id}")
print("=" * 60)
if not selected_fields:
print("❌ 错误: 未选择任何字段")
return jsonify({'error': '未选择任何字段'}), 400
if not coze_api_token:
print("❌ 错误: 需要 Coze API 令牌")
return jsonify({'error': '需要 Coze API 令牌'}), 400
if not workflow_id:
print("❌ 错误: 需要工作流 ID")
return jsonify({'error': '需要工作流 ID'}), 400
# 从选定字段准备输入字符串
# 格式: "字段ID (描述, 单位)"
input_lines = []
for field_id, description in selected_fields.items():
input_lines.append(f"{field_id} ({description})")
input_string = "\n".join(input_lines)
# 如果数据集描述可用,则附加
if dataset_description:
input_string = f"数据集描述: {dataset_description}\n\n选定字段:\n{input_string}"
print("📤 准备 COZE API 请求:")
print(f" 输入字符串: {input_string[:100]}..." if len(input_string) > 100 else f" 输入字符串: {input_string}")
# 设置 Coze API
coze_api_base = COZE_CN_BASE_URL
print(f"🌐 使用 Coze API 基础 URL: {coze_api_base}")
# 初始化 Coze 客户端
print("🔧 初始化 Coze 客户端...")
coze = Coze(auth=TokenAuth(token=coze_api_token), base_url=coze_api_base)
# 准备参数
parameters = {
"input": input_string
}
print("📊 请求参数:")
print(f" {json.dumps(parameters, indent=2)}")
# 使用重试调用工作流
workflow_result = None
error_message = None
print("🔄 开始带重试的工作流执行...")
for attempt in range(3):
try:
print(f" 尝试 {attempt + 1}/3: 调用 Coze 工作流...")
workflow = coze.workflows.runs.create(
workflow_id=workflow_id,
parameters=parameters
)
workflow_result = workflow
print(f" ✅ 尝试 {attempt + 1} 成功!")
print(f" 📥 工作流响应类型: {type(workflow_result)}")
if hasattr(workflow_result, 'data'):
print(f" 📥 响应数据长度: {len(str(workflow_result.data)) if workflow_result.data else 0}")
break
except Exception as e:
error_message = f"尝试 {attempt + 1} 失败,错误: {str(e)}"
print(f"{error_message}")
continue
if workflow_result is None:
print("💥 所有重试后工作流执行失败")
print(f" 最后错误: {error_message}")
return jsonify({
'error': '3次尝试后运行工作流失败',
'details': error_message
}), 500
print("✅ 工作流执行成功")
print("🔍 处理工作流结果...")
# 处理结果
try:
# 尝试从工作流数据中提取输出
if hasattr(workflow_result, 'data') and workflow_result.data:
print(" 📊 找到工作流数据,正在处理...")
# 如果是字符串,尝试评估数据
try:
result_data = eval(workflow_result.data) if isinstance(workflow_result.data, str) else workflow_result.data
output = result_data.get('output', '')
print(f" 📄 提取的输出长度: {len(str(output))}")
except:
# 如果评估失败,按原样使用数据
output = str(workflow_result.data)
print(f" 📄 使用原始数据作为输出 (长度: {len(output)})")
else:
output = "工作流未返回数据"
print(" 工作流未返回数据")
print("=" * 60)
print("🎉 COZE API 请求成功完成")
print(f"📊 最终输出预览: {str(output)[:150]}..." if len(str(output)) > 150 else f"📊 最终输出: {str(output)}")
print("=" * 60)
return jsonify({
'success': True,
'output': output,
'parameters': parameters,
'selected_fields': selected_fields
})
except Exception as parse_error:
print(f"💥 解析工作流结果错误: {str(parse_error)}")
print(f" 原始 workflow.data: {workflow_result.data if hasattr(workflow_result, 'data') else '无数据属性'}")
return jsonify({
'error': '解析工作流结果时出错',
'details': str(parse_error),
'raw_data': str(workflow_result.data) if hasattr(workflow_result, 'data') else None
}), 500
except Exception as e:
print(f"💥 处理字段错误: {str(e)}")
print("=" * 60)
return jsonify({'error': str(e)}), 500
@idea_house_bp.route('/api/get-datafields-proxy', methods=['GET'])
def get_datafields_proxy():
"""代理端点,从主应用的 BRAIN API 端点获取数据字段"""
try:
from flask import current_app
# 使用测试客户端发出内部请求
with current_app.test_client() as client:
# 从原始请求复制头部
headers = {
'Session-ID': request.headers.get('Session-ID') or flask_session.get('brain_session_id')
}
# 复制查询参数
params = request.args.to_dict()
# 向主应用的数据字段端点发出内部请求
response = client.get('/api/datafields', headers=headers, query_string=params)
# 返回响应数据和状态
return response.get_json(), response.status_code
except Exception as e:
print(f"数据字段代理错误: {str(e)}")
return jsonify({'error': str(e)}), 500
@idea_house_bp.route('/api/get-dataset-description', methods=['GET'])
def get_dataset_description():
"""代理端点,从主应用的 BRAIN API 端点获取数据集描述"""
print("\n" + "="*60)
print("🔍 数据集描述代理端点被调用")
print("="*60)
try:
from flask import current_app
# 使用测试客户端发出内部请求
with current_app.test_client() as client:
# 从原始请求复制头部
headers = {
'Session-ID': request.headers.get('Session-ID') or flask_session.get('brain_session_id')
}
# 复制查询参数
params = request.args.to_dict()
print(f"📊 代理请求参数: {params}")
print(f"📌 会话 ID: {headers.get('Session-ID')}")
# 向主应用的数据集描述端点发出内部请求
response = client.get('/api/dataset-description', headers=headers, query_string=params)
print(f"📥 代理响应状态: {response.status_code}")
# 返回响应数据和状态
return response.get_json(), response.status_code
except Exception as e:
print(f"💥 数据集描述代理错误: {str(e)}")
print("="*60 + "\n")
return jsonify({'error': str(e)}), 500