问题标签 [trident]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
113 浏览

clojure - 从 Trident 中的 DRPC Spout 持久化状态

我正在为这个项目试验 Storm 和 Trident,我正在使用 Clojure 和 Marceline 来做这件事。我正在尝试扩展Marceline 页面上给出的 wordcount 示例,以便句子 spout 来自 DRPC 调用,而不是来自本地 spout。我遇到的问题是因为 DRPC 流需要有结果才能返回到客户端,但我希望 DRPC 调用有效地返回 null,并简单地更新持久数据。

代码中有两种选择 - 一种使用固定的批处理 spout 加载没有问题,但是当我尝试使用 DRPC 流加载代码时,我收到此错误:

我相信这个错误来自这样一个事实,即 DRPC 流必须尝试订阅输出才能有一些东西返回给客户端 - 但persistent-aggregate不提供任何此类输出来订阅。

那么如何设置我的拓扑结构,以便 DRPC 流导致我的持久数据被更新?

小更新:看起来这可能是不可能的:( https://issues.apache.org/jira/browse/STORM-38

0 投票
1 回答
271 浏览

java - 如何将来自 Trident/Storm 的值存储在列表中(使用 Java API)

我正在尝试创建一些单元测试来验证我的 Trident 拓扑的某些部分是否在做他们应该做的事情。

我希望能够检索运行拓扑后产生的所有值并将它们放入列表中,以便我可以“看到”它们并检查它们的条件。

我正在运行拓扑TrackedTopology(从另一个 SO 问题中获取代码,感谢@brianghig的提问和@Thomas Kielbus的回复)

这就是我“启动”拓扑以及将样本值输入其中的方式:

当我这样做时,我可以在日志消息中看到拓扑运行正确,并且值计算正确,但我想将结果“钓鱼”到一个List(或任何结构,此时)所以我实际上可以Asserts在我的测试中放一些。

我一直在尝试 [as**ton] 不同的方法,但它们都不起作用。

最新的想法是在聚合之后添加一个螺栓,以便它将我的值“持久化”到一个列表中:

下面你会看到这个类试图遍历由 发出的所有元组,aggregate并将它们放在我之前初始化的列表中:

所以现在代码看起来像:

但什么都没有......日志显示Done. Checkpoint results=[](空列表)

有没有办法得到它?我想它一定是可行的,但我一直无法找到办法......

任何提示或链接到页面或任何类似的东西都将不胜感激。先感谢您。

0 投票
1 回答
70 浏览

state - Storm Trident 状态实现 Txid 比较

我正在构建一个风暴三叉戟拓扑,并且在某些时候我正在使用 partitionPersist 阶段将数据存储到 solr 中。我已经实现了 State 类并编写了一些逻辑来使用 StateUpdater 更新 solr。我是否在任何阶段都必须跟踪事务 id (txid) 以确保我不会将重复项存储到 solr 中,或者 trident 会为我处理这个问题吗?

0 投票
1 回答
78 浏览

join - 风暴三叉戟窗口连接

我已经使用 partitionPersist 阶段和状态查询在我的风暴三叉戟拓扑中实现了窗口连接。我想在查询时从我的内存状态中删除项目。这可能吗?

0 投票
1 回答
147 浏览

state - 更新时风暴三叉戟状态查询

我有一个风暴三叉戟拓扑,其中我有一个简单的哈希图状态和一个状态查询。在更新状态时是否会调用状态查询?

0 投票
2 回答
149 浏览

java - 每条推文中不重复的单词总数

我是 java 和 Trident 的新手,我导入了获取推文的项目,但我想得到一些东西,当我从tuple.getValue(0); 只意味着第一条推文的代码中获得时,这段代码如何获得不止一条推文?!

我的问题是在 hashset 或 hashmap 中获取所有推文以获取每条推文中独特词的总数

此方法用于在推文上执行方程式

这段代码得到了第一条推文,然后得到了它的主体,将它转换为字符串数组,我知道我需要解决什么但写得不好

我的想法:像循环一样

0 投票
1 回答
1557 浏览

apache-kafka - 从 Kafka 获取消息时缓冲区下溢

我正在为流处理设置风暴,我的 trident spout 正在从 kafka 主题中获取数据,但是在从 kafka 获取消息时我不断收到 BufferUnderFlowException:

java.lang.RuntimeException: java.lang.RuntimeException: java.nio.BufferUnderflowException at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.5.jar:0.9.5]在 backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.5.jar:0.9.5] 引起:java.lang.RuntimeException:暴风雨中的 java.nio.BufferUnderflowException。 kafka.KafkaUtils.fetchMessages(KafkaUtils.java:177) ~[stormjar.jar:na] atstorm.kafka.trident.TridentKafkaEmitter.fetchMessages(TridentKafkaEmitter.java:132) ~[stormjar.jar:na] atstorm.kafka。 trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:113) ~[stormjar.jar:na]

引起:java.nio.BufferUnderflowException: null at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:151) ~[na:1.8.0_05] at java.nio.ByteBuffer.get(ByteBuffer.java:715) ~[ na:1.8.0_05] 在 kafka.api.ApiUtils$.readShortString(ApiUtils.scala:40) ~[stormjar.jar:na]

我用的storm版本是0.9.5,kafka有2.11版本。我无法理解是什么在这里造成了问题。

0 投票
1 回答
289 浏览

apache-storm - Storm UI 显示不同数量的 Executors 和 Tasks

我正在使用带有三叉戟拓扑的风暴,但我无法理解并行度是如何获得的,它根据我的计算和我在风暴 UI 上看到的内容而有所不同,

这是分配工人数量的代码:

这是流处理代码:

我在上面的代码中使用的并行属性在这里:

现在根据我的计算,执行者的数量应该是 3*6 + 6 = 24

但是在 Storm UI 中它显示 23,如何?

在此处输入图像描述

已编辑

添加新的屏幕截图,其中包含有关各个组件的信息

在此处输入图像描述

这里我可以看到 Executors 和任务的数量是 50,但是我没有为此设置任何配置,storm 本身是否提供了这个?

其次,发出的元组数量巨大,我没有产生这么多数据,这是100多倍的元组,为什么会在UI中显示这么多的元组?

0 投票
1 回答
245 浏览

apache-storm - Apache Storm Trident - 动态创建拓扑

有没有办法在 trident 中动态创建拓扑?任何人都可以提供例子吗?

0 投票
1 回答
929 浏览

tuples - 如何使用storm Trident进行元组批处理?

我以前使用过storm,我需要更多的批处理功能,所以我在storm中搜索了批处理。我发现了实时进行微批处理的 Trident。

但不知何故,我无法弄清楚 Trident 如何处理微批处理(流量、批处理大小、批处理间隔)以知道它确实有我需要的东西。

我想做的是收集/保存喷口在一个间隔内发出的元组,并在另一个时间间隔内将它们重新发送到下游组件/螺栓/功能。(例如,spout 每秒发出一个元组,下一个 trident 函数将收集/保存元组并每分钟发出 50 个元组到下一个函数。)

有人可以指导我在这种情况下如何应用三叉戟吗?或者使用风暴功能的任何其他适用方式?