问题标签 [trident]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
639 浏览

apache-storm - 保留时间顺序的风暴三叉戟“合并”功能

假设我有两个流:

常规合并会产生 Stream 3,如下所示:

我想合并流,同时保留发出元组的顺序,因此如果[2,5]在时间 1[1,3]发出,在时间 2、[3,2]在时间 3 和[2,4]在时间 4 发出,则生成的流将是:

有没有办法做到这一点,如果有,怎么做?一些示例代码将不胜感激,因为我是一个完整的 Trident 新手,最近被推入一个基于 Trident 的项目中。

在此先感谢您的帮助,

伊莱

0 投票
1 回答
2606 浏览

stream - 如何在 Trident 中映射具有持久状态的元组?

我正在学习Trident框架。Trident 上有几种方法可以Stream批次中聚合元组,包括这个允许使用Aggregator接口执行元组的有状态映射的方法。但不幸的是,不存在用于额外持久化地图状态的内置对应项,就像其他 9 个重载一样persistentAggregate(),仅使用Aggregator作为参数。

因此,我如何通过结合较低级别的 Trident 和 Storm 抽象和工具来实现所需的功能?探索 API 非常困难,因为几乎没有 Javadoc 文档。

换句话说,persistentAggregate()方法允许通过更新一些持久状态来结束流处理:

我想更新持久状态并顺便发出不同的元组:

Stream.aggregate(Fields, Aggregator, Fields)不提供容错:

0 投票
2 回答
168 浏览

java - 物质外观小程序问题

每当我将鼠标悬停在菜单项上时,我都会在图像中看到一个奇怪的黑色空间,我不知道为什么。当我使用系统的外观和感觉时,我没有这个问题,所以我认为这是因为物质的外观和感觉。

0 投票
2 回答
658 浏览

apache-storm - 如何获取原始字段的完整列表以及已在 trident 中修改的新字段?

假设我有字段列表,即 {field1,field2,field3,field4} 我对 field2 执行了一些操作,说我想将每个元组值增加某个值,比如 5,

0 投票
1 回答
67 浏览

apache-storm - 包 MemoryMapState 不存在

使用 maven 编译 Storm Trident 拓扑时出现错误

包 MemoryMapState 不存在

0 投票
1 回答
247 浏览

apache-storm - 如何在计数器更新之前获取先前的状态

例如,我有这些批量大小为 5 的元组,其中包含来自用户的印象:

这是我保存计数状态的示例:

我有清晰的数据库并运行我的拓扑。按 clientId 对流进行分组后,我使用 persistentAggregate 函数和 Count 聚合器保存状态。第一批是 newValuesStream 方法后的结果:[clientId1, 4], [clientId2, 1]。对于第二批:[clientId1, 7][clientId2, 3]正如预期的那样。

ClientStream 在几个分支中使用,在其中一个分支中,我需要处理元组以便拥有大小为 1 的批次,因为我需要有关每个元组的计数的信息。大小为 1 的批次显然是废话,所以我必须在更新计数器之前以某种方式找出计数器的先前状态,并使用元组发出此信息,那里已经更新了计数器,例如第二批[clientId1, 7, 4]

有人知道怎么做吗?

0 投票
1 回答
491 浏览

java - 关于在 trident 中实现事务拓扑的问题

我的用例是调用查询以从具有不同输入参数的 db 中获取记录。获取记录后,进行一些处理,最后写入文件。我的输入参数值取决于前面查询的完整处理。我的问题是,我如何在 spout 中知道先前查询的处理已完成,即记录已成功写入文件。

我尝试实施ITridentSpout但仍然没有得到任何解决方案。以下是我的代码ITridentSpout

TridentCoordinator.java

TridentEmitterImpl.java

TridentSpoutImpl.java

也无法理解和会initializeTransaction出现什么值。请提供一些解决方案prevMetaDatacurMetada

0 投票
1 回答
248 浏览

java - 使用 Storm 的动态枢轴

我在 BigData DB(在我的情况下为 Cassandra)中有列名称为 col1、col2、col3、val1、val2 的行

在 SQL 方法中,我可以按 col1、col2 或 col2、col1 或任何其他可能的方式进行分组。这样我可以很容易地形成树层次结构。

但是现在我们使用 Cassandra 来存储不支持 group by 的数据。所以我们想使用 Storm 进行分组和聚合。我们编写了一些示例代码进行聚合和分组,但我们无法形成是否可以实现的意见。

数据看起来像这样

就像在 excel pivot 中一样,我想构建层次结构 root->child1->child2->child3-val1,val2 如果我的层次结构是 col1->col2->col3,它可能看起来像这样

我想为用户提供重新排列层次结构元素的功能,例如 col3->col1->col2 (或其他东西,它是动态的)在这种情况下,数据将如下所示

我的三叉戟代码的几行看起来像这样,它没有按预期工作。

为了进行上述转换,我想在有或没有 Trident API 支持的情况下使用 Storm。谁能指导我如何实现它?非常感谢任何程序创意。

0 投票
1 回答
866 浏览

java - Trident Storm-Cassandra,写入具有多个主键的表

我正在学习如何将 Storm 的 Trident 与 Cassandra 2.0.5、Storm 版本 0.9.0.1 一起使用。我也在使用 com.hmsonlinestorm-cassandra 0.4.0-rc4 contrib。

我的目标只是将一些文本行插入到具有 id (int)、name (text) 和 sentence (text) 列的表中。id 和 name 是主键。

partitionPersist需要一个,StateUpdater为此我正在使用com.hmsonline.storm.cassandra.trident.CassandraUpdater<K, C, V>. 但从看起来它只有一个键作为输入而不是两个(我需要 id 和 name)。元组映射器 ( TridentTupleMapper) 也使用一个键:

也许我遗漏了一些东西,但是如何将多个列定义为键?

0 投票
1 回答
652 浏览

java - 使用 Storm Trident 向 cassandra 插入行

我试图在 Cassandra 2.0.5,Storm 版本 0.9.0.1 中的表中插入一个简单的行。

我的测试如下:

我有一个由 id (int) 和 sentence (text) 列组成的表。id 是主键。

我的 spout 生成句子,我添加了一个 ID(代码中的静态增量)。

这是我的拓扑:

MyTridentTupleMapper 的代码:

https://github.com/guywald/trident-cassandra-read-write-examples/blob/master/src/test/java/com/guywald/storm/trident/cassandra/MyTridentTupleMapper.java

我得到以下异常:

我不确定它为什么会返回这个并希望得到帮助。