2025-11-05-基于随机加权推断模型(IM)的参数估计算法

First Post:

Last Update:

Page View: loading...

基于随机加权推断模型(IM)的参数估计方法

不可观测的泊松过程 ,观测值 和辅助变量 ,目标是估计参数
m?

现提出基于随机加权推断模型(Inferential Model, IM)的算法通过引入随机权重处理离散分布,来构造置信区间

方法概述

随机加权 IM 方法的核心是通过引入均匀随机变量和权重,将离散泊松分布连续化,从而逆解参数
该方法假设 已知算法分为四个步骤:建立关联模型、引入随机权重连续化、逆解参数、模拟验证覆盖率

算法步骤

1: 建立关联模型

对观测值 ,其分布函数为 ,其中
引入辅助变量 ,建立关联:

对观测值 ,其分布函数为 ,引入辅助变量 ,建立关联:

2: 引入随机权重进行连续化

引入随机权重 构造方程:

是关于参数的函数,且由于泊松分布函的单调性,逆函数 存在

3: 逆解参数,使用 brentq 函数进行二分法求根

从上述方程解出

推导参数:

得到 的表达式

4: 验证覆盖率

  • 模拟目标:检查构造的置信区间覆盖真实 的概率是否接近 95%

  • 步骤:

生成观测数据:从分布生成一对观测值

对于固定 ,生成大量样本(如 个)的

计算 的候选值:对于每个样本,计算

构造置信区间:从 10000 个 q 值中取分位数,得到置信区间

检查覆盖率:重复步骤 2-5 多次( 次),每次生成新的 ,计算区间覆盖真实 的比例覆盖率

算法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
import numpy as np
import pandas as pd
from scipy.stats import poisson, gamma
from scipy.optimize import brentq
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')


# 设置全局字体为 SimHei (黑体) 或其他中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

class PoissonSignalInference:
"""
基于随机加权IM方法的泊松信号推断类
用于估计背景率b、信号率q和尺度参数m
"""

def __init__(self, b_true=None, q_true=None, m_true=None):
"""
初始化真实参数值
"""
self.b_true = b_true
self.q_true = q_true
self.m_true = m_true
self.STA_true = b_true + q_true if b_true is not None and q_true is not None else None

def generate_data(self, n_samples=1):
"""

生成观测数据Y和W

当前有不可观测的泊松过程 B ∼ Poisson(b) 和 S ∼ Poisson(q)
可观测对象有 Y = B + S ∼ Poisson(b + q) 和辅助变量 W ∼ Poisson(m · b)
目标是估计参数 b、q 和可能的 m

"""
if self.b_true is None or self.q_true is None or self.m_true is None:
raise ValueError("必须先设置真实参数值")

if self.STA_true is None:
self.STA_true = self.b_true + self.q_true

Y = np.random.poisson(self.STA_true, n_samples)
W = np.random.poisson(self.m_true * self.b_true, n_samples)

return Y, W

def poisson_cdf(self, k, theta):
"""
计算泊松分布的累积分布函数
"""
return poisson.cdf(k, theta) #泊松分布的CDF公式,参数为 k 和 theta

def solve_G_inverse(self, u, Y, w1, bracket_low=0, bracket_high=100):
"""
求解方程 G(STA) = u 的逆函数,得到STA
使用二分法求解 G(STA): w_1 F_{STA}(Y-1) + (1-w_1) F_{STA}(Y) = u

计算泊松混合分布的累积分布函数值与目标值的差值

该函数计算一个泊松混合分布的CDF值,该混合分布由两个泊松分布组成,权重分别为w1和(1-w1),
然后减去目标值u,通常用于求解分位数或进行概率计算
参数:
Y (int or array-like): 观测值,泊松分布的计数变量
STA (float or array-like): 泊松分布的参数λ(均值)
w1 (float): 第一个泊松分布的权重,取值范围[0,1]
u (float): 目标概率值,用于比较或求解
返回:
"""
def equation(STA): #返回值:(float or array-like): 泊松混合分布的CDF值与目标值u的差值
if Y == 0:
# 当Y=0时,F_STA(-1)=0,方程简化为F_STA(0)=u
return self.poisson_cdf(0, STA) - u
else:
return w1 * self.poisson_cdf(Y-1, STA) + (1-w1) * self.poisson_cdf(Y, STA) - u
# 求加权泊松混合分布
# 第一项:权重w1乘以P(X <= Y-1)的概率
# 第二项:权重(1-w1)乘以P(X <= Y)的概率
# 最后减去目标值u

