# -*-coding: utf-8 -*- import pymongo from pymongo import errors import time import sys import os sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto')) from base.base_load_config import load_config, get_base_path config_json = load_config() base_project = get_base_path() DB_USER = config_json.get('DB_USER') DB_PASSWORD = config_json.get('DB_PASSWORD') DB_IP = config_json.get('DB_IP') DB_PORT = config_json.get('DB_PORT') MONGO_LINK = f'mongodb://{DB_USER}:{DB_PASSWORD}@{DB_IP}:{DB_PORT}/' class MongoHandle(object): def __init__(self, db, collection, del_db=False, del_collection=False, auto_remove=0): self.client = pymongo.MongoClient(MONGO_LINK) self.db = db self.collection = collection if del_db and db: # 检查数据库是否存在 if db in self.client.list_database_names(): # 删除数据库 self.client.drop_database(db) self.db = self.client[db] if del_collection and self.collection: # 检查集合是否存在 if self.collection in self.db.list_collection_names(): # 删除集合 self.db.drop_collection(collection) self.collection = self.db[collection] if auto_remove: self.auto_remove_data(auto_remove) def write_data(self, data): self.collection.insert_one(data) def load_data(self): # MongoDB 会在第一次写入时自动创建数据库和集合 return list(self.collection.find({}, {'_id': False})) def auto_remove_data(self, day): for data in self.collection.find({'create_time': {'$lt': int(time.time()) - day * 24 * 60 * 60}}): self.collection.delete_one({'_id': data['_id']}) # if __name__ == '__main__': # mongo = MongoHandle('test_db', 'test_collection', False, False, 0) # mongo.collection.insert_one({'name': 'test'}) # mongo.collection.insert_many([{'name': 'test1'}, {'name': 'test2'}]) # print(mongo.collection.find_one()) # print(mongo.collection.find()) # print('done!')