我使用 Spark 2.1。
我正在尝试使用 Spark Structured Streaming 从 Kafka 读取记录,对它们进行反序列化并在之后应用聚合。
我有以下代码:
SparkSession spark = SparkSession
.builder()
.appName("Statistics")
.getOrCreate();
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaUri)
.option("subscribe", "Statistics")
.option("startingOffsets", "earliest")
.load();
df.selectExpr("CAST(value AS STRING)")
我想要的是将该value
字段反序列化到我的对象中,而不是强制转换为String
.
我有一个自定义的反序列化器。
public StatisticsRecord deserialize(String s, byte[] bytes)
我怎样才能在 Java 中做到这一点?
我发现的唯一相关链接是这个https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2 .html,但这是针对 Scala 的。