350 lines
13 KiB
Python
350 lines
13 KiB
Python
from typing import List, Dict, Tuple, Optional
|
||
import numpy as np
|
||
from collections import defaultdict
|
||
from datetime import datetime, timedelta
|
||
from sqlalchemy.orm import Session
|
||
from ..models.lottery import SSQLottery, DLTLottery
|
||
|
||
|
||
class LotteryAnalysisService:
|
||
def __init__(self, db: Session):
|
||
self.db = db
|
||
|
||
def get_hot_cold_numbers(self, lottery_type: str, periods: int = 50) -> Dict:
|
||
"""分析热号和冷号
|
||
|
||
Args:
|
||
lottery_type: 彩票类型 ('ssq' 或 'dlt')
|
||
periods: 分析期数,默认50期
|
||
|
||
Returns:
|
||
Dict: 包含热号和冷号的字典
|
||
"""
|
||
model = SSQLottery if lottery_type == 'ssq' else DLTLottery
|
||
recent_draws = self.db.query(model).order_by(
|
||
model.open_time.desc()).limit(periods).all()
|
||
|
||
# 初始化号码频率统计
|
||
red_freq = defaultdict(int)
|
||
blue_freq = defaultdict(int)
|
||
|
||
# 统计号码出现频率
|
||
for draw in recent_draws:
|
||
if lottery_type == 'ssq':
|
||
red_numbers = [draw.red_ball_1, draw.red_ball_2, draw.red_ball_3,
|
||
draw.red_ball_4, draw.red_ball_5, draw.red_ball_6]
|
||
for num in red_numbers:
|
||
red_freq[num] += 1
|
||
blue_freq[draw.blue_ball] += 1
|
||
else:
|
||
red_numbers = [draw.front_ball_1, draw.front_ball_2, draw.front_ball_3,
|
||
draw.front_ball_4, draw.front_ball_5]
|
||
for num in red_numbers:
|
||
red_freq[num] += 1
|
||
blue_freq[draw.back_ball_1] += 1
|
||
blue_freq[draw.back_ball_2] += 1
|
||
|
||
# 计算平均出现次数
|
||
red_avg = sum(red_freq.values()) / len(red_freq)
|
||
blue_avg = sum(blue_freq.values()) / len(blue_freq)
|
||
|
||
# 定义热号和冷号
|
||
hot_reds = [num for num, freq in red_freq.items() if freq > red_avg]
|
||
cold_reds = [num for num, freq in red_freq.items() if freq < red_avg]
|
||
hot_blues = [num for num, freq in blue_freq.items() if freq > blue_avg]
|
||
cold_blues = [num for num, freq in blue_freq.items()
|
||
if freq < blue_avg]
|
||
|
||
return {
|
||
'hot_reds': sorted(hot_reds),
|
||
'cold_reds': sorted(cold_reds),
|
||
'hot_blues': sorted(hot_blues),
|
||
'cold_blues': sorted(cold_blues),
|
||
'red_frequencies': dict(red_freq),
|
||
'blue_frequencies': dict(blue_freq)
|
||
}
|
||
|
||
def analyze_number_distribution(self, lottery_type: str, periods: int = 100) -> Dict:
|
||
"""分析号码分布(分区统计、奇偶比)
|
||
|
||
Args:
|
||
lottery_type: 彩票类型 ('ssq' 或 'dlt')
|
||
periods: 分析期数,默认100期
|
||
|
||
Returns:
|
||
Dict: 包含各种分布统计的字典
|
||
"""
|
||
model = SSQLottery if lottery_type == 'ssq' else DLTLottery
|
||
recent_draws = self.db.query(model).order_by(
|
||
model.open_time.desc()).limit(periods).all()
|
||
|
||
# 初始化统计数据
|
||
zone_stats = defaultdict(int)
|
||
odd_count = 0
|
||
even_count = 0
|
||
total_numbers = 0
|
||
|
||
for draw in recent_draws:
|
||
if lottery_type == 'ssq':
|
||
red_numbers = [draw.red_ball_1, draw.red_ball_2, draw.red_ball_3,
|
||
draw.red_ball_4, draw.red_ball_5, draw.red_ball_6]
|
||
else:
|
||
red_numbers = [draw.front_ball_1, draw.front_ball_2, draw.front_ball_3,
|
||
draw.front_ball_4, draw.front_ball_5]
|
||
|
||
# 分区统计
|
||
for num in red_numbers:
|
||
# 对于双色球:每5个数字一个区,最后一个区是31-33
|
||
# 对于大乐透:每5个数字一个区,最后一个区是31-35
|
||
zone = (num - 1) // 5 + 1
|
||
zone_stats[zone] += 1
|
||
|
||
# 奇偶统计
|
||
if num % 2 == 0:
|
||
even_count += 1
|
||
else:
|
||
odd_count += 1
|
||
total_numbers += 1
|
||
|
||
# 计算比例
|
||
zone_distribution = {str(zone): count for zone,
|
||
count in zone_stats.items()}
|
||
odd_even_distribution = {
|
||
'odd': odd_count,
|
||
'even': even_count
|
||
}
|
||
|
||
return {
|
||
'zone_distribution': zone_distribution,
|
||
'odd_even_distribution': odd_even_distribution
|
||
}
|
||
|
||
def analyze_consecutive_numbers(self, lottery_type: str, periods: int = 100) -> Dict:
|
||
"""分析连号和重复号
|
||
|
||
Args:
|
||
lottery_type: 彩票类型 ('ssq' 或 'dlt')
|
||
periods: 分析期数,默认100期
|
||
|
||
Returns:
|
||
Dict: 包含连号和重复号分析的字典
|
||
"""
|
||
model = SSQLottery if lottery_type == 'ssq' else DLTLottery
|
||
recent_draws = self.db.query(model).order_by(
|
||
model.open_time.desc()).limit(periods).all()
|
||
|
||
consecutive_stats = defaultdict(int)
|
||
repeat_stats = defaultdict(int)
|
||
|
||
prev_numbers = None
|
||
for draw in recent_draws:
|
||
if lottery_type == 'ssq':
|
||
current_numbers = sorted([
|
||
draw.red_ball_1, draw.red_ball_2, draw.red_ball_3,
|
||
draw.red_ball_4, draw.red_ball_5, draw.red_ball_6
|
||
])
|
||
else:
|
||
current_numbers = sorted([
|
||
draw.front_ball_1, draw.front_ball_2, draw.front_ball_3,
|
||
draw.front_ball_4, draw.front_ball_5
|
||
])
|
||
|
||
# 分析连号
|
||
consecutive_count = 0
|
||
for i in range(len(current_numbers)-1):
|
||
if current_numbers[i+1] - current_numbers[i] == 1:
|
||
consecutive_count += 1
|
||
consecutive_stats[consecutive_count] += 1
|
||
|
||
# 分析重复号
|
||
if prev_numbers:
|
||
repeat_count = len(set(current_numbers) & set(prev_numbers))
|
||
repeat_stats[repeat_count] += 1
|
||
|
||
prev_numbers = current_numbers
|
||
|
||
total_draws = len(recent_draws)
|
||
return {
|
||
'consecutive_distribution': {k: v/total_draws for k, v in consecutive_stats.items()},
|
||
'repeat_distribution': {k: v/(total_draws-1) for k, v in repeat_stats.items()}
|
||
}
|
||
|
||
def analyze_mathematical_stats(self, lottery_type: str, periods: int = 100) -> Dict:
|
||
"""分析数学统计特征
|
||
|
||
Args:
|
||
lottery_type: 彩票类型 ('ssq' 或 'dlt')
|
||
periods: 分析期数,默认100期
|
||
|
||
Returns:
|
||
Dict: 包含数学统计特征的字典
|
||
"""
|
||
model = SSQLottery if lottery_type == 'ssq' else DLTLottery
|
||
recent_draws = self.db.query(model).order_by(
|
||
model.open_time.desc()).limit(periods).all()
|
||
|
||
sums = []
|
||
for draw in recent_draws:
|
||
if lottery_type == 'ssq':
|
||
red_numbers = [draw.red_ball_1, draw.red_ball_2, draw.red_ball_3,
|
||
draw.red_ball_4, draw.red_ball_5, draw.red_ball_6]
|
||
else:
|
||
red_numbers = [draw.front_ball_1, draw.front_ball_2, draw.front_ball_3,
|
||
draw.front_ball_4, draw.front_ball_5]
|
||
sums.append(sum(red_numbers))
|
||
|
||
mean = np.mean(sums)
|
||
std = np.std(sums)
|
||
|
||
return {
|
||
'sum_mean': float(mean),
|
||
'sum_std': float(std),
|
||
'sum_range': {
|
||
'min': float(mean - 2*std),
|
||
'max': float(mean + 2*std)
|
||
}
|
||
}
|
||
|
||
def get_missing_periods(self, lottery_type: str, periods: int = 200) -> Dict:
|
||
"""分析号码遗漏值(仅分析最近periods期,避免全表遍历)
|
||
|
||
Args:
|
||
lottery_type: 彩票类型 ('ssq' 或 'dlt')
|
||
periods: 分析期数,默认200期
|
||
|
||
Returns:
|
||
Dict: 包含各号码当前遗漏值的字典
|
||
"""
|
||
model = SSQLottery if lottery_type == 'ssq' else DLTLottery
|
||
max_red = 33 if lottery_type == 'ssq' else 35
|
||
max_blue = 16 if lottery_type == 'ssq' else 12
|
||
|
||
# 一次性查出最近periods期数据
|
||
draws = self.db.query(model).order_by(
|
||
model.open_time.desc()).limit(periods).all()
|
||
|
||
# 初始化遗漏值字典
|
||
red_missing = {i: 0 for i in range(1, max_red + 1)}
|
||
blue_missing = {i: 0 for i in range(1, max_blue + 1)}
|
||
|
||
# 标记号码是否已出现
|
||
red_found = {i: False for i in range(1, max_red + 1)}
|
||
blue_found = {i: False for i in range(1, max_blue + 1)}
|
||
|
||
for idx, draw in enumerate(draws):
|
||
if lottery_type == 'ssq':
|
||
red_numbers = [draw.red_ball_1, draw.red_ball_2, draw.red_ball_3,
|
||
draw.red_ball_4, draw.red_ball_5, draw.red_ball_6]
|
||
blue_numbers = [draw.blue_ball]
|
||
else:
|
||
red_numbers = [draw.front_ball_1, draw.front_ball_2, draw.front_ball_3,
|
||
draw.front_ball_4, draw.front_ball_5]
|
||
blue_numbers = [draw.back_ball_1, draw.back_ball_2]
|
||
|
||
# 红球遗漏统计
|
||
for i in range(1, max_red + 1):
|
||
if not red_found[i]:
|
||
if i in red_numbers:
|
||
red_found[i] = True
|
||
red_missing[i] = idx
|
||
# 蓝球遗漏统计
|
||
for i in range(1, max_blue + 1):
|
||
if not blue_found[i]:
|
||
if i in blue_numbers:
|
||
blue_found[i] = True
|
||
blue_missing[i] = idx
|
||
|
||
# 未出现过的号码,遗漏值为periods
|
||
for i in range(1, max_red + 1):
|
||
if not red_found[i]:
|
||
red_missing[i] = periods
|
||
for i in range(1, max_blue + 1):
|
||
if not blue_found[i]:
|
||
blue_missing[i] = periods
|
||
|
||
return {
|
||
'red_missing': red_missing,
|
||
'blue_missing': blue_missing
|
||
}
|
||
|
||
def generate_smart_numbers(self, lottery_type: str, strategy: str = 'balanced', count: int = 1, periods: int = 100) -> List[Dict]:
|
||
"""智能选号
|
||
|
||
Args:
|
||
lottery_type: 彩票类型 ('ssq' 或 'dlt')
|
||
strategy: 选号策略 ('balanced', 'hot', 'cold', 'missing')
|
||
count: 生成注数
|
||
periods: 分析期数,默认100期
|
||
|
||
Returns:
|
||
List[Dict]: 生成的号码列表
|
||
"""
|
||
import random
|
||
|
||
# 获取分析数据
|
||
hot_cold = self.get_hot_cold_numbers(lottery_type, periods)
|
||
distribution = self.analyze_number_distribution(lottery_type, periods)
|
||
missing = self.get_missing_periods(lottery_type, periods)
|
||
|
||
max_red = 33 if lottery_type == 'ssq' else 35
|
||
max_blue = 16 if lottery_type == 'ssq' else 12
|
||
red_count = 6 if lottery_type == 'ssq' else 5
|
||
blue_count = 1 if lottery_type == 'ssq' else 2
|
||
|
||
results = []
|
||
for _ in range(count):
|
||
# 根据策略选择红球
|
||
if strategy == 'hot':
|
||
red_pool = hot_cold['hot_reds']
|
||
elif strategy == 'cold':
|
||
red_pool = hot_cold['cold_reds']
|
||
elif strategy == 'missing':
|
||
red_pool = sorted(missing['red_missing'].items(),
|
||
key=lambda x: x[1], reverse=True)
|
||
red_pool = [num for num, _ in red_pool[:max_red//2]]
|
||
else: # balanced
|
||
red_pool = list(range(1, max_red + 1))
|
||
|
||
# 确保红球池不为空且数量足够
|
||
if not red_pool or len(red_pool) < red_count:
|
||
red_pool = list(range(1, max_red + 1))
|
||
|
||
# 选择红球
|
||
try:
|
||
red_numbers = sorted(random.sample(red_pool, red_count))
|
||
except ValueError:
|
||
# 如果采样失败,使用全范围随机
|
||
red_numbers = sorted(random.sample(
|
||
range(1, max_red + 1), red_count))
|
||
|
||
# 选择蓝球
|
||
if strategy == 'hot':
|
||
blue_pool = hot_cold['hot_blues']
|
||
elif strategy == 'cold':
|
||
blue_pool = hot_cold['cold_blues']
|
||
elif strategy == 'missing':
|
||
blue_pool = sorted(missing['blue_missing'].items(),
|
||
key=lambda x: x[1], reverse=True)
|
||
blue_pool = [num for num, _ in blue_pool[:max_blue//2]]
|
||
else: # balanced
|
||
blue_pool = list(range(1, max_blue + 1))
|
||
|
||
# 确保蓝球池不为空且数量足够
|
||
if not blue_pool or len(blue_pool) < blue_count:
|
||
blue_pool = list(range(1, max_blue + 1))
|
||
|
||
# 选择蓝球
|
||
try:
|
||
blue_numbers = sorted(random.sample(blue_pool, blue_count))
|
||
except ValueError:
|
||
# 如果采样失败,使用全范围随机
|
||
blue_numbers = sorted(random.sample(
|
||
range(1, max_blue + 1), blue_count))
|
||
|
||
results.append({
|
||
'red_numbers': red_numbers,
|
||
'blue_numbers': blue_numbers
|
||
})
|
||
|
||
return results
|