mottery/backend/scheduler.py

355 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import schedule
import time
import logging
from datetime import datetime
from update_lottery import LotteryUpdater
from app.services.prediction_service import PredictionService
from app.services.telegram_bot import send_message
from app.core.database import SessionLocal
from app.models.lottery import SSQLotteryBetRecord, DLTLotteryBetRecord, SSQLottery, DLTLottery
import random
import os
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filename='lottery_scheduler.log'
)
logger = logging.getLogger(__name__)
class LotteryScheduler:
def __init__(self):
self.updater = LotteryUpdater()
def update_lottery_data_job(self):
"""每天早上6点更新开奖数据任务"""
try:
logger.info("开始执行早上6点更新开奖数据任务...")
# 更新双色球数据
logger.info("开始更新双色球数据...")
ssq_result = self.updater.update_lottery_data('ssq')
logger.info(f"双色球数据更新完成: {ssq_result}")
# 更新大乐透数据
logger.info("开始更新大乐透数据...")
dlt_result = self.updater.update_lottery_data('dlt')
logger.info(f"大乐透数据更新完成: {dlt_result}")
# 检查中奖情况
db = SessionLocal()
try:
logger.info("开始检查双色球中奖情况...")
self.updater.check_and_update_bet_wins('ssq', db)
logger.info("双色球中奖检查完成")
logger.info("开始检查大乐透中奖情况...")
self.updater.check_and_update_bet_wins('dlt', db)
logger.info("大乐透中奖检查完成")
finally:
db.close()
logger.info("早上6点更新开奖数据任务完成")
except Exception as e:
logger.error(f"早上6点更新开奖数据任务失败: {str(e)}")
def generate_daily_recommendations_job(self):
"""每天下午5点生成拼盘推荐任务推送内容与prediction.py recommend_today接口一致"""
try:
logger.info("开始执行下午5点生成拼盘推荐任务...")
import datetime
import os
db = SessionLocal()
try:
# 判断彩种
weekday = datetime.datetime.now().isoweekday()
if weekday == 5:
logger.info("周五休息,不推送推荐")
return
FIXED_SSQ = {'red': [4, 6, 7, 12, 18, 19], 'blue': [9]}
FIXED_DLT = {'red': [4, 6, 9, 18, 19], 'blue': [7, 12]}
if weekday in [1, 3, 6]:
lottery_type = 'dlt'
fixed = FIXED_DLT
title = "你好,帮忙买下大乐透。"
dlt_append = True
else:
lottery_type = 'ssq'
fixed = FIXED_SSQ
title = "你好,帮忙买下双色球。"
dlt_append = False
# 生成batch_id
today = datetime.datetime.now().strftime('%Y%m%d')
batch_id = f"{today}-{random.randint(1000, 9999)}"
# 推荐4注集成预测+智能选号补足)
service = PredictionService(db)
pred = service.get_ensemble_prediction(
lottery_type, periods=200)
recs = pred.get('recommendations', []) if pred.get(
'success') else []
results = []
used_numbers = set()
# 先取集成预测推荐
for rec in recs:
if len(results) >= 4:
break
red = rec.get('numbers')
blue = [rec.get('blue')] if rec.get(
'blue') is not None else rec.get('blues', [])
if not red or not blue:
continue
numbers = ','.join(
f"{n:02d}" for n in red) + '|' + ','.join(f"{n:02d}" for n in blue)
if numbers in used_numbers:
continue
used_numbers.add(numbers)
results.append({'red': red, 'blue': blue})
# 补足到4注
from app.services.analysis_service import LotteryAnalysisService
analysis_service = LotteryAnalysisService(db)
while len(results) < 4:
smart = analysis_service.generate_smart_numbers(
lottery_type, strategy='balanced', count=1)
if not smart:
break
red = smart[0].get('red_numbers')
blue = smart[0].get('blue_numbers')
numbers = ','.join(
f"{n:02d}" for n in red) + '|' + ','.join(f"{n:02d}" for n in blue)
if numbers in used_numbers:
continue
used_numbers.add(numbers)
results.append({'red': red, 'blue': blue})
# 组装推送内容
msg_lines = [title]
red_str = ' '.join(f"{n:02d}" for n in fixed['red'])
blue_str = ' '.join(f"{n:02d}" for n in fixed['blue'])
msg_lines.append(f"红球:{red_str} 蓝球:{blue_str}")
for rec in results:
red_str = ' '.join(f"{n:02d}" for n in rec['red'])
blue_str = ' '.join(f"{n:02d}" for n in rec['blue'])
msg_lines.append(f"红球:{red_str} 蓝球:{blue_str}")
if dlt_append:
msg_lines.append("都追加,谢谢。")
msg_lines.append("\n快乐8单式选十")
for _ in range(3):
nums = sorted(random.sample(range(1, 81), 10))
msg_lines.append(' '.join(f"{n:02d}" for n in nums))
# 推送到Telegram
try:
logger.info("开始推送到Telegram...")
token = os.getenv("TELEGRAM_BOT_TOKEN")
chat_id = os.getenv("TELEGRAM_CHAT_ID")
result = send_message(
'\n'.join(msg_lines), token=token, chat_id=chat_id)
if result:
logger.info("Telegram推送成功")
logger.info(f"Telegram返回: {result}")
else:
logger.warning("Telegram推送失败")
except Exception as e:
logger.error(f"Telegram推送失败: {str(e)}")
logger.info("下午5点生成拼盘推荐任务完成")
finally:
db.close()
except Exception as e:
logger.error(f"下午5点生成拼盘推荐任务失败: {str(e)}")
def check_previous_day_wins_job(self):
"""每天早上9点半幽默播报昨日号码命运"""
try:
logger.info("开始执行早上9点半幽默播报任务...")
from datetime import timedelta
yesterday = datetime.now() - timedelta(days=1)
target_date = yesterday.strftime('%Y-%m-%d')
current_weekday = datetime.now().isoweekday()
if current_weekday == 6: # 周六
logger.info("周六休息,不推送中奖检查")
return
if current_weekday in [1, 3, 5, 7]:
lottery_type = 'ssq'
BetModel = SSQLotteryBetRecord
LotteryModel = SSQLottery
fixed_numbers = {'red': [4, 6, 7, 12, 18, 19], 'blue': [9]}
else:
lottery_type = 'dlt'
BetModel = DLTLotteryBetRecord
LotteryModel = DLTLottery
fixed_numbers = {'red': [4, 6, 9, 18, 19], 'blue': [7, 12]}
db = SessionLocal()
try:
target_batch_id_pattern = f"{yesterday.strftime('%Y%m%d')}%"
bet_records = db.query(BetModel).filter(
BetModel.batch_id.like(target_batch_id_pattern)
).all()
self.updater.check_and_update_bet_wins(lottery_type, db)
updated_records = db.query(BetModel).filter(
BetModel.batch_id.like(target_batch_id_pattern)
).all()
last_draw = db.query(LotteryModel).filter(
LotteryModel.open_time == yesterday.date()).first()
open_code_str = ""
if last_draw:
if lottery_type == 'ssq':
open_red = [last_draw.red_ball_1, last_draw.red_ball_2, last_draw.red_ball_3,
last_draw.red_ball_4, last_draw.red_ball_5, last_draw.red_ball_6]
open_blue = [last_draw.blue_ball]
open_code_str = f"{' '.join(f'{n:02d}' for n in open_red)} | {
' '.join(f'{n:02d}' for n in open_blue)}"
else:
open_red = [last_draw.front_ball_1, last_draw.front_ball_2,
last_draw.front_ball_3, last_draw.front_ball_4, last_draw.front_ball_5]
open_blue = [last_draw.back_ball_1,
last_draw.back_ball_2]
open_code_str = f"{' '.join(f'{n:02d}' for n in open_red)} | {
' '.join(f'{n:02d}' for n in open_blue)}"
# 固定号码开奖比对
fixed_result = None
if last_draw:
bet_red = fixed_numbers['red']
bet_blue = fixed_numbers['blue']
red_hit = len(set(bet_red) & set(open_red))
blue_hit = len(set(bet_blue) & set(open_blue))
win_level, win_amount = self.updater.calc_prize(
lottery_type, red_hit, blue_hit, bet_red, bet_blue, open_red, open_blue)
fixed_result = {
'red': bet_red,
'blue': bet_blue,
'win_level': win_level,
'win_amount': win_amount
}
# 幽默短语
miss_phrases = [
"擦肩而过",
"与大奖无缘",
"继续努力",
"还差亿点点",
"中奖是种信仰",
"大奖说:明天见!",
"陪跑小能手上线",
"差点就中啦!",
"梦想还在路上",
"运气在加载中..."
]
win_phrases = [
"欧气爆棚!",
"今晚加鸡腿!",
"终于等到你!",
"天选之子!",
"请我吃饭吧!",
"这波可以吹一年!"
]
# 组装推送消息
msg_lines = [
random.choice([
"昨晚的梦想,今早的现实播报!",
"新鲜出炉的号码命运播报来啦!",
"昨夜许愿,今晨揭晓!",
"号码的奇幻漂流记:",
"昨晚的神秘数字,今早的答案!"
])
]
msg_lines.append(
f"昨晚的神秘数字是:{open_code_str if open_code_str else '(暂无)'}\n")
msg_lines.append(random.choice([
"我悄悄许下的愿望号码:",
"我的小目标们:",
"昨夜的希望清单:",
"这些号码陪我走过了一个夜晚:"
]))
# 固定号码
red_str = ' '.join(f"{n:02d}" for n in fixed_numbers['red'])
blue_str = ' '.join(f"{n:02d}" for n in fixed_numbers['blue'])
if fixed_result:
if fixed_result['win_level']:
win_text = f"{fixed_result['win_level']},奖金{fixed_result['win_amount']}元!{random.choice(win_phrases)}"
else:
win_text = random.choice(miss_phrases)
msg_lines.append(f"{red_str} | {blue_str}{win_text}")
else:
msg_lines.append(f"{red_str} | {blue_str} → 暂无开奖数据")
# 推荐号码
win_count = 0
for rec in updated_records:
numbers_parts = rec.numbers.split('|')
red_numbers = [int(n) for n in numbers_parts[0].split(
',')] if numbers_parts[0] else []
blue_numbers = [int(n) for n in numbers_parts[1].split(',')] if len(
numbers_parts) > 1 and numbers_parts[1] else []
red_str = ' '.join(f"{n:02d}" for n in red_numbers)
blue_str = ' '.join(f"{n:02d}" for n in blue_numbers)
if rec.is_winner == 1:
win_count += 1
win_text = f"{rec.win_level},奖金{rec.win_amount}元!{random.choice(win_phrases)}"
else:
win_text = random.choice(miss_phrases)
msg_lines.append(f"{red_str} | {blue_str}{win_text}")
# 结尾心情
if (fixed_result and fixed_result['win_level']) or win_count > 0:
mood = random.choice([
"🎉 好运终于眷顾了我,今晚加鸡腿!",
"🥳 好运爆棚,今晚请我吃饭!",
"😎 中奖的感觉真不错!",
"🍻 好运挡不住,继续冲!"
])
else:
mood = random.choice([
"😅 今天又是陪跑小能手,大奖说'明天见'",
"😂 全军覆没,明天继续做梦!",
"😭 梦想还在,大奖还远!",
"🤡 继续修炼欧气,明天再战!"
])
msg_lines.append("")
msg_lines.append(mood)
# 推送到Telegram
try:
logger.info("开始推送幽默中奖播报到Telegram...")
message = '\n'.join(msg_lines)
result = send_message(message)
if result:
logger.info("幽默中奖播报推送成功")
else:
logger.warning("幽默中奖播报推送失败")
except Exception as e:
logger.error(f"幽默中奖播报推送失败: {str(e)}")
logger.info(f"早上9点半幽默播报任务完成")
finally:
db.close()
except Exception as e:
logger.error(f"早上9点半幽默播报任务失败: {str(e)}")
def start_scheduler(self):
"""启动定时任务调度器"""
# 每天早上6点更新开奖数据
schedule.every().day.at("06:00").do(self.update_lottery_data_job)
# 每天早上9点半检查前天中奖情况
schedule.every().day.at("09:30").do(self.check_previous_day_wins_job)
# 每天下午5点生成拼盘推荐
schedule.every().day.at("17:00").do(self.generate_daily_recommendations_job)
logger.info("定时任务调度器已启动")
logger.info("已设置任务:")
logger.info("- 每天早上6点: 更新开奖数据")
logger.info("- 每天早上9点半: 检查前天中奖情况")
logger.info("- 每天下午5点: 生成拼盘推荐")
# 运行调度器
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次
def main():
"""主函数"""
scheduler = LotteryScheduler()
scheduler.start_scheduler()
if __name__ == "__main__":
main()