try:
"""
使用二分法求解 G(STA) = u 方程中的 STA 值
"""
# 设置上下界来初始化搜索区间
low = max(0.00001, bracket_low) #bracket_low下界()
high = max(10 * Y, bracket_high) if Y > 0 else bracket_high #bracket_high上界,根据观测值 Y 调整上界

# 检查边界值
f_low = equation(low)
f_high = equation(high)


if f_low * f_high > 0:
# 如果 f_low * f_high > 0(同号),则自动扩大搜索区间
attempts = 0
while f_low * f_high > 0 and attempts < 10:
high *= 2 #最多尝试10次,每次将上界翻倍:high *= 2
f_high = equation(high)
attempts += 1

STA_solution = brentq(equation, low, high, xtol=1e-6)
"""
参数为要求解的方程(加权泊松混合分布),上下区间,精度
使用 brentq 函数进行二分法求根
求解 w1 * self.poisson_cdf(Y-1, STA) + (1-w1) * self.poisson_cdf(Y, STA) - u = 0

即求解方程 G(STA) = u
得到STA

"""

return STA_solution #返回求解得到的 STA 值
except (ValueError, RuntimeError):
# 如果求解失败,返回保守估计
print("ERR solve_G_inverse 求解失败,返回保守估计")
return max(0.001, Y)

def solve_H_inverse(self, v, W, w2, m, bracket_low=0, bracket_high=100):
"""
求解方程 H(mb) = v 的逆函数,得到mb

H(mb): w_2 F_{mb}(W-1) + (1-w_2) F_{mb}(W) = v

"""
def equation(mb):
if W == 0:
return self.poisson_cdf(0, mb) - v
else:
return w2 * self.poisson_cdf(W-1, mb) + (1-w2) * self.poisson_cdf(W, mb) - v

try:
low = max(0.001, bracket_low)
high = max(10 * W, bracket_high) if W > 0 else bracket_high

f_low = equation(low)
f_high = equation(high)

if f_low * f_high > 0:
attempts = 0
while f_low * f_high > 0 and attempts < 10:
high *= 2
f_high = equation(high)
attempts += 1

mb_solution = brentq(equation, low, high, xtol=1e-6)
return mb_solution
except (ValueError, RuntimeError):
return max(0.001, W)

def estimate_parameters_IM(self, Y_obs, W_obs, m_known=True, n_inner_samples=10000):
"""
使用随机加权IM方法估计三个参数q、b和STA
Y_obs:观测值Y
W_obs:观测值W
m_known:一个布尔值,表示参数m是否已知
n_inner_samples:内部采样次数,默认为10000

"""
if not m_known:
raise NotImplementedError("m未知")

# 创建三个列表存储候选参数值
q_candidates = []
b_candidates = []
STA_candidates = []

for _ in range(n_inner_samples):
# 生成4个均匀分布的随机权重(u, v, w1, w2)
u = np.random.uniform(0, 1)
v = np.random.uniform(0, 1)
w1 = np.random.uniform(0, 1)
w2 = np.random.uniform(0, 1)

# 使用solve_G_inverse和solve_H_inverse方法计算STA和mb的估计值
STA_est = self.solve_G_inverse(u, Y_obs, w1)
mb_est = self.solve_H_inverse(v, W_obs, w2, self.m_true)
# G(STA): w_1 F_{STA}(Y-1) + (1-w_1) F_{STA}(Y) = u ==> 求解STA
# H(mb): w_2 F_{mb}(W-1) + (1-w_2) F_{mb}(W) = v ==> 求解mb
# 可以得到
# STA = G⁻¹(u), mb = H⁻¹(v)
# 计算b和q
# b = H⁻¹(v)/m
# q = STA - b = G⁻¹(u) - H⁻¹(v)/m
if self.m_true is None:
raise ValueError("参数 m_true 未设置,无法计算 b_est")
b_est = mb_est / self.m_true
q_est = STA_est - b_est

