0

我有一个包含多个子集群的大型 Confluent Kafka 集群,一个用于 Zookeeper,另一个用于具有 Schema Registry 和 KSQL 流的 Kafka 代理,一个用于 Connect。

我的连接集群出现问题,因为我已根据此处的文章将所有工作实例的 rest.advertised.host.name 配置为 FQDN -

以下是我在所有节点上的连接分布式日志文件中不断看到的错误 -

connectDistributed.out

错误 1-

[2021-08-12 14:07:48,932] INFO [Consumer clientId=connector-consumer-XYZ-0, groupId=connect-XYZ] Attempt to
heartbeat failed since group is rebalancing (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1054)

错误 2-

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-coordinator-heartbeat-thread | connect-XYZ" 

以下是连接工作人员属性 -

bootstrap.servers=production-kafka-elb.int.supportabc.platform.co.uk:9092
group.id=connect-cluster-cc
connect.protocol=compatible
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets-cc
offset.storage.replication.factor=5
config.storage.topic=connect-configs-cc
config.storage.replication.factor=5
status.storage.topic=connect-status-cc
status.storage.replication.factor=5
offset.flush.interval.ms=10000
rest.port=8085
rest.advertised.host.name=bblpkaa011.int.supportabc.platform.co.uk
rest.advertised.port=8085
plugin.path=/usr/share/java,/apps/confluent-5.5.1/share/java/
key.converter.schema.registry.url=abc-production-kafka-elb.int.supportabc.platform.co.uk:8081
value.converter.schema.registry.url=abc-production-kafka-elb.int.supportabc.platform.co.uk:8081

我确信每个工人都有 6GB 分配给它 -

查看过程跟踪 -

java -Xmx6G -Xms6G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:MetaspaceSize=96m -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/apps/confluent-5.5.1/bin/../logs -Dlog4j.configuration=file:/apps/confluent-5.5.1/bin/../etc/kafka/connect-log4j.properties -cp /apps/confluent-5.5.1/share/java/confluent-security/connect/*:/apps/confluent-5.5.1/share/java/kafka/*:/apps/confluent-5.5.1/share/java/confluent-common/*:/apps/confluent-5.5.1/share/java/kafka-serde-tools/*:/apps/confluent-5.5.1/share/java/monitoring-interceptors/*:/apps/confluent-5.5.1/bin/../ce-broker-plugins/build/libs/*:/apps/confluent-5.5.1/bin/../ce-broker-plugins/build/dependant-libs/*:/apps/confluent-5.5.1/bin/../ce-auth-providers/build/libs/*:/apps/confluent-5.5.1/bin/../ce-auth-providers/build/dependant-libs/*:/apps/confluent-5.5.1/bin/../ce-rest-server/build/libs/*:/apps/confluent-5.5.1/bin/../ce-rest-server/build/dependant-libs/*:/apps/confluent-5.5.1/bin/../ce-audit/build/libs/*:/apps/confluent-5.5.1/bin/../ce-audit/build/dependant-libs/*:/apps/confluent-5.5.1/bin/../share/java/kafka/*:/apps/confluent-5.5.1/bin/../share/java/confluent-metadata-service/*:/apps/confluent-5.5.1/bin/../share/java/rest-utils/*:/apps/confluent-5.5.1/bin/../share/java/confluent-common/*:/apps/confluent-5.5.1/bin/../share/java/confluent-security/schema-validator/*:/apps/confluent-5.5.1/bin/../support-metrics-client/build/dependant-libs-2.12.10/*:/apps/confluent-5.5.1/bin/../support-metrics-client/build/libs/*:/usr/share/java/support-metrics-client/*:/apps/confluent-5.5.1/bin/../support-metrics-fullcollector/build/dependant-libs-2.12.10/*:/apps/confluent-5.5.1/bin/../support-metrics-fullcollector/build/libs/*:/usr/share/java/support-metrics-fullcollector/* -javaagent:/apps/ad/java-agent-20.9.0.30985-latest/javaagent.jar org.apache.kafka.connect.cli.ConnectDistributed /apps/confluent-5.5.1/etc/kafka/connect-distributed-worker-cc.properties

请帮助如何解决这个问题?

4

2 回答 2

0

究竟发生了什么!

在 Connect 集群上,当集群上的所有节点都进入 Confluent 所称的重平衡的“STOP-THE-WORLD”事件时,就会出现这种情况。

这实质上意味着无论之前在集群上运行了多少连接器工作程序/任务,它们都会停止处理之前的任何事情,并跳入重新平衡模式以争夺领导者。

为什么会这样!

您的 Connect 工作程序属性文件之一设置为此 ->connect.protocol=compatible

或者

连接工作程序属性或工作程序重新启动的其他一些重大更改,而无需先暂停正在运行的任务

解决方案

rest.advertised.host.name=<FULLY QUALIFIED HOST NAME> OR <IP.ADDRESS>
rest.advertised.port=8083

我已经能够通过按照下面提到的顺序执行以下步骤来解决这个问题 -

  1. 停止连接工作人员运行connect.protocol=compatible

  2. 停止了其他 Connect 工作人员

  3. 在所有 worker 属性文件中添加了两个属性 -rest.advertised.host.name= -rest.advertised.port=

  4. 一个个重启 Connect worker,发现下面的属性被拾取

[kafka@abchostnamekk01 logs]$ grep -ri 'info advertised' connectDistributed.out
    [2021-08-12 14:06:50,809] INFO Advertised URI: http://abchostnamekk01.domain.com:8083
于 2021-08-13T11:36:09.627 回答
0

摆脱内存不足错误的正确答案是增加进程配置中的 Xms 和 Xmx 内存分配变量并优雅地重新启动它。

检查现有进程是否具有如下所示的变量

./connect-distributed:  export KAFKA_HEAP_OPTS="-Xms6G -Xmx6G"

检查目标服务器上 free -m 或 top 的输出

KiB Mem : 32304516 total,   288648 free, 17298612 used, 

根据系统中的可用内存更改内存分配限制

./connect-distributed:export KAFKA_HEAP_OPTS="-Xmx28G -Xms24G"

使用下面的标志优雅地停止控制台上的进程。如果 -SIGTERM 不起作用,请使用 -SIGKILL

kill -SIGTERM <PID> 

使用通常的重启命令重启服务

/apps/confluent-5.5.1/bin/connect-distributed -daemon /apps/confluent-5.5.1/etc/kafka/connect-distributed-worker1.properties

重新启动后,一切都应该稳定下来

于 2022-01-12T11:49:25.737 回答