问题标签 [apache-samza]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
122 浏览

c# - 如何使用 Samza 和 Kafka 实现消息队列系统?

我必须构建一个接收事件的排队系统,并将它们重定向到多个消费者。我还没有理解如何实现这样一个使用 Samza 和 Kafka 将事件分发给消费者的排队系统。请提供资源

0 投票
1 回答
44 浏览

apache - 我们是否需要至少在一次交付情况下自行删除重复项?

Apache Storm 和 Samza 保证至少交付一次。这意味着计算过程中可能存在一些重复。我们是否需要自己移动重复项(包括删除代码中的重复部分)?例如,字数问题。如果单词“男孩”只出现一次,但由于某些故障或延迟而出现了 2 个“男孩”。风暴重播了“男孩”。那么'boy's count 2的结果是什么?或者Storm为我们删除重复,结果是一个?

0 投票
1 回答
266 浏览

scala - Samza 教程 compileScala FAILED

不知道如何解决这个问题,因为我是 Samza 和 Scala 的新手。

我正在关注本教程,目前停留在本节:https ://github.com/apache/samza-hello-samza#2-start-a-grid

这是我得到的错误信息

0 投票
1 回答
39 浏览

scala - samza KafkaSystemFactory.getAdmin 上的异常

我正在运行 Samza 以使用 Scala 中给定 Kafka 主题的消息。为了运行,我创建了一个 samza-read.properties 文件,其中包含:

然而,当我运行我的程序时,我不断收到异常: java.lang.NoClassDefFoundError: kafka/common/ReplicaNotAvailableException at org.apache.samza.system.kafka.KafkaSystemFactory.getAdmin(KafkaSystemFactory.scala:106)

我相信这必须处理,systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory但也许有人以前曾遇到过这个例外。任何帮助是极大的赞赏!

0 投票
1 回答
128 浏览

apache-samza - 强制 RocksDB 支持的 Samza 键/值存储从 kafka 更改日志重新加载?

为了调试生产问题,我使用 ProcessJobFactory 在本地运行 Samza 代码。一切似乎运行良好。

该代码使用由 RocksDB 和 Kafka 支持的 Samza 键/值存储作为更改日志(Kafka 在不同的机器上运行以防万一)。

为了用真实数据填充环境以进行调试,我将实时数据重播到 Kafka 更改日志中,用于 RocksDB 数据库的键/值存储,而 Samza 作业已停止。

启动 Samza 时,它不会RocksDB 数据库与 Kafka 更改日志重新同步。我使用 Keylord(工具)验证了这一点,并直接查看了 RocksDB 数据库的内容。

如何强制 Samza 将 RocksDB 数据库(键/值存储)与更改日志重新同步?是否可以进行配置设置或代码级调用?

相关 - 我假设代码执行 key-value-store.all(); 即使代码中的缓存为空,它也会转到 RocksDB 并从那里提取“所有条目”?

谢谢,

0 投票
1 回答
629 浏览

apache-kafka - 重置为 Kafka 分区中的自定义偏移量

我正在为我正在研究的特定用例研究 Kafka。我有一个正在流动的数据流,我想对其进行处理并将其发布到中间阶段。

在每个阶段(初始和中间), Samza 任务都会进行处理和重新发布。我的要求之一是能够在我想要的任何时候从特定阶段重新触发整个处理管道。

我知道 kafka 为其每个日志(传入数据)维护一个偏移量。但是,Kafka 是否提供任何功能,可以分区偏移量映射到某个自定义标识符(例如时间戳)并使用它从该点重新触发整个管道?

我在多个地方读过,我可以通过重新设置开头并返回 N 次来重放 kafka 提交日志。但是有没有办法让我将这些偏移映射到我自己的标识符(如时间戳),并将其用作一种机制来判断从哪个偏移重放。

最好
的沙比尔

0 投票
1 回答
251 浏览

java - 与 Beam 中的运行器依赖项冲突

我想使用 Beam 测试不同的流处理引擎,但是在包含 Flink和Samza 依赖项时无法运行程序。如果只包括其中一个,它适用于所有其他跑步者。

我的pom.xml包含以下内容:

尝试执行时抛出的错误信息PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();是:

任何人都可以帮助我吗?

0 投票
1 回答
240 浏览

java - org.apache.beam.sdk.util.UserCodeException 使用 Samza Runner 执行 Beam Pipeline

我正在尝试使用 Samza Runner 从此处运行 Wordcount 演示。这是我的 build.gradle

我的 wordcount.java 如下。

我正在使用 Beam 版本 2.22.0。我尝试了以下组合。带有 Beam 2.22 的 Samza 1.4、带有 Beam 2.11 和 Beam 2.22 的 Samza 1.0 以及带有 Beam 2.11.0 的 Samza 0.14.1。但是,在执行时我收到以下错误:

我正在使用 Java 1.8。有没有人知道是什么导致了这个问题?

0 投票
1 回答
38 浏览

apache-kafka - Apache Samza 刷新表立即更新到更改日志

如果我为 Samza 中的 RocksDB 表指定更改日志支持。是否有配置将异步写入时间更新到更改日志?我想把它缩短到更短的时间。我在Config 参考中看不到任何内容。

我想要的场景是在桥接旧 JMS 连接后从流中写入更改日志。这个遗留连接提供了部分更新,我想将部分更新合并到更完整的消息中,在 samza 流应用程序中构建这些消息的缓存,并将它们写到更改日志中。

如果我使用配置的变更日志,stores.store-name.changelog那么它将写入变更日志,最终我对 Samze API 表所做的更改。但对我的需求不够快,所以想配置最大等待时间以传播到变更日志。

或者,似乎withSideInputs每次都使用引导我的表,然后使用sendTo会更快地更新,我也可以保持LocalStore读写缓存,并且始终将更改日志作为黄金来源。

我希望更改日志也能快速写入的原因是因为其他应用程序正在从该更改日志中读取。

0 投票
1 回答
28 浏览

apache-samza - 应用部署在yarn中时samza如何生成container.id?

当应用程序部署在纱线中时,有人可以告诉我 samza 如何生成samza.container.id / SAMZA_CONTAINER_ID吗?我在 samza 代码库中环顾四周,但无法找到生成的逻辑samza.container.id