问题标签 [trident]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1107 浏览

hadoop - 风暴中的 DRPC 服务器错误

我正在尝试执行下面的代码并收到错误..不确定我是否在这里遗漏了一些东西..还有我在哪里可以看到输出?

错误

java.lang.RuntimeException:没有为 backtype.storm.drpc.DRPCSpout.open(DRPCSpout.java:79) atstorm.trident.spout.RichSpoutBatchTriggerer.open(RichSpoutBatchTriggerer.java:58) at backtype.storm 的拓扑配置 DRPC 服务器.daemon.executor$fn__5802$fn__5817.invoke(executor.clj:519) at backtype.storm.util$async_loop$fn__442.invoke(util.clj:434) at clojure.lang.AFn.run(AFn.java:24 ) 在 java.lang.Thread.run(Thread.java:744)

0 投票
1 回答
1081 浏览

java - Batch size in Storm Trident

I would like to know how to set BATCH SIZE (example I want a batch to have 10000 records) in TRIDENT. I have the below configuration in my code and this fetches approximately 250 records per batch from Kafka. Can I increase this to 10000*1024? or is there a way to set No of records per batch.

0 投票
1 回答
241 浏览

apache-storm - 从三叉戟风暴写入内存网格(Apache Ignite)

有人可以阐明如何将数据从三叉戟风暴写入/读取到内存网格(Apache ignite)。我用谷歌搜索并没有找到任何关于如何从三叉戟风暴连接到记忆网格的信息/文件。

0 投票
2 回答
495 浏览

apache-storm - Gridgain 异常 java.lang.IncompatibleClassChangeError:实现类

我正在使用 Trident Storm 并尝试在函数准备方法中启动网格实例。相同的配置文件适用于本地集群。当我在 remore 集群中提交它时,我得到了错误。

代码:

错误

0 投票
1 回答
515 浏览

database - 从 Apache Storm 中持久化数据,任何框架?

我们在项目中使用 Kafka-storm。在storm中,我们将使用多个螺栓进行转换。但在此之前,作为 POC 的一部分,我们希望将数据持久化到 DB 中。我们应该使用哪个框架?BigData场景可以使用哪些?三叉戟在这里适用吗?对于持久性,我正在寻找像 Hibernate/JPA 这样的东西。可以使用什么?如果可能,请为此提供示例代码。

0 投票
1 回答
489 浏览

apache-storm - 风暴三叉戟批次是否同时处理?

我想知道三叉戟批次是否并行执行,即多个批次可以一次运行?

除此之外,我有几个问题太小而无法单独发布。如果它们足够大,请随时发表评论以单独发布它们。

  1. 如果只处理批处理中的特定元组失败怎么办?

    那么batch会被replay,导致之前处理成功的tuples被重新处理?例如,字数统计,其中每个元组都包含一个单词,但只有几个元组被成功计数?例如,如果有三个单词叫man并且计数只显示 2 表示一个元组处理失败?

  2. 仅在本教程中,之前的txid被存储。之前的交易ID呢?

    例如,有 1、2、3、4 三个批次。现在,在第 1 批之后,第 2 批被执行,第 1 批被重播。那么 txid 将为 2,因为最近处理的批次是批次 #2,并且无法识别批次 #1 是否先前已处理。如果是这样,那么批次必须按顺序执行。这意味着在第 1 批成功完成之前,第 2 批无法执行。如果是这样,那么执行批处理的并行性在哪里?

  3. 如果拓扑中的批处理没有正确执行特定功能怎么办?

    例如,我有两个功能,一个是将消息持久化到数据库中,另一个是生成到 kafka 队列。在这里,在数据库中持久化是成功的,但是由于某些节点故障(例如),推送到 kafka 队列失败。然后,我只希望为该特定批次执行推送到 kafka 队列的函数。有没有办法在三叉戟中做?为此,我不仅需要存储 txid,还需要存储要为该 txid 处理的函数列表。怎么可能做到?

