我在 Kubernetes 上运行 3-zookeeper-cluster 和 3-kafka-cluster。
卡夫卡似乎正在运行。
但是,如果我向某个主题生成一些消息并检查该主题,则根本没有消息。
这是我的经纪人说的。那就是说一些无效的接收或其他东西,有趣的是试图使主题运作良好但产生。
我也可以观看我早期在 Topics-ui 上制作的主题或模式,Topics-ui 是代理的 GUI 工具。
Schema-registry、Connect、Rest 的日志很好,因此代理似乎运行良好。
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1195725856 larger than 104857600)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:104)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at kafka.network.Processor.poll(SocketServer.scala:863)
at kafka.network.Processor.run(SocketServer.scala:762)
at java.lang.Thread.run(Thread.java:748)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:104)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at kafka.network.Processor.poll(SocketServer.scala:863)
at kafka.network.Processor.run(SocketServer.scala:762)
at java.lang.Thread.run(Thread.java:748)
这是我使用terraform Statefulset的代理配置
port {
container_port = 9092
}
env {
name = "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR"
value = "3"
}
env {
name = "KAFKA_DEFAULT_REPLICATION_FACTOR"
value = "3"
}
env {
name = "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"
value = "PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT"
}
env {
name = "KAFKA_ZOOKEEPER_CONNECT"
value = "lucent-zookeeper-0.zookeeper-service.default:2181,lucent-zookeeper-1.zookeeper-service.default:2181,lucent-zookeeper-2.zookeeper-service.default:2181"
}
env {
name = "POD_IP"
value_from {
field_ref {
field_path = "status.podIP"
}
}
}
env {
name = "HOST_IP"
value_from {
field_ref {
field_path = "status.hostIP"
}
}
}
env {
name = "POD_NAME"
value_from {
field_ref {
field_path = "metadata.name"
}
}
}
env {
name = "POD_NAMESPACE"
value_from {
field_ref {
field_path = "metadata.namespace"
}
}
}
command = [
"sh",
"-exec",
"export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://$${POD_NAME}.kafka-service.$${POD_NAMESPACE}:9092 && export KAFKA_BROKER_ID=$${HOSTNAME##*-} && exec /etc/confluent/docker/run"
]
服务
resource "kubernetes_service" "kafka-service" {
metadata {
name = "kafka-service"
labels = {
app = "broker"
}
}
spec {
selector = {
app = "broker"
}
port {
port = 9092
}
cluster_ip = "None"
}
尝试生成的代码
kafka-console-producer --broker-list kafka-service:9092 --topic test