mottery/backend/update_lottery.py

261 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
from datetime import datetime
import os
from typing import Dict, List, Optional
import logging
import time
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from sqlalchemy import create_engine, desc
from sqlalchemy.orm import sessionmaker
from app.models.lottery import SSQLottery, DLTLottery
from app.core.database import engine, SessionLocal
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class LotteryUpdater:
def __init__(self):
self.api_key = "7a4beb6175a2c4dacf6cf9cab43bfe6f"
self.api_url = "http://apis.juhe.cn/lottery/history"
self.lottery_types = {
"ssq": {
"name": "双色球",
"model": SSQLottery
},
"dlt": {
"name": "超级大乐透",
"model": DLTLottery
}
}
# 配置数据库连接
self.SessionLocal = SessionLocal
# 配置请求会话
self.session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
def get_latest_draw_info(self, lottery_type: str) -> tuple:
"""获取数据库中最新一期的开奖信息"""
try:
db = self.SessionLocal()
model = self.lottery_types[lottery_type]["model"]
latest = db.query(model).order_by(desc(model.issue)).first()
if latest:
return latest.open_time.strftime('%Y-%m-%d'), latest.issue
return None, None
except Exception as e:
logger.error(f"获取最新开奖信息失败: {str(e)}")
return None, None
finally:
db.close()
def fetch_lottery_data(self, lottery_id: str) -> Dict:
"""从API获取最新一页的开奖数据"""
try:
params = {
'key': self.api_key,
'lottery_id': lottery_id,
'page': 1, # 只获取第一页
'page_size': 50 # 获取最大数量
}
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = self.session.get(
self.api_url, params=params, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
if data.get('error_code') != 0:
error_msg = data.get('reason', '未知错误')
logger.error(f"API返回错误: {error_msg}")
return None
if not data.get('result') or not data['result'].get('lotteryResList'):
logger.error("API返回数据格式不正确")
return None
return data
except Exception as e:
logger.error(f"获取开奖数据失败: {str(e)}")
return None
def validate_lottery_data(self, item: Dict, lottery_type: str) -> bool:
"""验证开奖数据的有效性"""
try:
# 验证日期格式
draw_date = datetime.strptime(item['lottery_date'], '%Y-%m-%d')
if draw_date > datetime.now():
logger.warning(f"开奖日期 {item['lottery_date']} 在未来")
return False
# 验证期号格式
if not item['lottery_no'].isdigit():
logger.warning(f"期号 {item['lottery_no']} 格式不正确")
return False
# 验证开奖号码
numbers = item['lottery_res'].split(',')
if lottery_type == 'ssq':
if len(numbers) != 7:
logger.warning(f"双色球开奖号码数量不正确: {item['lottery_res']}")
return False
# 验证红球范围(1-33)和蓝球范围(1-16)
if not all(1 <= int(n) <= 33 for n in numbers[:6]) or not 1 <= int(numbers[6]) <= 16:
logger.warning(f"双色球号码范围不正确: {item['lottery_res']}")
return False
else: # dlt
if len(numbers) != 7:
logger.warning(f"大乐透开奖号码数量不正确: {item['lottery_res']}")
return False
# 验证前区范围(1-35)和后区范围(1-12)
if not all(1 <= int(n) <= 35 for n in numbers[:5]) or not all(1 <= int(n) <= 12 for n in numbers[5:]):
logger.warning(f"大乐透号码范围不正确: {item['lottery_res']}")
return False
return True
except Exception as e:
logger.error(f"验证开奖数据失败: {str(e)}")
return False
def get_all_open_times_in_db(self, lottery_type: str) -> set:
"""获取数据库中所有已存在的开奖日期(字符串集合)"""
db = self.SessionLocal()
model = self.lottery_types[lottery_type]["model"]
open_times = set(row.open_time.strftime('%Y-%m-%d')
for row in db.query(model.open_time).all())
db.close()
return open_times
def process_lottery_data(self, lottery_data: Dict, lottery_type: str, existing_open_times: set) -> List[Dict]:
"""处理API返回的开奖数据只返回数据库中没有的开奖日期"""
processed_data = []
try:
lottery_list = sorted(
lottery_data['result']['lotteryResList'],
key=lambda x: x['lottery_date'],
reverse=True
)
for item in lottery_list:
if not self.validate_lottery_data(item, lottery_type):
continue
if item['lottery_date'] in existing_open_times:
logger.info(f"开奖日期 {item['lottery_date']} 已存在,跳过")
continue
numbers = item['lottery_res'].split(',')
if lottery_type == 'ssq':
processed_item = {
'issue': item['lottery_no'],
'open_time': datetime.strptime(item['lottery_date'], '%Y-%m-%d').date(),
'red_ball_1': int(numbers[0]),
'red_ball_2': int(numbers[1]),
'red_ball_3': int(numbers[2]),
'red_ball_4': int(numbers[3]),
'red_ball_5': int(numbers[4]),
'red_ball_6': int(numbers[5]),
'blue_ball': int(numbers[6])
}
else: # dlt
processed_item = {
'issue': item['lottery_no'],
'open_time': datetime.strptime(item['lottery_date'], '%Y-%m-%d').date(),
'front_ball_1': int(numbers[0]),
'front_ball_2': int(numbers[1]),
'front_ball_3': int(numbers[2]),
'front_ball_4': int(numbers[3]),
'front_ball_5': int(numbers[4]),
'back_ball_1': int(numbers[5]),
'back_ball_2': int(numbers[6])
}
processed_data.append(processed_item)
if processed_data:
logger.info(
f"新数据开奖日期范围: {processed_data[0]['open_time']} - {processed_data[-1]['open_time']}")
logger.info(f"共发现 {len(processed_data)} 条新数据")
except Exception as e:
logger.error(f"处理开奖数据失败: {str(e)}")
return processed_data
def update_lottery_data(self, lottery_type: str):
"""更新彩票数据"""
try:
# 获取数据库所有已存在的开奖日期
existing_open_times = self.get_all_open_times_in_db(lottery_type)
logger.info(f"数据库已存在{len(existing_open_times)}个开奖日期")
# 获取API数据
api_data = self.fetch_lottery_data(lottery_type)
if not api_data:
logger.error(f"获取{lottery_type}数据失败")
return
# 处理数据
new_data = self.process_lottery_data(
api_data, lottery_type, existing_open_times)
if not new_data:
logger.info(f"{lottery_type}数据已是最新")
return
# 按open_time升序排序确保旧数据先插入
new_data = sorted(new_data, key=lambda x: x['open_time'])
# 将新数据添加到数据库
db = self.SessionLocal()
try:
model = self.lottery_types[lottery_type]["model"]
for item in new_data:
lottery = model(**item)
db.add(lottery)
db.commit()
logger.info(f"成功更新{lottery_type}数据,新增{len(new_data)}条记录")
# 打印最新一期数据
if new_data:
latest = new_data[-1]
logger.info(f"最新一期数据:")
logger.info(f"期号:{latest['issue']}")
logger.info(f"开奖日期:{latest['open_time']}")
if lottery_type == 'ssq':
logger.info(
f"开奖号码:{latest['red_ball_1']} {latest['red_ball_2']} {latest['red_ball_3']} {latest['red_ball_4']} {latest['red_ball_5']} {latest['red_ball_6']} + {latest['blue_ball']}")
else:
logger.info(
f"开奖号码:{latest['front_ball_1']} {latest['front_ball_2']} {latest['front_ball_3']} {latest['front_ball_4']} {latest['front_ball_5']} + {latest['back_ball_1']} {latest['back_ball_2']}")
except Exception as e:
db.rollback()
logger.error(f"保存数据到数据库失败: {str(e)}")
finally:
db.close()
except Exception as e:
logger.error(f"更新{lottery_type}数据失败: {str(e)}")
logger.exception(e) # 打印完整的错误堆栈
def update_all_lottery_data(self):
"""更新所有彩票数据"""
for lottery_type in self.lottery_types:
logger.info(f"开始更新{self.lottery_types[lottery_type]['name']}数据...")
self.update_lottery_data(lottery_type)
if __name__ == "__main__":
updater = LotteryUpdater()
updater.update_all_lottery_data()