问题标签 [trident]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-storm - Storm Trident - 即使 kafka 中没有数据,也会从聚合器中连续发出
我有一个从 kafka 获取元组的拓扑。
拓扑大致是这样的。
从文档中我了解到,在聚合器中,为每个元组调用聚合方法,在处理所有元组时调用完整方法。
我已将调试系统输出放入 init 和聚合方法中。当我启动拓扑时,我可以看到来自 init 和聚合方法的连续系统输出。这种行为是预期的吗?(注意Kafka中没有数据)
java - Storm-HBase Trident - 同时查询多个列
我正在构建一个查询 HBaseState 的 Trident 拓扑。我正在使用 org.apache.storm.hbase 包。
我的理解(如果我错了,请纠正我)是 HBaseQuery 读取给定 rowKey 的所有列值(或在 projectionCriteria 中指定的值),并使用 Fields("columnName","columnValue") 单独输出每一列。
例如,如果我有一张宠物表,其中 rowKey 是宠物名称,一列是“type”,一列是“age”,stateQuery 将接收带有 Values("Fido") 的输入元组,并输出两个单独的元组:
值(“Fido”,“类型”,“狗”)
价值观(“Fido”,“年龄”,11)
一些问题:
有没有办法在一个查询中从多个列中获取值?意思是,我可以使用 Fields("Name","column1Value","column2Value")获得单个输出吗?
如果有一种方法可以将多个列中的值获取到一个元组中,如果它们是不同类型的(例如,一个是字符串,一个是整数),是否仍然可以这样做?
最终,我的目标是能够使用 Fields("Name") 获取输入元组,并使用 Fields("Name","Type","Age") 获取单个输出元组,例如 Values("Fido","Dog ",11) 和价值观("Mr. Kibbles","Cat",4)。如果使用上述方法是不可能的,那怎么可能?
TIA 寻求帮助!
apache-storm - 在storm中,我可以指定一个bolt将运行的工人数量吗?
并且在使用 Trident 时,如果要在一个螺栓上处理一个批次,该批次可以在不同的工人上运行吗?
apache-storm - 关于 Apache Storm,Trident 拓扑中的一批可以在多个 worker 上并行处理吗?
我希望一批可以在不同的服务器上处理,我该怎么做?
apache-storm - 当我使用风暴三叉戟时,我怎样才能手动使螺栓失效并使喷口重新发出元组?
我尝试了很多方法,但我发现有些例外不能使喷口重新发出。但是在我的工作中,我非常需要它,我该怎么做?我试图抛出 FailedException,但它似乎没用。这是我的代码
apache-storm - 当我使用storm scheduler时,一个主机可以调度两次吗?
例如
如上面的代码,一台主机可以同时运行 StereoInit 和 Rendering 吗?
apache-storm - 当我使用storm trident时,如果我将并行度设置为不小于2,我怎样才能让所有的执行器运行在不同的服务器上,而不仅仅是一台服务器上?
即,如果并行度为2,则bolt 在2 个不同的服务器上运行,如果并行度为3,则bolt 在3 个不同的服务器上运行。这对我来说很重要,因为我不希望所有任务都只在一台服务器上运行,那太慢了。
java - Apache Trident 运算符并不总是被执行
首先,我对 Storm/Trident 有点陌生,而且我已经在一个问题上苦苦挣扎了好几个小时。
我所拥有的是一个带有一个分区的 Kafka 主题。生产者每 x 毫秒向该主题发送元组。TransactionalTridentKafkaSpout 从这个主题中读取,一些 Trident 操作员处理它们。整个拓扑在本地模式下运行(远程模式尚未测试)。
拓扑的主要代码是:
现在我遇到的问题是生产者的消息间隔越低,执行的一些操作符就越少。
例如,如果生产者以 100 毫秒的间隔发送 200 个元组,每个运算符正确处理所有 200 个元组,但如果间隔设置为 20 毫秒,则运算符处理 / 仅针对以下数量的元组执行:
CustomCombinerAgg1:200
CustomCombinerAgg2:50
CustomBaseFilter1:50
CustomCombinerAgg3:150
CustomCombinerAgg4:180
CustomBaseFilter2:60
据我了解(事务性)Trident 保证只处理一次,并且只有在前一个元组被完全处理后,才应该从 spout 中获取一批新的元组。这似乎不是这里的情况,而是第一个运算符 CustomCombinerAgg1 决定了速度,然后后面的运算符不能在给定时间内处理所有元组?
我期望的是,每个元组都正确执行每个运算符,并且一旦所有运算符都处理了元组/批处理,就会获取下一个运算符。使用 Trident 不应该是这种情况吗?难道我做错了什么?我怎样才能实现这种行为?
Trident 甚至如何知道元组何时已被完全处理?据我所知,您必须 ack() Storm 中的元组,但 Trident 运算符没有 OutputCollector,因此无法调用 ack()?我的问题是否与此有关?
谢谢。
redis - Storm + Redis 或 Storm Trident 或 Spark Streaming
我将构建流处理系统。使用 Kafka 进行消息传输。流处理可以通过风暴三叉戟、风暴或火花流来完成。但找不到最佳答案。
如果一开始的条件,流处理将很简单。字段可以在不同的元组内,这就是我需要存储前一个字段的原因。现在我通过使用storm + redis(不是三叉戟)实现了这个场景。在 bolt 内部,从 redis 中获取所有字段,然后通过 if 条件进行测量。如果字段不为空,则使用,如果为空,则继续工作。如果这是正确的,我对建筑有不好的感觉。
我应该使用 trident api 还是 spark 流式传输?
我不需要“恰好一次”的交货保证。只需要每个数据源的状态。此状态将按字段计算。
谢谢您的回复。
apache-storm - BaseStatefulBolt(风暴核心)与 StateFactory(风暴三叉戟)
我对使用风暴感到困惑。我将使用其流数据来测量数据源的状态。状态将通过组合一些字段来计算,这些字段可以实现不同的时间间隔。这就是为什么我需要保存字段来衡量数据源的状态。
我可以使用 BaseStatefulBolt 吗?或者这个场景的唯一解决方案是三叉戟?
他们有什么区别。因为三叉戟内部也有一个statefactory。
谢谢你。