# 确保非负
q_est = max(0, q_est)
b_est = max(0, b_est)

#每次计算出新的 q_est 值后,会将其添加到 q_candidates 列表中
q_candidates.append(q_est)
b_candidates.append(b_est)
STA_candidates.append(STA_est)

return {
'q_candidates': np.array(q_candidates),
'b_candidates': np.array(b_candidates),
'STA_candidates': np.array(STA_candidates)
}

def calculate_confidence_intervals(self, candidates_dict, alpha=0.05):
"""
计算参数的置信区间
"""
intervals = {}

for param, values in candidates_dict.items():
# 移除'_candidates'后缀,使键名简化
simple_param = param.replace('_candidates', '')
lower = np.quantile(values, alpha/2)
upper = np.quantile(values, 1 - alpha/2)
intervals[simple_param] = (lower, upper)

return intervals

def coverage_simulation(self, n_outer_trials=1000, n_inner_samples=1000, alpha=0.05):
"""
进行覆盖率模拟验证
"""
coverage_results = {
'q': 0.0,
'b': 0.0,
'STA': 0.0
}

interval_lengths = {
'q': [],
'b': [],
'STA': []
}

print("正在进行覆盖率模拟...")
for i in tqdm(range(n_outer_trials)):
# 生成新的观测数据
Y_obs, W_obs = self.generate_data(1)
Y_obs, W_obs = Y_obs[0], W_obs[0]

# 估计参数
candidates_dict = self.estimate_parameters_IM(Y_obs, W_obs,
n_inner_samples=n_inner_samples)

# 计算置信区间
intervals = self.calculate_confidence_intervals(candidates_dict, alpha)

# 检查覆盖率
if intervals['q'][0] <= self.q_true <= intervals['q'][1]:
coverage_results['q'] += 1
if intervals['b'][0] <= self.b_true <= intervals['b'][1]:
coverage_results['b'] += 1
if intervals['STA'][0] <= self.STA_true <= intervals['STA'][1]:
coverage_results['STA'] += 1

# 记录区间长度
interval_lengths['q'].append(intervals['q'][1] - intervals['q'][0])
interval_lengths['b'].append(intervals['b'][1] - intervals['b'][0])
interval_lengths['STA'].append(intervals['STA'][1] - intervals['STA'][0])

# 计算覆盖率
for param in coverage_results:
coverage_results[param] = coverage_results[param] / n_outer_trials

return coverage_results, interval_lengths

def plot_results(self, candidates_dict, intervals, Y_obs, W_obs):
"""
可视化结果
"""
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# q的分布
axes[0, 0].hist(candidates_dict['q_candidates'], bins=50, alpha=0.7, density=True)
axes[0, 0].axvline(self.q_true, color='red', linestyle='--', linewidth=2, label=f'真实 q = {self.q_true}')
axes[0, 0].axvline(intervals['q'][0], color='green', linestyle='--', linewidth=1, label=f'{100*(1-0.05)}% CI')
axes[0, 0].axvline(intervals['q'][1], color='green', linestyle='--', linewidth=1)
axes[0, 0].set_xlabel('q')
axes[0, 0].set_ylabel('密度')
axes[0, 0].set_title('q的后验分布')
axes[0, 0].legend()

# b的分布
axes[0, 1].hist(candidates_dict['b_candidates'], bins=50, alpha=0.7, density=True)
axes[0, 1].axvline(self.b_true, color='red', linestyle='--', linewidth=2, label=f'真实 b = {self.b_true}')
axes[0, 1].axvline(intervals['b'][0], color='green', linestyle='--', linewidth=1, label=f'{100*(1-0.05)}% CI')
axes[0, 1].axvline(intervals['b'][1], color='green', linestyle='--', linewidth=1)
axes[0, 1].set_xlabel('b')
axes[0, 1].set_ylabel('密度')
axes[0, 1].set_title('b的后验分布')
axes[0, 1].legend()

