1

我正在使用以下配置:

  • spring-integration-kafka 2.1.0.RELEASE
  • 卡夫卡客户端 0.10.0.1
  • 卡夫卡 0.10.xx
  • spring-xd-1.3.1.RELEASE

我为 SpringXD 创建了我的自定义 Kafka 源模块。我设置了我的消费者逻辑和我的message-driven-channel-adapter(我与 a 一起使用control-bus来停止我的通道适配器)。到目前为止,一切都很好。此外,我还使用 kafka 属性max.poll.record=10来获取每次轮询的 10 条记录。

我想确保在成功获取所有记录(在本例中为 10 条记录)后立即停止我的频道。

所以举个例子:我想避免在不是所有的记录都被成功获取和处理的时候停止读取(即当记录没有被发送到输出通道时)。

有没有办法说出来?

这是我的 xml 配置,以防万一:

xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
    http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
                               http://www.springframework.org/schema/context
                       http://www.springframework.org/schema/context/spring-context.xsd">


<int:channel id="input_to_control_bus" />
<int:channel id="output" />

<context:component-scan base-package="com.kafka.source.logic" />


<int:control-bus id="my_control_bus" input-channel="input_to_control_bus" />

<int-kafka:message-driven-channel-adapter
    id="kafkaInboundChannelAdapterTesting" listener-container="container1"
    auto-startup="false" phase="100" send-timeout="5000" channel="output"
    mode="record" message-converter="messageConverter" />

<bean id="messageConverter"
    class="org.springframework.kafka.support.converter.MessagingMessageConverter" />

<!--Consumer -->
<bean id="container1"
    class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                    <entry key="enable.auto.commit" value="false" />
                    <entry key="auto.commit.interval.ms" value="100" />
                    <entry key="session.timeout.ms" value="15000" />
                    <entry key="max.poll.records" value="3" />
                    <entry key="group.id" value="bridge-stream-testing" />
                    <entry key="key.deserializer"
                        value="org.apache.kafka.common.serialization.IntegerDeserializer" />
                    <entry key="value.deserializer"
                        value="org.apache.kafka.common.serialization.StringDeserializer" />
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>

    <constructor-arg>
        <bean class="org.springframework.kafka.listener.config.ContainerProperties">
            <constructor-arg name="topics" value="testing-topic" />
        </bean>
    </constructor-arg>
</bean>

[更新 N°1] 我为什么要这样做?这些是详细信息:

  • 我想每Y分钟从 Kafka 主题中读取最多X条消息。
  • max.poll.records用来确保每次投票最多获取X条消息。
  • 我想处理的一种情况是:如果在一次特定的消息轮询中,我轮询的消息少于X会发生什么。这意味着我应该停止通道而不等待X消息,否则我将不得不等到未来的消息轮询才能到达那些X消息。

这些是关于这个场景的一些细节。还有更多场景,但我不想使用相同的 SO 问题来混合它。

[更新 N°2]

阿尔乔姆回答后的一些想法。

  • 如果我没有定义 amax.poll.records并且只是等到达到Y分钟并计算了X条消息,然后stop是通道,会发生什么?
  • 是不是有些消息会因为无法阅读而丢失,或者那些无法阅读的消息在我start重新频道时会被阅读?

我想避免将消息保存在内存中,这就是我使用message-driven-channel-adapter+的原因max.poll.records

4

1 回答 1

1

我可以建议的是像AtomicIntegerbean,它在每个处理的记录上都会增加,当你达到阈值时,你会stop()为你的kafkaInboundChannelAdapterTesting.

于 2017-04-12T13:03:22.967 回答