0

我有一个自定义scorer函数,其输入取决于特定的训练和验证折叠,此外,.predict_survival_function还需要估计器的输出。举一个更具体的例子:

我正在尝试使用综合 Brier 分数(IBS)作为方法GridSearch运行随机生存森林scikit-survival包)。挑战在于 IBS 的域是数据特定的(因此是折叠的),因为它在某些时候依赖于 Kaplan-Meyer 估计。此外,在评分评估步骤期间每次都需要调用该方法,而不仅仅是在结束时调用。scoring.predict_survival_function

看来我设法通过创建以下函数来处理第一个问题:

def IB_time_interval(y_train, y_test):
    y_times_tr = [i[2] for i in y_train]
    y_times_te = [i[2] for i in y_test]

    T1 = np.percentile(y_times_tr, 5, interpolation='higher')
    T2 = np.percentile(y_times_tr, 95, interpolation='lower')
    T3 = np.percentile(y_times_te, 5, interpolation='higher')
    T4 = np.percentile(y_times_te, 95, interpolation='lower')
    
    return np.linspace(np.maximum(T1,T3), np.minimum(T2, T4))

这足够强大,可以在所有折叠中工作。但是,我无法网格搜索阶段检索估计器的预测,因为每次调用自定义记分器函数时,它的未拟合副本似乎都会传递。II 尝试的解决方法是在评分函数中重新拟合估计器,但这不仅在概念上是错误的,而且还会引发错误。

自定义记分器函数如下所示:

def IB_scorer(y_true, y_pred, times=times_linspace, y=y, clf=rsf):
    
    rsf.fit(X_train,y_train) #<--- = conceptually wrong
    survs_test = rsf.predict_survival_function(X_test, return_array=False) #<---
    T1, T2 = survs_test[0].x.min(), survs_test[0].x.max()
    mask2 = np.logical_or(times >= T2, times < T1) # mask outer interval            
    times = times[~mask2]
    #preds has shape (n_y-s, n_times)
    preds_test = np.asarray([[fn(t) for t in times] for fn in survs_test])
    return integrated_brier_score(y, y_true, preds_test, times)

scoring然后我立即创建对象:

trial_IB_scorer = make_scorer(IB_scorer, greater_is_better=False)

有什么建议么?能够将 GridSearch 与更复杂的评分功能一起使用会很棒,尤其是对于生存分析案例!

PS。我将在此处粘贴其余的最小工作代码:

import numpy as np
from sksurv.ensemble import RandomSurvivalForest
from sklearn.model_selection import train_test_split
from sklearn.model_selection import GridSearchCV
from sksurv.metrics import integrated_brier_score


from sksurv.datasets import load_breast_cancer
X, y = load_breast_cancer()
X = X.drop(["er", "grade"], axis=1)
y_cens = np.array([i[0] for i in y]) #censoring status 1 or 0

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size= 0.2,
                                                    shuffle=True,
                                                    random_state=0,
                                                    stratify = y_cens)

param_grid = {
            'max_depth': [4, 20, None],
            'max_features': ["sqrt", None]
            }

rsf = RandomSurvivalForest(n_jobs=1, random_state=0)

times_linspace = IB_time_interval(y_train, y_test)


clf = GridSearchCV(rsf, param_grid, refit=True, n_jobs=1, 
                    scoring=trial_IB_scorer)

clf.fit(X_train, y_train)
print("final score clf", clf.score(X_train, y_train))
print(clf.best_params_)
4

0 回答 0