1

数据 -

我拥有的训练和测试数据的大小非常大~ 150gb并且高度不平衡以及99% 的否定标签/ 1% 的 pos标签,而且我不能下采样,因为它是一个非常重要的信息,所以目前使用加权估计器。

问题 -

如果我们使用 spark 使用 sample() 函数进行拆分并保存到多个文件中,那么很有可能只有在许多文件中的一个文件中存在否定样本(例如 100 个文件中的 1 个),这会在摄取数据只有正样本被馈送到估计器,导致零损失并且模型无法学习。

此外,我在制作批处理时确实使用了随机播放,但输入函数将多个文件作为输入,因此通过从每个文件中的数据随机播放来创建批处理,这导致模型在很长一段时间内只输入负例,直到随机播放发生对于具有负样本的文件。

有没有更好的方法来确保在使用pyspark保存数据时,由 spark 保存的每个文件都有来自两个类/标签的样本(最好与整体数据 pos/neg 比率相同)?

在这些情况下,我尝试使用一个大文件来馈送和洗牌工作正常,但是当我们馈送许多文件时,它会产生零损失问题,因为只有一个类的样本被馈送到模型中。

在 tensorflow 代码中使用以下输入函数 -

def csv_input_fn(files_name_pattern, mode=tf.estimator.ModeKeys.EVAL,
             skip_header_lines=0,
             num_epochs=None,
             batch_size=1000):

shuffle = True if mode == tf.estimator.ModeKeys.TRAIN else False

num_threads = multiprocessing.cpu_count() if MULTI_THREADING else 1

print("")
print("* data input_fn:")
print("================")
print("Input file(s): {}".format(files_name_pattern))
print("Batch size: {}".format(batch_size))
print("Epoch Count: {}".format(num_epochs))
print("Mode: {}".format(mode))
print("Thread Count: {}".format(num_threads))
print("Shuffle: {}".format(shuffle))
print("================")
print("")

file_names = tf.matching_files(files_name_pattern)
dataset = data.TextLineDataset(filenames=file_names)

dataset = dataset.skip(skip_header_lines)

if shuffle:
    dataset = dataset.shuffle(buffer_size=2 * batch_size + 1)

dataset = dataset.batch(batch_size)
dataset = dataset.map(lambda csv_row: parse_csv_row(csv_row),
                      num_parallel_calls=num_threads)

dataset = dataset.repeat(num_epochs)
iterator = dataset.make_one_shot_iterator()

features, target = iterator.get_next()
return features, target

任何建议,将不胜感激!谢谢

4

1 回答 1

0

找到了我自己的问题的答案,因此您可以将 buffer_size 更改为完整数据集中的元素/行数,这样我们可以确保使用 shuffle 随机分配的索引与现在的 shuffle 一样一致是使用整个数据集完成的。

代码已更改 -

   if shuffle:
    dataset = dataset.shuffle(buffer_size='total training instances size')
于 2018-12-14T22:59:10.227 回答