问题标签 [trident]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
clojure - 从 Trident 中的 DRPC Spout 持久化状态
我正在为这个项目试验 Storm 和 Trident,我正在使用 Clojure 和 Marceline 来做这件事。我正在尝试扩展Marceline 页面上给出的 wordcount 示例,以便句子 spout 来自 DRPC 调用,而不是来自本地 spout。我遇到的问题是因为 DRPC 流需要有结果才能返回到客户端,但我希望 DRPC 调用有效地返回 null,并简单地更新持久数据。
代码中有两种选择 - 一种使用固定的批处理 spout 加载没有问题,但是当我尝试使用 DRPC 流加载代码时,我收到此错误:
我相信这个错误来自这样一个事实,即 DRPC 流必须尝试订阅输出才能有一些东西返回给客户端 - 但persistent-aggregate
不提供任何此类输出来订阅。
那么如何设置我的拓扑结构,以便 DRPC 流导致我的持久数据被更新?
小更新:看起来这可能是不可能的:( https://issues.apache.org/jira/browse/STORM-38
java - 如何将来自 Trident/Storm 的值存储在列表中(使用 Java API)
我正在尝试创建一些单元测试来验证我的 Trident 拓扑的某些部分是否在做他们应该做的事情。
我希望能够检索运行拓扑后产生的所有值并将它们放入列表中,以便我可以“看到”它们并检查它们的条件。
我正在运行拓扑TrackedTopology
(从另一个 SO 问题中获取代码,感谢@brianghig的提问和@Thomas Kielbus的回复)
这就是我“启动”拓扑以及将样本值输入其中的方式:
当我这样做时,我可以在日志消息中看到拓扑运行正确,并且值计算正确,但我想将结果“钓鱼”到一个List
(或任何结构,此时)所以我实际上可以Asserts
在我的测试中放一些。
我一直在尝试 [as**ton] 不同的方法,但它们都不起作用。
最新的想法是在聚合之后添加一个螺栓,以便它将我的值“持久化”到一个列表中:
下面你会看到这个类试图遍历由 发出的所有元组,aggregate
并将它们放在我之前初始化的列表中:
所以现在代码看起来像:
但什么都没有......日志显示Done. Checkpoint results=[]
(空列表)
有没有办法得到它?我想它一定是可行的,但我一直无法找到办法......
任何提示或链接到页面或任何类似的东西都将不胜感激。先感谢您。
state - Storm Trident 状态实现 Txid 比较
我正在构建一个风暴三叉戟拓扑,并且在某些时候我正在使用 partitionPersist 阶段将数据存储到 solr 中。我已经实现了 State 类并编写了一些逻辑来使用 StateUpdater 更新 solr。我是否在任何阶段都必须跟踪事务 id (txid) 以确保我不会将重复项存储到 solr 中,或者 trident 会为我处理这个问题吗?
join - 风暴三叉戟窗口连接
我已经使用 partitionPersist 阶段和状态查询在我的风暴三叉戟拓扑中实现了窗口连接。我想在查询时从我的内存状态中删除项目。这可能吗?
state - 更新时风暴三叉戟状态查询
我有一个风暴三叉戟拓扑,其中我有一个简单的哈希图状态和一个状态查询。在更新状态时是否会调用状态查询?
java - 每条推文中不重复的单词总数
我是 java 和 Trident 的新手,我导入了获取推文的项目,但我想得到一些东西,当我从tuple.getValue(0);
只意味着第一条推文的代码中获得时,这段代码如何获得不止一条推文?!
我的问题是在 hashset 或 hashmap 中获取所有推文以获取每条推文中独特词的总数
此方法用于在推文上执行方程式
这段代码得到了第一条推文,然后得到了它的主体,将它转换为字符串数组,我知道我需要解决什么但写得不好
我的想法:像循环一样
apache-kafka - 从 Kafka 获取消息时缓冲区下溢
我正在为流处理设置风暴,我的 trident spout 正在从 kafka 主题中获取数据,但是在从 kafka 获取消息时我不断收到 BufferUnderFlowException:
java.lang.RuntimeException: java.lang.RuntimeException: java.nio.BufferUnderflowException at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.5.jar:0.9.5]在 backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.5.jar:0.9.5] 引起:java.lang.RuntimeException:暴风雨中的 java.nio.BufferUnderflowException。 kafka.KafkaUtils.fetchMessages(KafkaUtils.java:177) ~[stormjar.jar:na] atstorm.kafka.trident.TridentKafkaEmitter.fetchMessages(TridentKafkaEmitter.java:132) ~[stormjar.jar:na] atstorm.kafka。 trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:113) ~[stormjar.jar:na]
引起:java.nio.BufferUnderflowException: null at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:151) ~[na:1.8.0_05] at java.nio.ByteBuffer.get(ByteBuffer.java:715) ~[ na:1.8.0_05] 在 kafka.api.ApiUtils$.readShortString(ApiUtils.scala:40) ~[stormjar.jar:na]
我用的storm版本是0.9.5,kafka有2.11版本。我无法理解是什么在这里造成了问题。
apache-storm - Apache Storm Trident - 动态创建拓扑
有没有办法在 trident 中动态创建拓扑?任何人都可以提供例子吗?
tuples - 如何使用storm Trident进行元组批处理?
我以前使用过storm,我需要更多的批处理功能,所以我在storm中搜索了批处理。我发现了实时进行微批处理的 Trident。
但不知何故,我无法弄清楚 Trident 如何处理微批处理(流量、批处理大小、批处理间隔)以知道它确实有我需要的东西。
我想做的是收集/保存喷口在一个间隔内发出的元组,并在另一个时间间隔内将它们重新发送到下游组件/螺栓/功能。(例如,spout 每秒发出一个元组,下一个 trident 函数将收集/保存元组并每分钟发出 50 个元组到下一个函数。)
有人可以指导我在这种情况下如何应用三叉戟吗?或者使用风暴功能的任何其他适用方式?