5

我正在尝试从包含 Avro 消息的 kafka 队列运行火花流。

根据https://spark.apache.org/docs/latest/sql-data-sources-avro.html我应该可以from_avro用来将列值转换为Dataset<Row>.

但是,我无法编译该项目,因为它抱怨from_avro找不到。我可以看到在依赖的 package.class 中声明的方法。

如何在本地 Java 代码中使用该from_avro方法?org.apache.spark.sql.avro

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.avro.*;


public class AvroStreamTest {
    public static void main(String[] args) throws IOException, InterruptedException {

     // Creating local sparkSession here...

        Dataset<Row> df = sparkSession
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "host:port")
                .option("subscribe", "avro_queue")
                .load();

        // Cannot resolve method 'from_avro'...
        df.select(from_avro(col("value"), jsonFormatSchema)).writeStream().format("console")
                .outputMode("update")
                .start();


    }
}

pom.xml:

<dependencies>
    <dependency> 
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-avro_2.11</artifactId>
        <version>2.4.0</version>
    </dependency>
  <!-- more dependencies below -->

</dependencies>

似乎Java无法从sql.avro.package.class

4

2 回答 2

6

这是因为生成的类名,将其导入import org.apache.spark.sql.avro.package$;然后使用package$.MODULE$.from_avro(...)应该可以

于 2019-03-06T16:49:09.153 回答
0

您需要在 pom.xml 中包含spark-sql-avro,该文件可在

https://mvnrepository.com/artifact/org.apache.spark/spark-sql-avro_2.11/2.4.0-palantir.28-1-gdf34e2d

于 2019-03-06T16:36:52.733 回答