我正在使用 Kafka 2.3.0 和 Spark 2.3.4。我已经构建了一个 Kafka 连接器,它读取 CSV 文件并将 CSV 中的一行发布到相关的 Kafka 主题。这条线是这样的:“201310,XYZ001,Sup,XYZ,A,0,Presales,6,Callout,0,0,1,N,Prospect”。CSV 包含 1000 条这样的行。连接器能够成功地将它们发布在主题上,我也能够在 Spark 中获取消息。我不确定如何将该消息反序列化到我的架构中?请注意,消息是无标题的,因此 kafka 消息中的关键部分为空。值部分包括上面的完整CSV 字符串。我的代码如下。
我看了这个 -如何使用 Java 中的结构化流从 Kafka 反序列化记录?但无法将其移植到我的 csv 案例中。此外,我尝试了其他 spark sql 机制来尝试从“值”列中检索单个行,但无济于事。如果我确实设法获得了编译版本(例如,indivValues 数据集或 dsRawData 上的映射),我会收到类似于以下内容的错误:“org.apache.spark.sql.AnalysisException: cannot resolve ' IC
' given input columns: [value];” . 如果我理解正确,那是因为 value 是一个逗号分隔的字符串,如果我不做“某事”,spark 并不会真正为我神奇地映射它。
//build the spark session
SparkSession sparkSession = SparkSession.builder()
.appName(seCfg.arg0AppName)
.config("spark.cassandra.connection.host",config.arg2CassandraIp)
.getOrCreate();
...
//my target schema is this:
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("timeOfOrigin", DataTypes.TimestampType, true),
DataTypes.createStructField("cName", DataTypes.StringType, true),
DataTypes.createStructField("cRole", DataTypes.StringType, true),
DataTypes.createStructField("bName", DataTypes.StringType, true),
DataTypes.createStructField("stage", DataTypes.StringType, true),
DataTypes.createStructField("intId", DataTypes.IntegerType, true),
DataTypes.createStructField("intName", DataTypes.StringType, true),
DataTypes.createStructField("intCatId", DataTypes.IntegerType, true),
DataTypes.createStructField("catName", DataTypes.StringType, true),
DataTypes.createStructField("are_vval", DataTypes.IntegerType, true),
DataTypes.createStructField("isee_vval", DataTypes.IntegerType, true),
DataTypes.createStructField("opCode", DataTypes.IntegerType, true),
DataTypes.createStructField("opType", DataTypes.StringType, true),
DataTypes.createStructField("opName", DataTypes.StringType, true)
});
...
Dataset<Row> dsRawData = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", config.arg3Kafkabootstrapurl)
.option("subscribe", config.arg1TopicName)
.option("failOnDataLoss", "false")
.load();
//getting individual terms like '201310', 'XYZ001'.. from "values"
Dataset<String> indivValues = dsRawData
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING())
.flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(",")).iterator(), Encoders.STRING());
//indivValues when printed to console looks like below which confirms that //I receive the data correctly and completely
/*
When printed on console, looks like this:
+--------------------+
| value|
+--------------------+
| 201310|
| XYZ001|
| Sup|
| XYZ|
| A|
| 0|
| Presales|
| 6|
| Callout|
| 0|
| 0|
| 1|
| N|
| Prospect|
+--------------------+
*/
StreamingQuery sq = indivValues.writeStream()
.outputMode("append")
.format("console")
.start();
//await termination
sq.awaitTermination();
- 我需要将数据键入为上面显示的自定义模式,因为我将对其进行数学计算(对于每个新行与一些旧行相结合)。
- 在将它们推送到主题之前,在 Kafka 连接器源任务中合成标题是否更好?有标题会使这个问题的解决更简单吗?
谢谢!