""" idea库蓝图 - 使用 Coze API 分析数据字段的 Flask 蓝图 """ from flask import Blueprint, render_template, request, jsonify, session as flask_session import os import sys from cozepy import Coze, TokenAuth, Message, ChatStatus, MessageContentType from cozepy import COZE_CN_BASE_URL import json # 创建蓝图 idea_house_bp = Blueprint('idea_house', __name__, url_prefix='/idea-house') @idea_house_bp.route('/') def idea_house(): """idea库页面""" return render_template('idea_house.html') @idea_house_bp.route('/api/process-fields', methods=['POST']) def process_fields(): """使用 Coze API 处理选定的数据字段""" try: # 从请求获取选定的字段 data = request.get_json() selected_fields = data.get('selected_fields', {}) coze_api_token = data.get('coze_api_token') workflow_id = data.get('workflow_id') dataset_description = data.get('dataset_description', '') print("=" * 60) print("🚀 COZE API 请求已启动") print("=" * 60) print(f"📋 选定字段数量: {len(selected_fields)}") print(f"🔑 API 令牌 (最后10位): ...{coze_api_token[-10:] if coze_api_token else '无'}") print(f"⚙️ 工作流 ID: {workflow_id}") print("=" * 60) if not selected_fields: print("❌ 错误: 未选择任何字段") return jsonify({'error': '未选择任何字段'}), 400 if not coze_api_token: print("❌ 错误: 需要 Coze API 令牌") return jsonify({'error': '需要 Coze API 令牌'}), 400 if not workflow_id: print("❌ 错误: 需要工作流 ID") return jsonify({'error': '需要工作流 ID'}), 400 # 从选定字段准备输入字符串 # 格式: "字段ID (描述, 单位)" input_lines = [] for field_id, description in selected_fields.items(): input_lines.append(f"{field_id} ({description})") input_string = "\n".join(input_lines) # 如果数据集描述可用,则附加 if dataset_description: input_string = f"数据集描述: {dataset_description}\n\n选定字段:\n{input_string}" print("📤 准备 COZE API 请求:") print(f" 输入字符串: {input_string[:100]}..." if len(input_string) > 100 else f" 输入字符串: {input_string}") # 设置 Coze API coze_api_base = COZE_CN_BASE_URL print(f"🌐 使用 Coze API 基础 URL: {coze_api_base}") # 初始化 Coze 客户端 print("🔧 初始化 Coze 客户端...") coze = Coze(auth=TokenAuth(token=coze_api_token), base_url=coze_api_base) # 准备参数 parameters = { "input": input_string } print("📊 请求参数:") print(f" {json.dumps(parameters, indent=2)}") # 使用重试调用工作流 workflow_result = None error_message = None print("🔄 开始带重试的工作流执行...") for attempt in range(3): try: print(f" 尝试 {attempt + 1}/3: 调用 Coze 工作流...") workflow = coze.workflows.runs.create( workflow_id=workflow_id, parameters=parameters ) workflow_result = workflow print(f" ✅ 尝试 {attempt + 1} 成功!") print(f" 📥 工作流响应类型: {type(workflow_result)}") if hasattr(workflow_result, 'data'): print(f" 📥 响应数据长度: {len(str(workflow_result.data)) if workflow_result.data else 0}") break except Exception as e: error_message = f"尝试 {attempt + 1} 失败,错误: {str(e)}" print(f" ❌ {error_message}") continue if workflow_result is None: print("💥 所有重试后工作流执行失败") print(f" 最后错误: {error_message}") return jsonify({ 'error': '3次尝试后运行工作流失败', 'details': error_message }), 500 print("✅ 工作流执行成功") print("🔍 处理工作流结果...") # 处理结果 try: # 尝试从工作流数据中提取输出 if hasattr(workflow_result, 'data') and workflow_result.data: print(" 📊 找到工作流数据,正在处理...") # 如果是字符串,尝试评估数据 try: result_data = eval(workflow_result.data) if isinstance(workflow_result.data, str) else workflow_result.data output = result_data.get('output', '') print(f" 📄 提取的输出长度: {len(str(output))}") except: # 如果评估失败,按原样使用数据 output = str(workflow_result.data) print(f" 📄 使用原始数据作为输出 (长度: {len(output)})") else: output = "工作流未返回数据" print(" ⚠️ 工作流未返回数据") print("=" * 60) print("🎉 COZE API 请求成功完成") print(f"📊 最终输出预览: {str(output)[:150]}..." if len(str(output)) > 150 else f"📊 最终输出: {str(output)}") print("=" * 60) return jsonify({ 'success': True, 'output': output, 'parameters': parameters, 'selected_fields': selected_fields }) except Exception as parse_error: print(f"💥 解析工作流结果错误: {str(parse_error)}") print(f" 原始 workflow.data: {workflow_result.data if hasattr(workflow_result, 'data') else '无数据属性'}") return jsonify({ 'error': '解析工作流结果时出错', 'details': str(parse_error), 'raw_data': str(workflow_result.data) if hasattr(workflow_result, 'data') else None }), 500 except Exception as e: print(f"💥 处理字段错误: {str(e)}") print("=" * 60) return jsonify({'error': str(e)}), 500 @idea_house_bp.route('/api/get-datafields-proxy', methods=['GET']) def get_datafields_proxy(): """代理端点,从主应用的 BRAIN API 端点获取数据字段""" try: from flask import current_app # 使用测试客户端发出内部请求 with current_app.test_client() as client: # 从原始请求复制头部 headers = { 'Session-ID': request.headers.get('Session-ID') or flask_session.get('brain_session_id') } # 复制查询参数 params = request.args.to_dict() # 向主应用的数据字段端点发出内部请求 response = client.get('/api/datafields', headers=headers, query_string=params) # 返回响应数据和状态 return response.get_json(), response.status_code except Exception as e: print(f"数据字段代理错误: {str(e)}") return jsonify({'error': str(e)}), 500 @idea_house_bp.route('/api/get-dataset-description', methods=['GET']) def get_dataset_description(): """代理端点,从主应用的 BRAIN API 端点获取数据集描述""" print("\n" + "="*60) print("🔍 数据集描述代理端点被调用") print("="*60) try: from flask import current_app # 使用测试客户端发出内部请求 with current_app.test_client() as client: # 从原始请求复制头部 headers = { 'Session-ID': request.headers.get('Session-ID') or flask_session.get('brain_session_id') } # 复制查询参数 params = request.args.to_dict() print(f"📊 代理请求参数: {params}") print(f"📌 会话 ID: {headers.get('Session-ID')}") # 向主应用的数据集描述端点发出内部请求 response = client.get('/api/dataset-description', headers=headers, query_string=params) print(f"📥 代理响应状态: {response.status_code}") # 返回响应数据和状态 return response.get_json(), response.status_code except Exception as e: print(f"💥 数据集描述代理错误: {str(e)}") print("="*60 + "\n") return jsonify({'error': str(e)}), 500