You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
219 lines
7.2 KiB
219 lines
7.2 KiB
# -*- coding: utf-8 -*-
|
|
# 内部数据库同步数据脚本, 用于同一台服务器, 将mongodb数据同步到pgsql
|
|
from datetime import datetime
|
|
from pymongo import MongoClient
|
|
import psycopg2
|
|
from psycopg2 import sql
|
|
|
|
MONGOLINK = 'mongodb://root:aaaAAA111!!!@erhe.top:38000/'
|
|
PGSQLPARAMS = {
|
|
# 'database': 'postgres', # 默认连接到postgres数据库,超级用户数据库
|
|
'user': 'toor',
|
|
'password': 'aaaAAA111', # 替换为你的密码
|
|
'host': '192.168.31.177',
|
|
'port': 5432
|
|
}
|
|
|
|
|
|
def mongo():
|
|
# mongodb
|
|
client = MongoClient(MONGOLINK)
|
|
|
|
# 指定数据库名称
|
|
db_name = 'NEWS' # 替换为你的数据库名称
|
|
|
|
# 选择数据库
|
|
db = client[db_name]
|
|
|
|
# 列出数据库中的所有集合
|
|
collections = db.list_collection_names()
|
|
|
|
all_data = []
|
|
|
|
for collection_name in collections:
|
|
# 选择集合
|
|
collection = db[collection_name]
|
|
|
|
# 读取集合中的所有数据
|
|
for document in collection.find({}, {'_id': 0}):
|
|
all_data.append(document)
|
|
|
|
sorted_data = []
|
|
|
|
if all_data:
|
|
sorted_data = sorted(all_data, key=lambda x: x['create_time'], reverse=True)
|
|
|
|
return sorted_data
|
|
|
|
|
|
def pg(sorted_data):
|
|
table_name = 'auto'
|
|
PGSQLPARAMS.update({'database': table_name})
|
|
conn = psycopg2.connect(**PGSQLPARAMS)
|
|
|
|
# 记录相同数据, 如果查过指定数量, 则退出程序, 避免浪费资源
|
|
same_data = 500
|
|
for doc in sorted_data:
|
|
if same_data > 0:
|
|
try:
|
|
cur = conn.cursor()
|
|
|
|
create_time_dt = None
|
|
if doc.get('create_time'):
|
|
create_time_dt = datetime.utcfromtimestamp(doc['create_time'])
|
|
|
|
values = {
|
|
'name': doc.get('title'),
|
|
'context': doc.get('context') or '',
|
|
'source_url': doc.get('source_url') or '',
|
|
'link': doc.get('line') or '',
|
|
'article_type': doc.get('article_type') or '',
|
|
'article_source': doc.get('article_source') or '',
|
|
'img_url': doc.get('img_url') or '',
|
|
'keyword': doc.get('keyword') or '',
|
|
'posted_date': doc.get('posted_date') or '',
|
|
'create_time_ts': doc.get('create_time') or '',
|
|
'create_time': create_time_dt,
|
|
'create_datetime': datetime.strptime(doc['create_datetime'], '%Y-%m-%d %H:%M:%S') if doc.get(
|
|
'create_datetime') else None
|
|
}
|
|
|
|
# 将create_time转换为适合数据库的时间戳格式
|
|
create_time_dt = datetime.utcfromtimestamp(values['create_time_ts']) if values[
|
|
'create_time_ts'] else None
|
|
values['create_time'] = create_time_dt
|
|
|
|
# 将create_datetime转换为适合数据库的时间戳格式
|
|
create_datetime_str = doc.get('create_datetime')
|
|
values['create_datetime'] = datetime.strptime(create_datetime_str,
|
|
'%Y-%m-%d %H:%M:%S') if create_datetime_str else None
|
|
|
|
# 检查数据库中是否已存在相同title的记录
|
|
check_query = "SELECT id FROM news_info WHERE name = %s;"
|
|
cur.execute(check_query, (values['name'],))
|
|
|
|
# 如果没有找到记录,则插入新记录
|
|
if not cur.fetchone():
|
|
insert_query = """
|
|
INSERT INTO news_info (name, context, source_url, link, article_type, article_source, img_url, keyword, posted_date, create_time, create_datetime)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
"""
|
|
cur.execute(insert_query, (
|
|
values['name'],
|
|
values['context'],
|
|
values['source_url'],
|
|
values['link'],
|
|
values['article_type'],
|
|
values['article_source'],
|
|
values['img_url'],
|
|
values['keyword'],
|
|
values['posted_date'],
|
|
values['create_time'],
|
|
values['create_datetime']
|
|
))
|
|
conn.commit()
|
|
print(f'已保存{values}')
|
|
else:
|
|
same_data -= 1
|
|
except Exception as e:
|
|
print("Error during search: ", e)
|
|
finally:
|
|
# 关闭游标和连接
|
|
if 'cur' in locals():
|
|
cur.close()
|
|
else:
|
|
print('相同数据计数已到最大值, 退出程序')
|
|
break
|
|
conn.close()
|
|
|
|
|
|
def create_db():
|
|
db_name = 'postgres'
|
|
|
|
PGSQLPARAMS.update({'database': db_name})
|
|
# 连接到数据库
|
|
conn = psycopg2.connect(**PGSQLPARAMS)
|
|
conn.autocommit = True # 确保创建数据库的命令可以立即执行
|
|
|
|
# 创建一个cursor对象
|
|
cur = conn.cursor()
|
|
|
|
# 要创建的数据库名
|
|
new_db = 'auto'
|
|
# 数据库所有者
|
|
owner = 'toor'
|
|
|
|
try:
|
|
# 检查数据库是否存在
|
|
cur.execute(sql.SQL("SELECT 1 FROM pg_catalog.pg_database WHERE datname = %s"), (new_db,))
|
|
exists = cur.fetchone()
|
|
if not exists:
|
|
# 如果数据库不存在,创建它
|
|
cur.execute(sql.SQL("CREATE DATABASE {} WITH OWNER {} ENCODING 'UTF8'").format(
|
|
sql.Identifier(new_db),
|
|
sql.Identifier(owner)
|
|
))
|
|
print(f"Database '{new_db}' created successfully.")
|
|
else:
|
|
print(f"Database '{new_db}' already exists.")
|
|
except Exception as e:
|
|
print(f"An error occurred: {e}")
|
|
finally:
|
|
# 关闭cursor和连接
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
def create_table():
|
|
db_name = 'auto'
|
|
|
|
PGSQLPARAMS.update({'database': db_name})
|
|
|
|
# 创建数据库连接
|
|
conn = psycopg2.connect(**PGSQLPARAMS)
|
|
|
|
# 创建一个 cursor 对象
|
|
cur = conn.cursor()
|
|
|
|
# 定义表结构的 SQL 语句
|
|
create_table_sql = """
|
|
CREATE TABLE IF NOT EXISTS news_info (
|
|
id SERIAL PRIMARY KEY,
|
|
name VARCHAR(255) NOT NULL,
|
|
context TEXT,
|
|
source_url VARCHAR(255),
|
|
link VARCHAR(255),
|
|
article_type VARCHAR(50),
|
|
article_source VARCHAR(50),
|
|
img_url VARCHAR(255),
|
|
keyword VARCHAR(255),
|
|
posted_date VARCHAR,
|
|
create_time_ts BIGINT,
|
|
create_time TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
|
create_datetime TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
"""
|
|
|
|
try:
|
|
# 执行 SQL 语句来创建表
|
|
cur.execute(create_table_sql)
|
|
conn.commit() # 提交事务
|
|
print("Table 'auto' created successfully.")
|
|
except Exception as e:
|
|
print(f"An error occurred while creating table: {e}")
|
|
conn.rollback() # 发生错误时回滚事务
|
|
finally:
|
|
# 关闭 cursor 和连接
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
create_db()
|
|
create_table()
|
|
|
|
sorted_data = mongo()
|
|
if sorted_data:
|
|
pg(sorted_data)
|
|
|
|
print("Done!")
|
|
|