0

我有两个 Ktable,我需要做 leftJoin。这个左连接桅杆是ManyToOne。我有 leftJoin 的下一个代码:

KTable<String, GenericRecord> right = createKTable("topic_1");
KTable<String, GenericRecord> left = createKTable("topic_2");

KTable<String, GenericRecord> joined = right.leftJoin(left, (doc, infoReg) -> {
            List<Schema.Field> fields = doc.getSchema().getFields();
            GenericRecord resultRecords = new GenericData.Record(valueSchema);

            for (Schema.Field field : fields) {
                resultRecords.put(field.name(), doc.get(field.name()));
            }
            Schema schema = valueSchema.getField("InfoReg").schema();
            GenericArray<GenericRecord> list = new GenericData.Array<>(0, schema);
                list.add(infoReg);
                resultRecords.put("InfoReg", list);

            return resultRecords;
        });

但它就像 OneToOne 一样工作。如何更改 ManyToOne 的代码?

4

0 回答 0