You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
auto/utils/utils_mongo_handle.py

62 lines
2.1 KiB

# -*-coding: utf-8 -*-
import pymongo
from pymongo import errors
import time
import sys
import os
sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto'))
from base.base_load_config import load_config, get_base_path
config_json = load_config()
base_project = get_base_path()
DB_USER = config_json.get('DB_USER')
DB_PASSWORD = config_json.get('DB_PASSWORD')
DB_IP = config_json.get('DB_IP')
DB_PORT = config_json.get('DB_PORT')
MONGO_LINK = f'mongodb://{DB_USER}:{DB_PASSWORD}@{DB_IP}:{DB_PORT}/'
class MongoHandle(object):
def __init__(self, db, collection, del_db=False, del_collection=False, auto_remove=0):
self.client = pymongo.MongoClient(MONGO_LINK)
self.db = db
self.collection = collection
if del_db and db:
# 检查数据库是否存在
if db in self.client.list_database_names():
# 删除数据库
self.client.drop_database(db)
self.db = self.client[db]
if del_collection and self.collection:
# 检查集合是否存在
if self.collection in self.db.list_collection_names():
# 删除集合
self.db.drop_collection(collection)
self.collection = self.db[collection]
if auto_remove:
self.auto_remove_data(auto_remove)
def write_data(self, data):
self.collection.insert_one(data)
def load_data(self):
# MongoDB 会在第一次写入时自动创建数据库和集合
return list(self.collection.find({}, {'_id': False}))
def auto_remove_data(self, day):
for data in self.collection.find({'create_time': {'$lt': int(time.time()) - day * 24 * 60 * 60}}):
self.collection.delete_one({'_id': data['_id']})
# if __name__ == '__main__':
# mongo = MongoHandle('test_db', 'test_collection', False, False, 0)
# mongo.collection.insert_one({'name': 'test'})
# mongo.collection.insert_many([{'name': 'test1'}, {'name': 'test2'}])
# print(mongo.collection.find_one())
# print(mongo.collection.find())
# print('done!')