我是 Spark 的新手,我想对现有的 protobuf 进行更改。进行更改后,我想将该 protobuf 消息映射到 Spark 数据集行。由于 protobuf 消息复杂且嵌套很深。
我不想创建模式然后复制值,这很乏味且难以编写代码
像这样的东西:
Dataset<Row> events = spark
.readStream()
.format("kafka")
.load();
//call mapper
events.mapPartitions()
....
...
//mapper code
ProtoMessage.message
//create schema
StructType SCHEMA = new StructType()
.add("value1", DataTypes.StringType, false)
.add("value2", DataTypes.StringType, false)
//create columns
Object[] columnes = {
message.getValue1(),
message.getValue2()
....
}
//create a row
Stream.<Row>of(new GenericRowWithSchema(columns, SCHEMA));
但是我不知道确切的列数(我知道,但几乎不可能手动编写所有代码)基本上我想要做的是获取 protobuf,更改一个字段,然后将整个内容转换为数据集行。
我研究了sparksql-protobuf,但我想在推断架构后也复制值。
感谢您的帮助!