2

我有这个非常简单的 Beam 管道,它从 Kafka 主题读取记录并将它们写入 Pulsar 主题:

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);

p.apply(
  KafkaIO.<Long, String>read()
    .withTopic("<topic>")
    .withBootstrapServers("<url>")
    .withKeyDeserializer(LongDeserializer.class)
    .withValueDeserializer(StringDeserializer.class)
    .updateConsumerProperties(getConsumerProps())
    .withoutMetadata() // PCollection<KV<Long, String>>
)
  .apply(Values.<String>create())
  .apply(ParDo.of(new PulsarSink()));

p.run();

据我了解,这应该恰好创建一个 Kafka 消费者,将其价值推向管道。现在由于某种原因,管道似乎一遍又一遍地重新启动,创建了多个 Kafka 消费者和多个 Pulsar 生产者。

以下是显示正在创建多个 Kafka 消费者的日志的摘录:

2019-06-07 16:08:30,010 INFO  o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [<url>:39701, <url>:39702, <url>:39703]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = 292330999892453
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 524288
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = [hidden]
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = SCRAM-SHA-256
    security.protocol = SASL_SSL
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2019-06-07 16:08:30,097 INFO  o.a.k.c.s.a.AbstractLogin - Successfully logged in.
2019-06-07 16:08:30,204 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0
2019-06-07 16:08:30,205 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f
2019-06-07 16:08:30,684 INFO  org.apache.kafka.clients.Metadata - Cluster ID: AKrCWqWfQKOfb9OSgwFyIQ
2019-06-07 16:08:30,693 INFO  o.a.b.s.i.kafka.KafkaUnboundedSource - Partitions assigned to split 0 (total 1): 292330999892453.events.all.v1.json-0
2019-06-07 16:08:30,693 INFO  o.a.b.s.i.kafka.KafkaUnboundedSource - Partitions assigned to split 1 (total 1): 292330999892453.events.all.v1.json-1
2019-06-07 16:08:30,693 INFO  o.a.b.s.i.kafka.KafkaUnboundedSource - Partitions assigned to split 2 (total 1): 292330999892453.events.all.v1.json-2
2019-06-07 16:08:30,720 INFO  o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [<url>:39701, <url>:39702, <url>:39703]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = 292330999892453
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 524288
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = [hidden]
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = SCRAM-SHA-256
    security.protocol = SASL_SSL
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2019-06-07 16:08:30,720 INFO  o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [<url>:39701, <url>:39702, <url>:39703]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = 292330999892453
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 524288
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = [hidden]
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = SCRAM-SHA-256
    security.protocol = SASL_SSL
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2019-06-07 16:08:30,720 INFO  o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [<url>:39701, <url>:39702, <url>:39703]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = 292330999892453
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 524288
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = [hidden]
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = SCRAM-SHA-256
    security.protocol = SASL_SSL
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2019-06-07 16:08:30,721 INFO  o.a.k.c.s.a.AbstractLogin - Successfully logged in.
2019-06-07 16:08:30,734 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0
2019-06-07 16:08:30,734 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f
2019-06-07 16:08:30,742 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0
2019-06-07 16:08:30,742 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f
2019-06-07 16:08:30,743 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0
2019-06-07 16:08:30,743 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f
2019-06-07 16:08:31,116 INFO  org.apache.kafka.clients.Metadata - Cluster ID: AKrCWqWfQKOfb9OSgwFyIQ
2019-06-07 16:08:31,117 INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-2, groupId=292330999892453] Discovered group coordinator <url>:39703 (id: 2147483644 rack: null)
2019-06-07 16:08:31,145 INFO  org.apache.kafka.clients.Metadata - Cluster ID: AKrCWqWfQKOfb9OSgwFyIQ
2019-06-07 16:08:31,145 INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-3, groupId=292330999892453] Discovered group coordinator <url>:39703 (id: 2147483644 rack: null)
2019-06-07 16:08:31,147 INFO  org.apache.kafka.clients.Metadata - Cluster ID: AKrCWqWfQKOfb9OSgwFyIQ
2019-06-07 16:08:31,148 INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, groupId=292330999892453] Discovered group coordinator <url>:39703 (id: 2147483644 rack: null)
2019-06-07 16:08:31,351 INFO  o.a.b.s.i.kafka.KafkaUnboundedSource - Reader-0: reading from 292330999892453.events.all.v1.json-0 starting at offset 318186186
2019-06-07 16:08:31,352 INFO  o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [<url>:39701, <url>:39702, <url>:39703]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = Reader-0_offset_consumer_1189437256_292330999892453
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 524288
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = [hidden]
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = SCRAM-SHA-256
    security.protocol = SASL_SSL
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2019-06-07 16:08:31,359 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0
2019-06-07 16:08:31,359 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f
2019-06-07 16:08:31,389 INFO  o.a.b.s.i.kafka.KafkaUnboundedSource - Reader-1: reading from 292330999892453.events.all.v1.json-1 starting at offset 318738731
2019-06-07 16:08:31,389 INFO  o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [<url>:39701, <url>:39702, <url>:39703]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = Reader-1_offset_consumer_1231768376_292330999892453
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 524288
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = [hidden]
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = SCRAM-SHA-256
    security.protocol = SASL_SSL
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2019-06-07 16:08:31,394 INFO  o.a.b.s.i.kafka.KafkaUnboundedSource - Reader-2: reading from 292330999892453.events.all.v1.json-2 starting at offset 318129714
2019-06-07 16:08:31,394 INFO  o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [<url>:39701, <url>:39702, <url>:39703]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = Reader-2_offset_consumer_64443017_292330999892453
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 524288
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = [hidden]
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = SCRAM-SHA-256
    security.protocol = SASL_SSL
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2019-06-07 16:08:31,395 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0
2019-06-07 16:08:31,395 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f
2019-06-07 16:08:31,397 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0
2019-06-07 16:08:31,398 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f
2019-06-07 16:08:31,613 INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-3, groupId=292330999892453] Fetch offset 318129714 is out of range for partition 292330999892453.events.all.v1.json-2, resetting offset
2019-06-07 16:08:31,613 INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-2, groupId=292330999892453] Fetch offset 318186186 is out of range for partition 292330999892453.events.all.v1.json-0, resetting offset
2019-06-07 16:08:31,641 INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-3, groupId=292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-2 to offset 320367573.
2019-06-07 16:08:31,641 INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-2, groupId=292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-0 to offset 321301099.
2019-06-07 16:08:31,648 INFO  org.apache.kafka.clients.Metadata - Cluster ID: AKrCWqWfQKOfb9OSgwFyIQ
2019-06-07 16:08:31,649 INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-4, groupId=292330999892453] Fetch offset 318738731 is out of range for partition 292330999892453.events.all.v1.json-1, resetting offset
2019-06-07 16:08:31,667 INFO  org.apache.kafka.clients.Metadata - Cluster ID: AKrCWqWfQKOfb9OSgwFyIQ
2019-06-07 16:08:31,672 INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-4, groupId=292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-1 to offset 320867070.
2019-06-07 16:08:31,714 INFO  org.apache.kafka.clients.Metadata - Cluster ID: AKrCWqWfQKOfb9OSgwFyIQ
2019-06-07 16:08:31,860 INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-5, groupId=Reader-0_offset_consumer_1189437256_292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-0 to offset 336281187.
2019-06-07 16:08:31,885 INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-5, groupId=Reader-0_offset_consumer_1189437256_292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-0 to offset 336281187.
2019-06-07 16:08:31,905 INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-6, groupId=Reader-1_offset_consumer_1231768376_292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-1 to offset 336474159.
2019-06-07 16:08:31,938 INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-6, groupId=Reader-1_offset_consumer_1231768376_292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-1 to offset 336474159.
2019-06-07 16:08:31,957 INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-7, groupId=Reader-2_offset_consumer_64443017_292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-2 to offset 336295646.
2019-06-07 16:08:31,981 INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-7, groupId=Reader-2_offset_consumer_64443017_292330999892453] Resetting offset for partition 292330999892453.events.all.v1.json-2 to offset 336295646.
2019-06-07 16:08:32,142 INFO  o.a.b.s.i.kafka.KafkaUnboundedSource - Reader-0: first record offset 321301099

为什么 Kafka Consumers 会一遍遍重启?这是预期的行为吗?

4

1 回答 1

2

DirectRunner 的主要目的是对 Pipelines 进行本地测试。因此,它表现出的行为可能在性能方面并不理想。一个例子可能是它有目的地序列化和反序列化操作符之间的数据,即使它不是必需的。原因是为了验证应用程序代码没有修改输入对象,这在 Beam 中是被禁止的。创建许多消费者的原因是另一个例子 - DirectRunner 经常执行检查点(包括许多可能不必要的步骤,如重新创建消费者) - 请参阅此处

因此,DirectRunner 真的应该只在测试和/或中等条件下使用,在这些条件下性能不是问题。当性能是一个问题时,应该使用不同的运行器——要么是一些分布式的,要么是这种运行器的本地版本——例如本地 FlinkRunner。

于 2021-05-24T07:32:59.523 回答