feat: 幽默风格中奖播报,推送内容更有趣味性,号码结果一行展示,结尾自动心情表达

This commit is contained in:
Mars 2025-07-02 16:15:23 +08:00
parent 4afe401568
commit 2601246be5

View File

@ -4,7 +4,10 @@ import logging
from datetime import datetime
from update_lottery import LotteryUpdater
from app.services.prediction_service import PredictionService
from app.services.telegram_bot import send_message
from app.core.database import SessionLocal
from app.models.lottery import SSQLotteryBetRecord, DLTLotteryBetRecord, SSQLottery, DLTLottery
import random
# 配置日志
logging.basicConfig(
@ -76,6 +79,77 @@ class LotteryScheduler:
)
logger.info(f"大乐透拼盘推荐生成完成,共生成 {len(dlt_recommendations)}")
# 推送到Telegram
try:
logger.info("开始推送到Telegram...")
# 固定投注号码参照prediction.py
FIXED_SSQ = {'red': [4, 6, 7, 12, 18, 19], 'blue': [9]}
FIXED_DLT = {'red': [4, 6, 9, 18, 19], 'blue': [7, 12]}
# 组装推送内容参照prediction.py的格式
msg_lines = []
# 根据当前时间判断彩种参照prediction.py逻辑
weekday = datetime.now().isoweekday()
if weekday in [1, 3, 6]: # 周一、三、六
lottery_type_push = 'dlt'
fixed = FIXED_DLT
title = "你好,帮忙买下大乐透。"
dlt_append = True
else: # 周二、四、日
lottery_type_push = 'ssq'
fixed = FIXED_SSQ
title = "你好,帮忙买下双色球。"
dlt_append = False
msg_lines.append(title)
# 固定投注
msg_lines.append(f"红球:{' '.join(f'{n:02d}' for n in fixed['red'])} 蓝球:{
' '.join(f'{n:02d}' for n in fixed['blue'])}")
# 推荐号码(双色球)
if ssq_recommendations:
for rec in ssq_recommendations[:4]:
red_numbers = rec.get('red_numbers', [])
blue_numbers = rec.get('blue_numbers', [])
red_str = ' '.join(f"{n:02d}" for n in red_numbers)
blue_str = ' '.join(
f"{n:02d}" for n in blue_numbers)
msg_lines.append(f"红球:{red_str} 蓝球:{blue_str}")
# 推荐号码(大乐透)
if dlt_recommendations:
for rec in dlt_recommendations[:4]:
red_numbers = rec.get('red_numbers', [])
blue_numbers = rec.get('blue_numbers', [])
red_str = ' '.join(f"{n:02d}" for n in red_numbers)
blue_str = ' '.join(
f"{n:02d}" for n in blue_numbers)
msg_lines.append(f"红球:{red_str} 蓝球:{blue_str}")
if dlt_append:
msg_lines.append("都追加,谢谢。")
# 快乐8推送3注十选合并到主消息
msg_lines.append("\n快乐8单式选十")
for _ in range(3):
nums = sorted(random.sample(range(1, 81), 10))
msg_lines.append(' '.join(f"{n:02d}" for n in nums))
# 发送消息
message = '\n'.join(msg_lines)
result = send_message(message)
if result:
logger.info("Telegram推送成功")
else:
logger.warning("Telegram推送失败")
except Exception as e:
logger.error(f"Telegram推送失败: {str(e)}")
logger.info("下午5点生成拼盘推荐任务完成")
finally:
@ -84,17 +158,184 @@ class LotteryScheduler:
except Exception as e:
logger.error(f"下午5点生成拼盘推荐任务失败: {str(e)}")
def check_previous_day_wins_job(self):
"""每天早上9点半幽默播报昨日号码命运"""
try:
logger.info("开始执行早上9点半幽默播报任务...")
from datetime import timedelta
yesterday = datetime.now() - timedelta(days=1)
target_date = yesterday.strftime('%Y-%m-%d')
current_weekday = datetime.now().isoweekday()
if current_weekday == 6: # 周六
logger.info("周六休息,不推送中奖检查")
return
if current_weekday in [1, 3, 5, 7]:
lottery_type = 'ssq'
BetModel = SSQLotteryBetRecord
LotteryModel = SSQLottery
fixed_numbers = {'red': [4, 6, 7, 12, 18, 19], 'blue': [9]}
else:
lottery_type = 'dlt'
BetModel = DLTLotteryBetRecord
LotteryModel = DLTLottery
fixed_numbers = {'red': [4, 6, 9, 18, 19], 'blue': [7, 12]}
db = SessionLocal()
try:
target_batch_id_pattern = f"{yesterday.strftime('%Y%m%d')}%"
bet_records = db.query(BetModel).filter(
BetModel.batch_id.like(target_batch_id_pattern)
).all()
self.updater.check_and_update_bet_wins(lottery_type, db)
updated_records = db.query(BetModel).filter(
BetModel.batch_id.like(target_batch_id_pattern)
).all()
last_draw = db.query(LotteryModel).filter(
LotteryModel.open_time == yesterday.date()).first()
open_code_str = ""
if last_draw:
if lottery_type == 'ssq':
open_red = [last_draw.red_ball_1, last_draw.red_ball_2, last_draw.red_ball_3,
last_draw.red_ball_4, last_draw.red_ball_5, last_draw.red_ball_6]
open_blue = [last_draw.blue_ball]
open_code_str = f"{' '.join(f'{n:02d}' for n in open_red)} | {
' '.join(f'{n:02d}' for n in open_blue)}"
else:
open_red = [last_draw.front_ball_1, last_draw.front_ball_2,
last_draw.front_ball_3, last_draw.front_ball_4, last_draw.front_ball_5]
open_blue = [last_draw.back_ball_1,
last_draw.back_ball_2]
open_code_str = f"{' '.join(f'{n:02d}' for n in open_red)} | {
' '.join(f'{n:02d}' for n in open_blue)}"
# 固定号码开奖比对
fixed_result = None
if last_draw:
bet_red = fixed_numbers['red']
bet_blue = fixed_numbers['blue']
red_hit = len(set(bet_red) & set(open_red))
blue_hit = len(set(bet_blue) & set(open_blue))
win_level, win_amount = self.updater.calc_prize(
lottery_type, red_hit, blue_hit, bet_red, bet_blue, open_red, open_blue)
fixed_result = {
'red': bet_red,
'blue': bet_blue,
'win_level': win_level,
'win_amount': win_amount
}
# 幽默短语
miss_phrases = [
"擦肩而过",
"与大奖无缘",
"继续努力",
"还差亿点点",
"中奖是种信仰",
"大奖说:明天见!",
"陪跑小能手上线",
"差点就中啦!",
"梦想还在路上",
"运气在加载中..."
]
win_phrases = [
"欧气爆棚!",
"今晚加鸡腿!",
"终于等到你!",
"天选之子!",
"请我吃饭吧!",
"这波可以吹一年!"
]
# 组装推送消息
msg_lines = [
random.choice([
"昨晚的梦想,今早的现实播报!",
"新鲜出炉的号码命运播报来啦!",
"昨夜许愿,今晨揭晓!",
"号码的奇幻漂流记:",
"昨晚的神秘数字,今早的答案!"
])
]
msg_lines.append(
f"昨晚的神秘数字是:{open_code_str if open_code_str else '(暂无)'}\n")
msg_lines.append(random.choice([
"我悄悄许下的愿望号码:",
"我的小目标们:",
"昨夜的希望清单:",
"这些号码陪我走过了一个夜晚:"
]))
# 固定号码
red_str = ' '.join(f"{n:02d}" for n in fixed_numbers['red'])
blue_str = ' '.join(f"{n:02d}" for n in fixed_numbers['blue'])
if fixed_result:
if fixed_result['win_level']:
win_text = f"{fixed_result['win_level']},奖金{fixed_result['win_amount']}元!{random.choice(win_phrases)}"
else:
win_text = random.choice(miss_phrases)
msg_lines.append(f"{red_str} | {blue_str}{win_text}")
else:
msg_lines.append(f"{red_str} | {blue_str} → 暂无开奖数据")
# 推荐号码
win_count = 0
for rec in updated_records:
numbers_parts = rec.numbers.split('|')
red_numbers = [int(n) for n in numbers_parts[0].split(
',')] if numbers_parts[0] else []
blue_numbers = [int(n) for n in numbers_parts[1].split(',')] if len(
numbers_parts) > 1 and numbers_parts[1] else []
red_str = ' '.join(f"{n:02d}" for n in red_numbers)
blue_str = ' '.join(f"{n:02d}" for n in blue_numbers)
if rec.is_winner == 1:
win_count += 1
win_text = f"{rec.win_level},奖金{rec.win_amount}元!{random.choice(win_phrases)}"
else:
win_text = random.choice(miss_phrases)
msg_lines.append(f"{red_str} | {blue_str}{win_text}")
# 结尾心情
if (fixed_result and fixed_result['win_level']) or win_count > 0:
mood = random.choice([
"🎉 好运终于眷顾了我,今晚加鸡腿!",
"🥳 好运爆棚,今晚请我吃饭!",
"😎 中奖的感觉真不错!",
"🍻 好运挡不住,继续冲!"
])
else:
mood = random.choice([
"😅 今天又是陪跑小能手,大奖说'明天见'",
"😂 全军覆没,明天继续做梦!",
"😭 梦想还在,大奖还远!",
"🤡 继续修炼欧气,明天再战!"
])
msg_lines.append("")
msg_lines.append(mood)
# 推送到Telegram
try:
logger.info("开始推送幽默中奖播报到Telegram...")
message = '\n'.join(msg_lines)
result = send_message(message)
if result:
logger.info("幽默中奖播报推送成功")
else:
logger.warning("幽默中奖播报推送失败")
except Exception as e:
logger.error(f"幽默中奖播报推送失败: {str(e)}")
logger.info(f"早上9点半幽默播报任务完成")
finally:
db.close()
except Exception as e:
logger.error(f"早上9点半幽默播报任务失败: {str(e)}")
def start_scheduler(self):
"""启动定时任务调度器"""
# 每天早上6点更新开奖数据
schedule.every().day.at("06:00").do(self.update_lottery_data_job)
# 每天早上9点半检查前天中奖情况
schedule.every().day.at("09:30").do(self.check_previous_day_wins_job)
# 每天下午5点生成拼盘推荐
schedule.every().day.at("17:00").do(self.generate_daily_recommendations_job)
logger.info("定时任务调度器已启动")
logger.info("已设置任务:")
logger.info("- 每天早上6点: 更新开奖数据")
logger.info("- 每天早上9点半: 检查前天中奖情况")
logger.info("- 每天下午5点: 生成拼盘推荐")
# 运行调度器