
为了解决二进制分类任务,我有 100 个样本,每个样本都有唯一的 ID、主题和标签(1 或 0)。



我可以使用 sklearn 中的 gropKfold 简单地做到这一点,但我希望我的数据是平衡的(或至少接近平衡)


n_shuffles = 2
group_k_fold = GroupKFold(n_splits=5)

        for i in range(n_shuffles):
            X_shuffled, y_shuffled, groups_shuffled = shuffle(idx, labels, subjects, random_state=i)
            splits = group_k_fold.split(X_shuffled, y_shuffled, groups_shuffled)

            for train_idx, val_idx in splits:     
                X = perezDataFrame.loc[perezDataFrame['ID'].isin(X_shuffled[train_idx]),AU_names].values
                X = preprocessing.normalize(X, norm='l2')
                y = perezDataFrame.loc[perezDataFrame['ID'].isin(X_shuffled[train_idx]),'label'].values

                XTest = perezDataFrame.loc[perezDataFrame['ID'].isin(X_shuffled[val_idx]),AU_names].values
                XTest = preprocessing.normalize(XTest, norm='l2')
                yTest = perezDataFrame.loc[perezDataFrame['ID'].isin(X_shuffled[val_idx]),'label'].values

其中 idx、主题和标签分别是 ID、主题和标签的列表。



for i in range(5):
    GSP = GroupShuffleSplit(n_splits =10, test_size =0.20, train_size=0.80 ,random_state=i)
    splits = GSP.split(idx, labels, subjects)
    for train_idx, test_idx in splits:

但这不是 Kfold,所以我不能保证相同的样本只保留一个折叠。


1 回答 1


所以我不认为有一个默认的 scikit-learn 交叉验证器可以实现你想要的,但应该可以创建一个。



import pandas as pd
import numpy as np

n_subjects = 50
n_observations = 100
n_positives = 15

positive_subjects = np.random.randint(0, n_subjects, n_positives)
data = pd.DataFrame({
    'subject': np.random.randint(0, n_subjects, n_observations)
    target=lambda d: d['subject'].isin(positive_subjects)

subject target
0   14  False
1   12  True
2   10  False
3   36  False
4   21  False


def target_rate_improvements(data, subjects, extra):
    """Compute the improvement in squared difference between the positive rate in each fold vs the overall positive rate in the dataset"""
    target_rate = data['target'].mean()
    rate_without_extra = data.loc[lambda d: d['subject'].isin(subjects), 'target'].mean()
    rate_with_extra =  data.loc[lambda d: d['subject'].isin(subjects + [extra]), 'target'].mean()
    rate_without_extra = 0 if np.isnan(rate_without_extra) else rate_without_extra

    return (rate_without_extra - target_rate)**2 - (rate_with_extra - target_rate)**2

def size_improvement(data, subjects, n_folds):
    """compute the improvement in squared difference between the number of observations in each fold vs the expected number of observations"""
    target_obs_per_fold = len(data) / n_folds

    return [(target_obs_per_fold - len(data.loc[lambda d: d['subject'].isin(subject)])) ** 2 for subject in subjects.values()]

n_folds = 5
test_subjects_per_fold = {fold: [] for fold in range(n_folds)}
subjects_to_assign = list(range(100))

for subject in data['subject'].unique():

    target_rate_improvement = np.array([target_rate_improvements(data, test_subjects_per_fold[fold], subject) for fold in range(n_folds)])  
    size_improvements = np.array(size_improvement(data, test_subjects_per_fold, n_folds)) * 0.001
    best_fold = np.argmax(target_rate_improvement +size_improvements)
    test_subjects_per_fold[best_fold] += [subject]


for fold, subjects in test_subjects_per_fold.items():
    print(f'for fold {fold}')
    test_data = data.loc[lambda d: d['subject'].isin(subjects)]
    train_data = data.loc[lambda d: ~d['subject'].isin(subjects)]

    print('train - pos rate:', train_data['target'].mean(), 'size:', len(train_data))
    print('test - pos rate:', test_data['target'].mean(), 'size:', len(test_data))

for fold 0
train - pos rate: 0.3 size: 80
test - pos rate: 0.3 size: 20
for fold 1
train - pos rate: 0.3037974683544304 size: 79
test - pos rate: 0.2857142857142857 size: 21
for fold 2
train - pos rate: 0.2962962962962963 size: 81
test - pos rate: 0.3157894736842105 size: 19
for fold 3
train - pos rate: 0.3 size: 80
test - pos rate: 0.3 size: 20
for fold 4
train - pos rate: 0.3 size: 80
test - pos rate: 0.3 size: 20


在 scikit-learn 兼容的交叉验证器中实现这一点看起来像这样,尽管它需要更多的重新设计。

class StratifiedGroupKFold(_BaseKFold):


    def _iter_test_indices(self, X, y, groups):
        test_subjects_per_fold = {fold: [] for fold in range(n_folds)}

        for subject in data['subject'].unique():

            target_rate_improvement = np.array([self.target_rate_improvements(X, y, test_subjects_per_fold[fold], subject) for fold in range(self.n_folds)])  
            size_improvements = np.array(self.size_improvement(X, y, test_subjects_per_fold, self.n_folds)) * 0.001
            best_fold = np.argmax(target_rate_improvement +size_improvements)
            test_subjects_per_fold[best_fold] += [subject]

        for subjects in test_subjects_per_fold.values():
            yield data['subject'].isin(subjects)], ~data['subject'].isin(subjects)]
于 2020-01-01T12:18:33.243 回答