You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
281 lines
9.0 KiB
281 lines
9.0 KiB
# -*- coding: utf-8 -*-
|
|
import os
|
|
import random
|
|
import sys
|
|
import json
|
|
import openai
|
|
import httpx
|
|
import csv
|
|
from datetime import datetime
|
|
import jieba
|
|
|
|
sys.path.append(os.path.join(os.path.abspath(__file__).split('AlphaGenerator')[0] + 'AlphaGenerator'))
|
|
PROJECT_PATH = os.path.join(os.path.abspath(__file__).split('AlphaGenerator')[0] + 'AlphaGenerator')
|
|
|
|
PREPARE_PROMPT = os.path.join(PROJECT_PATH, 'prepare_prompt')
|
|
KEYS_TEXT = os.path.join(PREPARE_PROMPT, 'keys_text.txt')
|
|
|
|
SELECT_DATA_SET_QTY = 30
|
|
|
|
SILICONFLOW_API_KEY = "sk-pvdiisdowmuwkrpnxsrlhxaovicqibmlljwrwwvbbdjaitdl"
|
|
SILICONFLOW_BASE_URL = "https://api.siliconflow.cn/v1"
|
|
MODELS = [
|
|
'deepseek-ai/DeepSeek-V3.2-Exp',
|
|
'Qwen/Qwen3-VL-235B-A22B-Instruct',
|
|
# 'MiniMaxAI/MiniMax-M2',
|
|
# 'zai-org/GLM-4.6',
|
|
# 'inclusionAI/Ring-flash-2.0',
|
|
# 'zai-org/GLM-4.6',
|
|
# 'inclusionAI/Ling-flash-2.0',
|
|
# 'inclusionAI/Ring-flash-2.0',
|
|
]
|
|
|
|
def process_text(text):
|
|
filter_list = ['\n', '\t', '\r', '\b', '\f', '\v', ':', '的', '或', '10', '天', '了', '可', '是', '该', ',', ' ', '、', '让', '和', '集',
|
|
'/', '日', '在', '(', '_', '-', ')', '(', '上', '距', '与', '比', '下', '及', ')', '...', ';', '%', '&', '+', ',', '.',
|
|
':', ';', '<', '=', '>', '?', '[', ']', '|', '—', '。'
|
|
]
|
|
|
|
text_list = jieba.lcut(text)
|
|
results = []
|
|
for tl in text_list:
|
|
should_include = True
|
|
for fl in filter_list:
|
|
if fl == tl:
|
|
should_include = False
|
|
break
|
|
if should_include:
|
|
results.append(tl)
|
|
|
|
if results:
|
|
return list(set(results))
|
|
else:
|
|
return None
|
|
|
|
def load_keys_text():
|
|
if not os.path.exists(KEYS_TEXT):
|
|
print(f"文件不存在: {KEYS_TEXT}")
|
|
exit(1)
|
|
with open(KEYS_TEXT, 'r', encoding='utf-8') as f:
|
|
text_list = [line.strip() for line in f if line.strip()]
|
|
if not text_list:
|
|
print('关键词文本无数据, 程序退出')
|
|
exit(1)
|
|
|
|
return process_text(';'.join(text_list))
|
|
|
|
def txtFileLoader(file_path):
|
|
if not os.path.exists(file_path):
|
|
print(f"文件不存在: {file_path}")
|
|
exit(1)
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
return [line.strip() for line in f if line.strip()]
|
|
|
|
|
|
def csvFileLoader(file_path, keys_text):
|
|
if not os.path.exists(file_path):
|
|
print(f"文件不存在: {file_path}")
|
|
exit(1)
|
|
|
|
data_dict = {} # 使用字典来存储,以id为键
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
reader = csv.reader(f)
|
|
for row in reader:
|
|
for key in keys_text:
|
|
if key in row[11] or key in row[12]:
|
|
item_id = row[0]
|
|
# 如果id不存在,或者想要保留第一个出现的记录
|
|
if item_id not in data_dict:
|
|
data_dict[item_id] = {
|
|
'id': item_id,
|
|
'data_set_name': row[1],
|
|
'description': row[2],
|
|
'description_cn': row[11],
|
|
}
|
|
|
|
# 将字典的值转换为列表
|
|
return list(data_dict.values())
|
|
|
|
|
|
def read_prompt(alpha_prompt_path):
|
|
if not os.path.exists(alpha_prompt_path):
|
|
print("alpha_prompt.txt文件不存在")
|
|
exit(1)
|
|
with open(alpha_prompt_path, 'r', encoding='utf-8') as f:
|
|
prompt = f.read().strip()
|
|
if not prompt:
|
|
print("alpha_prompt.txt是空的")
|
|
exit(1)
|
|
return prompt.replace('\n\n', '\n')
|
|
|
|
|
|
def read_operator(operator_prompt_path):
|
|
if not os.path.exists(operator_prompt_path):
|
|
print("wqb_operator.txt文件不存在")
|
|
exit(1)
|
|
with open(operator_prompt_path, 'r', encoding='utf-8') as f:
|
|
operator_lines = [line.strip() for line in f.readlines() if line.strip()]
|
|
if not operator_lines:
|
|
print("wqb_operator.txt是空的")
|
|
exit(1)
|
|
return "\n".join(operator_lines)
|
|
|
|
|
|
def create_result_folder():
|
|
folder_name = "generated_alpha"
|
|
if not os.path.exists(folder_name):
|
|
os.makedirs(folder_name)
|
|
return folder_name
|
|
|
|
|
|
def call_siliconflow(prompt, model):
|
|
try:
|
|
client = openai.OpenAI(
|
|
api_key=SILICONFLOW_API_KEY,
|
|
base_url=SILICONFLOW_BASE_URL
|
|
)
|
|
|
|
response = client.chat.completions.create(
|
|
model=model,
|
|
messages=[{"role": "user", "content": prompt}]
|
|
)
|
|
|
|
return response.choices[0].message.content
|
|
|
|
except openai.AuthenticationError:
|
|
print("API密钥错误")
|
|
except openai.RateLimitError:
|
|
print("调用频率限制")
|
|
except openai.APIError as e:
|
|
print(f"API错误: {e}")
|
|
except Exception as e:
|
|
print(f"其他错误: {e}")
|
|
exit(1)
|
|
|
|
|
|
def save_result(result, folder):
|
|
now = datetime.now()
|
|
date_folder = now.strftime("%Y-%m-%d")
|
|
time_filename = now.strftime("%H%M%S")
|
|
full_folder_path = os.path.join(folder, date_folder)
|
|
|
|
if not os.path.exists(full_folder_path):
|
|
os.makedirs(full_folder_path)
|
|
print(f"创建文件夹: {full_folder_path}")
|
|
|
|
filename = f"{time_filename}.txt"
|
|
filepath = os.path.join(full_folder_path, filename)
|
|
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
f.write(result)
|
|
|
|
print(f"结果保存到: {filepath}")
|
|
|
|
|
|
def get_user_info():
|
|
headers = {"Authorization": f"Bearer {SILICONFLOW_API_KEY}"}
|
|
url = "https://api.siliconflow.cn/v1/user/info"
|
|
response = httpx.get(url, headers=headers)
|
|
data = response.json()['data']
|
|
balance = data['balance']
|
|
print(f"余额: {balance}")
|
|
return float(balance)
|
|
|
|
|
|
def manual_prompt(prompt):
|
|
manual_prompt_path = os.path.join(PROJECT_PATH, "manual_prompt")
|
|
|
|
if not os.path.exists(manual_prompt_path):
|
|
os.makedirs(manual_prompt_path)
|
|
print(f"创建文件夹: {manual_prompt_path}")
|
|
|
|
# 文件名后添加保存时间
|
|
now = datetime.now()
|
|
filename = f"manual_prompt_{now.strftime('%Y%m%d%H%M%S')}.txt"
|
|
filepath = os.path.join(manual_prompt_path, filename)
|
|
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
f.write(prompt)
|
|
|
|
print(f"手动提示词保存到: {filepath}")
|
|
|
|
|
|
def call_ai(prompt, model):
|
|
balance = get_user_info()
|
|
|
|
folder = create_result_folder()
|
|
|
|
print(f"正在调用AI...{model}")
|
|
result = call_siliconflow(prompt, model)
|
|
|
|
if result:
|
|
print(f"AI回复: {result[:200]}...")
|
|
save_result(result, folder)
|
|
used_balance = balance - get_user_info()
|
|
print(f'本次调用 api 使用额度 {used_balance}')
|
|
else:
|
|
print("AI调用失败")
|
|
|
|
|
|
def prepare_prompt(data_sets):
|
|
prompt = ''
|
|
|
|
# 读取基础提示词
|
|
alpha_prompt_path = os.path.join(PREPARE_PROMPT, "alpha_prompt.txt")
|
|
prompt += read_prompt(alpha_prompt_path)
|
|
|
|
# 读取操作符
|
|
prompt += "\n\n以下是我的账号有权限使用的操作符, 请严格按照操作符, 进行生成,组合因子\n\n"
|
|
prompt += "========================= 操作符开始 ======================================="
|
|
prompt += "注意: Operator: 后面的是操作符,\nDescription: 此字段后面的是操作符对应的描述或使用说明, Description字段后面的内容是使用说明, 不是操作符\n"
|
|
prompt += "特别注意!!!! 必须按照操作符字段Operator的使用说明生成 alpha"
|
|
operator_prompt_path = os.path.join(PREPARE_PROMPT, "operator.txt")
|
|
operator = read_operator(operator_prompt_path)
|
|
prompt += operator
|
|
prompt += "\n========================= 操作符结束 =======================================\n\n"
|
|
|
|
prompt += "========================= 数据字段开始 ======================================="
|
|
prompt += "注意: data_set_name: 后面的是数据字段(可以使用), description: 此字段后面的是数据字段对应的描述或使用说明(不能使用), description_cn字段后面的内容是中文使用说明(不能使用)\n\n"
|
|
for data_set in data_sets:
|
|
prompt += str(data_set) + '\n'
|
|
|
|
prompt += "========================= 数据字段结束 =======================================\n\n"
|
|
|
|
return prompt
|
|
|
|
|
|
def main():
|
|
# 将金融逻辑, 分割成标签
|
|
keys_text = load_keys_text()
|
|
|
|
# 分割好的标签, 搜索对应的数据集, 返回匹配到的结果
|
|
data_sets_path = os.path.join(PREPARE_PROMPT, "all_data_combined.csv")
|
|
result_data_sets = csvFileLoader(data_sets_path, keys_text)
|
|
|
|
if not result_data_sets:
|
|
print(f'搜索数据集为空, 程序退出')
|
|
exit(1)
|
|
|
|
|
|
data_sets = 0
|
|
print(f'从数据集中提取了 {len(result_data_sets)} 条数据')
|
|
if len(result_data_sets) > 500:
|
|
data_sets = random.sample(my_list, 10)
|
|
else:
|
|
data_sets = result_data_sets
|
|
|
|
|
|
# 组合提示词
|
|
prompt = prepare_prompt(data_sets)
|
|
|
|
# # 如果需要手动在页面段模型, 使用提示词, 打开这个, 将生成的提示词存到本地
|
|
manual_prompt(prompt)
|
|
|
|
for model in MODELS:
|
|
# 如果需要使用模型, 打开这个
|
|
call_ai(prompt, model)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|