我使用 Spark 2.1.0 和 Kafka 0.10.2.1。
我编写了一个从 Kafka 主题读取数据集的 Spark 应用程序。
代码如下:
package com.example;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public class MLP {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("MLP")
.getOrCreate();
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092,localhost:9093")
.option("subscribe", "resultsTopic")
.load();
df.show();
spark.stop();
}
}
我的部署脚本如下:
spark-submit \
--verbose \
--jars${echo /home/hduser1/spark/jars/*.jar | tr ' ' ',') \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.10 \
--class com.**** \
--master (Spark Master URL) /path/to/jar
但是我得到了错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException:
kafka is not a valid Spark SQL Data Source.;
我尝试将同一应用程序与非 Jafka 数据源一起使用,并且数据框已正确创建。我也尝试在客户端模式下使用纱线,我得到了同样的错误。