# -*- coding: utf-8 -*- """ 获取超级大乐透结果, 并匹配自己购买的号码, 配合定时执行使用 """ import sys import os import asyncio import httpx sys.path.append(os.path.join(os.path.abspath(__file__).split('manual')[0] + 'manual')) from utils.utils_send_gotify import GotifyNotifier class CheckDlt: def __init__(self): self.headers = { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "Accept-Encoding": "gzip, deflate, br, zstd", "Accept-Language": "zh-CN,zh;q=0.9", "Cache-Control": "max-age=0", "Priority": "u=0, i", "Sec-CH-UA": '"Not;A=Brand";v="99", "Google Chrome";v="139", "Chromium";v="139"', "Sec-CH-UA-Mobile": "?0", "Sec-CH-UA-Platform": '"macOS"', "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Sec-Fetch-User": "?1", "Upgrade-Insecure-Requests": "1", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36" } self.my_dlt = [ ['10', '11', '16', '17', '18', '11', '12'], ['02', '03', '11', '12', '23', '05', '06'], ['07', '09', '15', '17', '22', '09', '11'], ['05', '06', '07', '34', '35', '02', '09'], ['09', '10', '11', '21', '22', '04', '05'] ] async def req(self, url): async with httpx.AsyncClient() as client: try: resp = await client.get(url, timeout=5) if resp.status_code != 200: print('state code: {}'.format(resp.status_code)) log_detail = '访问失败, 状态码:{},url:{}'.format(resp.status_code, url) print(log_detail) return None except Exception as e: print(f'请求失败 {e}') return None return resp.json() def data_handle(self, data): if not data: print('获取数据为空') return None value = data.get('value') if not value: print('获取数据为空') return None data_list = value.get('list') if not data_list: print('获取数据为空') return None result_data = [] for d in data_list: numbers = d.get('lotteryUnsortDrawresult') try: if len(numbers.split(' ')) < 7: continue except Exception as e: print('numbers: {}, err: {}'.format(numbers, e)) continue red_list = numbers.split(' ')[:5] blue_list = numbers.split(' ')[5:] red_list.sort() blue_list.sort() try: red1 = red_list[0] red2 = red_list[1] red3 = red_list[2] red4 = red_list[3] red5 = red_list[4] blue1 = blue_list[0] blue2 = blue_list[1] except Exception as e: print('红球或蓝球数据丢失') continue result_data.append({ 'serial': d.get('lotteryDrawNum'), 'red1': red1 or '', 'red2': red2 or '', 'red3': red3 or '', 'red4': red4 or '', 'red5': red5 or '', 'blue1': blue1 or '', 'blue2': blue2 or '', 'drawPdfUrl': d.get('drawPdfUrl'), 'date': d.get('lotteryDrawTime'), 'pool': d.get('poolBalanceAfterdraw') }) if result_data: return result_data else: print('返回的数据为空, 获取数据失败') return None def data_compare(self, all_data): if not all_data: return '', '' data = all_data[0] red_list = [data['red1'], data['red2'], data['red3'], data['red4'], data['red5']] blue_list = [data['blue1'], data['blue2']] # 期号 subject = '{}'.format(data['serial']) # 组成每期数据的text serial_text = 'serial: {}\t\tlottery draw date: {}\nbonus pool: {} RMB\n{}\nlottery draw num: {} + {}\n\n'.format( data['serial'], data['date'], data['pool'], '*' * 90, red_list, blue_list) for my_num in self.my_dlt: my_red_list = my_num[:5] my_blue_list = my_num[5:] # 使用列表推导式找出两个列表中都存在的元素 red_common_elements = [element for element in red_list if element in my_red_list] blue_common_elements = [element for element in blue_list if element in my_blue_list] # 计算相等元素的数量 red_equal_count = len(red_common_elements) blue_equal_count = len(blue_common_elements) serial_text += 'my nums: {} + {}\nred hit: {}\nblue hit: {}\n\n'.format(my_red_list, my_blue_list, red_equal_count, blue_equal_count) serial_text += '{}\n\n\n\n'.format('*' * 90) return serial_text, subject async def main(self): for no in range(3, 0, -1): url = f'https://webapi.sporttery.cn/gateway/lottery/getHistoryPageListV1.qry?gameNo=85&provinceId=0&pageSize=1&isVerify=1&pageNo={no}' data_list = await self.req(url) result_data = self.data_handle(data_list) if not result_data: continue text, subject = self.data_compare(result_data) print(text) if __name__ == '__main__': asyncio.run(CheckDlt().main())