3

将以下 colab python 代码(请参见下面的链接)部署到 Google Cloud 上的 Dataproc,它仅在input_list是一个包含一个项目的数组时才有效,当input_list有两个项目时,PySpark 作业在“for r”行终止并出现以下错误在下面的 get_similarity 方法中的 result.collect()" 中:

java.io.IOException: Premature EOF from inputStream
        at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:194)
        at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213)
        at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
        at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)
        at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446)
        at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:739)
        at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124)
        at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232)
        at java.lang.Thread.run(Thread.java:745)
input_list=["no error"]                 <---- works
input_list=["this", "throws EOF error"] <---- does not work

使用 spark-nlp 链接到 colab 的句子相似性: https ://colab.research.google.com/github/JohnSnowLabs/spark-nlp-workshop/blob/master/tutorials/streamlit_notebooks/SENTENCE_SIMILARITY.ipynb#scrollTo=6E0Y5wtunFi4

def get_similarity(input_list):
    df = spark.createDataFrame(pd.DataFrame({'text': input_list}))
    result = light_pipeline.transform(df)
    embeddings = []
    for r in result.collect():
        embeddings.append(r.sentence_embeddings[0].embeddings)
    embeddings_matrix = np.array(embeddings)
    return np.matmul(embeddings_matrix, embeddings_matrix.transpose())

我尝试在 hadoop 集群配置中更改为,但仍然没有运气dfs.datanode.max.transfer.threads8192

hadoop_config.set('dfs.datanode.max.transfer.threads', "8192")

当input_list在数组中有多个项目时,如何使此代码正常工作?

4

1 回答 1

0

java.io.IOException: Premature EOF from inputStream可能指向磁盘带宽不足、HDFS DataNodes 过载或许多其他问题:Hadoop MapReduce job I/O Exception due to early EOF from inputStream

Spark 应用程序中不断增加的 DataNode 传输线程数并没有改变任何东西,因为您需要在每个集群工作人员的 HDFS 配置中更改此属性并在每个工作人员上重新启动 DataNode 服务。hdfs:dfs.datanode.max.transfer.threads=8192 最简单的方法是使用cluster property重新创建一个集群。

请注意,如果问题的根本原因是磁盘带宽不足,那么增加 DataNodes 中的传输线程数量只会夸大它,而不是修复它。

您有多种选择来尝试解决此问题:

  1. 要增加本地磁盘带宽,请在创建集群时在工作节点上使用PD-SSDLocal SSD 。

  2. 如果您使用具有少量工作人员的集群,则可能是 HDFS 数据节点(每个工作人员 1 个)无法处理负载,作为一种解决方法,您可以增加集群中工作人员的数量或使用如果您使用具有超过 4 个 CPU 内核的工作程序,则容量相同但较小的工作程序数量更多。

  3. 使用 Google Cloud Storage(gs://架构)而不是 HDFS 来存储您处理的数据 - Google Cloud Storage 的扩展性比 HDFS 好得多,并且应该开箱即用。

于 2021-01-31T17:49:31.330 回答