问题标签 [trident]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-storm - 保留时间顺序的风暴三叉戟“合并”功能
假设我有两个流:
常规合并会产生 Stream 3,如下所示:
我想合并流,同时保留发出元组的顺序,因此如果[2,5]
在时间 1[1,3]
发出,在时间 2、[3,2]
在时间 3 和[2,4]
在时间 4 发出,则生成的流将是:
有没有办法做到这一点,如果有,怎么做?一些示例代码将不胜感激,因为我是一个完整的 Trident 新手,最近被推入一个基于 Trident 的项目中。
在此先感谢您的帮助,
伊莱
stream - 如何在 Trident 中映射具有持久状态的元组?
我正在学习Trident框架。Trident 上有几种方法可以Stream
在批次中聚合元组,包括这个允许使用Aggregator
接口执行元组的有状态映射的方法。但不幸的是,不存在用于额外持久化地图状态的内置对应项,就像其他 9 个重载一样persistentAggregate()
,仅使用Aggregator
作为参数。
因此,我如何通过结合较低级别的 Trident 和 Storm 抽象和工具来实现所需的功能?探索 API 非常困难,因为几乎没有 Javadoc 文档。
换句话说,persistentAggregate()
方法允许通过更新一些持久状态来结束流处理:
我想更新持久状态并顺便发出不同的元组:
Stream.aggregate(Fields, Aggregator, Fields)
不提供容错:
java - 物质外观小程序问题
每当我将鼠标悬停在菜单项上时,我都会在图像中看到一个奇怪的黑色空间,我不知道为什么。当我使用系统的外观和感觉时,我没有这个问题,所以我认为这是因为物质的外观和感觉。
apache-storm - 如何获取原始字段的完整列表以及已在 trident 中修改的新字段?
假设我有字段列表,即 {field1,field2,field3,field4} 我对 field2 执行了一些操作,说我想将每个元组值增加某个值,比如 5,
apache-storm - 包 MemoryMapState 不存在
使用 maven 编译 Storm Trident 拓扑时出现错误
包 MemoryMapState 不存在
apache-storm - 如何在计数器更新之前获取先前的状态
例如,我有这些批量大小为 5 的元组,其中包含来自用户的印象:
这是我保存计数状态的示例:
我有清晰的数据库并运行我的拓扑。按 clientId 对流进行分组后,我使用 persistentAggregate 函数和 Count 聚合器保存状态。第一批是 newValuesStream 方法后的结果:[clientId1, 4]
, [clientId2, 1]
。对于第二批:[clientId1, 7]
,[clientId2, 3]
正如预期的那样。
ClientStream 在几个分支中使用,在其中一个分支中,我需要处理元组以便拥有大小为 1 的批次,因为我需要有关每个元组的计数的信息。大小为 1 的批次显然是废话,所以我必须在更新计数器之前以某种方式找出计数器的先前状态,并使用元组发出此信息,那里已经更新了计数器,例如第二批[clientId1, 7, 4]
。
有人知道怎么做吗?
java - 关于在 trident 中实现事务拓扑的问题
我的用例是调用查询以从具有不同输入参数的 db 中获取记录。获取记录后,进行一些处理,最后写入文件。我的输入参数值取决于前面查询的完整处理。我的问题是,我如何在 spout 中知道先前查询的处理已完成,即记录已成功写入文件。
我尝试实施ITridentSpout
但仍然没有得到任何解决方案。以下是我的代码ITridentSpout
:
TridentCoordinator.java
TridentEmitterImpl.java
TridentSpoutImpl.java
也无法理解和会initializeTransaction
出现什么值。请提供一些解决方案prevMetaData
curMetada
java - 使用 Storm 的动态枢轴
我在 BigData DB(在我的情况下为 Cassandra)中有列名称为 col1、col2、col3、val1、val2 的行
在 SQL 方法中,我可以按 col1、col2 或 col2、col1 或任何其他可能的方式进行分组。这样我可以很容易地形成树层次结构。
但是现在我们使用 Cassandra 来存储不支持 group by 的数据。所以我们想使用 Storm 进行分组和聚合。我们编写了一些示例代码进行聚合和分组,但我们无法形成是否可以实现的意见。
数据看起来像这样
就像在 excel pivot 中一样,我想构建层次结构 root->child1->child2->child3-val1,val2 如果我的层次结构是 col1->col2->col3,它可能看起来像这样
我想为用户提供重新排列层次结构元素的功能,例如 col3->col1->col2 (或其他东西,它是动态的)在这种情况下,数据将如下所示
我的三叉戟代码的几行看起来像这样,它没有按预期工作。
为了进行上述转换,我想在有或没有 Trident API 支持的情况下使用 Storm。谁能指导我如何实现它?非常感谢任何程序创意。
java - Trident Storm-Cassandra,写入具有多个主键的表
我正在学习如何将 Storm 的 Trident 与 Cassandra 2.0.5、Storm 版本 0.9.0.1 一起使用。我也在使用 com.hmsonlinestorm-cassandra 0.4.0-rc4 contrib。
我的目标只是将一些文本行插入到具有 id (int)、name (text) 和 sentence (text) 列的表中。id 和 name 是主键。
partitionPersist
需要一个,StateUpdater
为此我正在使用com.hmsonline.storm.cassandra.trident.CassandraUpdater<K, C, V>
. 但从看起来它只有一个键作为输入而不是两个(我需要 id 和 name)。元组映射器 ( TridentTupleMapper
) 也使用一个键:
也许我遗漏了一些东西,但是如何将多个列定义为键?
java - 使用 Storm Trident 向 cassandra 插入行
我试图在 Cassandra 2.0.5,Storm 版本 0.9.0.1 中的表中插入一个简单的行。
我的测试如下:
我有一个由 id (int) 和 sentence (text) 列组成的表。id 是主键。
我的 spout 生成句子,我添加了一个 ID(代码中的静态增量)。
这是我的拓扑:
MyTridentTupleMapper 的代码:
我得到以下异常:
我不确定它为什么会返回这个并希望得到帮助。