#!/usr/bin/env python3 """ 从 PostgreSQL 读取表达式,提取字段名 所有输出保存到文件,不在终端显示过多内容 """ import re import psycopg2 from collections import Counter import csv import sys from datetime import datetime # 数据库连接配置 DB_CONFIG = { 'host': '192.168.31.41', 'port': 30018, 'user': 'jack', 'password': 'aaaAAA111', 'database': 'quantify' } # 算子列表(84个,从 CSV 提取) OPERATORS = { 'add', 'abs', 'log', 'subtract', 'signed_power', 'sign', 'reverse', 'power', 'multiply', 'min', 'max', 'inverse', 'sqrt', 's_log_1p', 'densify', 'divide', 'not', 'and', 'less', 'equal', 'or', 'not_equal', 'greater', 'greater_equal', 'less_equal', 'is_nan', 'if_else', 'ts_sum', 'ts_scale', 'ts_mean', 'ts_zscore', 'ts_std_dev', 'kth_element', 'inst_tvr', 'ts_corr', 'ts_count_nans', 'ts_target_tvr_decay', 'ts_median', 'ts_covariance', 'ts_decay_linear', 'ts_product', 'ts_regression', 'ts_delta_limit', 'ts_step', 'ts_decay_exp_window', 'ts_quantile', 'days_from_last_change', 'hump', 'last_diff_value', 'ts_arg_max', 'ts_arg_min', 'ts_av_diff', 'ts_backfill', 'ts_rank', 'ts_delay', 'ts_delta', 'winsorize', 'truncate', 'regression_neut', 'scale', 'rank', 'quantile', 'normalize', 'zscore', 'vec_min', 'vec_count', 'vec_stddev', 'vec_range', 'vec_avg', 'vec_sum', 'vec_max', 'left_tail', 'trade_when', 'right_tail', 'bucket', 'group_rank', 'group_cartesian_product', 'group_backfill', 'group_mean', 'group_neutralize', 'group_normalize', 'group_median', 'group_scale', 'group_zscore' } class TeeLogger: """同时输出到终端和文件""" def __init__(self, filename): self.terminal = sys.stdout self.log_file = open(filename, 'w', encoding='utf-8') def write(self, message): self.terminal.write(message) self.log_file.write(message) def flush(self): self.terminal.flush() self.log_file.flush() def close(self): self.log_file.close() def extract_fields(expression): """ 从表达式字符串中提取字段名 字段名特征:小写字母+数字+下划线,不在算子列表里,不是数字 """ candidates = re.findall(r'\b[a-z][a-z0-9_]*\b', expression) fields = [] for c in candidates: # 过滤条件 if (c.isdigit() or c in OPERATORS or len(c) < 3 or c in ['true', 'false', 'nan', 'null', 'constant']): continue fields.append(c) return fields def main(): # 设置日志文件 log_filename = f'extract_fields_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log' logger = TeeLogger(log_filename) sys.stdout = logger try: print("=" * 60) print(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("=" * 60) print("\n🔌 连接数据库...") conn = psycopg2.connect(**DB_CONFIG) conn.autocommit = True # 不开启事务,避免锁表 try: cursor = conn.cursor() # 先统计总数 print("📊 统计总表达式数...") cursor.execute("SELECT COUNT(*) FROM alpha_expression_line") total = cursor.fetchone()[0] print(f" 总表达式数: {total:,}") # 分批读取 batch_size = 5000 offset = 0 processed = 0 field_counter = Counter() while offset < total: print(f"📖 读取进度: {processed:,} / {total:,} ({processed*100//total}%)") cursor.execute( "SELECT expression FROM alpha_expression_line LIMIT %s OFFSET %s", (batch_size, offset) ) rows = cursor.fetchall() for row in rows: expr = row[0] if expr and isinstance(expr, str): fields = extract_fields(expr) field_counter.update(fields) processed += 1 offset += batch_size print(f"\n✅ 提取完成,共处理 {processed:,} 条表达式") print(f" 发现 {len(field_counter)} 个不同的字段") # 按出现次数排序 sorted_fields = sorted(field_counter.items(), key=lambda x: x[1], reverse=True) # 保存完整字段列表到 CSV csv_filename = 'extracted_fields.csv' with open(csv_filename, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(['field_name', 'occurrence_count']) for field, count in sorted_fields: writer.writerow([field, count]) print(f"\n💾 完整字段列表已保存到: {csv_filename}") print(f" 共 {len(sorted_fields)} 个字段") # 保存字段名列表(纯文本,方便复制) txt_filename = 'field_names.txt' with open(txt_filename, 'w', encoding='utf-8') as f: for field, count in sorted_fields: f.write(f"{field}\n") print(f"💾 字段名列表(纯文本)已保存到: {txt_filename}") # 统计信息 print("\n" + "=" * 60) print("📊 统计信息") print("=" * 60) print(f"总表达式数: {total:,}") print(f"有效表达式数: {processed:,}") print(f"不同字段数: {len(sorted_fields):,}") # 高频字段 Top 50 print("\n📋 出现次数最多的前 50 个字段:") print("-" * 40) for i, (field, count) in enumerate(sorted_fields[:50], 1): pct = count * 100.0 / processed print(f"{i:3}. {field:30} {count:8,} 次 ({pct:.2f}%)") # 低频字段统计(只出现1次的) once_fields = [f for f, c in sorted_fields if c == 1] print(f"\n📋 仅出现 1 次的字段数: {len(once_fields)}") if once_fields: print(f" 示例: {', '.join(once_fields[:10])}") # 保存详细统计报告 report_filename = f'field_analysis_report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.txt' with open(report_filename, 'w', encoding='utf-8') as f: f.write("字段分析报告\n") f.write("=" * 60 + "\n") f.write(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"总表达式数: {total:,}\n") f.write(f"不同字段数: {len(sorted_fields):,}\n\n") f.write("高频字段 Top 100:\n") f.write("-" * 60 + "\n") for i, (field, count) in enumerate(sorted_fields[:100], 1): pct = count * 100.0 / processed f.write(f"{i:3}. {field:30} {count:8,} 次 ({pct:.2f}%)\n") f.write("\n\n完整字段列表:\n") f.write("-" * 60 + "\n") for field, count in sorted_fields: f.write(f"{field},{count}\n") print(f"\n💾 详细分析报告已保存到: {report_filename}") print("\n" + "=" * 60) print(f"完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("=" * 60) finally: cursor.close() conn.close() print("\n🔌 数据库连接已关闭") except Exception as e: print(f"\n❌ 错误: {e}") import traceback traceback.print_exc() finally: # 恢复标准输出 sys.stdout = logger.terminal logger.close() print(f"\n✅ 所有输出已保存到: {log_filename}") if __name__ == "__main__": main()