You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
217 lines
8.1 KiB
217 lines
8.1 KiB
#!/usr/bin/env python3
|
|
"""
|
|
从 PostgreSQL 读取表达式,提取字段名
|
|
所有输出保存到文件,不在终端显示过多内容
|
|
"""
|
|
|
|
import re
|
|
import psycopg2
|
|
from collections import Counter
|
|
import csv
|
|
import sys
|
|
from datetime import datetime
|
|
|
|
# 数据库连接配置
|
|
DB_CONFIG = {
|
|
'host': '192.168.31.41',
|
|
'port': 30018,
|
|
'user': 'jack',
|
|
'password': 'aaaAAA111',
|
|
'database': 'quantify'
|
|
}
|
|
|
|
# 算子列表(84个,从 CSV 提取)
|
|
OPERATORS = {
|
|
'add', 'abs', 'log', 'subtract', 'signed_power', 'sign', 'reverse', 'power',
|
|
'multiply', 'min', 'max', 'inverse', 'sqrt', 's_log_1p', 'densify', 'divide',
|
|
'not', 'and', 'less', 'equal', 'or', 'not_equal', 'greater', 'greater_equal',
|
|
'less_equal', 'is_nan', 'if_else', 'ts_sum', 'ts_scale', 'ts_mean', 'ts_zscore',
|
|
'ts_std_dev', 'kth_element', 'inst_tvr', 'ts_corr', 'ts_count_nans',
|
|
'ts_target_tvr_decay', 'ts_median', 'ts_covariance', 'ts_decay_linear',
|
|
'ts_product', 'ts_regression', 'ts_delta_limit', 'ts_step', 'ts_decay_exp_window',
|
|
'ts_quantile', 'days_from_last_change', 'hump', 'last_diff_value', 'ts_arg_max',
|
|
'ts_arg_min', 'ts_av_diff', 'ts_backfill', 'ts_rank', 'ts_delay', 'ts_delta',
|
|
'winsorize', 'truncate', 'regression_neut', 'scale', 'rank', 'quantile',
|
|
'normalize', 'zscore', 'vec_min', 'vec_count', 'vec_stddev', 'vec_range',
|
|
'vec_avg', 'vec_sum', 'vec_max', 'left_tail', 'trade_when', 'right_tail',
|
|
'bucket', 'group_rank', 'group_cartesian_product', 'group_backfill', 'group_mean',
|
|
'group_neutralize', 'group_normalize', 'group_median', 'group_scale', 'group_zscore'
|
|
}
|
|
|
|
|
|
class TeeLogger:
|
|
"""同时输出到终端和文件"""
|
|
def __init__(self, filename):
|
|
self.terminal = sys.stdout
|
|
self.log_file = open(filename, 'w', encoding='utf-8')
|
|
|
|
def write(self, message):
|
|
self.terminal.write(message)
|
|
self.log_file.write(message)
|
|
|
|
def flush(self):
|
|
self.terminal.flush()
|
|
self.log_file.flush()
|
|
|
|
def close(self):
|
|
self.log_file.close()
|
|
|
|
|
|
def extract_fields(expression):
|
|
"""
|
|
从表达式字符串中提取字段名
|
|
字段名特征:小写字母+数字+下划线,不在算子列表里,不是数字
|
|
"""
|
|
candidates = re.findall(r'\b[a-z][a-z0-9_]*\b', expression)
|
|
|
|
fields = []
|
|
for c in candidates:
|
|
# 过滤条件
|
|
if (c.isdigit() or
|
|
c in OPERATORS or
|
|
len(c) < 3 or
|
|
c in ['true', 'false', 'nan', 'null', 'constant']):
|
|
continue
|
|
fields.append(c)
|
|
|
|
return fields
|
|
|
|
|
|
def main():
|
|
# 设置日志文件
|
|
log_filename = f'extract_fields_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'
|
|
logger = TeeLogger(log_filename)
|
|
sys.stdout = logger
|
|
|
|
try:
|
|
print("=" * 60)
|
|
print(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
print("=" * 60)
|
|
|
|
print("\n🔌 连接数据库...")
|
|
conn = psycopg2.connect(**DB_CONFIG)
|
|
conn.autocommit = True # 不开启事务,避免锁表
|
|
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
# 先统计总数
|
|
print("📊 统计总表达式数...")
|
|
cursor.execute("SELECT COUNT(*) FROM alpha_expression_line")
|
|
total = cursor.fetchone()[0]
|
|
print(f" 总表达式数: {total:,}")
|
|
|
|
# 分批读取
|
|
batch_size = 5000
|
|
offset = 0
|
|
processed = 0
|
|
|
|
field_counter = Counter()
|
|
|
|
while offset < total:
|
|
print(f"📖 读取进度: {processed:,} / {total:,} ({processed*100//total}%)")
|
|
cursor.execute(
|
|
"SELECT expression FROM alpha_expression_line LIMIT %s OFFSET %s",
|
|
(batch_size, offset)
|
|
)
|
|
rows = cursor.fetchall()
|
|
|
|
for row in rows:
|
|
expr = row[0]
|
|
if expr and isinstance(expr, str):
|
|
fields = extract_fields(expr)
|
|
field_counter.update(fields)
|
|
processed += 1
|
|
|
|
offset += batch_size
|
|
|
|
print(f"\n✅ 提取完成,共处理 {processed:,} 条表达式")
|
|
print(f" 发现 {len(field_counter)} 个不同的字段")
|
|
|
|
# 按出现次数排序
|
|
sorted_fields = sorted(field_counter.items(), key=lambda x: x[1], reverse=True)
|
|
|
|
# 保存完整字段列表到 CSV
|
|
csv_filename = 'extracted_fields.csv'
|
|
with open(csv_filename, 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(['field_name', 'occurrence_count'])
|
|
for field, count in sorted_fields:
|
|
writer.writerow([field, count])
|
|
|
|
print(f"\n💾 完整字段列表已保存到: {csv_filename}")
|
|
print(f" 共 {len(sorted_fields)} 个字段")
|
|
|
|
# 保存字段名列表(纯文本,方便复制)
|
|
txt_filename = 'field_names.txt'
|
|
with open(txt_filename, 'w', encoding='utf-8') as f:
|
|
for field, count in sorted_fields:
|
|
f.write(f"{field}\n")
|
|
|
|
print(f"💾 字段名列表(纯文本)已保存到: {txt_filename}")
|
|
|
|
# 统计信息
|
|
print("\n" + "=" * 60)
|
|
print("📊 统计信息")
|
|
print("=" * 60)
|
|
print(f"总表达式数: {total:,}")
|
|
print(f"有效表达式数: {processed:,}")
|
|
print(f"不同字段数: {len(sorted_fields):,}")
|
|
|
|
# 高频字段 Top 50
|
|
print("\n📋 出现次数最多的前 50 个字段:")
|
|
print("-" * 40)
|
|
for i, (field, count) in enumerate(sorted_fields[:50], 1):
|
|
pct = count * 100.0 / processed
|
|
print(f"{i:3}. {field:30} {count:8,} 次 ({pct:.2f}%)")
|
|
|
|
# 低频字段统计(只出现1次的)
|
|
once_fields = [f for f, c in sorted_fields if c == 1]
|
|
print(f"\n📋 仅出现 1 次的字段数: {len(once_fields)}")
|
|
if once_fields:
|
|
print(f" 示例: {', '.join(once_fields[:10])}")
|
|
|
|
# 保存详细统计报告
|
|
report_filename = f'field_analysis_report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.txt'
|
|
with open(report_filename, 'w', encoding='utf-8') as f:
|
|
f.write("字段分析报告\n")
|
|
f.write("=" * 60 + "\n")
|
|
f.write(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
|
f.write(f"总表达式数: {total:,}\n")
|
|
f.write(f"不同字段数: {len(sorted_fields):,}\n\n")
|
|
|
|
f.write("高频字段 Top 100:\n")
|
|
f.write("-" * 60 + "\n")
|
|
for i, (field, count) in enumerate(sorted_fields[:100], 1):
|
|
pct = count * 100.0 / processed
|
|
f.write(f"{i:3}. {field:30} {count:8,} 次 ({pct:.2f}%)\n")
|
|
|
|
f.write("\n\n完整字段列表:\n")
|
|
f.write("-" * 60 + "\n")
|
|
for field, count in sorted_fields:
|
|
f.write(f"{field},{count}\n")
|
|
|
|
print(f"\n💾 详细分析报告已保存到: {report_filename}")
|
|
|
|
print("\n" + "=" * 60)
|
|
print(f"完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
print("=" * 60)
|
|
|
|
finally:
|
|
cursor.close()
|
|
conn.close()
|
|
print("\n🔌 数据库连接已关闭")
|
|
|
|
except Exception as e:
|
|
print(f"\n❌ 错误: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
finally:
|
|
# 恢复标准输出
|
|
sys.stdout = logger.terminal
|
|
logger.close()
|
|
print(f"\n✅ 所有输出已保存到: {log_filename}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |