有人可以帮我解决问题以正确加入 2 个流,其中 key 和 value 为GenericRecord
. 首先,如您所见,我正在使用 AVRO 模式为键和值创建 2 个主题。之后,我加入了两个流,并在输出主题中创建了 new GenericRecord
,即带有投影模式的所谓投影记录,我得到了代码片段后所示的异常:
@Test
public void joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord() throws Exception {
String methodName = new Object() {
}.getClass().getEnclosingMethod().getName();
long timestamp = new Date().getTime();
String firstTopic = String.format("%1$s_1_%2$s", methodName, timestamp);
String secondTopic = String.format("%1$s_2_%2$s", methodName, timestamp);
String outputTopic = String.format("%1$s_output_%2$s", methodName, timestamp);
String firstStorage = String.format("%1$s_store_1_%2$s", methodName, timestamp);
String secondStorage = String.format("%1$s_store_2_%2$s", methodName, timestamp);
String appIdConfig = String.format("%1$s_app_id_%2$s", methodName, timestamp);
String groupIdConfig = String.format("%1$s_group_id_%2$s", methodName, timestamp);
String schemaIdNamespace = String.format("%1$s_id_ns_%2$s", methodName, timestamp);
String schemaNameNamespace = String.format("%1$s_name_ns_%2$s", methodName, timestamp);
String schemaScopeNamespace = String.format("%1$s_scope_ns_%2$s", methodName, timestamp);
String schemaProjectionNamespace = String.format("%1$s_proj_ns_%2$s", methodName, timestamp);
String schemaIdRecord = String.format("%1$s_id_rec_%2$s", methodName, timestamp);
String schemaNameRecord = String.format("%1$s_name_rec_%2$s", methodName, timestamp);
String schemaScopeRecord = String.format("%1$s_scope_rec_%2$s", methodName, timestamp);
String schemaProjectionRecord = String.format("%1$s_proj_rec_%2$s", methodName, timestamp);
try {
Integer partitions = 1;
Integer replication = 1;
Properties topicConfig = new Properties();
RestUtils.createTopic(firstTopic, partitions, replication, topicConfig);
RestUtils.createTopic(secondTopic, partitions, replication, topicConfig);
RestUtils.createTopic(outputTopic, partitions, replication, topicConfig);
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appIdConfig);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_CONNECT_CONFIG);
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams/"); //TestUtils.tempDirectory().getAbsolutePath());
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG);
Serializer kafkaAvroSerializer = new KafkaAvroSerializer();
kafkaAvroSerializer.configure(streamsConfiguration, false);
Deserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
kafkaAvroDeserializer.configure(streamsConfiguration, false);
Serde<GenericRecord> avroSerde = Serdes.serdeFrom(kafkaAvroSerializer, kafkaAvroDeserializer);
//-----
Schema idSchema = SchemaBuilder.record(schemaIdRecord).namespace(schemaIdNamespace).fields()
.name("Id").type().nullable().intType().noDefault()
.endRecord();
Schema nameSchema = SchemaBuilder.record(schemaNameRecord).namespace(schemaNameNamespace).fields()
.name("Id").type().nullable().intType().noDefault()
.name("Name").type().nullable().stringType().noDefault()
.endRecord();
Schema scopeSchema = SchemaBuilder.record(schemaScopeRecord).namespace(schemaScopeNamespace).fields()
.name("Scope").type().nullable().stringType().noDefault()
.endRecord();
Schema projectionSchema = SchemaBuilder.record(schemaProjectionRecord).namespace(schemaProjectionNamespace).fields()
.name("Id").type().nullable().intType().noDefault()
.name("Name").type().nullable().stringType().noDefault()
.name("Scope").type().nullable().stringType().noDefault()
.endRecord();
GenericRecord idRecord1 = new GenericData.Record(idSchema);
idRecord1.put("Id", 1);
GenericRecord idRecord2 = new GenericData.Record(idSchema);
idRecord2.put("Id", 2);
GenericRecord idRecord3 = new GenericData.Record(idSchema);
idRecord3.put("Id", 3);
GenericRecord idRecord4 = new GenericData.Record(idSchema);
idRecord4.put("Id", 4);
GenericRecord nameRecord1 = new GenericData.Record(nameSchema);
nameRecord1.put("Id", 1);
nameRecord1.put("Name", "Bruce Eckel");
GenericRecord nameRecord2 = new GenericData.Record(nameSchema);
nameRecord2.put("Id", 2);
nameRecord2.put("Name", "Robert Lafore");
GenericRecord nameRecord3 = new GenericData.Record(nameSchema);
nameRecord3.put("Id", 3);
nameRecord3.put("Name", "Andrew Tanenbaum");
GenericRecord nameRecord4 = new GenericData.Record(nameSchema);
nameRecord4.put("Id", 4);
nameRecord4.put("Name", "Programming in Scala");
GenericRecord scopeRecord1 = new GenericData.Record(scopeSchema);
scopeRecord1.put("Scope", "Modern Operating System");
GenericRecord scopeRecord2 = new GenericData.Record(scopeSchema);
scopeRecord2.put("Scope", "Thinking in Java");
GenericRecord scopeRecord3 = new GenericData.Record(scopeSchema);
scopeRecord3.put("Scope", "Computer Architecture");
GenericRecord scopeRecord4 = new GenericData.Record(scopeSchema);
scopeRecord4.put("Scope", "Programming in Scala");
List<KeyValue<GenericRecord, GenericRecord>> list1 = Arrays.asList(
new KeyValue<>(idRecord1, nameRecord1),
new KeyValue<>(idRecord2, nameRecord2),
new KeyValue<>(idRecord3, nameRecord3)
);
List<KeyValue<GenericRecord, GenericRecord>> list2 = Arrays.asList(
new KeyValue<>(idRecord3, scopeRecord1),
new KeyValue<>(idRecord1, scopeRecord2),
new KeyValue<>(idRecord3, scopeRecord3),
new KeyValue<>(idRecord4, scopeRecord4)
);
GenericRecord projectionRecord1 = new GenericData.Record(projectionSchema);
projectionRecord1.put("Id", nameRecord1.get("Id"));
projectionRecord1.put("Name", nameRecord1.get("Name"));
projectionRecord1.put("Scope", scopeRecord1.get("Scope"));
GenericRecord projectionRecord2 = new GenericData.Record(projectionSchema);
projectionRecord2.put("Id", nameRecord2.get("Id"));
projectionRecord2.put("Name", nameRecord2.get("Name"));
projectionRecord2.put("Scope", scopeRecord2.get("Scope"));
GenericRecord projectionRecord3 = new GenericData.Record(projectionSchema);
projectionRecord3.put("Id", nameRecord3.get("Id"));
projectionRecord3.put("Name", nameRecord3.get("Name"));
projectionRecord3.put("Scope", scopeRecord3.get("Scope"));
List<KeyValue<GenericRecord, GenericRecord>> expectedResults = Arrays.asList(
new KeyValue<>(idRecord3, projectionRecord3),
new KeyValue<>(idRecord1, projectionRecord1),
new KeyValue<>(idRecord3, projectionRecord3)
);
//-----
KStreamBuilder builder = new KStreamBuilder();
KStream<GenericRecord, GenericRecord> firstStream = builder.stream(avroSerde, avroSerde, firstTopic);
KStream<GenericRecord, GenericRecord> secondStream = builder.stream(avroSerde, avroSerde, secondTopic);
KStream<GenericRecord, GenericRecord> outputStream = firstStream.join(secondStream,
new ValueJoiner<GenericRecord, GenericRecord, GenericRecord>() {
@Override
public GenericRecord apply(GenericRecord l, GenericRecord r) {
GenericRecord projectionRecord = new GenericData.Record(projectionSchema);
projectionRecord.put("Id", l.get("Id"));
projectionRecord.put("Name", l.get("Name"));
projectionRecord.put("Scope", r.get("Scope"));
return projectionRecord;
}
}, JoinWindows.of(TimeUnit.SECONDS.toMillis(5)), avroSerde, avroSerde, avroSerde);
outputStream.to(avroSerde, avroSerde, outputTopic);
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
Properties cfg1 = new Properties();
cfg1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
cfg1.put(ProducerConfig.ACKS_CONFIG, "all");
cfg1.put(ProducerConfig.RETRIES_CONFIG, 0);
cfg1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
cfg1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
cfg1.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG);
IntegrationTestUtils.produceKeyValuesSynchronously(firstTopic, list1, cfg1);
Properties cfg2 = new Properties();
cfg2.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
cfg2.put(ProducerConfig.ACKS_CONFIG, "all");
cfg2.put(ProducerConfig.RETRIES_CONFIG, 0);
cfg2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
cfg2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
cfg2.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG);
IntegrationTestUtils.produceKeyValuesSynchronously(secondTopic, list2, cfg2);
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
consumerConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG);
List<KeyValue<GenericRecord, GenericRecord>> actualResults = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedResults.size());
streams.close();
//-----
assertThat(actualResults).containsExactlyElementsOf(expectedResults);
//-----
} finally {
RestUtils.deleteTopics(firstTopic, secondTopic, outputTopic);
}
}
堆栈跟踪:
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_1_1490264134172, partition=0, offset=0
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_rec_1490264134172","namespace":"joinKStreamToKStreamWhereKeyValueIsGenericRecordGenericRecord_name_ns_1490264134172","fields":[{"name":"Id","type":["int","null"]},{"name":"Name","type":["string","null"]}]}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema
io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema
at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.incompatibleSchemaException(Errors.java:63)
at io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource.register(SubjectVersionsResource.java:166)
at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:144)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:161)
at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:143)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:99)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:389)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:347)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:102)
at org.glassfish.jersey.server.ServerRuntime$2.run(ServerRuntime.java:308)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:271)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:267)
at org.glassfish.jersey.internal.Errors.process(Errors.java:315)
at org.glassfish.jersey.internal.Errors.process(Errors.java:297)
at org.glassfish.jersey.internal.Errors.process(Errors.java:267)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:317)
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:291)
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:1140)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:403)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:386)
at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:548)
at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:489)
at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:426)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1652)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:585)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:159)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
at org.eclipse.jetty.server.Server.handle(Server.java:499)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
at java.lang.Thread.run(Thread.java:745)
Caused by: io.confluent.kafka.schemaregistry.exceptions.IncompatibleSchemaException: New schema is incompatible with an earlier schema.
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.register(KafkaSchemaRegistry.java:369)
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.registerOrForward(KafkaSchemaRegistry.java:391)
at io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource.register(SubjectVersionsResource.java:154)
... 44 more
; error code: 409
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:187)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:238)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:230)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:225)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:59)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:91)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54)
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:176)
at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109)
at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:101)
at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:65)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)