You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
auto/utils/utils_sync_psql.py

219 lines
7.2 KiB

# -*- coding: utf-8 -*-
# 内部数据库同步数据脚本, 用于同一台服务器, 将mongodb数据同步到pgsql
from datetime import datetime
from pymongo import MongoClient
import psycopg2
from psycopg2 import sql
MONGOLINK = 'mongodb://root:aaaAAA111!!!@erhe.top:38000/'
PGSQLPARAMS = {
# 'database': 'postgres', # 默认连接到postgres数据库,超级用户数据库
'user': 'toor',
'password': 'aaaAAA111', # 替换为你的密码
'host': '192.168.31.177',
'port': 5432
}
def mongo():
# mongodb
client = MongoClient(MONGOLINK)
# 指定数据库名称
db_name = 'NEWS' # 替换为你的数据库名称
# 选择数据库
db = client[db_name]
# 列出数据库中的所有集合
collections = db.list_collection_names()
all_data = []
for collection_name in collections:
# 选择集合
collection = db[collection_name]
# 读取集合中的所有数据
for document in collection.find({}, {'_id': 0}):
all_data.append(document)
sorted_data = []
if all_data:
sorted_data = sorted(all_data, key=lambda x: x['create_time'], reverse=True)
return sorted_data
def pg(sorted_data):
table_name = 'auto'
PGSQLPARAMS.update({'database': table_name})
conn = psycopg2.connect(**PGSQLPARAMS)
# 记录相同数据, 如果查过指定数量, 则退出程序, 避免浪费资源
same_data = 500
for doc in sorted_data:
if same_data > 0:
try:
cur = conn.cursor()
create_time_dt = None
if doc.get('create_time'):
create_time_dt = datetime.utcfromtimestamp(doc['create_time'])
values = {
'name': doc.get('title'),
'context': doc.get('context') or '',
'source_url': doc.get('source_url') or '',
'link': doc.get('line') or '',
'article_type': doc.get('article_type') or '',
'article_source': doc.get('article_source') or '',
'img_url': doc.get('img_url') or '',
'keyword': doc.get('keyword') or '',
'posted_date': doc.get('posted_date') or '',
'create_time_ts': doc.get('create_time') or '',
'create_time': create_time_dt,
'create_datetime': datetime.strptime(doc['create_datetime'], '%Y-%m-%d %H:%M:%S') if doc.get(
'create_datetime') else None
}
# 将create_time转换为适合数据库的时间戳格式
create_time_dt = datetime.utcfromtimestamp(values['create_time_ts']) if values[
'create_time_ts'] else None
values['create_time'] = create_time_dt
# 将create_datetime转换为适合数据库的时间戳格式
create_datetime_str = doc.get('create_datetime')
values['create_datetime'] = datetime.strptime(create_datetime_str,
'%Y-%m-%d %H:%M:%S') if create_datetime_str else None
# 检查数据库中是否已存在相同title的记录
check_query = "SELECT id FROM news_info WHERE name = %s;"
cur.execute(check_query, (values['name'],))
# 如果没有找到记录,则插入新记录
if not cur.fetchone():
insert_query = """
INSERT INTO news_info (name, context, source_url, link, article_type, article_source, img_url, keyword, posted_date, create_time, create_datetime)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cur.execute(insert_query, (
values['name'],
values['context'],
values['source_url'],
values['link'],
values['article_type'],
values['article_source'],
values['img_url'],
values['keyword'],
values['posted_date'],
values['create_time'],
values['create_datetime']
))
conn.commit()
print(f'已保存{values}')
else:
same_data -= 1
except Exception as e:
print("Error during search: ", e)
finally:
# 关闭游标和连接
if 'cur' in locals():
cur.close()
else:
print('相同数据计数已到最大值, 退出程序')
break
conn.close()
def create_db():
db_name = 'postgres'
PGSQLPARAMS.update({'database': db_name})
# 连接到数据库
conn = psycopg2.connect(**PGSQLPARAMS)
conn.autocommit = True # 确保创建数据库的命令可以立即执行
# 创建一个cursor对象
cur = conn.cursor()
# 要创建的数据库名
new_db = 'auto'
# 数据库所有者
owner = 'toor'
try:
# 检查数据库是否存在
cur.execute(sql.SQL("SELECT 1 FROM pg_catalog.pg_database WHERE datname = %s"), (new_db,))
exists = cur.fetchone()
if not exists:
# 如果数据库不存在,创建它
cur.execute(sql.SQL("CREATE DATABASE {} WITH OWNER {} ENCODING 'UTF8'").format(
sql.Identifier(new_db),
sql.Identifier(owner)
))
print(f"Database '{new_db}' created successfully.")
else:
print(f"Database '{new_db}' already exists.")
except Exception as e:
print(f"An error occurred: {e}")
finally:
# 关闭cursor和连接
cur.close()
conn.close()
def create_table():
db_name = 'auto'
PGSQLPARAMS.update({'database': db_name})
# 创建数据库连接
conn = psycopg2.connect(**PGSQLPARAMS)
# 创建一个 cursor 对象
cur = conn.cursor()
# 定义表结构的 SQL 语句
create_table_sql = """
CREATE TABLE IF NOT EXISTS news_info (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
context TEXT,
source_url VARCHAR(255),
link VARCHAR(255),
article_type VARCHAR(50),
article_source VARCHAR(50),
img_url VARCHAR(255),
keyword VARCHAR(255),
posted_date VARCHAR,
create_time_ts BIGINT,
create_time TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP,
create_datetime TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
"""
try:
# 执行 SQL 语句来创建表
cur.execute(create_table_sql)
conn.commit() # 提交事务
print("Table 'auto' created successfully.")
except Exception as e:
print(f"An error occurred while creating table: {e}")
conn.rollback() # 发生错误时回滚事务
finally:
# 关闭 cursor 和连接
cur.close()
conn.close()
if __name__ == '__main__':
create_db()
create_table()
sorted_data = mongo()
if sorted_data:
pg(sorted_data)
print("Done!")