我有两个 Ktable,我需要做 leftJoin。这个左连接桅杆是ManyToOne。我有 leftJoin 的下一个代码:
KTable<String, GenericRecord> right = createKTable("topic_1");
KTable<String, GenericRecord> left = createKTable("topic_2");
KTable<String, GenericRecord> joined = right.leftJoin(left, (doc, infoReg) -> {
List<Schema.Field> fields = doc.getSchema().getFields();
GenericRecord resultRecords = new GenericData.Record(valueSchema);
for (Schema.Field field : fields) {
resultRecords.put(field.name(), doc.get(field.name()));
}
Schema schema = valueSchema.getField("InfoReg").schema();
GenericArray<GenericRecord> list = new GenericData.Array<>(0, schema);
list.add(infoReg);
resultRecords.put("InfoReg", list);
return resultRecords;
});
但它就像 OneToOne 一样工作。如何更改 ManyToOne 的代码?