问题标签 [trident]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1127 浏览

java - Apache Storm 一次性处理

我们目前在集群拓扑模式下使用 Apache Storm 0.9.5 来处理 Amazon Kinesis 记录 (spout) 并将它们存储到 Redshift 数据仓库 (bolt) 中。我们的 Storm 集群部署在 AWS 中,由 1 个 nimbus + UI 节点、1 个 zookeeper 节点和 3 个 supervisor + logviewer 节点组成。我们的拓扑配置支持处理多个 Kinesis 流,并且它包含的每个流:

  • 一个 Kinesis 流喷口用于侦听传入记录
  • 一个 Redshift 螺栓,用于将记录插入数据仓库

拓扑:

该系统的一个问题是它无法保证只处理一次输入消息,从而导致将具有相同业务密钥的多条记录插入到目标数据库中。为了了解问题的严重程度,我们进行了一项受控测试,发现大约三分之一的输入记录被多次提交处理。

根据这个线程(目前尚未得到答复),我们也考虑过使用 Trident 来保证一次性处理,但也得出结论,将幂等性内置到系统中更为重要(以及至少-once 语义)而不是像其他文章所建议的那样增加复杂性、降低性能和生成状态。

我们现在正在寻求以支持集群的方式在现有拓扑中实现幂等性的最佳方式的建议。到目前为止,我们倾向于引入一个 RedisBolt,它可以通过元组消息 id 键值。是否存在使用 Apache Storm 实现此目的的现有模式?

0 投票
0 回答
46 浏览

java - Trident 在 Kafka 流上进行不正确的聚合

我是风暴三叉戟的新手。这是我的问题:

我正在使用来自 Kafka 队列的消息并使用 的实现CombinerAggregator来进行聚合,以将数据保存在 Cassandra 中。

当我在本地运行这个东西时,我看到数字被正确填充。但是在生产环境中,我看到大多数时候这些聚合都是 0,这是错误的,我们也应该为这些情况设置值。在某些情况下,会出现正确的非零数字。

我已将numOfWorkers拓扑设置为 4 和parallelismHint2。您知道会出什么问题吗?或者建议一种方法来调试这个问题。

0 投票
0 回答
150 浏览

hadoop - Storm Trident HDFS Bolt 抛出错误

当我尝试将消息从 Trident JMS spout 发送到 Trident hdfs 螺栓时,Storm Trident 拓扑会引发如下错误,同时尝试在 HDFS 中写入消息。

有没有人遇到过这种错误?如果您遇到这种错误,您认为这是什么?

我们的 Storm 版本是 0.9.1.2.1.3.0-563

Storm-Hdfs 为 0.9.4

0 投票
0 回答
164 浏览

apache-storm - 在 Trident 中使用不透明状态时如何保留批次 ID?

使用本地集群重新运行 Trident 拓扑时出现此错误:

以前的事务 ID 和一个计数器值一起存储在 Cassandra 中,但似乎事务 ID 在开始时总是设置为 1,因此简单的验证步骤(当前事务 ID 大于以前的事务 ID)总是失败。

如何让 Trident 持久化当前的批次 ID?

谢谢!

0 投票
1 回答
133 浏览

hadoop - Storm Trident 拓扑在将元组放入 HDFS 时丢失了元组

我正在运行一个风暴三叉戟拓扑,在两个不同的流中有两个不同的喷口。我的 spout 是 JMS spout 并使用 HDFS State 来持久化元组。

如果我只运行一个 spout,它工作正常,我将所有记录发布到 HDFS 中的 JMS 队列。

在运行具有连接到两个不同队列的两个 spout 的拓扑时,与我在 QUEUE 中发布的记录相比,我得到的记录更少。我在这里做错了什么吗。如果我这样做的方式有任何问题,请告诉我。

0 投票
0 回答
241 浏览

java - 如何将倒计时锁存器传递给 Apache Storm/Trident 过滤器而不会引发不可序列化的异常

我正在尝试创建一些测试来验证通过 Apache Storm 拓扑的数据(使用 Trident API)

我创建了这个简单的过滤器来访问回调:

如果我尝试这个,我会得到一个运行时异常,说 CountdownLatch 不可序列化:

因此,Storm 似乎正在序列化拓扑的所有组件,然后以序列化的形式提交它们,可能是为了集群或诸如此类。

有什么方法可以从 Storm 回调到调用测试?也许某种不序列化拓扑的测试模式?从测试的角度来看,很难看到拓扑内部发生了什么,尤其是在拓扑的每个阶段。

更新:

即使做这样的事情也行不通!

我看到在调试器中添加了 tupleList,但在测试空间中,列表保持为零。就像拓扑在自己的 JVM 中运行一样。

0 投票
1 回答
637 浏览

apache-storm - 在 Storm Trident 中发射到多个流

如何从 Storm Trident 中的同一个螺栓向多个流发射?

我有一个螺栓可以进行一些计算,并根据结果将一些值传递给一个流,将一些其他值传递给另一个流。

在 Storm(不是 Trident)中,我们可以通过以下方式实现:

将流拆分为多个流:

然后根据发现发出,例如:

然后通过监听预期的流来完成剩下的工作:

那么如何使用 Storm Trident 实现相同的行为呢?

一种选择是为同一个流调用“每个”并运行相同的螺栓,并且仅根据我想要向该流发出的内容发出,或者另一种选择是发出键和值对并根据键过滤流(如 type1, type2,错误等)并再次创建多个流。但在我看来,它们都不是一个好的设计。实现它的最佳方法是什么?

0 投票
1 回答
786 浏览

apache-storm - Apache Storm Trident .each() function explanation

I want to use Apache Storm's TridentTopology in a project. I am finding it difficult to understand the .each() function from the storm.trident.Stream class. Below is the example code given in their tutorial for reference:

I didn't understand the signature of the method .each(). Below is what I understood. Please correct me if I am wrong and also give some more information for my knowledge.

.each()

  • The first parameter takes the fields which are correlated keys to the emitted values from spout and returned from the getOutputFields() method in the spout. I still don't know why is that parameter used for.
  • The second parameter is the class extending the BaseFunction. It processes the tuple.
  • The third parameter understanding is similar to the first parameter.
0 投票
0 回答
150 浏览

hbase - 在 HBase 中构建 Storm Trident 拓扑并持久化元组,但 hbase 中没有数据

我构建了一个风暴三叉戟拓扑,如下所示:

util execute each(new Fields("rowKey", "column", "count"), new SaveFunction(), new Fields(columns),我在SaveFunction中得到了打印日志,但是partitionPersist没有将日期写入hbase,并且所有拓扑都没有错误;

谁能给点建议?

0 投票
2 回答
6431 浏览

java - NoClassDefFoundError: kafka/api/OffsetRequest

I am trying to write application for real time processing with apache storm , kafka and trident but in initialization of TridentKafkaConfig i see this error

my spout class is

main class:

and my pom.xml:

http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0

I am trying different version of storm-kafka (0.9.3 and 0.9.4 and 0.9.5 and 0.9.6 and 0.10.0) and storm-core (9.3 and 9.4 and 9.6)

But I still see my previous error

by googling i found this link but ...

ClassNotFoundException: kafka.api.OffsetRequest