所以我不认为有一个默认的 scikit-learn 交叉验证器可以实现你想要的,但应该可以创建一个。
我的方法是遍历所有主题并贪婪地将它们分配到测试集中进行折叠,具体取决于分配对折叠大小的改进程度以及折叠中的目标班级率。
我已经生成了一些类似于您的问题的示例数据:
import pandas as pd
import numpy as np
n_subjects = 50
n_observations = 100
n_positives = 15
positive_subjects = np.random.randint(0, n_subjects, n_positives)
data = pd.DataFrame({
'subject': np.random.randint(0, n_subjects, n_observations)
}).assign(
target=lambda d: d['subject'].isin(positive_subjects)
)
subject target
0 14 False
1 12 True
2 10 False
3 36 False
4 21 False
然后我们可以使用以下代码片段进行分配
def target_rate_improvements(data, subjects, extra):
"""Compute the improvement in squared difference between the positive rate in each fold vs the overall positive rate in the dataset"""
target_rate = data['target'].mean()
rate_without_extra = data.loc[lambda d: d['subject'].isin(subjects), 'target'].mean()
rate_with_extra = data.loc[lambda d: d['subject'].isin(subjects + [extra]), 'target'].mean()
rate_without_extra = 0 if np.isnan(rate_without_extra) else rate_without_extra
return (rate_without_extra - target_rate)**2 - (rate_with_extra - target_rate)**2
def size_improvement(data, subjects, n_folds):
"""compute the improvement in squared difference between the number of observations in each fold vs the expected number of observations"""
target_obs_per_fold = len(data) / n_folds
return [(target_obs_per_fold - len(data.loc[lambda d: d['subject'].isin(subject)])) ** 2 for subject in subjects.values()]
n_folds = 5
test_subjects_per_fold = {fold: [] for fold in range(n_folds)}
subjects_to_assign = list(range(100))
for subject in data['subject'].unique():
target_rate_improvement = np.array([target_rate_improvements(data, test_subjects_per_fold[fold], subject) for fold in range(n_folds)])
size_improvements = np.array(size_improvement(data, test_subjects_per_fold, n_folds)) * 0.001
best_fold = np.argmax(target_rate_improvement +size_improvements)
test_subjects_per_fold[best_fold] += [subject]
并验证它是否按我们的预期工作:
for fold, subjects in test_subjects_per_fold.items():
print('-'*80)
print(f'for fold {fold}')
test_data = data.loc[lambda d: d['subject'].isin(subjects)]
train_data = data.loc[lambda d: ~d['subject'].isin(subjects)]
print('train - pos rate:', train_data['target'].mean(), 'size:', len(train_data))
print('test - pos rate:', test_data['target'].mean(), 'size:', len(test_data))
--------------------------------------------------------------------------------
for fold 0
train - pos rate: 0.3 size: 80
test - pos rate: 0.3 size: 20
--------------------------------------------------------------------------------
for fold 1
train - pos rate: 0.3037974683544304 size: 79
test - pos rate: 0.2857142857142857 size: 21
--------------------------------------------------------------------------------
for fold 2
train - pos rate: 0.2962962962962963 size: 81
test - pos rate: 0.3157894736842105 size: 19
--------------------------------------------------------------------------------
for fold 3
train - pos rate: 0.3 size: 80
test - pos rate: 0.3 size: 20
--------------------------------------------------------------------------------
for fold 4
train - pos rate: 0.3 size: 80
test - pos rate: 0.3 size: 20
变量命名可以在这里和那里改进,但总的来说我会说这种方法可以解决你的问题。
在 scikit-learn 兼容的交叉验证器中实现这一点看起来像这样,尽管它需要更多的重新设计。
class StratifiedGroupKFold(_BaseKFold):
...
def _iter_test_indices(self, X, y, groups):
test_subjects_per_fold = {fold: [] for fold in range(n_folds)}
for subject in data['subject'].unique():
target_rate_improvement = np.array([self.target_rate_improvements(X, y, test_subjects_per_fold[fold], subject) for fold in range(self.n_folds)])
size_improvements = np.array(self.size_improvement(X, y, test_subjects_per_fold, self.n_folds)) * 0.001
best_fold = np.argmax(target_rate_improvement +size_improvements)
test_subjects_per_fold[best_fold] += [subject]
for subjects in test_subjects_per_fold.values():
yield data['subject'].isin(subjects)], ~data['subject'].isin(subjects)]