问题标签 [embedded-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
118 浏览

scala - Spark - 没有停止使用 Kafka 主题的 Spark Stream

我正在尝试为使用来自 kafka 的数据的火花流示例编写测试。我为此使用EmbeddedKafka 。

当我运行它时,它会继续运行并且不会停止或向控制台打印任何内容。我不知道为什么会这样。我还尝试通过在调用流EmbeddedKafka.stop()之前调用来终止 kafka。stop

0 投票
1 回答
1507 浏览

scala - 使用 scalatest-embedded-kafka 集成测试 Flink 和 Kafka

我想用 Flink和 Kafka运行集成测试。过程是从 Kafka 读取,用 Flink 进行一些操作,然后将数据流放入 kafka。

我想从头到尾测试这个过程。现在我使用scalatest-embedded-kafka

我在这里举了一个例子,我尽量简单:

我有一个错误,所以我添加了这一行implicit val typeInfo = TypeInformation.of(classOf[String]),但我真的不明白为什么我必须这样做。

现在这段代码不起作用,它运行时不会中断,但不会停止,也不会给出任何结果。

如果有人有任何想法?测试这种管道的更好主意。

谢谢 !

编辑:添加env.execute()和更改错误。

0 投票
1 回答
438 浏览

scala - 带有 Kafka Streams 的 Scala Embedded Kafka 中的生产者错误

我有一个测试,它在气质上留下了一个开放的生产者线程,并带有连续的错误日志记录。

测试有效,但有时会像上面那样失败。

一个特殊性是流应用程序在其消费的同一主题上产生删除事件。

该套件中有两个类似的测试。我在 sbt 下执行测试套件,如下所示:

五分之四的执行会留下一个悬空线程无限期地发布这些错误。他们以 3 人一组出现,但我也不知道为什么。

我尝试在调用 close() 后设置延迟,但似乎没有帮助。如何避免悬空的生产者线程?

0 投票
1 回答
3293 浏览

apache-kafka - Spring Embedded Kafka + Mock Schema Registry:State Store ChangeLog Schema 未注册

我正在使用带有 MockSchemaRegistryClient的Spring Embedded Kafka Broker为我们的 kafka 系统构建集成测试。我正在为使用 Streams API (KStreamBuilder) 构建的一种 Stream 拓扑构建测试。这个特定的拓扑有一个 KStream (stream1) 馈送到 KTable (table1)。

当我将来自 table1 的 KTableProcessor 的输入输入到 stream1 时,我遇到了一个错误:

提前致谢!

0 投票
1 回答
459 浏览

scala - 即使设置了配置值,EmbeddedKafka 也会抛出 RecordTooLargeException

我正在尝试将 kafka 的默认消息大小从 1MB 增加到 10MB。我正在使用 EmbeddedKafka 和 ScalaTest 测试我的新配置,但它不起作用。

使用这个答案,我相应地增加了配置值:

经纪人:

  • message.max.bytes
  • replica.fetch.max.bytes

消费者:

  • max.partition.fetch.bytes

制片人:

  • max.request.size

我的代码:

当我使用仅 999999 字节(小于 1MB)的消息运行此代码时,我收到此错误:

这是 EmbeddedKafka 中的错误吗?还是我错误地配置了我的应用程序?

0 投票
2 回答
8726 浏览

spring-boot - 嵌入式 kafka 无法启动 - 错误

我很难解决这个问题。这是我的 Junit,我正在使用弹簧嵌入式 kafka。当我运行我的测试用例时,我遇到了奇怪的问题/异常。

这是我的例外:

我正在使用 spring boot 1.5.4.RELEASE & kafka 0.11.0.0

请帮我解决这个问题,我花了很多时间但无法解决这个问题。提前致谢。

0 投票
0 回答
1398 浏览

apache-kafka - 嵌入式 Kafka 测试(由 SBT 运行)间歇性失败并出现 ZooKeeperServer 错误

我正在使用spring KafkaEmbedded test util编写一组测试。每个测试都单独建立一个嵌入式 kafka 实例,产生事件并断言产生的下游事件。

在 IDE(例如 IntelliJ)中运行时,测试始终通过,但是在使用 SBT 运行时,测试会间歇性地失败(大约 50% 的时间,没有规律性)。如果测试失败,我会看到以下错误:

此外,我看到许多 INFO 日志报告缺少 ZooKeeper 节点,例如:

这些日志不会出现在成功的测试中。当我说“我看到很多 INFO 日志”时,我的意思是很多,大约 40 个这样的日志,其中一些节点路径嵌套在先前报告的节点路径中。

研究表明错误日志是无辜的,我想信息日志也是如此,但它们在测试失败时是独立的。

7/17 更新:

KStreams/Producers/Consumers 配置:

嵌入式 Kafka 启动:

测试前:

测试后:

7/17 更新:

更多上下文,这是一个 Spring 项目,每个测试都这样注释:

0 投票
0 回答
927 浏览

apache-kafka - 嵌入式 Kafka:KTable+KTable leftJoin 产生重复记录

我来寻求奥术知识。

首先,我有两对主题,每对中的一个主题馈入另一个主题。后面的主题正在形成两个KTable,用于KTable+KTable leftJoin。问题是,当我为任一 KTable 生成一条记录时,leftJoin 生成了三条记录。我希望表单中有两条记录(A-null,AB),但我得到的是(A-null,AB,A-null)。我已经确认 KTables 每个都收到一条记录。

我摆弄了 CACHE_MAX_BYTES_BUFFERING_CONFIG 来启用/禁用状态存储缓存。上述行为是将 CACHE_MAX_BYTES_BUFFERING_CONFIG 设置为 0。当我使用 CACHE_MAX_BYTES_BUFFERING_CONFIG 的默认值时,我看到连接输出以下记录:(AB, AB, A-null)

以下是流、消费者、生产者的配置:

遇到此行为的处理器 API 代码(已清理)如下,请注意配对的主题 [A1, A2] 和 [B1, B2]:

预先感谢您的任何帮助,哦,仁慈的人。

更新: 我正在使用一个 kafka 服务器和每个主题 1 个分区运行,并且遇到了这种行为。当我将服务器数量增加到 2 并将分区数量增加到 3 时,我的输出变为(A-null)。

在我看来,我需要花更多时间阅读 kafka 手册......

0 投票
1 回答
160 浏览

apache-kafka - Kafka Streams:混合搭配 PAPI 和 DSL KTable 不共同分区

我有一个混合匹配的 Scala 拓扑结构,其中主要工作人员是 PAPI 处理器,其他部分通过 DSL 连接。

整个主题的数据(包括 original eventsTopic)通过 a 进行分区,我们称它为DoubleKey具有两个字段。visitorsTopic访问者通过 Sink发送到:

在 DSL 中,我在这个主题上创建了一个 KV KTable:

我后来连接到EventProcessor

一切都是共同分区的(通过 DoubleKey)。visitorSinkPartitioner执行典型的模运算:

在 PAPI 处理器 EventsProcessor 中,我查询此表以查看是否已经存在访问者。

但是,在我的测试中(使用EmbeddedKafka,但这应该没有什么区别),如果我用一个分区运行它们,一切都很好(EventsProcessor 检查 KTable 上的两个事件DoubleKey和第二个事件 - 有一些延迟 -它可以看到Visitor商店中的存在),但如果我用更高的数字运行它,EventProcessor 永远不会看到商店中的值。

但是,如果我通过 API(迭代)检查商店store.all(),记录就在那里。所以我知道它必须去不同的分区。

由于 KTable 应该处理其分区上的数据,并且所有内容都发送到同一个分区(使用显式分区程序调用相同的代码),因此 KTable 应该在同一个分区上获取该数据。

我的假设正确吗?会发生什么?

卡夫卡流 1.0.0,斯卡拉 2.12.4。

PS。当然,它可以在 PAPI 上执行puts 通过 PAPI 而不是创建存储StreamsBuilder.table(),因为这肯定会使用代码运行的相同分区,但这是不可能的。

0 投票
1 回答
360 浏览

apache-kafka - 具有不同@StreamListener 的两个实例之间的嵌入式 Kafka 迁移状态存储

我有一个 SpringBoot 应用程序,其中有两个通过 Spring Cloud 映射的流处理器。每个处理器都有自己的 @StreamListener 用于不同的主题。一个处理器将聚合数据写入 quarable 状态存储。通过我的@Service(服务从状态存储获取聚合数据)获取数据时,我在单元测试中遇到了这个问题。由于某些原因不时捕获异常:

org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, recently-played-store, may have migrated to another instance. at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60) at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1043) at org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry.getQueryableStoreType(QueryableStoreRegistry.java:47)

当我从另一个处理器中删除 StreamListener 时,一切正常且稳定。

如何将状态存储与正确的处理器绑定到确切的实例?