1

上下文: 我按照这个链接设置 AWS MSK 并测试生产者和消费者,它已设置并正常工作。我能够通过 2 个单独的 EC2 实例发送和接收消息,它们都使用相同的 Kafka 集群(我的 MSK 集群)。现在,我想建立一个从 Eventhubs 到 AWS Firehose 的数据管道,其形式如下:

Azure Eventthub -> Eventhub-to-Kafka Camel 连接器-> AWS MSK -> Kafka-to-Kinesis-Firehose Camel 连接器-> AWS Kinesis Firehose

我能够在不使用 MSK(通过常规的旧 Kafka)的情况下成功地做到这一点,但由于未说明的原因,现在需要使用 MSK,但我无法让它工作。

问题: 尝试启动AWS MSK 和我正在使用的两个 Camel 连接器之间的连接器时,我收到以下错误:

漏洞

这些是有问题的两个连接器:

  1. AWS Kinesis Firehose 到 Kafka 连接器(Kafka -> 消费者)
  2. Azure Eventhubs 到 Kafka 连接器(生产者 -> Kafka)

目标:让这些连接器与 MSK 一起使用,就像他们在直接使用 Kafka 时所做的那样。

这是 Firehose 的问题:

Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
com.amazonaws.services.kinesisfirehose.model.AmazonKinesisFirehoseException: The security token included in the request is invalid

这是 Azure 的一个:

[2021-05-04 14:09:56,848] WARN Load balancing for event processor failed - If you are using a StorageSharedKeyCredential, and the server returned an error message that says 'Signature did not match', you can compare the string to sign with the one generated by the SDK. To log the string to sign, pass in the context key value pair 'Azure-Storage-Log-String-To-Sign': true to the appropriate method call.
If you are using a SAS token, and the server returned an error message that says 'Signature did not match', you can compare the string to sign with the one generated by the SDK. To log the string to sign, pass in the context key value pair 'Azure-Storage-Log-String-To-Sign': true to the appropriate generateSas method call.
Please remember to disable 'Azure-Storage-Log-String-To-Sign' before going to production as this string can potentially contain PII.
Status code 403, "<?xml version="1.0" encoding="utf-8"?><Error><Code>AuthorizationFailure</Code><Message>This request is not authorized to perform this operation.
Time:2021-05-04T14:09:56.7148317Z</Message></Error>" (com.azure.messaging.eventhubs.PartitionBasedLoadBalancer:344)
[2021-05-04 14:09:56,858] Error was received while reading the incoming data. The connection will be closed. (reactor.netty.channel.ChannelOperationsHandler:319)
java.lang.NoSuchMethodError: org.apache.camel.component.azure.eventhubs.EventHubsConsumer.createExchange(Z)Lorg/apache/camel/Exchange;
        at org.apache.camel.component.azure.eventhubs.EventHubsConsumer.createAzureEventHubExchange(EventHubsConsumer.java:93)
4

2 回答 2

1

MSK 不提供 Kafka Connect 服务。您需要将其安装在您自己的计算机或其他 AWS 计算资源上。从那里,您需要安装 Camel 连接器插件

于 2021-05-04T12:53:11.503 回答
0

Kafka Connect 是一个与 Kafka(MSK、开源或任何其他 kafka 发行版)一起使用的框架。但是,它没有任何连接器。Kafka Connect 与开源 kafka 捆绑在一起。

作为最佳实践,永远不要在与代理节点相同的服务器上运行 kafka connect。因为它们共享二进制文件。调整代理可能会导致 kafka 代理出现意外问题。此外,Kafka Connect 应用程序是应用程序,您不会在同一节点上运行您的 kafka 消费者或生产者应用程序。所以创建一个 EC2 实例并在那里部署 kafka 连接。

使用 TLS - 如果您启用客户端 TLS 身份验证 - 您需要查找 boostrap_broker_tls。

于 2021-06-10T09:35:49.283 回答