# STA的分布
axes[1, 0].hist(candidates_dict['STA_candidates'], bins=50, alpha=0.7, density=True)
axes[1, 0].axvline(self.STA_true, color='red', linestyle='--', linewidth=2, label=f'真实 STA = {self.STA_true}')
axes[1, 0].axvline(intervals['STA'][0], color='green', linestyle='--', linewidth=1, label=f'{100*(1-0.05)}% CI')
axes[1, 0].axvline(intervals['STA'][1], color='green', linestyle='--', linewidth=1)
axes[1, 0].set_xlabel('STA')
axes[1, 0].set_ylabel('密度')
axes[1, 0].set_title('STA的后验分布')
axes[1, 0].legend()

# 观测数据信息
axes[1, 1].axis('off')
text_str = f'观测数据:\nY = {Y_obs}\nW = {W_obs}\n\n真实参数:\nb = {self.b_true}\nq = {self.q_true}\nm = {self.m_true}\nSTA = {self.STA_true}'
axes[1, 1].text(0.1, 0.9, text_str, transform=axes[1, 1].transAxes, fontsize=12,
verticalalignment='top', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))

plt.tight_layout()
plt.show()

def main():
"""
主函数:演示完整的推断流程
"""
# 设置真实参数
b_true = 3.0 # b
q_true = 2.0 # q
m_true = 0.8 # 尺度参数(已知)
STA_true = b_true + q_true

print("=== 泊松推断演示 ===")
print(f"真实参数: b={b_true}, q={q_true}, m={m_true}, STA={STA_true}")

# 初始化推断器
inference = PoissonSignalInference(b_true, q_true, m_true)

# 生成单次观测数据
Y_obs, W_obs = inference.generate_data(1)
Y_obs, W_obs = Y_obs[0], W_obs[0]
print(f"\n生成的观测数据: Y={Y_obs}, W={W_obs}")

# 使用IM方法进行参数估计
print("\n正在进行参数估计...")
candidates_dict = inference.estimate_parameters_IM(Y_obs, W_obs, n_inner_samples=10000)

# 计算置信区间
alpha = 0.05 # 0.95置信区间
intervals = inference.calculate_confidence_intervals(candidates_dict, alpha)

print("\n=== 估计结果 ===")
for param, (lower, upper) in intervals.items():
true_value = getattr(inference, f"{param.replace('_candidates', '')}_true", None)
if true_value is not None:
print(f"{param}: [{lower:.3f}, {upper:.3f}] (真实值: {true_value:.3f})")
else:
print(f"{param}: [{lower:.3f}, {upper:.3f}]")

# 可视化结果
print("\n生成可视化图表...")
inference.plot_results(candidates_dict, intervals, Y_obs, W_obs)

# 进行覆盖率模拟(样本量较小以加快演示速度)
print("\n正在进行覆盖率模拟...")
coverage_results, interval_lengths = inference.coverage_simulation(
n_outer_trials=500, # 减少试验次数以加快演示
n_inner_samples=1000, # 减少内部样本数
alpha=0.05
)

print("\n=== 覆盖率结果 ===")
for param, coverage in coverage_results.items():
print(f"{param}的覆盖率: {coverage:.3f} (目标: {1-alpha:.3f})")

# 绘制区间长度分布
plt.figure(figsize=(12, 4))
for i, (param, lengths) in enumerate(interval_lengths.items()):
plt.subplot(1, 3, i+1)
plt.hist(lengths, bins=30, alpha=0.7)
plt.xlabel(f'{param}区间长度')
plt.ylabel('频数')
plt.title(f'{param}置信区间长度分布')
plt.tight_layout()
plt.show()
if __name__ == "__main__":
main()