You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
wqb-server/blueprints/feature_engineering.py

314 lines
11 KiB

from flask import Blueprint, render_template, request, jsonify
import requests
import json
import logging
feature_engineering_bp = Blueprint('feature_engineering', __name__)
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@feature_engineering_bp.route('/')
def feature_engineering():
"""特征工程主页面"""
return render_template('feature_engineering.html')
@feature_engineering_bp.route('/api/test-deepseek', methods=['POST'])
def test_deepseek_api():
"""测试 Deepseek API 连接"""
try:
api_key = request.headers.get('X-API-Key')
if not api_key:
return jsonify({'success': False, 'error': '需要提供 API 密钥'}), 400
# 通过简单请求测试 API
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
test_data = {
'model': 'deepseek-chat',
'messages': [
{'role': 'user', 'content': '你好,这是一条测试消息。'}
],
'max_tokens': 10
}
response = requests.post(
'https://api.deepseek.com/chat/completions ',
headers=headers,
json=test_data,
timeout=10
)
if response.status_code == 200:
return jsonify({'success': True, 'message': 'API 连接成功'})
else:
error_detail = response.text
return jsonify({'success': False, 'error': f'API 返回状态 {response.status_code}{error_detail}'}), 400
except requests.exceptions.RequestException as e:
logger.error(f"API 测试出错:{str(e)}")
return jsonify({'success': False, 'error': f'网络错误:{str(e)}'}), 500
except Exception as e:
logger.error(f"API 测试中出现意外错误:{str(e)}")
return jsonify({'success': False, 'error': f'未知错误:{str(e)}'}), 500
@feature_engineering_bp.route('/api/get-recommendations', methods=['POST'])
def get_feature_engineering_recommendations():
"""从 Deepseek API 获取特征工程建议"""
try:
api_key = request.headers.get('X-API-Key')
if not api_key:
return jsonify({'success': False, 'error': '需要提供 API 密钥'}), 400
data = request.get_json()
current_step = data.get('current_step', 1)
data_field = data.get('data_field', '')
previous_steps = data.get('previous_steps', [])
current_data_state = data.get('current_data_state', '原始数据')
if not data_field:
return jsonify({'success': False, 'error': '请提供数据字段描述'}), 400
# 构建系统提示
system_prompt = get_default_system_prompt_text()
# 构建用户提示
previous_steps_text = "" if not previous_steps else "".join([f"步骤 {i+1}{step}" for i, step in enumerate(previous_steps)])
user_prompt = f"""上下文:
当前步骤:{current_step}
当前数据字段:{data_field}
之前的步骤及所用类别:{previous_steps_text}
当前数据状态:{current_data_state}"""
# 调用 Deepseek API
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
api_data = {
'model': 'deepseek-chat',
'messages': [
{'role': 'system', 'content': system_prompt},
{'role': 'user', 'content': user_prompt}
],
'max_tokens': 8192,
'temperature': 0.7
}
response = requests.post(
'https://api.deepseek.com/chat/completions ',
headers=headers,
json=api_data,
timeout=30
)
if response.status_code == 200:
response_data = response.json()
recommendations = response_data['choices'][0]['message']['content']
return jsonify({
'success': True,
'recommendations': recommendations,
'current_step': current_step,
'data_field': data_field,
'previous_steps': previous_steps,
'current_data_state': current_data_state
})
else:
error_detail = response.text
logger.error(f"Deepseek API 错误:{response.status_code} - {error_detail}")
return jsonify({'success': False, 'error': f'API 返回状态 {response.status_code}{error_detail}'}), 400
except requests.exceptions.RequestException as e:
logger.error(f"API 请求出错:{str(e)}")
return jsonify({'success': False, 'error': f'网络错误:{str(e)}'}), 500
except Exception as e:
logger.error(f"获取建议时出现意外错误:{str(e)}")
return jsonify({'success': False, 'error': f'未知错误:{str(e)}'}), 500
@feature_engineering_bp.route('/api/continue-conversation', methods=['POST'])
def continue_conversation():
"""继续对话,回答追问"""
try:
api_key = request.headers.get('X-API-Key')
if not api_key:
return jsonify({'success': False, 'error': '需要提供 API 密钥'}), 400
data = request.get_json()
conversation_history = data.get('conversation_history', [])
user_message = data.get('user_message', '')
custom_system_prompt = data.get('custom_system_prompt', None)
if not user_message:
return jsonify({'success': False, 'error': '请输入您的消息'}), 400
# 构建对话消息
messages = []
# 若提供了自定义系统提示则使用,否则用默认
if custom_system_prompt:
system_prompt = custom_system_prompt
else:
system_prompt = get_default_system_prompt_text()
messages.append({'role': 'system', 'content': system_prompt})
# 加入历史对话
for msg in conversation_history:
messages.append(msg)
# 加入新用户消息
messages.append({'role': 'user', 'content': user_message})
print(user_message)
# 调用 Deepseek API
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
api_data = {
'model': 'deepseek-chat',
'messages': messages,
'max_tokens': 8192,
'temperature': 0.7
}
response = requests.post(
'https://api.deepseek.com/chat/completions ',
headers=headers,
json=api_data,
timeout=30
)
if response.status_code == 200:
response_data = response.json()
assistant_response = response_data['choices'][0]['message']['content']
return jsonify({
'success': True,
'response': assistant_response
})
else:
error_detail = response.text
logger.error(f"Deepseek API 错误:{response.status_code} - {error_detail}")
return jsonify({'success': False, 'error': f'API 返回状态 {response.status_code}{error_detail}'}), 400
except requests.exceptions.RequestException as e:
logger.error(f"API 请求出错:{str(e)}")
return jsonify({'success': False, 'error': f'网络错误:{str(e)}'}), 500
except Exception as e:
logger.error(f"继续对话时出现意外错误:{str(e)}")
return jsonify({'success': False, 'error': f'未知错误:{str(e)}'}), 500
def get_default_system_prompt_text():
"""获取默认系统提示文本"""
return """你是一位特征工程专家助手。你的任务是针对给定的数据字段,设计一个最多包含 6 步的特征工程流水线。每一步都要根据当前数据状态、已完成的步骤以及总体目标,从 15 个类别中推荐最合适的特征工程类别。
说明:
每一步你都会收到:
当前步骤序号。
当前数据字段及其描述。
已完成的步骤及所用类别(如有)。
当前数据状态(例如:已归一化、已过滤等)。
你的任务是:
列出当前步骤最可行的特征工程类别,从以下 15 类中选择:
基础算术与数学运算
逻辑与条件运算
时间序列:变化检测与值比较
时间序列:统计特征工程
时间序列:排序、缩放与归一化
时间序列:衰减、平滑与翻转控制
时间序列:极值与位置识别
横截面:排序、缩放与归一化
横截面:回归与中性化
横截面:分布变换与截断
变换与过滤操作
分组聚合与统计摘要
分组排序、缩放与归一化
分组回归与中性化
分组插补与回填
对于每个推荐的类别,请按以下格式呈现:
重复完整上下文。
明确指出下一步选择的类别。
给出简要选择原因。
输出格式:
第 X 步可行类别:
第 X 步 选项 1:上下文:当前步骤:[序号] 当前数据字段:[描述] 之前步骤及所用类别:[列表] 当前数据状态:[详细描述之前步骤如何将数据转换到当前状态及其逻辑] 下一步选择:[类别名称] 原因:[解释]
第 X 步 选项 2:上下文:当前步骤:[序号] 当前数据字段:[描述] 之前步骤及所用类别:[列表] 当前数据状态:[详细描述之前步骤如何将数据转换到当前状态及其逻辑] 下一步选择:[类别名称] 原因:[解释]
...(继续列出所有可行选项)
额外说明:
仅推荐在当前数据状态及已完成的步骤下逻辑合理且有意义的类别。
若某类别在此步不适用,请勿列出。
解释需简洁明了。
除非特别要求,否则不要给出具体操作符。
每一步你都会收到如下输入:
上下文:
当前步骤:
当前数据字段:
之前步骤及所用类别:
当前数据状态:
收到输入后,请按上述格式回复。
重要:选项之后不要添加任何总结、建议、理由或其他解释。严格按照上述格式提供选项即可。不要添加“最推荐选择”、“理由”、“最佳选择”或“是否继续”等部分。列出所有选项后立即结束。
"""
@feature_engineering_bp.route('/api/get-default-system-prompt', methods=['GET'])
def get_default_system_prompt():
"""获取默认系统提示"""
try:
return jsonify({
'success': True,
'default_system_prompt': get_default_system_prompt_text()
})
except Exception as e:
logger.error(f"获取默认系统提示出错:{str(e)}")
return jsonify({'success': False, 'error': f'未知错误:{str(e)}'}), 500