1

我使用 Spark 2.1.0 和 Kafka 0.10.2.1。

我编写了一个从 Kafka 主题读取数据集的 Spark 应用程序。

代码如下:

package com.example;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public class MLP {

    public static void main(String[] args) {
        SparkSession spark = SparkSession
            .builder()
            .appName("MLP")
            .getOrCreate();

        Dataset<Row> df = spark
            .read()
            .format("kafka")
            .option("kafka.bootstrap.servers","localhost:9092,localhost:9093")
            .option("subscribe", "resultsTopic")
            .load();
        df.show();
        spark.stop();
    }
}

我的部署脚本如下:

spark-submit \
  --verbose \
  --jars${echo /home/hduser1/spark/jars/*.jar | tr ' ' ',') \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.10 \
  --class com.**** \
  --master (Spark Master URL) /path/to/jar 

但是我得到了错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException:
kafka is not a valid Spark SQL Data Source.;

我尝试将同一应用程序与非 Jafka 数据源一起使用,并且数据框已正确创建。我也尝试在客户端模式下使用纱线,我得到了同样的错误。

4

2 回答 2

3

Kafka 作为非流 DataFrame 的数据源 - 数据集将从 Spark 2.2 提供,参考 Spark JIRA上的这个问题

正如@JacekLaskowski 提到的,将包更改为(修改Jacek 的版本以使用2.2):

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

更重要的是,用于readStream读取数据流。

您不能使用show流式数据源,而是使用console格式。

StreamingQuery query = df.writeStream()
  .outputMode("append")
  .format("console")
  .start();

query.awaitTermination();

看到这个链接

于 2017-06-29T12:02:01.543 回答
1

首先,您应该用--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.10以下内容替换(我怀疑它是否有效):

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.1

我不认为该版本2.10可用。您可能已经想到2.1.0,如果您使用2.1.0(not 2.10),这可能会奏效。

其次,删除--jars${echo /home/hduser1/spark/jars/*.jar | tr ' ' ',')除了一些额外的 jars,比如 Kafka 源文件之外的 Spark 加载。

这应该使您可以访问kafka源格式。

于 2017-06-29T13:49:54.997 回答