// Asynchronous response from Message Hub / Kafka.
kafkaProducer.send(record,
new Callback() {
public void onCompletion(RecordMetadata m, Exception e) {
if(e != null) {
e.printStackTrace();
} else {
log.debug(" **** Message sent, offset: " + m.offset() +
" @ partition " + m.partition());
log.debug(" <<<< " +
" document_id " + key +
" @ " + account.getActivityId());
}
}
});
尝试使用上述代码将消息发布到 Message Hub 时,我们总是收到以下错误。
2016-06-21 18:38:22-[INFO] com.ibm.cloudant.streaming.messageHub.Client.send(476): >>>> 发送 document_id julia30 @ my_database 2016-06-21 18:38:22 -[DEBUG] org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(623):初始化与节点-1的连接以发送元数据请求 2016-06-21 18:38:22-[DEBUG] org.apache.kafka .clients.NetworkClient.initiateConnect(487):在 kafka01-prod01.messagehub.services.us-south.bluemix.net:9093 处启动到节点 -1 的连接。2016-06-21 18:38:22-[DEBUG] org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$1.run(105):创建 SaslClient:client=multiuser-adapter@multiuser.messagehub.ibm.com ;service=kafka;serviceHostname=kafka01-prod01.messagehub.services.us-south.bluemix.net;mechs=[GSSAPI] 2016-06-21 18:38:22-[DEBUG] com.
生产者的设置如下所示:
```
compression.type = none
metric.reporters = []
metadata.max.age.ms = 300000
metadata.fetch.timeout.ms = 60000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [kafka01-prod01.messagehub.services.us-south.bluemix.net:9093]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
max.block.ms = 60000
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = [hidden]
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
client.id = kafka01-prod01.messagehub.services.us-south.bluemix.net%3A9093_8qp87X32V6PK5epv.1
ssl.endpoint.identification.algorithm = HTTPS
ssl.protocol = TLSv1.2
request.timeout.ms = 30000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2]
acks = -1
batch.size = 16384
ssl.keystore.location = null
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = SASL_SSL
retries = 1
max.request.size = 1048576
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
ssl.truststore.location = /Users/jiangph/tools/liberty/usr/shared/resources/keystore.jks
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
send.buffer.bytes = 131072
linger.ms = 0
```
使用上述设置,使用 Message Hub REST API 创建主题没有问题。尝试发布消息时会出现问题。
任何想法都受到高度赞赏。