上下文: 我按照这个链接设置 AWS MSK 并测试生产者和消费者,它已设置并正常工作。我能够通过 2 个单独的 EC2 实例发送和接收消息,它们都使用相同的 Kafka 集群(我的 MSK 集群)。现在,我想建立一个从 Eventhubs 到 AWS Firehose 的数据管道,其形式如下:
Azure Eventthub -> Eventhub-to-Kafka Camel 连接器-> AWS MSK -> Kafka-to-Kinesis-Firehose Camel 连接器-> AWS Kinesis Firehose
我能够在不使用 MSK(通过常规的旧 Kafka)的情况下成功地做到这一点,但由于未说明的原因,现在需要使用 MSK,但我无法让它工作。
问题: 尝试启动AWS MSK 和我正在使用的两个 Camel 连接器之间的连接器时,我收到以下错误:
这些是有问题的两个连接器:
目标:让这些连接器与 MSK 一起使用,就像他们在直接使用 Kafka 时所做的那样。
这是 Firehose 的问题:
Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
com.amazonaws.services.kinesisfirehose.model.AmazonKinesisFirehoseException: The security token included in the request is invalid
这是 Azure 的一个:
[2021-05-04 14:09:56,848] WARN Load balancing for event processor failed - If you are using a StorageSharedKeyCredential, and the server returned an error message that says 'Signature did not match', you can compare the string to sign with the one generated by the SDK. To log the string to sign, pass in the context key value pair 'Azure-Storage-Log-String-To-Sign': true to the appropriate method call.
If you are using a SAS token, and the server returned an error message that says 'Signature did not match', you can compare the string to sign with the one generated by the SDK. To log the string to sign, pass in the context key value pair 'Azure-Storage-Log-String-To-Sign': true to the appropriate generateSas method call.
Please remember to disable 'Azure-Storage-Log-String-To-Sign' before going to production as this string can potentially contain PII.
Status code 403, "<?xml version="1.0" encoding="utf-8"?><Error><Code>AuthorizationFailure</Code><Message>This request is not authorized to perform this operation.
Time:2021-05-04T14:09:56.7148317Z</Message></Error>" (com.azure.messaging.eventhubs.PartitionBasedLoadBalancer:344)
[2021-05-04 14:09:56,858] Error was received while reading the incoming data. The connection will be closed. (reactor.netty.channel.ChannelOperationsHandler:319)
java.lang.NoSuchMethodError: org.apache.camel.component.azure.eventhubs.EventHubsConsumer.createExchange(Z)Lorg/apache/camel/Exchange;
at org.apache.camel.component.azure.eventhubs.EventHubsConsumer.createAzureEventHubExchange(EventHubsConsumer.java:93)