import requests import json from datetime import datetime import os from typing import Dict, List, Optional import logging import time from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry from sqlalchemy import create_engine, desc from sqlalchemy.orm import sessionmaker from app.models.lottery import SSQLottery, DLTLottery, SSQLotteryBetRecord, DLTLotteryBetRecord from app.core.database import engine, SessionLocal # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class LotteryUpdater: def __init__(self): self.api_key = "7a4beb6175a2c4dacf6cf9cab43bfe6f" self.api_url = "http://apis.juhe.cn/lottery/history" self.lottery_types = { "ssq": { "name": "双色球", "model": SSQLottery }, "dlt": { "name": "超级大乐透", "model": DLTLottery } } # 配置数据库连接 self.SessionLocal = SessionLocal # 配置请求会话 self.session = requests.Session() retry_strategy = Retry( total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504] ) adapter = HTTPAdapter(max_retries=retry_strategy) self.session.mount("http://", adapter) self.session.mount("https://", adapter) def get_latest_draw_info(self, lottery_type: str) -> tuple: """获取数据库中最新一期的开奖信息""" try: db = self.SessionLocal() model = self.lottery_types[lottery_type]["model"] latest = db.query(model).order_by(desc(model.issue)).first() if latest: return latest.open_time.strftime('%Y-%m-%d'), latest.issue return None, None except Exception as e: logger.error(f"获取最新开奖信息失败: {str(e)}") return None, None finally: db.close() def fetch_lottery_data(self, lottery_id: str) -> Dict: """从API获取最新一页的开奖数据""" try: params = { 'key': self.api_key, 'lottery_id': lottery_id, 'page': 1, # 只获取第一页 'page_size': 50 # 获取最大数量 } headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } response = self.session.get( self.api_url, params=params, headers=headers, timeout=10) response.raise_for_status() data = response.json() if data.get('error_code') != 0: error_msg = data.get('reason', '未知错误') logger.error(f"API返回错误: {error_msg}") return None if not data.get('result') or not data['result'].get('lotteryResList'): logger.error("API返回数据格式不正确") return None return data except Exception as e: logger.error(f"获取开奖数据失败: {str(e)}") return None def validate_lottery_data(self, item: Dict, lottery_type: str) -> bool: """验证开奖数据的有效性""" try: # 验证日期格式 draw_date = datetime.strptime(item['lottery_date'], '%Y-%m-%d') if draw_date > datetime.now(): logger.warning(f"开奖日期 {item['lottery_date']} 在未来") return False # 验证期号格式 if not item['lottery_no'].isdigit(): logger.warning(f"期号 {item['lottery_no']} 格式不正确") return False # 验证开奖号码 numbers = item['lottery_res'].split(',') if lottery_type == 'ssq': if len(numbers) != 7: logger.warning(f"双色球开奖号码数量不正确: {item['lottery_res']}") return False # 验证红球范围(1-33)和蓝球范围(1-16) if not all(1 <= int(n) <= 33 for n in numbers[:6]) or not 1 <= int(numbers[6]) <= 16: logger.warning(f"双色球号码范围不正确: {item['lottery_res']}") return False else: # dlt if len(numbers) != 7: logger.warning(f"大乐透开奖号码数量不正确: {item['lottery_res']}") return False # 验证前区范围(1-35)和后区范围(1-12) if not all(1 <= int(n) <= 35 for n in numbers[:5]) or not all(1 <= int(n) <= 12 for n in numbers[5:]): logger.warning(f"大乐透号码范围不正确: {item['lottery_res']}") return False return True except Exception as e: logger.error(f"验证开奖数据失败: {str(e)}") return False def get_all_open_times_in_db(self, lottery_type: str) -> set: """获取数据库中所有已存在的开奖日期(字符串集合)""" db = self.SessionLocal() model = self.lottery_types[lottery_type]["model"] open_times = set(row.open_time.strftime('%Y-%m-%d') for row in db.query(model.open_time).all()) db.close() return open_times def process_lottery_data(self, lottery_data: Dict, lottery_type: str, existing_open_times: set) -> List[Dict]: """处理API返回的开奖数据,只返回数据库中没有的开奖日期""" processed_data = [] try: lottery_list = sorted( lottery_data['result']['lotteryResList'], key=lambda x: x['lottery_date'], reverse=True ) for item in lottery_list: if not self.validate_lottery_data(item, lottery_type): continue if item['lottery_date'] in existing_open_times: logger.info(f"开奖日期 {item['lottery_date']} 已存在,跳过") continue numbers = item['lottery_res'].split(',') if lottery_type == 'ssq': processed_item = { 'issue': item['lottery_no'], 'open_time': datetime.strptime(item['lottery_date'], '%Y-%m-%d').date(), 'red_ball_1': int(numbers[0]), 'red_ball_2': int(numbers[1]), 'red_ball_3': int(numbers[2]), 'red_ball_4': int(numbers[3]), 'red_ball_5': int(numbers[4]), 'red_ball_6': int(numbers[5]), 'blue_ball': int(numbers[6]) } else: # dlt processed_item = { 'issue': item['lottery_no'], 'open_time': datetime.strptime(item['lottery_date'], '%Y-%m-%d').date(), 'front_ball_1': int(numbers[0]), 'front_ball_2': int(numbers[1]), 'front_ball_3': int(numbers[2]), 'front_ball_4': int(numbers[3]), 'front_ball_5': int(numbers[4]), 'back_ball_1': int(numbers[5]), 'back_ball_2': int(numbers[6]) } processed_data.append(processed_item) if processed_data: logger.info( f"新数据开奖日期范围: {processed_data[0]['open_time']} - {processed_data[-1]['open_time']}") logger.info(f"共发现 {len(processed_data)} 条新数据") except Exception as e: logger.error(f"处理开奖数据失败: {str(e)}") return processed_data def update_lottery_data(self, lottery_type: str): """更新彩票数据""" try: # 获取数据库所有已存在的开奖日期 existing_open_times = self.get_all_open_times_in_db(lottery_type) logger.info(f"数据库已存在{len(existing_open_times)}个开奖日期") # 获取API数据 api_data = self.fetch_lottery_data(lottery_type) if not api_data: logger.error(f"获取{lottery_type}数据失败") return # 处理数据 new_data = self.process_lottery_data( api_data, lottery_type, existing_open_times) if not new_data: logger.info(f"{lottery_type}数据已是最新") return # 按open_time升序排序,确保旧数据先插入 new_data = sorted(new_data, key=lambda x: x['open_time']) # 将新数据添加到数据库 db = self.SessionLocal() try: model = self.lottery_types[lottery_type]["model"] for item in new_data: lottery = model(**item) db.add(lottery) db.commit() logger.info(f"成功更新{lottery_type}数据,新增{len(new_data)}条记录") # 打印最新一期数据 if new_data: latest = new_data[-1] if lottery_type == 'ssq': logger.info( f"最新一期:{latest['issue']} {latest['open_time']} {latest['red_ball_1']:02d} {latest['red_ball_2']:02d} {latest['red_ball_3']:02d} {latest['red_ball_4']:02d} {latest['red_ball_5']:02d} {latest['red_ball_6']:02d} + {latest['blue_ball']:02d}") else: logger.info( f"最新一期:{latest['issue']} {latest['open_time']} {latest['front_ball_1']:02d} {latest['front_ball_2']:02d} {latest['front_ball_3']:02d} {latest['front_ball_4']:02d} {latest['front_ball_5']:02d} + {latest['back_ball_1']:02d} {latest['back_ball_2']:02d}") except Exception as e: db.rollback() logger.error(f"保存数据到数据库失败: {str(e)}") finally: db.close() # 更新投注记录的期号 self.update_bet_issues_by_date(lottery_type) # 在开奖数据入库后自动调用 db = self.SessionLocal() try: self.check_and_update_bet_wins(lottery_type, db) finally: db.close() except Exception as e: logger.error(f"更新{lottery_type}数据失败: {str(e)}") logger.exception(e) # 打印完整的错误堆栈 def update_bet_issues_by_date(self, lottery_type: str): """ 根据投注日期更新期号 查找相同日期的投注记录,将期号设置为当天开奖的期号 """ try: db = self.SessionLocal() if lottery_type == 'ssq': LotteryModel = SSQLottery BetModel = SSQLotteryBetRecord else: LotteryModel = DLTLottery BetModel = DLTLotteryBetRecord # 查找所有期号为 'PENDING' 的投注记录 pending_bets = db.query(BetModel).filter( BetModel.issue == 'PENDING').all() logger.info(f"找到 {len(pending_bets)} 条期号待定的{lottery_type}投注记录") for bet in pending_bets: # 获取投注日期 bet_date = bet.bet_time.date() # 查找相同日期的开奖记录 draw = db.query(LotteryModel).filter( LotteryModel.open_time == bet_date).first() if draw: # 更新投注记录的期号 old_issue = bet.issue bet.issue = draw.issue logger.debug( f"更新投注记录 ID {bet.id}: 期号 {old_issue} -> {draw.issue} (日期匹配: {bet_date})") else: logger.debug( f"投注记录 ID {bet.id} 的日期 {bet_date} 暂无开奖数据,期号保持 PENDING") db.commit() logger.info(f"{lottery_type} 期号更新完成") except Exception as e: db.rollback() logger.error(f"更新{lottery_type}投注期号失败: {str(e)}") finally: db.close() def update_all_lottery_data(self): """更新所有彩票数据""" for lottery_type in self.lottery_types: logger.info(f"开始更新{self.lottery_types[lottery_type]['name']}数据...") self.update_lottery_data(lottery_type) def check_and_update_bet_wins(self, lottery_type: str, db): """批量比对投注记录并更新中奖信息,支持ssq和dlt""" if lottery_type == 'ssq': LotteryModel = SSQLottery BetModel = SSQLotteryBetRecord else: LotteryModel = DLTLottery BetModel = DLTLotteryBetRecord # 查询所有已开奖的期号和开奖号码 open_draws = {d.issue: d for d in db.query(LotteryModel).all()} # 查询所有未比对或未中奖的投注记录(排除期号为PENDING的记录) bets = db.query(BetModel).filter( BetModel.is_winner == 0, BetModel.issue != 'PENDING' ).all() for bet in bets: draw = open_draws.get(bet.issue) if not draw: continue # 未开奖 # 解析投注号码 bet_red, bet_blue = [], [] try: red_str, blue_str = bet.numbers.split('|') bet_red = [int(x) for x in red_str.split(',') if x] bet_blue = [int(x) for x in blue_str.split(',') if x] except Exception: continue # 解析开奖号码 if lottery_type == 'ssq': open_red = [draw.red_ball_1, draw.red_ball_2, draw.red_ball_3, draw.red_ball_4, draw.red_ball_5, draw.red_ball_6] open_blue = [draw.blue_ball] else: open_red = [draw.front_ball_1, draw.front_ball_2, draw.front_ball_3, draw.front_ball_4, draw.front_ball_5] open_blue = [draw.back_ball_1, draw.back_ball_2] # 计算命中数 red_hit = len(set(bet_red) & set(open_red)) blue_hit = len(set(bet_blue) & set(open_blue)) # 判断中奖等级和金额 win_level, win_amount = self.calc_prize( lottery_type, red_hit, blue_hit, bet_red, bet_blue, open_red, open_blue) is_winner = 1 if win_level else 0 # 更新投注记录 bet.is_winner = is_winner bet.win_level = win_level bet.win_amount = str(win_amount) if win_amount else None bet.open_code = ','.join( [str(x) for x in open_red]) + '|' + ','.join([str(x) for x in open_blue]) bet.open_date = draw.open_time db.add(bet) db.commit() print(f"已完成{lottery_type}投注中奖比对与更新") def calc_prize(self, lottery_type, red_hit, blue_hit, bet_red, bet_blue, open_red, open_blue): """根据命中数判断中奖等级和金额,返回(等级, 金额)""" # 双色球规则 if lottery_type == 'ssq': # 中奖规则(简化版,实际可查官方) if red_hit == 6 and blue_hit == 1: return '一等奖', 10000000 elif red_hit == 6 and blue_hit == 0: return '二等奖', 5000000 elif red_hit == 5 and blue_hit == 1: return '三等奖', 3000 elif red_hit == 5 and blue_hit == 0: return '四等奖', 200 elif red_hit == 4 and blue_hit == 1: return '四等奖', 200 elif red_hit == 4 and blue_hit == 0: return '五等奖', 10 elif red_hit == 3 and blue_hit == 1: return '五等奖', 10 elif blue_hit == 1: return '六等奖', 5 else: return None, None # 大乐透规则 else: # 一等奖:投注号码与当期开奖号码全部相同(顺序不限,下同),即中奖; if red_hit == 5 and blue_hit == 2: return '一等奖', 10000000 # 二等奖:投注号码与当期开奖号码中的五个前区号码及任意一个后区号码相同,即中奖; elif red_hit == 5 and blue_hit == 1: return '二等奖', 5000000 # 三等奖:投注号码与当期开奖号码中的五个前区号码相同,即中奖; elif red_hit == 5 and blue_hit == 0: return '三等奖', 3000 # 四等奖:投注号码与当期开奖号码中的任意四个前区号码及两个后区号码相同,即中奖; elif red_hit == 4 and blue_hit == 2: return '四等奖', 200 # 五等奖:投注号码与当期开奖号码中的任意四个前区号码及任意一个后区号码相同,即中奖; elif red_hit == 4 and blue_hit == 1: return '五等奖', 10 # 六等奖:投注号码与当期开奖号码中的任意三个前区号码及两个后区号码相同,即中奖; elif red_hit == 3 and blue_hit == 2: return '六等奖', 5 # 七等奖:投注号码与当期开奖号码中的任意四个前区号码相同,即中奖; elif red_hit == 4 and blue_hit == 0: return '七等奖', 5 # 八等奖:投注号码与当期开奖号码中的任意三个前区号码及任意一个后区号码相同,或者任意两个前区号码及两个后区号码相同,即中奖; elif (red_hit == 3 and blue_hit == 1) or (red_hit == 2 and blue_hit == 2): return '八等奖', 5 # 九等奖:投注号码与当期开奖号码中的任意三个前区号码相同,或者任意一个前区号码及两个后区号码相同,或者任意两个前区号码及任意一个后区号码相同,或者两个后区号码相同,即中奖。 elif (red_hit == 3 and blue_hit == 0) or (red_hit == 1 and blue_hit == 2) or (red_hit == 2 and blue_hit == 1) or (red_hit == 0 and blue_hit == 2): return '九等奖', 5 else: return None, None if __name__ == "__main__": updater = LotteryUpdater() updater.update_all_lottery_data()