我在使用 Confluent 开源平台 4.1.0 版时遇到以下问题:
[2018-05-01 03:43:33,433] ERROR Failed to initialize TopicClient: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. (io.confluent.ksql.util.KafkaTopicClient:257)
Exception in thread "main" io.confluent.ksql.util.KsqlException: Could not fetch broker information. KSQL cannot initialize AdminClient.
at io.confluent.ksql.util.KafkaTopicClientImpl.init(KafkaTopicClientImpl.java:258)
at io.confluent.ksql.util.KafkaTopicClientImpl.<init>(KafkaTopicClientImpl.java:62)
at io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:237)
at io.confluent.ksql.rest.server.KsqlServerMain.createExecutable(KsqlServerMain.java:58)
at io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:39)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
更改侦听器端口没有帮助。我们如何解决这个问题?
EDIT1:我正在使用 kafka 代理和 ksql-server
confluent start
最初,“confluent status”显示 ksql-server 是 UP,但在上述超时后服务器宕机。
EDIT2:是的,我的 kafka 代理正在运行,这是我的 kafka server.properties:
broker.id=100
listeners=PLAINTEXT://localhost:19090
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs-100
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
confluent.support.customer.id=anonymous
group.initial.rebalance.delay.ms=0
和 ksql-server.properties:
bootstrap.servers=localhost:19090
listeners=http://localhost:18088
ksql.server.ui.enabled=true
EDIT3:我怀疑这与不正确的引导服务器 url 有关,但我还没有找到。
EDIT4:KSQL 服务器日志,根据要求。
[2018-05-17 03:41:33,244] INFO KsqlRestConfig values:
metric.reporters = []
ssl.client.auth = false
ksql.server.install.dir = /home/<user name>/confluent/confluent-4.1.0
response.mediatype.default = application/json
authentication.realm =
ssl.keystore.type = JKS
ssl.trustmanager.algorithm =
authentication.method = NONE
metrics.jmx.prefix = rest-utils
request.logger.name = io.confluent.rest-utils.requests
ssl.key.password = [hidden]
ssl.truststore.password = [hidden]
authentication.roles = [*]
metrics.num.samples = 2
ssl.endpoint.identification.algorithm =
compression.enable = false
query.stream.disconnect.check = 1000
ssl.protocol = TLS
debug = false
listeners = [http://localhost:18088]
ssl.provider =
ssl.enabled.protocols = []
shutdown.graceful.ms = 1000
ssl.keystore.location =
response.mediatype.preferred = [application/json]
ssl.cipher.suites = []
authentication.skip.paths = []
ssl.truststore.type = JKS
access.control.allow.methods =
access.control.allow.origin =
ssl.truststore.location =
ksql.server.command.response.timeout.ms = 5000
ssl.keystore.password = [hidden]
ssl.keymanager.algorithm =
port = 8080
metrics.sample.window.ms = 30000
metrics.tag.map = {}
ksql.server.ui.enabled = true
(io.confluent.ksql.rest.server.KsqlRestConfig:179)
[2018-05-17 03:41:33,302] INFO KsqlConfig values:
ksql.persistent.prefix = query_
ksql.schema.registry.url = http://localhost:8081
ksql.service.id = default_
ksql.sink.partitions = 4
ksql.sink.replicas = 1
ksql.sink.window.change.log.additional.retention = 1000000
ksql.statestore.suffix = _ksql_statestore
ksql.transient.prefix = transient_
(io.confluent.ksql.util.KsqlConfig:279)
[2018-05-17 03:43:33,433] ERROR Failed to initialize TopicClient: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. (io.confluent.ksql.util.KafkaTopicClient:257)
Exception in thread "main" io.confluent.ksql.util.KsqlException: Could not fetch broker information. KSQL cannot initialize AdminClient.
at io.confluent.ksql.util.KafkaTopicClientImpl.init(KafkaTopicClientImpl.java:258)
at io.confluent.ksql.util.KafkaTopicClientImpl.<init>(KafkaTopicClientImpl.java:62)
at io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:237)
at io.confluent.ksql.rest.server.KsqlServerMain.createExecutable(KsqlServerMain.java:58)
at io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:39)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:258)
at io.confluent.ksql.util.KafkaTopicClientImpl.init(KafkaTopicClientImpl.java:230)
... 4 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.