0 投票
1 回答
246 浏览

spark-streaming - Storm Trident 和 Spark Streaming:分布式批处理锁定

在大量阅读和构建 POC 之后,我们仍然不确定 Storm Trident 或 Spark Streaming 是否可以处理我们的用例:

  • 我们有数百万台设备(具有唯一标识符)的入站传感器数据流。
  • 我们需要在每个设备级别上执行此流的聚合。聚合将读取之前批次中已经处理(和持久化)的数据。
  • 关键点:当我们处理特定设备的数据时,我们需要确保没有其他进程正在处理该特定设备的数据。这是因为我们处理的结果会影响该设备的下游处理。实际上,我们需要一个分布式锁。
  • 此外,事件设备数据需要按照事件发生的顺序进行处理。

本质上,我们不能同时处理同一设备的两个批次。

trident/spark 流可以处理我们的用例吗?

任何建议表示赞赏。

0 投票
1 回答
114 浏览

apache-storm - 如何编写没有聚合的三叉戟拓扑?

我想批量处理我想使用 Trident API 的元组。但是,这里没有我批量执行的操作。每个元组都是单独处理的。我在这里需要的只是一次性语义,这样每个元组只处理一次,这是使用 Trident 的唯一原因。

我想存储处理哪个元组的信息,以便在重播批处理时,已经处理的元组不会被执行。

拓扑包含一个persistentAggregate()方法,但它需要一些聚合操作,但我没有对一组元组执行任何聚合操作,因为每个元组都是单独处理的。

在这里,元组所经历的功能太小而无法执行。因此,我希望批量处理它们以节省计算资源和时间。


现在,如何编写一个将元组作为批处理消耗但仍然不执行任何批处理操作(如字数统计)的拓扑?

0 投票
1 回答
379 浏览

transactions - 如果 kafka spout 被风暴三叉戟重新启动会怎样?

假设 kafka spout 获取了一些消息,然后 spout 任务重新启动。获取的消息会丢失吗?我是 trident 的初学者,我的问题是 trident kafka 事务/不透明喷口。

提前致谢!

0 投票
2 回答
741 浏览

xml - 三叉戟拓扑抛出内存不足异常

我正在从 Storm 的传统拓扑转移到 Trident 拓扑,它在将元组推送到数据库之前维护成批的元组。我们将 XML 处理为单个元组。在一次处理一个 xml 的传统拓扑中,这很好用。但是在 Trident 拓扑中,在提交到数据库之前,它会在内存中保留大量元组,这会导致内存不足异常。也不清楚风暴如何决定批量大小以及它在每次迭代中的变化。以下是我们收到的错误:

java.lang.OutOfMemoryError: GC 开销限制在 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) 在 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder) 的 java.util.Arrays.copyOf(Arrays.java:2367) .java:114) 在 java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415) 在 java.lang.StringBuilder.append(StringBuilder.java:132) 在 clojure.core$str$fn__3896.invoke(core.clj: 517) 在 clojure.core$str.doInvoke(core.clj:519) 在 clojure.lang.RestFn.invoke(RestFn.java:423) 在 backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.clj :397) 在 backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58) 在 backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) 在 backtype.storm.utils.DisruptorQueue。consumeBatchWhenAvailable(DisruptorQueue.java:99) at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) at backtype.storm.daemon.executor$fn__5641$fn__5653$fn__5700.invoke(executor.clj:746) at backtype .storm.util$async_loop$fn__457.invoke(util.clj:431) 在 clojure.lang.AFn.run(AFn.java:24) 在 java.lang.Thread.run(Thread.java:745)

更多信息:

在处理螺栓时,我们使用 DOM 解析器来解析 XML。我们试图通过将 XML 的单个元素作为一个元组来减小单个元组的大小,但这也无济于事。

可能的解决方案可能包括限制存储在内存中的批次大小或使用快速垃圾收集。