# -*- coding: utf-8 -*- import os import json import openai import httpx from datetime import datetime def load_config(config_file="ai_config.json"): try: with open(config_file, 'r', encoding='utf-8') as f: return json.load(f) except FileNotFoundError: print(f"创建配置文件: {config_file}") with open(config_file, 'w', encoding='utf-8') as f: json.dump({}, f, indent=4) print("请在config.json中配置siliconflow的api信息") return None def create_prompt_file(): if not os.path.exists("alpha_prompt.txt"): with open("alpha_prompt.txt", 'w', encoding='utf-8') as f: f.write("") print("已创建alpha_prompt.txt,请填入提示词后重新运行") return False return True def read_prompt(): with open("alpha_prompt.txt", 'r', encoding='utf-8') as f: prompt = f.read().strip() if not prompt: print("alpha_prompt.txt是空的,请填入提示词") return None return prompt def read_operator(): with open("wqb_operator.txt", 'r', encoding='utf-8') as f: operator = f.read().strip() if not operator: print("operator.txt是空的,请填入操作符") return None return operator def create_result_folder(): # 修改这里:创建 generated_alpha 文件夹 folder_name = "generated_alpha" if not os.path.exists(folder_name): os.makedirs(folder_name) return folder_name def call_siliconflow(api_key, prompt, model, base_url): try: client = openai.OpenAI( api_key=api_key, base_url=base_url ) response = client.chat.completions.create( model=model, messages=[{"role": "user", "content": prompt}] ) return response.choices[0].message.content except openai.AuthenticationError: print("API密钥错误") except openai.RateLimitError: print("调用频率限制") except openai.APIError as e: print(f"API错误: {e}") except Exception as e: print(f"其他错误: {e}") return None def save_result(result, folder): timestamp = datetime.now().strftime("%Y%m%d%H%M%S") filename = f"{timestamp}.txt" filepath = os.path.join(folder, filename) with open(filepath, 'w', encoding='utf-8') as f: f.write(result) print(f"结果保存到: {filepath}") def get_ai_config(config): if not config: return None service_name = list(config.keys())[0] service_config = config[service_name] required_fields = ["api_keys", "model", "base_url"] for field in required_fields: if field not in service_config: print(f"缺少配置字段: {field}") return None return service_name, service_config def get_user_info(): # 获取用户信息 with open('ai_config.json', 'r') as f: config = json.load(f) siliconflow = config['siliconflow'] token = siliconflow['api_keys'] headers = {"Authorization": f"Bearer {token}"} url = "https://api.siliconflow.cn/v1/user/info" response = httpx.get(url, headers=headers) data = response.json()['data'] balance = data['balance'] print(f"余额: {balance}") def main(): get_user_info() config = load_config() if not config: return if not create_prompt_file(): return prompt = read_prompt() if not prompt: return config_result = get_ai_config(config) if not config_result: return operator = read_operator() if operator: prompt = prompt + "\n\n以下是我的账号有权限使用的操作符以及操作符的使用方法, 请严格按照操作符, 进行生成,组合因子\n\n" + operator service_name, service_config = config_result print(f"使用服务: {service_name}") print(f"使用模型: {service_config['model']}") folder = create_result_folder() print("正在调用AI...") result = call_siliconflow( service_config["api_keys"], prompt, service_config["model"], service_config["base_url"] ) if result: print(f"AI回复: {result[:200]}...") save_result(result, folder) else: print("AI调用失败") if __name__ == "__main__": main()