250 lines
8.7 KiB
Python
250 lines
8.7 KiB
Python
from sqlalchemy.orm import Session
|
|
from sqlalchemy import func, desc
|
|
from typing import List, Optional, Dict, Any, Tuple
|
|
from datetime import date, datetime
|
|
import pandas as pd
|
|
from app.models.lottery import SSQLottery, DLTLottery
|
|
from app.schemas.lottery import SSQLotteryCreate, DLTLotteryCreate, LotteryQuery
|
|
|
|
|
|
class LotteryService:
|
|
@staticmethod
|
|
def create_ssq_lottery(db: Session, lottery: SSQLotteryCreate) -> SSQLottery:
|
|
db_lottery = SSQLottery(**lottery.model_dump())
|
|
db.add(db_lottery)
|
|
db.commit()
|
|
db.refresh(db_lottery)
|
|
return db_lottery
|
|
|
|
@staticmethod
|
|
def create_dlt_lottery(db: Session, lottery: DLTLotteryCreate) -> DLTLottery:
|
|
db_lottery = DLTLottery(**lottery.model_dump())
|
|
db.add(db_lottery)
|
|
db.commit()
|
|
db.refresh(db_lottery)
|
|
return db_lottery
|
|
|
|
@staticmethod
|
|
def get_ssq_lotteries(
|
|
db: Session,
|
|
query: LotteryQuery
|
|
) -> Tuple[List[SSQLottery], int]:
|
|
"""获取双色球开奖记录"""
|
|
db_query = db.query(SSQLottery)
|
|
|
|
# 应用过滤条件
|
|
if query.issue:
|
|
db_query = db_query.filter(SSQLottery.issue == query.issue)
|
|
if query.start_date:
|
|
try:
|
|
start_date = datetime.strptime(
|
|
query.start_date, "%Y-%m-%d").date()
|
|
db_query = db_query.filter(SSQLottery.open_time >= start_date)
|
|
except ValueError:
|
|
pass
|
|
if query.end_date:
|
|
try:
|
|
end_date = datetime.strptime(query.end_date, "%Y-%m-%d").date()
|
|
db_query = db_query.filter(SSQLottery.open_time <= end_date)
|
|
except ValueError:
|
|
pass
|
|
|
|
# 获取总记录数
|
|
total = db_query.count()
|
|
|
|
# 应用分页
|
|
page = query.page or 1
|
|
page_size = query.page_size or 20
|
|
db_query = db_query.order_by(desc(SSQLottery.open_time)) \
|
|
.offset((page - 1) * page_size) \
|
|
.limit(page_size)
|
|
|
|
return db_query.all(), total
|
|
|
|
@staticmethod
|
|
def get_dlt_lotteries(
|
|
db: Session,
|
|
query: LotteryQuery
|
|
) -> Tuple[List[DLTLottery], int]:
|
|
"""获取大乐透开奖记录"""
|
|
db_query = db.query(DLTLottery)
|
|
|
|
# 应用过滤条件
|
|
if query.issue:
|
|
db_query = db_query.filter(DLTLottery.issue == query.issue)
|
|
if query.start_date:
|
|
try:
|
|
start_date = datetime.strptime(
|
|
query.start_date, "%Y-%m-%d").date()
|
|
db_query = db_query.filter(DLTLottery.open_time >= start_date)
|
|
except ValueError:
|
|
pass
|
|
if query.end_date:
|
|
try:
|
|
end_date = datetime.strptime(query.end_date, "%Y-%m-%d").date()
|
|
db_query = db_query.filter(DLTLottery.open_time <= end_date)
|
|
except ValueError:
|
|
pass
|
|
|
|
# 获取总记录数
|
|
total = db_query.count()
|
|
|
|
# 应用分页
|
|
page = query.page or 1
|
|
page_size = query.page_size or 20
|
|
db_query = db_query.order_by(desc(DLTLottery.open_time)) \
|
|
.offset((page - 1) * page_size) \
|
|
.limit(page_size)
|
|
|
|
return db_query.all(), total
|
|
|
|
@staticmethod
|
|
def get_ssq_statistics(db: Session):
|
|
# 红球统计
|
|
red_freq = []
|
|
for i in range(1, 7):
|
|
col = getattr(SSQLottery, f"red_ball_{i}")
|
|
result = db.query(col, func.count().label(
|
|
'count')).group_by(col).all()
|
|
for number, count in result:
|
|
red_freq.append((number, count))
|
|
# 汇总红球频率
|
|
red_counter = {}
|
|
for number, count in red_freq:
|
|
red_counter[number] = red_counter.get(number, 0) + count
|
|
red_balls = sorted([(k, v)
|
|
for k, v in red_counter.items()], key=lambda x: x[0])
|
|
|
|
# 蓝球统计
|
|
blue_freq = db.query(SSQLottery.blue_ball, func.count().label(
|
|
'count')).group_by(SSQLottery.blue_ball).all()
|
|
blue_balls = sorted([(k, v) for k, v in blue_freq], key=lambda x: x[0])
|
|
|
|
return {
|
|
"red_balls": red_balls,
|
|
"blue_balls": blue_balls
|
|
}
|
|
|
|
@staticmethod
|
|
def get_dlt_statistics(db: Session):
|
|
# 前区统计
|
|
front_freq = []
|
|
for i in range(1, 6):
|
|
col = getattr(DLTLottery, f"front_ball_{i}")
|
|
result = db.query(col, func.count().label(
|
|
'count')).group_by(col).all()
|
|
for number, count in result:
|
|
front_freq.append((number, count))
|
|
front_counter = {}
|
|
for number, count in front_freq:
|
|
front_counter[number] = front_counter.get(number, 0) + count
|
|
front_balls = sorted(
|
|
[(k, v) for k, v in front_counter.items()], key=lambda x: x[0])
|
|
|
|
# 后区统计
|
|
back_freq = []
|
|
for i in range(1, 3):
|
|
col = getattr(DLTLottery, f"back_ball_{i}")
|
|
result = db.query(col, func.count().label(
|
|
'count')).group_by(col).all()
|
|
for number, count in result:
|
|
back_freq.append((number, count))
|
|
back_counter = {}
|
|
for number, count in back_freq:
|
|
back_counter[number] = back_counter.get(number, 0) + count
|
|
back_balls = sorted(
|
|
[(k, v) for k, v in back_counter.items()], key=lambda x: x[0])
|
|
|
|
return {
|
|
"front_balls": front_balls,
|
|
"back_balls": back_balls
|
|
}
|
|
|
|
@staticmethod
|
|
def import_ssq_data(db: Session, file_path: str) -> int:
|
|
import pandas as pd
|
|
from app.models.lottery import SSQLottery
|
|
|
|
df = pd.read_json(file_path)
|
|
# 先查出所有已存在的期号
|
|
existing_issues = set(
|
|
i[0] for i in db.query(SSQLottery.issue).filter(SSQLottery.issue.in_(df['issue'].astype(str).tolist())).all()
|
|
)
|
|
# pandas 端彻底去重
|
|
new_rows = df[~df['issue'].astype(str).isin(
|
|
existing_issues)].drop_duplicates(subset=['issue'])
|
|
# 按open_time升序排序
|
|
new_rows = new_rows.sort_values(by='open_time', ascending=True)
|
|
|
|
objs = [
|
|
SSQLottery(
|
|
issue=str(row['issue']),
|
|
open_time=row['open_time'],
|
|
red_ball_1=int(row['red_ball_1']),
|
|
red_ball_2=int(row['red_ball_2']),
|
|
red_ball_3=int(row['red_ball_3']),
|
|
red_ball_4=int(row['red_ball_4']),
|
|
red_ball_5=int(row['red_ball_5']),
|
|
red_ball_6=int(row['red_ball_6']),
|
|
blue_ball=int(row['blue_ball'])
|
|
)
|
|
for _, row in new_rows.iterrows()
|
|
]
|
|
if objs:
|
|
try:
|
|
db.bulk_save_objects(objs)
|
|
db.commit()
|
|
except Exception as e:
|
|
db.rollback()
|
|
print(f"Bulk insert error: {e}")
|
|
return len(objs)
|
|
|
|
@staticmethod
|
|
def import_dlt_data(db: Session, file_path: str) -> int:
|
|
import pandas as pd
|
|
from app.models.lottery import DLTLottery
|
|
|
|
df = pd.read_json(file_path)
|
|
# 先查出所有已存在的期号
|
|
existing_issues = set(
|
|
i[0] for i in db.query(DLTLottery.issue).filter(DLTLottery.issue.in_(df['issue'].astype(str).tolist())).all()
|
|
)
|
|
# pandas 端彻底去重
|
|
new_rows = df[~df['issue'].astype(str).isin(
|
|
existing_issues)].drop_duplicates(subset=['issue'])
|
|
# 按open_time升序排序
|
|
new_rows = new_rows.sort_values(by='open_time', ascending=True)
|
|
|
|
objs = [
|
|
DLTLottery(
|
|
issue=str(row['issue']),
|
|
open_time=row['open_time'],
|
|
front_ball_1=int(row['front_ball_1']),
|
|
front_ball_2=int(row['front_ball_2']),
|
|
front_ball_3=int(row['front_ball_3']),
|
|
front_ball_4=int(row['front_ball_4']),
|
|
front_ball_5=int(row['front_ball_5']),
|
|
back_ball_1=int(row['back_ball_1']),
|
|
back_ball_2=int(row['back_ball_2'])
|
|
)
|
|
for _, row in new_rows.iterrows()
|
|
]
|
|
if objs:
|
|
try:
|
|
db.bulk_save_objects(objs)
|
|
db.commit()
|
|
except Exception as e:
|
|
db.rollback()
|
|
print(f"Bulk insert error: {e}")
|
|
return len(objs)
|
|
|
|
@staticmethod
|
|
def get_latest_ssq(db: Session) -> Optional[SSQLottery]:
|
|
"""获取最新一期双色球开奖记录"""
|
|
return db.query(SSQLottery).order_by(desc(SSQLottery.open_time)).first()
|
|
|
|
@staticmethod
|
|
def get_latest_dlt(db: Session) -> Optional[DLTLottery]:
|
|
"""获取最新一期大乐透开奖记录"""
|
|
return db.query(DLTLottery).order_by(desc(DLTLottery.open_time)).first()
|