问题标签 [embedded-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - Spark - 没有停止使用 Kafka 主题的 Spark Stream
我正在尝试为使用来自 kafka 的数据的火花流示例编写测试。我为此使用EmbeddedKafka 。
当我运行它时,它会继续运行并且不会停止或向控制台打印任何内容。我不知道为什么会这样。我还尝试通过在调用流EmbeddedKafka.stop()
之前调用来终止 kafka。stop
scala - 使用 scalatest-embedded-kafka 集成测试 Flink 和 Kafka
我想用 Flink和 Kafka运行集成测试。过程是从 Kafka 读取,用 Flink 进行一些操作,然后将数据流放入 kafka。
我想从头到尾测试这个过程。现在我使用scalatest-embedded-kafka。
我在这里举了一个例子,我尽量简单:
我有一个错误,所以我添加了这一行implicit val typeInfo = TypeInformation.of(classOf[String])
,但我真的不明白为什么我必须这样做。
现在这段代码不起作用,它运行时不会中断,但不会停止,也不会给出任何结果。
如果有人有任何想法?测试这种管道的更好主意。
谢谢 !
编辑:添加env.execute()
和更改错误。
scala - 带有 Kafka Streams 的 Scala Embedded Kafka 中的生产者错误
我有一个测试,它在气质上留下了一个开放的生产者线程,并带有连续的错误日志记录。
测试有效,但有时会像上面那样失败。
一个特殊性是流应用程序在其消费的同一主题上产生删除事件。
该套件中有两个类似的测试。我在 sbt 下执行测试套件,如下所示:
五分之四的执行会留下一个悬空线程无限期地发布这些错误。他们以 3 人一组出现,但我也不知道为什么。
我尝试在调用 close() 后设置延迟,但似乎没有帮助。如何避免悬空的生产者线程?
apache-kafka - Spring Embedded Kafka + Mock Schema Registry:State Store ChangeLog Schema 未注册
我正在使用带有 MockSchemaRegistryClient的Spring Embedded Kafka Broker为我们的 kafka 系统构建集成测试。我正在为使用 Streams API (KStreamBuilder) 构建的一种 Stream 拓扑构建测试。这个特定的拓扑有一个 KStream (stream1) 馈送到 KTable (table1)。
当我将来自 table1 的 KTableProcessor 的输入输入到 stream1 时,我遇到了一个错误:
提前致谢!
scala - 即使设置了配置值,EmbeddedKafka 也会抛出 RecordTooLargeException
我正在尝试将 kafka 的默认消息大小从 1MB 增加到 10MB。我正在使用 EmbeddedKafka 和 ScalaTest 测试我的新配置,但它不起作用。
使用这个答案,我相应地增加了配置值:
经纪人:
message.max.bytes
replica.fetch.max.bytes
消费者:
max.partition.fetch.bytes
制片人:
max.request.size
我的代码:
当我使用仅 999999 字节(小于 1MB)的消息运行此代码时,我收到此错误:
这是 EmbeddedKafka 中的错误吗?还是我错误地配置了我的应用程序?
spring-boot - 嵌入式 kafka 无法启动 - 错误
我很难解决这个问题。这是我的 Junit,我正在使用弹簧嵌入式 kafka。当我运行我的测试用例时,我遇到了奇怪的问题/异常。
和
这是我的例外:
我正在使用 spring boot 1.5.4.RELEASE & kafka 0.11.0.0
请帮我解决这个问题,我花了很多时间但无法解决这个问题。提前致谢。
apache-kafka - 嵌入式 Kafka 测试(由 SBT 运行)间歇性失败并出现 ZooKeeperServer 错误
我正在使用spring KafkaEmbedded test util编写一组测试。每个测试都单独建立一个嵌入式 kafka 实例,产生事件并断言产生的下游事件。
在 IDE(例如 IntelliJ)中运行时,测试始终通过,但是在使用 SBT 运行时,测试会间歇性地失败(大约 50% 的时间,没有规律性)。如果测试失败,我会看到以下错误:
此外,我看到许多 INFO 日志报告缺少 ZooKeeper 节点,例如:
这些日志不会出现在成功的测试中。当我说“我看到很多 INFO 日志”时,我的意思是很多,大约 40 个这样的日志,其中一些节点路径嵌套在先前报告的节点路径中。
研究表明错误日志是无辜的,我想信息日志也是如此,但它们在测试失败时是独立的。
7/17 更新:
KStreams/Producers/Consumers 配置:
嵌入式 Kafka 启动:
测试前:
测试后:
7/17 更新:
更多上下文,这是一个 Spring 项目,每个测试都这样注释:
apache-kafka - 嵌入式 Kafka:KTable+KTable leftJoin 产生重复记录
我来寻求奥术知识。
首先,我有两对主题,每对中的一个主题馈入另一个主题。后面的主题正在形成两个KTable,用于KTable+KTable leftJoin。问题是,当我为任一 KTable 生成一条记录时,leftJoin 生成了三条记录。我希望表单中有两条记录(A-null,AB),但我得到的是(A-null,AB,A-null)。我已经确认 KTables 每个都收到一条记录。
我摆弄了 CACHE_MAX_BYTES_BUFFERING_CONFIG 来启用/禁用状态存储缓存。上述行为是将 CACHE_MAX_BYTES_BUFFERING_CONFIG 设置为 0。当我使用 CACHE_MAX_BYTES_BUFFERING_CONFIG 的默认值时,我看到连接输出以下记录:(AB, AB, A-null)
以下是流、消费者、生产者的配置:
遇到此行为的处理器 API 代码(已清理)如下,请注意配对的主题 [A1, A2] 和 [B1, B2]:
预先感谢您的任何帮助,哦,仁慈的人。
更新: 我正在使用一个 kafka 服务器和每个主题 1 个分区运行,并且遇到了这种行为。当我将服务器数量增加到 2 并将分区数量增加到 3 时,我的输出变为(A-null)。
在我看来,我需要花更多时间阅读 kafka 手册......
apache-kafka - Kafka Streams:混合搭配 PAPI 和 DSL KTable 不共同分区
我有一个混合匹配的 Scala 拓扑结构,其中主要工作人员是 PAPI 处理器,其他部分通过 DSL 连接。
整个主题的数据(包括 original eventsTopic
)通过 a 进行分区,我们称它为DoubleKey
具有两个字段。visitorsTopic
访问者通过 Sink发送到:
在 DSL 中,我在这个主题上创建了一个 KV KTable:
我后来连接到EventProcessor
:
一切都是共同分区的(通过 DoubleKey)。visitorSinkPartitioner
执行典型的模运算:
在 PAPI 处理器 EventsProcessor 中,我查询此表以查看是否已经存在访问者。
但是,在我的测试中(使用EmbeddedKafka,但这应该没有什么区别),如果我用一个分区运行它们,一切都很好(EventsProcessor 检查 KTable 上的两个事件DoubleKey
和第二个事件 - 有一些延迟 -它可以看到Visitor
商店中的存在),但如果我用更高的数字运行它,EventProcessor 永远不会看到商店中的值。
但是,如果我通过 API(迭代)检查商店store.all()
,记录就在那里。所以我知道它必须去不同的分区。
由于 KTable 应该处理其分区上的数据,并且所有内容都发送到同一个分区(使用显式分区程序调用相同的代码),因此 KTable 应该在同一个分区上获取该数据。
我的假设正确吗?会发生什么?
卡夫卡流 1.0.0,斯卡拉 2.12.4。
PS。当然,它可以在 PAPI 上执行put
s 通过 PAPI 而不是创建存储StreamsBuilder.table()
,因为这肯定会使用代码运行的相同分区,但这是不可能的。
apache-kafka - 具有不同@StreamListener 的两个实例之间的嵌入式 Kafka 迁移状态存储
我有一个 SpringBoot 应用程序,其中有两个通过 Spring Cloud 映射的流处理器。每个处理器都有自己的 @StreamListener 用于不同的主题。一个处理器将聚合数据写入 quarable 状态存储。通过我的@Service(服务从状态存储获取聚合数据)获取数据时,我在单元测试中遇到了这个问题。由于某些原因不时捕获异常:
org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, recently-played-store, may have migrated to another instance.
at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60)
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1043)
at org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry.getQueryableStoreType(QueryableStoreRegistry.java:47)
当我从另一个处理器中删除 StreamListener 时,一切正常且稳定。
如何将状态存储与正确的处理器绑定到确切的实例?