1

I have requirement to consume messages from IBM MHub topic into IBM Object Storage.

I got it working with local Kafka server with Confluent Kafka Connect S3 plugin as standalone worker for sink Amazon S3 bucket and file. Both was a success.

If I configure Confluent Kafka Connect S3 as distributed worker for IBM MHub cluster I get no errors but still no messages end up to Amazon S3 bucket. I tried file sink also, no luck either.

Is it possible at all?

4

2 回答 2

0

来自:https ://kafka.apache.org/documentation/#connect_running

此处配置的参数旨在供 Kafka Connect 使用的生产者和消费者访问配置、偏移和状态主题。Kafka source 和 Kafka sink 任务的配置,可以使用相同的参数,但需要以consumer为前缀。和制作人。分别。从worker配置继承的唯一参数是bootstrap.servers,在大多数情况下就足够了,因为同一个集群通常用于所有目的。一个值得注意的例外是安全集群,它需要额外的参数才能允许连接。这些参数最多需要在工作器配置中设置 3 次,一次用于管理访问,一次用于 Kafka 接收器,一次用于 Kafka 源。

所以解决方案是向消费者添加重复配置。在工作人员配置中添加前缀,以便在接收器使用者上进行所需的 sasl_ssl 设置而不是默认设置。

IBM Cloud Object Storage 也可以使用。需要凭据,例如。环境变量:AWS_ACCESS_KEY_ID="查看 cos 凭证" & AWS_SECRET_ACCESS_KEY="查看 cos 凭证"

连接器配置:

{
"name": "s3-sink",
"config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "5",
    "topics": "your-topic",
    "s3.region": "eu-central-1",
    "store.url": "https://s3.eu-geo.objectstorage.softlayer.net",
    "s3.bucket.name": "your-bucket",
    "s3.part.size": "5242880",
    "flush.size": "1",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "schema.compatibility": "NONE",
    "name": "s3-sink"
}

}

于 2018-09-20T07:28:23.197 回答
0

您可以尝试使用 Message Hub(现在称为 Event Streams)云对象存储桥:https ://console.bluemix.net/docs/services/MessageHub/messagehub115.html#cloud_object_storage_bridge

似乎符合您的要求?

于 2018-09-19T12:03:52.740 回答