# -*-coding: utf-8 -*- import os import sys import threading from datetime import datetime import time import httpx sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto')) from utils.utils_mongo_handle import MongoHandle from utils.utils_logs_handle import LogsHandle from utils.utils_send_email import SendEmail class GetData(object): def __init__(self, get_num=9999999): self.get_num = get_num self.url = 'https://webapi.sporttery.cn/gateway/lottery/getHistoryPageListV1.qry?gameNo=85&provinceId=0&pageSize={}&isVerify=1&pageNo=1'.format( get_num) self.logs_handle = LogsHandle() self.email_subject = 'dlt' self.email_title = '超级大乐透最新一期开奖查询对比' self.email_text = '获取数据时间:\n{0}\n{1}\n\n\n\n'.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), ('-' * 90)) self.logs_handle = LogsHandle() self.now_day = time.strftime('%Y-%m-%d', time.localtime()) db = 'dlt' collection = 'dlt_' + self.now_day self.mongo = MongoHandle(db=db, collection=collection, del_db=False, del_collection=False, auto_remove=0) def main(self): data_list = self.req() result_data = self.data_handle(data_list) return result_data def req(self): resp = httpx.get(self.url) if resp.status_code != 200: print('state code: {}'.format(resp.status_code)) log_detail = '访问失败, 状态码:{},url:{}'.format(resp.status_code, self.url) self.logs_handle.logs_write('auto_get_and_check_dlt', log_detail, 'error', False) exit(0) resp_json = resp.json() value = resp_json.setdefault('value') data_list = value.setdefault('list') if not data_list: self.logs_handle.logs_write('auto_get_and_check_dlt', '返回的数据为空, 获取数据失败', 'error', False) return print('已获取数据') return data_list def data_handle(self, data_list): result_data = [] for d in data_list: numbers = d.setdefault('lotteryUnsortDrawresult') try: if len(numbers.split(' ')) < 7: continue except Exception as e: print('numbers: {}, err: {}'.format(numbers, e)) continue red_list = numbers.split(' ')[:5] blue_list = numbers.split(' ')[5:] red_list.sort() blue_list.sort() try: # 切开红球,蓝球数组 red1 = red_list[0] red2 = red_list[1] red3 = red_list[2] red4 = red_list[3] red5 = red_list[4] blue1 = blue_list[0] blue2 = blue_list[1] except Exception as e: print('红球或蓝球数据丢失') continue result_data.append({ 'serial': d.setdefault('lotteryDrawNum'), 'red1': red1 or '', 'red2': red2 or '', 'red3': red3 or '', 'red4': red4 or '', 'red5': red5 or '', 'blue1': blue1 or '', 'blue2': blue2 or '', 'drawPdfUrl': d.setdefault('drawPdfUrl'), 'date': d.setdefault('lotteryDrawTime'), 'pool': d.setdefault('poolBalanceAfterdraw') }) if result_data: return result_data else: self.logs_handle.logs_write('auto_get_and_check_dlt', '返回的数据为空, 获取数据失败', 'error', False) exit(0) class CheckMyDLT(object): def __init__(self, data): self.my_dlt = [ ['10', '11', '16', '17', '18', '11', '12'], ['02', '03', '11', '12', '23', '05', '06'], ['07', '09', '15', '17', '22', '09', '11'], ['05', '06', '07', '34', '35', '02', '09'], ['09', '10', '11', '21', '22', '04', '05'] ] self.data = data def main(self): print('开始数据对比') prepare_send_text, prepare_send_subject = self.process_text() self.send_data(prepare_send_subject, prepare_send_text) def process_text(self): text = '' serial_text = None subject = None for data in self.data: red_list = [data['red1'], data['red2'], data['red3'], data['red4'], data['red5']] blue_list = [data['blue1'], data['blue2']] # 只查询一期时, subject显示, 如果查询多期,则subject不显示 if len(data) == 1: subject = '{}'.format(data['serial']) # 组成每期数据的text serial_text = 'serial: {}\t\tlottery draw date: {}\t\tbonus pool: {} RMB\n{}\nlottery draw num: {} + {}\n'.format( data['serial'], data['date'], data['pool'], '*' * 90, red_list, blue_list) for my_num in self.my_dlt: my_red_list = my_num[:5] my_blue_list = my_num[5:] # 使用列表推导式找出两个列表中都存在的元素 red_common_elements = [element for element in red_list if element in my_red_list] blue_common_elements = [element for element in blue_list if element in my_blue_list] # 计算相等元素的数量 red_equal_count = len(red_common_elements) blue_equal_count = len(blue_common_elements) serial_text += 'my nums: {} + {}\t\tred hit: {}\tblue hit: {}\n'.format(my_red_list, my_blue_list, red_equal_count, blue_equal_count) text += serial_text text += '{}\n\n\n\n'.format('*' * 90) return text, subject def send_data(self, subject, text): title = '超级大乐透最新一期开奖查询对比' SendEmail(subject, title, text).send() class SaveToDB(object): def __init__(self, data): self.logs_handle = LogsHandle() self.now_day = time.strftime('%Y-%m-%d', time.localtime()) db = 'dlt' collection = 'dlt_' + self.now_day self.mongo = MongoHandle(db=db, collection=collection, del_db=False, del_collection=True, auto_remove=0) self.data = data def save_data(self): print('开始保存数据') for data in self.data: data_to_insert = { "serial": data.setdefault('serial'), "red1": data.setdefault('red1'), "red2": data.setdefault('red2'), "red3": data.setdefault('red3'), "red4": data.setdefault('red4'), "red5": data.setdefault('red5'), "blue1": data.setdefault('blue1'), "blue2": data.setdefault('blue2'), "date": data.setdefault('date'), "pool": data.setdefault('pool'), "drawPdfUrl": data.setdefault('drawPdfUrl'), "create_time": int(time.time()), "create_datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } self.mongo.collection.insert_one(data_to_insert) print('数据已储存, 共储存数据{}条'.format(len(self.data))) class DLT(object): def start(self, n): # # 获取数据 G = GetData(n) data = G.main() return data def check(self, data): # # 读取数据并发送到邮件 Check = CheckMyDLT(data) Check.main() def mongo(self, data): # 存 mongodb Mongo = SaveToDB(data) Mongo.save_data() def main(self): L = LogsHandle() L.logs_write('auto_get_and_check_dlt', 'dlt任务开始', 'start', False) data = self.start(30) if data: tasks = [ self.check, self.mongo ] threads = [] for i in tasks: thread = threading.Thread(target=i, args=(data,)) threads.append(thread) thread.start() for thread in threads: thread.join() L.logs_write('auto_get_and_check_dlt', 'dlt任务结束', 'start', False) print('done') else: L.logs_write('auto_get_and_check_dlt', '获取数据失败', 'error', False) class Luanch(object): def start(self, n): # # 获取数据 G = GetData(n) data = G.main() return data def check(self, data): # # 读取数据并发送到邮件 Check = CheckMyDLT(data) Check.main() def mongo(self, data): # 存 mongodb Mongo = SaveToDB(data) Mongo.save_data() def main(self): Logs = LogsHandle() Logs.logs_write('auto_get_and_check_dlt', 'dlt任务开始', 'start', False) data = self.start(30) if data: tasks = [ self.check, self.mongo ] threads = [] for i in tasks: thread = threading.Thread(target=i, args=(data,)) threads.append(thread) thread.start() for thread in threads: thread.join() Logs.logs_write('auto_get_and_check_dlt', 'dlt任务结束', 'start', False) print('done') else: Logs.logs_write('auto_get_and_check_dlt', '获取数据失败', 'error', False) if __name__ == '__main__': Luanch().main() # ## 单独获取数据 # G = GetData() # data = G.main() # re_data = data[::-1] # save_txt = '' # for item in re_data: # save_txt += f'[[{item["red1"]}, {item["red2"]}, {item["red3"]}, {item["red4"]}, {item["red5"]}], [{item["blue1"]}, {item["blue2"]}]],\n' # # with open('dlt.txt', 'w') as f: # f.write(save_txt)