7

我使用 Spark 2.1

我正在尝试使用 Spark Structured Streaming 从 Kafka 读取记录,对它们进行反序列化并在之后应用聚合。

我有以下代码:

SparkSession spark = SparkSession
        .builder()
        .appName("Statistics")
        .getOrCreate();

Dataset<Row> df = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", kafkaUri)
        .option("subscribe", "Statistics")
        .option("startingOffsets", "earliest")
        .load();

df.selectExpr("CAST(value AS STRING)")

我想要的是将该value字段反序列化到我的对象中,而不是强制转换为String.

我有一个自定义的反序列化器。

public StatisticsRecord deserialize(String s, byte[] bytes)

我怎样才能在 Java 中做到这一点?


我发现的唯一相关链接是这个https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2 .html,但这是针对 Scala 的。

4

2 回答 2

4

为您的 JSON 消息定义架构。

StructType schema = DataTypes.createStructType(new StructField[] { 
                DataTypes.createStructField("Id", DataTypes.IntegerType, false),
                DataTypes.createStructField("Name", DataTypes.StringType, false),
                DataTypes.createStructField("DOB", DataTypes.DateType, false) });

现在阅读如下消息。MessageData 是 JSON 消息的 JavaBean。

Dataset<MessageData> df = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", kafkaUri)
            .option("subscribe", "Statistics")
            .option("startingOffsets", "earliest")
            .load()
            .selectExpr("CAST(value AS STRING) as message")
            .select(functions.from_json(functions.col("message"),schema).as("json"))
            .select("json.*")
            .as(Encoders.bean(MessageData.class));  
于 2017-05-12T12:30:44.713 回答
2

如果您在 Java 中为您的数据使用自定义反序列化器,请在load.

df.select("value")

该行只为您Dataset<Row>提供一列value


我专门使用 Scala 的 Spark API,所以我会在 Scala 中执行以下操作来处理“反序列化”情况:

import org.apache.spark.sql.Encoders
implicit val statisticsRecordEncoder = Encoders.product[StatisticsRecord]
val myDeserializerUDF = udf { bytes => deserialize("hello", bytes) }
df.select(myDeserializerUDF($"value") as "value_des")

那应该给你你想要的……在 Scala 中。将其转换为 Java 是您的家庭练习 :)

请注意,您的自定义对象必须有可用的编码器,否则 Spark SQL 将拒绝将其对象放入数据集中。

于 2017-05-12T14:44:31.843 回答