515 lines
18 KiB
Python

from sqlalchemy.orm import Session
from sqlalchemy import func, desc
from typing import List, Optional, Dict, Any, Tuple
from datetime import date, datetime
import pandas as pd
from app.models.lottery import SSQLottery, DLTLottery
from app.schemas.lottery import SSQLotteryCreate, DLTLotteryCreate, LotteryQuery
class LotteryService:
@staticmethod
def create_ssq_lottery(db: Session, lottery: SSQLotteryCreate) -> SSQLottery:
db_lottery = SSQLottery(**lottery.model_dump())
db.add(db_lottery)
db.commit()
db.refresh(db_lottery)
return db_lottery
@staticmethod
def create_dlt_lottery(db: Session, lottery: DLTLotteryCreate) -> DLTLottery:
db_lottery = DLTLottery(**lottery.model_dump())
db.add(db_lottery)
db.commit()
db.refresh(db_lottery)
return db_lottery
@staticmethod
def get_ssq_lotteries(
db: Session,
query: LotteryQuery
) -> Tuple[List[SSQLottery], int]:
"""获取双色球开奖记录"""
db_query = db.query(SSQLottery)
# 应用过滤条件
if query.issue:
db_query = db_query.filter(SSQLottery.issue == query.issue)
if query.start_date:
try:
start_date = datetime.strptime(
query.start_date, "%Y-%m-%d").date()
db_query = db_query.filter(SSQLottery.open_time >= start_date)
except ValueError:
pass
if query.end_date:
try:
end_date = datetime.strptime(query.end_date, "%Y-%m-%d").date()
db_query = db_query.filter(SSQLottery.open_time <= end_date)
except ValueError:
pass
# 获取总记录数
total = db_query.count()
# 应用分页
page = query.page or 1
page_size = query.page_size or 20
db_query = db_query.order_by(desc(SSQLottery.open_time)) \
.offset((page - 1) * page_size) \
.limit(page_size)
return db_query.all(), total
@staticmethod
def get_dlt_lotteries(
db: Session,
query: LotteryQuery
) -> Tuple[List[DLTLottery], int]:
"""获取大乐透开奖记录"""
db_query = db.query(DLTLottery)
# 应用过滤条件
if query.issue:
db_query = db_query.filter(DLTLottery.issue == query.issue)
if query.start_date:
try:
start_date = datetime.strptime(
query.start_date, "%Y-%m-%d").date()
db_query = db_query.filter(DLTLottery.open_time >= start_date)
except ValueError:
pass
if query.end_date:
try:
end_date = datetime.strptime(query.end_date, "%Y-%m-%d").date()
db_query = db_query.filter(DLTLottery.open_time <= end_date)
except ValueError:
pass
# 获取总记录数
total = db_query.count()
# 应用分页
page = query.page or 1
page_size = query.page_size or 20
db_query = db_query.order_by(desc(DLTLottery.open_time)) \
.offset((page - 1) * page_size) \
.limit(page_size)
return db_query.all(), total
@staticmethod
def get_ssq_statistics(db: Session):
# 红球统计
red_freq = []
for i in range(1, 7):
col = getattr(SSQLottery, f"red_ball_{i}")
result = db.query(col, func.count().label(
'count')).group_by(col).all()
for number, count in result:
red_freq.append((number, count))
# 汇总红球频率
red_counter = {}
for number, count in red_freq:
red_counter[number] = red_counter.get(number, 0) + count
red_balls = sorted([(k, v)
for k, v in red_counter.items()], key=lambda x: x[0])
# 蓝球统计
blue_freq = db.query(SSQLottery.blue_ball, func.count().label(
'count')).group_by(SSQLottery.blue_ball).all()
blue_balls = sorted([(k, v) for k, v in blue_freq], key=lambda x: x[0])
return {
"red_balls": red_balls,
"blue_balls": blue_balls
}
@staticmethod
def get_dlt_statistics(db: Session):
# 前区统计
front_freq = []
for i in range(1, 6):
col = getattr(DLTLottery, f"front_ball_{i}")
result = db.query(col, func.count().label(
'count')).group_by(col).all()
for number, count in result:
front_freq.append((number, count))
front_counter = {}
for number, count in front_freq:
front_counter[number] = front_counter.get(number, 0) + count
front_balls = sorted(
[(k, v) for k, v in front_counter.items()], key=lambda x: x[0])
# 后区统计
back_freq = []
for i in range(1, 3):
col = getattr(DLTLottery, f"back_ball_{i}")
result = db.query(col, func.count().label(
'count')).group_by(col).all()
for number, count in result:
back_freq.append((number, count))
back_counter = {}
for number, count in back_freq:
back_counter[number] = back_counter.get(number, 0) + count
back_balls = sorted(
[(k, v) for k, v in back_counter.items()], key=lambda x: x[0])
return {
"front_balls": front_balls,
"back_balls": back_balls
}
@staticmethod
def import_ssq_data(db: Session, file_path: str) -> int:
import pandas as pd
from app.models.lottery import SSQLottery
df = pd.read_json(file_path)
# 先查出所有已存在的期号
existing_issues = set(
i[0] for i in db.query(SSQLottery.issue).filter(SSQLottery.issue.in_(df['issue'].astype(str).tolist())).all()
)
# pandas 端彻底去重
new_rows = df[~df['issue'].astype(str).isin(
existing_issues)].drop_duplicates(subset=['issue'])
# 按open_time升序排序
new_rows = new_rows.sort_values(by='open_time', ascending=True)
objs = [
SSQLottery(
issue=str(row['issue']),
open_time=row['open_time'],
red_ball_1=int(row['red_ball_1']),
red_ball_2=int(row['red_ball_2']),
red_ball_3=int(row['red_ball_3']),
red_ball_4=int(row['red_ball_4']),
red_ball_5=int(row['red_ball_5']),
red_ball_6=int(row['red_ball_6']),
blue_ball=int(row['blue_ball'])
)
for _, row in new_rows.iterrows()
]
if objs:
try:
db.bulk_save_objects(objs)
db.commit()
except Exception as e:
db.rollback()
print(f"Bulk insert error: {e}")
return len(objs)
@staticmethod
def import_dlt_data(db: Session, file_path: str) -> int:
import pandas as pd
from app.models.lottery import DLTLottery
df = pd.read_json(file_path)
# 先查出所有已存在的期号
existing_issues = set(
i[0] for i in db.query(DLTLottery.issue).filter(DLTLottery.issue.in_(df['issue'].astype(str).tolist())).all()
)
# pandas 端彻底去重
new_rows = df[~df['issue'].astype(str).isin(
existing_issues)].drop_duplicates(subset=['issue'])
# 按open_time升序排序
new_rows = new_rows.sort_values(by='open_time', ascending=True)
objs = [
DLTLottery(
issue=str(row['issue']),
open_time=row['open_time'],
front_ball_1=int(row['front_ball_1']),
front_ball_2=int(row['front_ball_2']),
front_ball_3=int(row['front_ball_3']),
front_ball_4=int(row['front_ball_4']),
front_ball_5=int(row['front_ball_5']),
back_ball_1=int(row['back_ball_1']),
back_ball_2=int(row['back_ball_2'])
)
for _, row in new_rows.iterrows()
]
if objs:
try:
db.bulk_save_objects(objs)
db.commit()
except Exception as e:
db.rollback()
print(f"Bulk insert error: {e}")
return len(objs)
@staticmethod
def get_latest_ssq(db: Session) -> Optional[SSQLottery]:
"""获取最新一期双色球开奖记录"""
return db.query(SSQLottery).order_by(desc(SSQLottery.open_time)).first()
@staticmethod
def get_latest_dlt(db: Session) -> Optional[DLTLottery]:
"""获取最新一期大乐透开奖记录"""
return db.query(DLTLottery).order_by(desc(DLTLottery.open_time)).first()
@staticmethod
def update_ssq_data(db: Session) -> Dict[str, Any]:
"""更新双色球数据"""
import requests
import logging
from datetime import datetime
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
try:
# API配置
api_key = "7a4beb6175a2c4dacf6cf9cab43bfe6f"
api_url = "http://apis.juhe.cn/lottery/history"
# 配置请求会话
session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
# 获取数据库中所有已存在的开奖日期
existing_open_times = set(
row.open_time.strftime('%Y-%m-%d')
for row in db.query(SSQLottery.open_time).all()
)
logger.info(f"数据库已存在{len(existing_open_times)}个开奖日期")
# 从API获取数据
params = {
'key': api_key,
'lottery_id': 'ssq',
'page': 1,
'page_size': 50
}
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = session.get(
api_url, params=params, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
if data.get('error_code') != 0:
raise Exception(f"API返回错误: {data.get('reason', '未知错误')}")
if not data.get('result') or not data['result'].get('lotteryResList'):
raise Exception("API返回数据格式不正确")
# 处理数据
new_data = []
lottery_list = sorted(
data['result']['lotteryResList'],
key=lambda x: x['lottery_date'],
reverse=True
)
for item in lottery_list:
# 验证数据
try:
draw_date = datetime.strptime(
item['lottery_date'], '%Y-%m-%d')
if draw_date > datetime.now():
continue
if not item['lottery_no'].isdigit():
continue
numbers = item['lottery_res'].split(',')
if len(numbers) != 7:
continue
if not all(1 <= int(n) <= 33 for n in numbers[:6]) or not 1 <= int(numbers[6]) <= 16:
continue
if item['lottery_date'] in existing_open_times:
continue
new_data.append({
'issue': item['lottery_no'],
'open_time': draw_date.date(),
'red_ball_1': int(numbers[0]),
'red_ball_2': int(numbers[1]),
'red_ball_3': int(numbers[2]),
'red_ball_4': int(numbers[3]),
'red_ball_5': int(numbers[4]),
'red_ball_6': int(numbers[5]),
'blue_ball': int(numbers[6])
})
except Exception as e:
logger.warning(f"处理数据项失败: {e}")
continue
if not new_data:
return {"new_count": 0, "message": "数据已是最新"}
# 按开奖日期升序排序
new_data = sorted(new_data, key=lambda x: x['open_time'])
# 保存到数据库
for item in new_data:
lottery = SSQLottery(**item)
db.add(lottery)
db.commit()
return {
"new_count": len(new_data),
"message": f"成功更新双色球数据,新增{len(new_data)}条记录",
"latest_issue": new_data[-1]['issue'] if new_data else None,
"latest_date": new_data[-1]['open_time'].strftime('%Y-%m-%d') if new_data else None
}
except Exception as e:
db.rollback()
logger.error(f"更新双色球数据失败: {str(e)}")
raise e
@staticmethod
def update_dlt_data(db: Session) -> Dict[str, Any]:
"""更新大乐透数据"""
import requests
import logging
from datetime import datetime
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
try:
# API配置
api_key = "7a4beb6175a2c4dacf6cf9cab43bfe6f"
api_url = "http://apis.juhe.cn/lottery/history"
# 配置请求会话
session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
# 获取数据库中所有已存在的开奖日期
existing_open_times = set(
row.open_time.strftime('%Y-%m-%d')
for row in db.query(DLTLottery.open_time).all()
)
logger.info(f"数据库已存在{len(existing_open_times)}个开奖日期")
# 从API获取数据
params = {
'key': api_key,
'lottery_id': 'dlt',
'page': 1,
'page_size': 50
}
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = session.get(
api_url, params=params, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
if data.get('error_code') != 0:
raise Exception(f"API返回错误: {data.get('reason', '未知错误')}")
if not data.get('result') or not data['result'].get('lotteryResList'):
raise Exception("API返回数据格式不正确")
# 处理数据
new_data = []
lottery_list = sorted(
data['result']['lotteryResList'],
key=lambda x: x['lottery_date'],
reverse=True
)
for item in lottery_list:
# 验证数据
try:
draw_date = datetime.strptime(
item['lottery_date'], '%Y-%m-%d')
if draw_date > datetime.now():
continue
if not item['lottery_no'].isdigit():
continue
numbers = item['lottery_res'].split(',')
if len(numbers) != 7:
continue
if not all(1 <= int(n) <= 35 for n in numbers[:5]) or not all(1 <= int(n) <= 12 for n in numbers[5:]):
continue
if item['lottery_date'] in existing_open_times:
continue
new_data.append({
'issue': item['lottery_no'],
'open_time': draw_date.date(),
'front_ball_1': int(numbers[0]),
'front_ball_2': int(numbers[1]),
'front_ball_3': int(numbers[2]),
'front_ball_4': int(numbers[3]),
'front_ball_5': int(numbers[4]),
'back_ball_1': int(numbers[5]),
'back_ball_2': int(numbers[6])
})
except Exception as e:
logger.warning(f"处理数据项失败: {e}")
continue
if not new_data:
return {"new_count": 0, "message": "数据已是最新"}
# 按开奖日期升序排序
new_data = sorted(new_data, key=lambda x: x['open_time'])
# 保存到数据库
for item in new_data:
lottery = DLTLottery(**item)
db.add(lottery)
db.commit()
return {
"new_count": len(new_data),
"message": f"成功更新大乐透数据,新增{len(new_data)}条记录",
"latest_issue": new_data[-1]['issue'] if new_data else None,
"latest_date": new_data[-1]['open_time'].strftime('%Y-%m-%d') if new_data else None
}
except Exception as e:
db.rollback()
logger.error(f"更新大乐透数据失败: {str(e)}")
raise e
@staticmethod
def update_all_lottery_data(db: Session) -> Dict[str, Any]:
"""更新所有彩票数据"""
try:
ssq_result = LotteryService.update_ssq_data(db)
dlt_result = LotteryService.update_dlt_data(db)
return {
"ssq": ssq_result,
"dlt": dlt_result,
"total_new": ssq_result.get("new_count", 0) + dlt_result.get("new_count", 0)
}
except Exception as e:
raise e