0

我正在使用 johnsnowlabs 的 SparkNLP 从我的文本数据中提取嵌入,下面是管道。模型保存到hdfs后大小为1.8g

embeddings = BertSentenceEmbeddings.pretrained("labse", "xx") \
      .setInputCols("sentence") \
      .setOutputCol("sentence_embeddings")
nlp_pipeline = Pipeline(stages=[document_assembler, sentence_detector, embeddings])
pipeline_model = nlp_pipeline.fit(spark.createDataFrame([[""]]).toDF("text"))

我保存pipeline_modelHDFS使用pipeline_model.save("hdfs:///<path>").

以上只执行了一次

在另一个脚本中,我正在HDFS使用pipeline_model = PretrainedPipeline.from_disk("hdfs:///<path>").

上面的代码加载了模型,但是占用了太多。我在 spark 本地模型(无集群)上对其进行了测试,但我拥有 94g RAM、32 核的高资源。

后来,我用 12 个 Executor 将脚本部署在 yarn 上,每个 Executor 有 3 个内核和 7g ram。我分配了 10g 的驱动程序内存。

该脚本再次花费太多时间从 HDFS 加载保存的模型。

当火花到达这一点时,需要太多时间

当火花到达这一点时(见上面的截图),需要太多时间

我想到了一个方法

预加载

我认为的方法是以某种方式将模型预加载到内存中,当脚本想要对数据帧应用转换时,我可以以某种方式调用对预训练管道的引用并在旅途中使用它,而无需执行任何磁盘 i /o. 我搜索了,但它导致无处可去。

请让我知道您对此解决方案的看法以及实现此目标的最佳方法是什么。

YARN 资源

节点名称 数数 内存(每个) 核心(每个)
主节点 1 38克 8
辅助节点 1 38 克 8
工作节点 4 24 克 4
全部的 6 172克 32

谢谢

4

1 回答 1

1

正如评论中所讨论的,这是一个基于 PyTorch 而不是 SparkNLP 的解决方案。简化代码:

# labse_spark.py

LABSE_MODEL, LABSE_TOKENIZER = None


def transform(spark, df, input_col='text', output_col='output'):
    spark.sparkContext.addFile('hdfs:///path/to/labse_model')
    output_schema = T.StructType(df.schema.fields + [T.StructField(output_col, T.ArrayType(T.FloatType()))])

    rdd = df.rdd.mapPartitions(_map_partitions_func(input_col, output_col))
    res = spark.createDataFrame(data=rdd, schema=output_schema)
    return res


def _map_partitions_func(input_col, output_col):
    def executor_func(rows):
        # load everything to memory (partitions should be small, ~1k rows per partition):
        pandas_df = pd.DataFrame([r.asDict() for r in rows])
        global LABSE_MODEL, LABSE_TOKENIZER
        if not (LABSE_TOKENIZER or LABSE_MODEL):  # should happen once per executor core
            LABSE_TOKENIZER = AutoTokenizer.from_pretrained(SparkFiles.get('labse_model'))
            LABSE_MODEL = AutoModel.from_pretrained(SparkFiles.get('labse_model'))
        
        # copied from HF model card:
        encoded_input = LABSE_TOKENIZER(
            pandas_df[input_col].tolist(), padding=True, truncation=True, max_length=64, return_tensors='pt')
        with torch.no_grad():
            model_output = LABSE_MODEL(**encoded_input)
        embeddings = model_output.pooler_output
        embeddings = torch.nn.functional.normalize(embeddings)

        pandas_df[output_col] = pd.Series(embeddings.tolist())
        return pandas_df.to_dict('records')

    return executor_func
于 2021-06-15T06:00:45.320 回答