问题标签 [apache-storm]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
2340 浏览

hadoop - 如何在 java 中的 Storm Bolt 中使用 Hadoop FS API

我想将数据存储在 Storm Spout 发出的 hdfs 中。我在 Bolt 类中添加了 hadoop FS API 代码,但它会引发编译错误。

以下是Storm 螺栓类:

}

我还在 CLASSPATH 中添加了 hadoop jars。以下是 classpath 的值:

还复制了 hadoop 库:在Storm/lib目录中的 hadoop-cor-1.0.4.jar、commons-collection-3.2.1.jarcommons-cli-1.2.jar 。

当我构建这个项目时,它会抛出以下错误:

0 投票
1 回答
2469 浏览

java - 为测试目的创建一个 backtype.storm.tuple.Tuple?

我是Storm的新手,正在尝试研究如何编写一个螺栓测试来测试execute(Tuple tuple)子类中的方法BaseRichBolt

问题是它似乎Tuple是不可变的,我没有看到任何方法或构建器来创建新的元组。如何创建自己的Tuple,或者如何使用测试输入测试螺栓?

我实际上使用的是 Scala,而不是 Java,但答案应该很容易翻译。

0 投票
1 回答
1073 浏览

java - 在风暴拓扑中创建新连接时 Rabbitmq 阻塞

我是新来的风暴我在我的 spout 中使用 rabbitmq 从某个队列接收元组,并且有一个客户端正在运行另一台将元组插入该队列的机器我运行了一个简单的 rabbitmq 示例程序,它工作正常但是当我使用它时在风暴喷口内,它被阻挡在

即使我的rabbitmq服务器也在运行并且在我运行示例代码时在同一台机器上运行成功。打印语句 打印到语句

下面是我完整的 spout 类。

}

0 投票
1 回答
1001 浏览

java - Cassandra 上的计数器查询期间的字符串验证错误

我的代码上有以下计数器查询:

在我的单元测试中,代码运行良好。但是当运行正确的应用程序(我为此使用storm)时,我收到以下错误:

我也在使用 Hector 访问 Cassandra。

0 投票
2 回答
1016 浏览

real-time - 几个 Storm 拓扑之间的通信

我正在尝试在生产中部署几个 Storm 拓扑。我检查了文档,但找不到任何关于拓扑是否可以通过本机方法进行通信的参考资料。有人对如何实施有任何建议吗?

简而言之,我很想看看是否可以跨拓扑发送元组。

谢谢你的帮助!

0 投票
1 回答
514 浏览

apache-storm - Storm中一批多少元组是合理的?

我是Storm的新手。当我使用教程示例尝试 trident 时,它们通常是一批中的极少量元组(通常不超过 10 个)。Trident 旨在提供高吞吐量,每秒数百万条消息。所以我想问一下,一批中有多少个元组在现实世界中是合理的?

0 投票
1 回答
1197 浏览

cluster-computing - Storm Trident 只能分批做 JOIN 吗?

我想实现一个 JOIN 语义,我尝试了 Trident 拓扑中的 join 方法。我发现加入是在批次之间进行的。如果两个流之间的连接有数百万个元组,它必须在一个批次内吗?

在genderSpout中,每个batch有3个tuples,所以ageSpout会发出2个batch,每个batch有5个tuples,所以Spout只会发出1个batch

我用 JoinType 做一个 LEFT OUTER JOIN

测试代码的输出是:

从输出中,我发现前四个结果连接了来自genderSpout 的第一批和来自ageSpout 的第一批。最后两个结果是来自 genderSpout 的第二批与来自 ageSpout 的空批之间的连接。所以结果对于 JOIN 语义是不正确的,因为我想要的 genderSpout LEFT JOIN ageSpout 的结果是:

所以我的问题是:如果JOIN的两边(Spout)有数百万个元组,我应该把它们放在一批中以获得正确的结果吗?

或者我走的路是错误的,你能告诉我我应该怎么做才能获得 OUTER JOIN 语义的正确结果?

测试代码如下:

0 投票
2 回答
1405 浏览

clojure - Storm > 如何将 Java 回调集成到 Spout 中

我正在尝试将 Storm(请参见此处)集成到我的项目中。我理解了拓扑、spout 和 bolts 的概念。但现在,我试图弄清楚一些事情的实际实现。

A)我有一个使用 Java 和 Clojure 的多语言环境。我的 Java 代码是一个回调类,其中包含触发流数据的方法。推送到这些方法的事件数据是我想用作 spout 的。

所以第一个问题是如何将进入这些方法的数据连接到一个 spout ?我正在尝试i)传递一个backtype.storm.topology.IRichSpout,然后ii)将一个backtype.storm.spout.SpoutOutputCollector参见此处)传递给该 spout 的打开函数(参见此处)。但我看不到实际传递任何类型的地图或列表的方法。

B)我项目的其余部分都是 Clojure。通过这些方法将有大量数据。每个事件的 ID 介于 1 和 100 之间。在 Clojure 中,我希望将来自 spout 的数据拆分到不同的执行线程中。我认为,这些将是螺栓。

如何设置 Clojure bolt 从 spout 获取事件数据,然后根据传入事件的 ID 中断线程?

提前感谢蒂姆

[编辑 1]

我实际上已经解决了这个问题。我最终1)实现了我自己的 IRichSpout。然后我2)将该 spout 的内部元组连接到我的 java 回调类中的传入流数据。我不确定这是否是惯用的。但它编译并运行没有错误。但是,3)我没有看到通过printstuff螺栓传入的流数据(肯定在那里)。

为了确保事件数据得到传播,在 spout 或 bolt 实现或拓扑定义中是否需要做一些特定的事情?谢谢。

[编辑 2]

根据 SO 成员Ankur的建议,我正在调整我的拓扑结构。创建 Java 回调后,我将它的元组传递给下面的 IBSpout,使用(.setTuple ibspout (.getTuple java-callback)). 我没有传递整个 Java 回调对象,因为我得到了 NotSerializable 错误。一切都编译并运行没有错误。但同样,我的printstuff螺栓没有数据。嗯。

0 投票
4 回答
15198 浏览

java - 如何使用 IDE 在 Storm 生产集群中提交拓扑

我在Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload使用 IDE 向生产集群提交拓扑时遇到了一个问题,而如果我在命令行中使用storm jar命令执行同样的事情,它的运行就像天堂一样。我从githublink看到了同样的例子。

对于提交拓扑,我正在使用这些行

如果这是正确的运行方法,请建议我?

0 投票
0 回答
293 浏览

scala - Scala 2.10 风暴项目 ClassNotFoundException

我怀疑这可能是某种版本控制问题,但仍然无法深究。我正在尝试使用基于https://github.com/velvia/ScalaStorm的 Scala DSL 在 Scala 2.10 中设置 play2.1/storm 项目。我将 DSL 文件复制并编辑到我的项目中。我的问题是在尝试从同一个项目中实现 WordCountTopology 示例时出现的。示例的直接复制/粘贴可以正常编译,并且 eclipse 不会抱怨任何事情,但是在运行 main 函数时,我得到了 classNotFoundExceptions

这些类被明确定义,并注释掉使用它们的行(builder.setSpout 和 builder.setBolt 行)并创建 ClassNotFound 类的新对象(RichStormSpout、WordCount、SplitSentence),尽管它没有做任何事情,编译并运行得很好。

任何帮助深表感谢

编辑:这是一个堆栈跟踪: