0

我正在尝试KTable通过使用主题来创建一个。之后我想用它KafkaStreams来构建我的拓扑(代码下来)

    Properties streamsProperties = new Properties();

    streamsProperties.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "k-table-application");
    streamsProperties.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "c-kafka-qa4.copart.com:9092");
    streamsProperties.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    streamsProperties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 60000);
    streamsProperties.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsProperties.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsProperties.putIfAbsent("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"test-qa-consumer\" password=\"asdasat\";");
    streamsProperties.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    streamsProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    streamsProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    streamsProperties.put(ConsumerConfig.CHECK_CRCS_CONFIG, "TRUE");
    streamsProperties.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "60000");
    streamsProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "FALSE");
    streamsProperties.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "TRUE");
    streamsProperties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "52428800");
    streamsProperties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
    streamsProperties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
    streamsProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer2");
    streamsProperties.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "null");
    streamsProperties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
    streamsProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "[]");
    streamsProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted");
    streamsProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class.getName()");
    streamsProperties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "1048576");
    streamsProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
    streamsProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
    streamsProperties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "class org.apache.kafka.clients.consumer.RangeAssignor]");
    streamsProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class.getName()");

    KTable<String, String> kTable = builder.table("testqa2",
            Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("queryableStoreName"));

KafkaStreams streams = new KafkaStreams(builder.build(), streamsProperties);
streams.start();

但是这一会引发错误 - KafkaStreams streams = new KafkaStreams(builder.build(), streamsProperties);

2021-07-27 17:46:35.518  INFO 4196 --- [ test-consumer2] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-test-consumer2-2, groupId=test-consumer2] Group coordinator rnqckf402.corp.copart.com:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery
2021-07-27 17:46:37.579  INFO 4196 --- [ test-consumer2] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-test-consumer2-2, groupId=test-consumer2] Discovered group coordinator rnqckf402.corp.copart.com:9092 (id: 2147483645 rack: null)
2021-07-27 17:46:41.977 ERROR 4196 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Stopping container due to an Error

java.lang.NoSuchMethodError: 'void org.apache.kafka.common.metrics.Sensor.add(org.apache.kafka.common.MetricName, org.apache.kafka.common.metrics.MeasurableStat)'
    at org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.<init>(StreamThread.java:505) ~[kafka-streams-1.0.0.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:653) ~[kafka-streams-1.0.0.jar:na]
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:652) ~[kafka-streams-1.0.0.jar:na]
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:506) ~[kafka-streams-1.0.0.jar:na]
    at com.copart.mwa.service.TwilioEventsListener.twilioKTable(TwilioEventsListener.java:134) ~[classes/:na]
    at com.copart.mwa.service.TwilioEventsListener.processTwilioEvents(TwilioEventsListener.java:83) ~[classes/:na]
    at com.copart.mwa.service.TwilioEventsListener$$FastClassBySpringCGLIB$$98f1412b.invoke(<generated>) ~[classes/:na]
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:769) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:88) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at com.copart.mwa.aop.LoggerAspect.logRequestsForData(LoggerAspect.java:41) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:644) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:633) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]

请问你能指导我吗?我被困住了。

4

1 回答 1

0

修复以下这些行,然后重试并发布更新,

 streamsProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
 
   
 streamsProperties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,RangeAssignor.class.getName());
 
   
 streamsProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
于 2021-07-27T22:21:05.317 回答