mottery/backend/app/services/analysis_service.py

350 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import List, Dict, Tuple, Optional
import numpy as np
from collections import defaultdict
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from ..models.lottery import SSQLottery, DLTLottery
class LotteryAnalysisService:
def __init__(self, db: Session):
self.db = db
def get_hot_cold_numbers(self, lottery_type: str, periods: int = 50) -> Dict:
"""分析热号和冷号
Args:
lottery_type: 彩票类型 ('ssq''dlt')
periods: 分析期数默认50期
Returns:
Dict: 包含热号和冷号的字典
"""
model = SSQLottery if lottery_type == 'ssq' else DLTLottery
recent_draws = self.db.query(model).order_by(
model.open_time.desc()).limit(periods).all()
# 初始化号码频率统计
red_freq = defaultdict(int)
blue_freq = defaultdict(int)
# 统计号码出现频率
for draw in recent_draws:
if lottery_type == 'ssq':
red_numbers = [draw.red_ball_1, draw.red_ball_2, draw.red_ball_3,
draw.red_ball_4, draw.red_ball_5, draw.red_ball_6]
for num in red_numbers:
red_freq[num] += 1
blue_freq[draw.blue_ball] += 1
else:
red_numbers = [draw.front_ball_1, draw.front_ball_2, draw.front_ball_3,
draw.front_ball_4, draw.front_ball_5]
for num in red_numbers:
red_freq[num] += 1
blue_freq[draw.back_ball_1] += 1
blue_freq[draw.back_ball_2] += 1
# 计算平均出现次数
red_avg = sum(red_freq.values()) / len(red_freq)
blue_avg = sum(blue_freq.values()) / len(blue_freq)
# 定义热号和冷号
hot_reds = [num for num, freq in red_freq.items() if freq > red_avg]
cold_reds = [num for num, freq in red_freq.items() if freq < red_avg]
hot_blues = [num for num, freq in blue_freq.items() if freq > blue_avg]
cold_blues = [num for num, freq in blue_freq.items()
if freq < blue_avg]
return {
'hot_reds': sorted(hot_reds),
'cold_reds': sorted(cold_reds),
'hot_blues': sorted(hot_blues),
'cold_blues': sorted(cold_blues),
'red_frequencies': dict(red_freq),
'blue_frequencies': dict(blue_freq)
}
def analyze_number_distribution(self, lottery_type: str, periods: int = 100) -> Dict:
"""分析号码分布(分区统计、奇偶比)
Args:
lottery_type: 彩票类型 ('ssq''dlt')
periods: 分析期数默认100期
Returns:
Dict: 包含各种分布统计的字典
"""
model = SSQLottery if lottery_type == 'ssq' else DLTLottery
recent_draws = self.db.query(model).order_by(
model.open_time.desc()).limit(periods).all()
# 初始化统计数据
zone_stats = defaultdict(int)
odd_count = 0
even_count = 0
total_numbers = 0
for draw in recent_draws:
if lottery_type == 'ssq':
red_numbers = [draw.red_ball_1, draw.red_ball_2, draw.red_ball_3,
draw.red_ball_4, draw.red_ball_5, draw.red_ball_6]
else:
red_numbers = [draw.front_ball_1, draw.front_ball_2, draw.front_ball_3,
draw.front_ball_4, draw.front_ball_5]
# 分区统计
for num in red_numbers:
# 对于双色球每5个数字一个区最后一个区是31-33
# 对于大乐透每5个数字一个区最后一个区是31-35
zone = (num - 1) // 5 + 1
zone_stats[zone] += 1
# 奇偶统计
if num % 2 == 0:
even_count += 1
else:
odd_count += 1
total_numbers += 1
# 计算比例
zone_distribution = {str(zone): count for zone,
count in zone_stats.items()}
odd_even_distribution = {
'odd': odd_count,
'even': even_count
}
return {
'zone_distribution': zone_distribution,
'odd_even_distribution': odd_even_distribution
}
def analyze_consecutive_numbers(self, lottery_type: str, periods: int = 100) -> Dict:
"""分析连号和重复号
Args:
lottery_type: 彩票类型 ('ssq''dlt')
periods: 分析期数默认100期
Returns:
Dict: 包含连号和重复号分析的字典
"""
model = SSQLottery if lottery_type == 'ssq' else DLTLottery
recent_draws = self.db.query(model).order_by(
model.open_time.desc()).limit(periods).all()
consecutive_stats = defaultdict(int)
repeat_stats = defaultdict(int)
prev_numbers = None
for draw in recent_draws:
if lottery_type == 'ssq':
current_numbers = sorted([
draw.red_ball_1, draw.red_ball_2, draw.red_ball_3,
draw.red_ball_4, draw.red_ball_5, draw.red_ball_6
])
else:
current_numbers = sorted([
draw.front_ball_1, draw.front_ball_2, draw.front_ball_3,
draw.front_ball_4, draw.front_ball_5
])
# 分析连号
consecutive_count = 0
for i in range(len(current_numbers)-1):
if current_numbers[i+1] - current_numbers[i] == 1:
consecutive_count += 1
consecutive_stats[consecutive_count] += 1
# 分析重复号
if prev_numbers:
repeat_count = len(set(current_numbers) & set(prev_numbers))
repeat_stats[repeat_count] += 1
prev_numbers = current_numbers
total_draws = len(recent_draws)
return {
'consecutive_distribution': {k: v/total_draws for k, v in consecutive_stats.items()},
'repeat_distribution': {k: v/(total_draws-1) for k, v in repeat_stats.items()}
}
def analyze_mathematical_stats(self, lottery_type: str, periods: int = 100) -> Dict:
"""分析数学统计特征
Args:
lottery_type: 彩票类型 ('ssq''dlt')
periods: 分析期数默认100期
Returns:
Dict: 包含数学统计特征的字典
"""
model = SSQLottery if lottery_type == 'ssq' else DLTLottery
recent_draws = self.db.query(model).order_by(
model.open_time.desc()).limit(periods).all()
sums = []
for draw in recent_draws:
if lottery_type == 'ssq':
red_numbers = [draw.red_ball_1, draw.red_ball_2, draw.red_ball_3,
draw.red_ball_4, draw.red_ball_5, draw.red_ball_6]
else:
red_numbers = [draw.front_ball_1, draw.front_ball_2, draw.front_ball_3,
draw.front_ball_4, draw.front_ball_5]
sums.append(sum(red_numbers))
mean = np.mean(sums)
std = np.std(sums)
return {
'sum_mean': float(mean),
'sum_std': float(std),
'sum_range': {
'min': float(mean - 2*std),
'max': float(mean + 2*std)
}
}
def get_missing_periods(self, lottery_type: str, periods: int = 200) -> Dict:
"""分析号码遗漏值仅分析最近periods期避免全表遍历
Args:
lottery_type: 彩票类型 ('ssq''dlt')
periods: 分析期数默认200期
Returns:
Dict: 包含各号码当前遗漏值的字典
"""
model = SSQLottery if lottery_type == 'ssq' else DLTLottery
max_red = 33 if lottery_type == 'ssq' else 35
max_blue = 16 if lottery_type == 'ssq' else 12
# 一次性查出最近periods期数据
draws = self.db.query(model).order_by(
model.open_time.desc()).limit(periods).all()
# 初始化遗漏值字典
red_missing = {i: 0 for i in range(1, max_red + 1)}
blue_missing = {i: 0 for i in range(1, max_blue + 1)}
# 标记号码是否已出现
red_found = {i: False for i in range(1, max_red + 1)}
blue_found = {i: False for i in range(1, max_blue + 1)}
for idx, draw in enumerate(draws):
if lottery_type == 'ssq':
red_numbers = [draw.red_ball_1, draw.red_ball_2, draw.red_ball_3,
draw.red_ball_4, draw.red_ball_5, draw.red_ball_6]
blue_numbers = [draw.blue_ball]
else:
red_numbers = [draw.front_ball_1, draw.front_ball_2, draw.front_ball_3,
draw.front_ball_4, draw.front_ball_5]
blue_numbers = [draw.back_ball_1, draw.back_ball_2]
# 红球遗漏统计
for i in range(1, max_red + 1):
if not red_found[i]:
if i in red_numbers:
red_found[i] = True
red_missing[i] = idx
# 蓝球遗漏统计
for i in range(1, max_blue + 1):
if not blue_found[i]:
if i in blue_numbers:
blue_found[i] = True
blue_missing[i] = idx
# 未出现过的号码遗漏值为periods
for i in range(1, max_red + 1):
if not red_found[i]:
red_missing[i] = periods
for i in range(1, max_blue + 1):
if not blue_found[i]:
blue_missing[i] = periods
return {
'red_missing': red_missing,
'blue_missing': blue_missing
}
def generate_smart_numbers(self, lottery_type: str, strategy: str = 'balanced', count: int = 1, periods: int = 100) -> List[Dict]:
"""智能选号
Args:
lottery_type: 彩票类型 ('ssq''dlt')
strategy: 选号策略 ('balanced', 'hot', 'cold', 'missing')
count: 生成注数
periods: 分析期数默认100期
Returns:
List[Dict]: 生成的号码列表
"""
import random
# 获取分析数据
hot_cold = self.get_hot_cold_numbers(lottery_type, periods)
distribution = self.analyze_number_distribution(lottery_type, periods)
missing = self.get_missing_periods(lottery_type, periods)
max_red = 33 if lottery_type == 'ssq' else 35
max_blue = 16 if lottery_type == 'ssq' else 12
red_count = 6 if lottery_type == 'ssq' else 5
blue_count = 1 if lottery_type == 'ssq' else 2
results = []
for _ in range(count):
# 根据策略选择红球
if strategy == 'hot':
red_pool = hot_cold['hot_reds']
elif strategy == 'cold':
red_pool = hot_cold['cold_reds']
elif strategy == 'missing':
red_pool = sorted(missing['red_missing'].items(),
key=lambda x: x[1], reverse=True)
red_pool = [num for num, _ in red_pool[:max_red//2]]
else: # balanced
red_pool = list(range(1, max_red + 1))
# 确保红球池不为空且数量足够
if not red_pool or len(red_pool) < red_count:
red_pool = list(range(1, max_red + 1))
# 选择红球
try:
red_numbers = sorted(random.sample(red_pool, red_count))
except ValueError:
# 如果采样失败,使用全范围随机
red_numbers = sorted(random.sample(
range(1, max_red + 1), red_count))
# 选择蓝球
if strategy == 'hot':
blue_pool = hot_cold['hot_blues']
elif strategy == 'cold':
blue_pool = hot_cold['cold_blues']
elif strategy == 'missing':
blue_pool = sorted(missing['blue_missing'].items(),
key=lambda x: x[1], reverse=True)
blue_pool = [num for num, _ in blue_pool[:max_blue//2]]
else: # balanced
blue_pool = list(range(1, max_blue + 1))
# 确保蓝球池不为空且数量足够
if not blue_pool or len(blue_pool) < blue_count:
blue_pool = list(range(1, max_blue + 1))
# 选择蓝球
try:
blue_numbers = sorted(random.sample(blue_pool, blue_count))
except ValueError:
# 如果采样失败,使用全范围随机
blue_numbers = sorted(random.sample(
range(1, max_blue + 1), blue_count))
results.append({
'red_numbers': red_numbers,
'blue_numbers': blue_numbers
})
return results