246 lines
8.5 KiB
Python

from sqlalchemy.orm import Session
from sqlalchemy import func, desc
from typing import List, Optional, Dict, Any, Tuple
from datetime import date, datetime
import pandas as pd
from app.models.lottery import SSQLottery, DLTLottery
from app.schemas.lottery import SSQLotteryCreate, DLTLotteryCreate, LotteryQuery
class LotteryService:
@staticmethod
def create_ssq_lottery(db: Session, lottery: SSQLotteryCreate) -> SSQLottery:
db_lottery = SSQLottery(**lottery.model_dump())
db.add(db_lottery)
db.commit()
db.refresh(db_lottery)
return db_lottery
@staticmethod
def create_dlt_lottery(db: Session, lottery: DLTLotteryCreate) -> DLTLottery:
db_lottery = DLTLottery(**lottery.model_dump())
db.add(db_lottery)
db.commit()
db.refresh(db_lottery)
return db_lottery
@staticmethod
def get_ssq_lotteries(
db: Session,
query: LotteryQuery
) -> Tuple[List[SSQLottery], int]:
"""获取双色球开奖记录"""
db_query = db.query(SSQLottery)
# 应用过滤条件
if query.issue:
db_query = db_query.filter(SSQLottery.issue == query.issue)
if query.start_date:
try:
start_date = datetime.strptime(
query.start_date, "%Y-%m-%d").date()
db_query = db_query.filter(SSQLottery.open_time >= start_date)
except ValueError:
pass
if query.end_date:
try:
end_date = datetime.strptime(query.end_date, "%Y-%m-%d").date()
db_query = db_query.filter(SSQLottery.open_time <= end_date)
except ValueError:
pass
# 获取总记录数
total = db_query.count()
# 应用分页
page = query.page or 1
page_size = query.page_size or 20
db_query = db_query.order_by(desc(SSQLottery.open_time)) \
.offset((page - 1) * page_size) \
.limit(page_size)
return db_query.all(), total
@staticmethod
def get_dlt_lotteries(
db: Session,
query: LotteryQuery
) -> Tuple[List[DLTLottery], int]:
"""获取大乐透开奖记录"""
db_query = db.query(DLTLottery)
# 应用过滤条件
if query.issue:
db_query = db_query.filter(DLTLottery.issue == query.issue)
if query.start_date:
try:
start_date = datetime.strptime(
query.start_date, "%Y-%m-%d").date()
db_query = db_query.filter(DLTLottery.open_time >= start_date)
except ValueError:
pass
if query.end_date:
try:
end_date = datetime.strptime(query.end_date, "%Y-%m-%d").date()
db_query = db_query.filter(DLTLottery.open_time <= end_date)
except ValueError:
pass
# 获取总记录数
total = db_query.count()
# 应用分页
page = query.page or 1
page_size = query.page_size or 20
db_query = db_query.order_by(desc(DLTLottery.open_time)) \
.offset((page - 1) * page_size) \
.limit(page_size)
return db_query.all(), total
@staticmethod
def get_ssq_statistics(db: Session):
# 红球统计
red_freq = []
for i in range(1, 7):
col = getattr(SSQLottery, f"red_ball_{i}")
result = db.query(col, func.count().label(
'count')).group_by(col).all()
for number, count in result:
red_freq.append((number, count))
# 汇总红球频率
red_counter = {}
for number, count in red_freq:
red_counter[number] = red_counter.get(number, 0) + count
red_balls = sorted([(k, v)
for k, v in red_counter.items()], key=lambda x: x[0])
# 蓝球统计
blue_freq = db.query(SSQLottery.blue_ball, func.count().label(
'count')).group_by(SSQLottery.blue_ball).all()
blue_balls = sorted([(k, v) for k, v in blue_freq], key=lambda x: x[0])
return {
"red_balls": red_balls,
"blue_balls": blue_balls
}
@staticmethod
def get_dlt_statistics(db: Session):
# 前区统计
front_freq = []
for i in range(1, 6):
col = getattr(DLTLottery, f"front_ball_{i}")
result = db.query(col, func.count().label(
'count')).group_by(col).all()
for number, count in result:
front_freq.append((number, count))
front_counter = {}
for number, count in front_freq:
front_counter[number] = front_counter.get(number, 0) + count
front_balls = sorted(
[(k, v) for k, v in front_counter.items()], key=lambda x: x[0])
# 后区统计
back_freq = []
for i in range(1, 3):
col = getattr(DLTLottery, f"back_ball_{i}")
result = db.query(col, func.count().label(
'count')).group_by(col).all()
for number, count in result:
back_freq.append((number, count))
back_counter = {}
for number, count in back_freq:
back_counter[number] = back_counter.get(number, 0) + count
back_balls = sorted(
[(k, v) for k, v in back_counter.items()], key=lambda x: x[0])
return {
"front_balls": front_balls,
"back_balls": back_balls
}
@staticmethod
def import_ssq_data(db: Session, file_path: str) -> int:
import pandas as pd
from app.models.lottery import SSQLottery
df = pd.read_json(file_path)
# 先查出所有已存在的期号
existing_issues = set(
i[0] for i in db.query(SSQLottery.issue).filter(SSQLottery.issue.in_(df['issue'].astype(str).tolist())).all()
)
# pandas 端彻底去重
new_rows = df[~df['issue'].astype(str).isin(
existing_issues)].drop_duplicates(subset=['issue'])
objs = [
SSQLottery(
issue=str(row['issue']),
open_time=row['open_time'],
red_ball_1=int(row['red_ball_1']),
red_ball_2=int(row['red_ball_2']),
red_ball_3=int(row['red_ball_3']),
red_ball_4=int(row['red_ball_4']),
red_ball_5=int(row['red_ball_5']),
red_ball_6=int(row['red_ball_6']),
blue_ball=int(row['blue_ball'])
)
for _, row in new_rows.iterrows()
]
if objs:
try:
db.bulk_save_objects(objs)
db.commit()
except Exception as e:
db.rollback()
print(f"Bulk insert error: {e}")
return len(objs)
@staticmethod
def import_dlt_data(db: Session, file_path: str) -> int:
import pandas as pd
from app.models.lottery import DLTLottery
df = pd.read_json(file_path)
# 先查出所有已存在的期号
existing_issues = set(
i[0] for i in db.query(DLTLottery.issue).filter(DLTLottery.issue.in_(df['issue'].astype(str).tolist())).all()
)
# pandas 端彻底去重
new_rows = df[~df['issue'].astype(str).isin(
existing_issues)].drop_duplicates(subset=['issue'])
objs = [
DLTLottery(
issue=str(row['issue']),
open_time=row['open_time'],
front_ball_1=int(row['front_ball_1']),
front_ball_2=int(row['front_ball_2']),
front_ball_3=int(row['front_ball_3']),
front_ball_4=int(row['front_ball_4']),
front_ball_5=int(row['front_ball_5']),
back_ball_1=int(row['back_ball_1']),
back_ball_2=int(row['back_ball_2'])
)
for _, row in new_rows.iterrows()
]
if objs:
try:
db.bulk_save_objects(objs)
db.commit()
except Exception as e:
db.rollback()
print(f"Bulk insert error: {e}")
return len(objs)
@staticmethod
def get_latest_ssq(db: Session) -> Optional[SSQLottery]:
"""获取最新一期双色球开奖记录"""
return db.query(SSQLottery).order_by(desc(SSQLottery.issue)).first()
@staticmethod
def get_latest_dlt(db: Session) -> Optional[DLTLottery]:
"""获取最新一期大乐透开奖记录"""
return db.query(DLTLottery).order_by(desc(DLTLottery.issue)).first()