-2

我正在运行 spark 提供的默认示例来计算来自 Kafka 流的单词。

这是我正在运行的代码:

import scala.Tuple2;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
import java.util.Map;
import java.util.HashMap;


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

public class JavaWordCount {
    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) throws Exception {
        SparkSession spark = SparkSession
                .builder()
                .appName("JavaWordCount")
                .getOrCreate();
        Dataset<Row> lines = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "TutorialTopic")
                .option("startingOffsets", "latest")
                .load();
        lines.selectExpr("CAST key AS STRING", "CAST value AS STRING");
        Dataset<String> words = lines
                .as(Encoders.STRING())
                .flatMap(
                        new FlatMapFunction<String, String>() {
                            @Override
                            public Iterator<String> call(String x) {
                                return Arrays.asList(x.split(" ")).iterator();
                            }
                        }, Encoders.STRING());
        Dataset<Row> wordCounts = words.groupBy("value").count();
        StreamingQuery query = wordCounts.writeStream()
                .outputMode("complete")
                .format("console")
                .start();

        query.awaitTermination();
    }
}

在我的 pom.xml 文件中,我添加了以下依赖项:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.1.1</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>2.1.1</version>
</dependency>

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
   <version>2.1.1</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>
  <version>2.1.1</version>
</dependency>

我使用以下命令提交代码以触发:

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.2 --class "JavaWordCount" --master local[4] target/spark-sql-kafka-0-10_2.11-1.0.jar

运行时出现以下异常:

lines.selectExpr("CAST key AS STRING","CAST value AS STRING");

例外:

Try to map struct<key:binary,value:binary,topic:string,partition:int,offset:bigint,timestamp:timestamp,timestampType:int> to Tuple1, but failed as the number of fields does not line up. 

请帮助我解决此异常。谢谢!

4

1 回答 1

3

问题出在这一行lines.as(Encoders.STRING())

你可以改变

    lines.selectExpr("CAST key AS STRING", "CAST value AS STRING");
    Dataset<String> words = lines
            .as(Encoders.STRING())

    Dataset<String> words = lines.selectExpr("CAST value AS STRING")
            .as(Encoders.STRING())

您需要使用的返回值lines.selectExpr。这种方法本身不会改变lines。而且既然你在用.as(Encoders.STRING()),我想你只需要value.

于 2017-06-16T18:03:20.240 回答