2

在使用 pandas 1.1.5 和 pyarrow 2.0.0 运行 pyspark3 时出现以下错误:

火花代码:

import pyarrow
import pandas as pd

df = pd.DataFrame({'col1' : [1,2,3], 'col2': [4,5,6]})
df_sp = spark.createDataFrame(df)

df_sp.cache().count()

schema = df_sp.schema

def dummy_udf(data):
    return data

res = df_sp.groupby('col1').applyInPandas(dummy_udf, schema=schema)

print(res.cache().count())
print(res.toPandas())

例外:

21/09/17 07:28:10 ERROR util.Utils: Uncaught exception in thread stdout writer for python3
java.lang.NoSuchMethodError: com.google.flatbuffers.FlatBufferBuilder.createString(Ljava/lang/CharSequence;)I
        at org.apache.arrow.vector.types.pojo.Field.getField(Field.java:204)
        at org.apache.arrow.vector.types.pojo.Schema.getSchema(Schema.java:178)
        at org.apache.arrow.vector.ipc.message.MessageSerializer.serializeMetadata(MessageSerializer.java:187)
        at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:165)
        at org.apache.arrow.vector.ipc.ArrowWriter.ensureStarted(ArrowWriter.java:159)
        at org.apache.arrow.vector.ipc.ArrowWriter.start(ArrowWriter.java:112)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.$anonfun$writeIteratorToStream$1(ArrowPythonRunner.scala:86)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIteratorToStream(ArrowPythonRunner.scala:103)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:397)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:232)
21/09/17 07:28:10 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout writer for python3,5,main]
java.lang.NoSuchMethodError: com.google.flatbuffers.FlatBufferBuilder.createString(Ljava/lang/CharSequence;)I
        at org.apache.arrow.vector.types.pojo.Field.getField(Field.java:204)
        at org.apache.arrow.vector.types.pojo.Schema.getSchema(Schema.java:178)
        at org.apache.arrow.vector.ipc.message.MessageSerializer.serializeMetadata(MessageSerializer.java:187)
        at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:165)
        at org.apache.arrow.vector.ipc.ArrowWriter.ensureStarted(ArrowWriter.java:159)
        at org.apache.arrow.vector.ipc.ArrowWriter.start(ArrowWriter.java:112)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.$anonfun$writeIteratorToStream$1(ArrowPythonRunner.scala:86)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIteratorToStream(ArrowPythonRunner.scala:103)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:397)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:232)
21/09/17 07:28:10 WARN storage.BlockManager: Putting block rdd_25_69 failed due to exception org.apache.spark.SparkException: Python worker exited unexpectedly (crashed).
21/09/17 07:28:10 INFO memory.MemoryStore: MemoryStore cleared
21/09/17 07:28:10 INFO storage.BlockManager: BlockManager stopped
21/09/17 07:28:10 INFO util.ShutdownHookManager: Shutdown hook called
4

1 回答 1

2

在检查FlatBufferBuilder.java类中的createString()方法实现时,它有两种方法,一种是接受 CharSequence,另一种是接受 ByteBuffer 作为参数。

public int createString(CharSequence s) {

}

public int createString(ByteBuffer s) {

}

在检查Field.java类中的 getField() 方法实现时,这里传递了String值。

public class Field {
    private final String name;

    public int getField(FlatBufferBuilder builder) {
        int nameOffset = name == null ? -1 : builder.createString(name);

    }
}

要解决此问题,我们需要将CharSequenceByteBuffer作为参数传递给 getField() 方法。

解决方案:

public int getField(FlatBufferBuilder builder) {
   
    java.nio.ByteBuffer bb = java.nio.ByteBuffer.wrap(name.getBytes());
    int nameOffset = name == null ? -1 : builder.createString(bb);
    .......
}
于 2021-09-18T15:52:24.593 回答