问题标签 [apache-storm-topology]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-storm - 使用 Trident 拓扑查找具有最大计数的单词
如何使用 Trident 拓扑找到字数拓扑中计数最多的单词?这是 Trident 字数统计拓扑的链接。 https://github.com/nathanmarz/storm-starter/blob/master/src/jvm/storm/starter/trident/TridentWordCount.java
java - ClassNotFoundException: clojure.lang.PersistentList, compile:(clojure/core.clj:20:8) 同时将拓扑提交到 Storm 中的本地集群
我是 Apache Storm 的新手,我正在尝试在 Storm 中创建可用于实时处理流的拓扑。但是当我尝试在本地集群上提交我的拓扑时,我遇到了异常
一些背景:
我正在尝试开发 Kafka-CEP 拓扑,其中我使用 Kafka 作为喷口,我的 CEP 实例作为风暴拓扑中的螺栓。下面是我的主要 KafkaCEPTopology 代码和 pom.xml 供参考。我正在使用 IntelliJ 来运行我的拓扑。
Pom.xml
KafkaCEPTopology.java:
但我不明白为什么我会收到与 Clojure 相关的此类异常。我错过了一些依赖项吗?请帮忙
apache-storm - 杀死风暴拓扑后的资源清理
我们有一个与 MariaDB 数据库交互的风暴拓扑。我们的 Bolt 实现了IRichBolt
接口并覆盖了生命周期方法。我们在我们的方法中打开一个数据库连接并在prepare
方法中关闭它cleanup
。cleanup
方法文档说:
当 IBolt 将要关闭时调用。没有保证会调用清理,因为主管杀死了集群上 -9 的工作进程。保证调用清理的一种情况是在本地模式下运行 Storm 时终止拓扑
该kill -9
命令会在不清理任何资源的情况下终止该进程。所以我们得出这样的结论,即在终止拓扑时,没有必要cleanup
调用该方法并关闭数据库连接。
所以继续我的问题,我们有一个用于拓扑部署的 shell 脚本,它在执行时会以 0 的超时时间终止当前拓扑并部署一个新的拓扑。我们在数据库级别面临一个问题,即打开了许多连接,这提示我们以前的连接没有关闭。(在上一个拓扑中打开的那个)。
我们的假设正确吗?增加超时会清理所有资源吗?
apache-storm - Spout 完成后终止 Storm 拓扑
我创建了一个带有 Spout 的 Storm 拓扑,它发出许多用于基准测试的元组。一旦从 spout 发出所有元组或拓扑中不再有任何元组流动,我想停止/终止我的拓扑。
这是我的拓扑结构。
问题是我在这个范围内引用的 loadGenerator 实例与在 spout 线程中运行的实例不同。因此,isRuning() 总是返回 true,即使在 spout 线程中,当没有更多的元组要发出时,它的值为 false。
这是 LoadGeneratorSource 类的一部分。
一旦不再有从 spout 发出或在拓扑中流动的元组,有人能告诉我一种停止我的拓扑的方法吗?提前感谢您的帮助。
java - Storm UI 中未创建 Storm 拓扑
当 Storm 作业提交到 Hadoop 集群以使用 hdfsbolt 写入 hdfs 时,Storm UI 中不会创建任何拓扑。由于代码中使用了某些包(org.apache.storm.hdfs.bolt.AbstractHdfsBolt.cleanup(AbstractHdfsBolt.java:261) ~[f083f1dc515311e9868bcf07babd3298.jar:?]),显示错误。
错误:
以下是使用的 Java 代码。这是主要的拓扑文件。数据从 Kafka 收集并通过 hdfsbolt 发送到 hdfs。部分数据存储在 hdfs 中,但所有工作节点均未工作,并且 Storm UI 中未创建拓扑。
爪哇:
预期的结果是在 Storm UI 中创建拓扑,并在运行 Storm jar 时确保所有工作节点的参与。
apache - 如何从Combiner/Reducer/Aggregator 函数返回具有多个字段的元组?
此处Storm 文档指出: CombinerAggregator 返回单个元组,其中包含单个字段作为输出。
我应该怎么做才能从Combiner函数返回一个包含多个字段的元组?
我正在创建一个聚合函数,并希望从输入元组聚合两个或多个值并将这两个或多个字段作为输出发送。
我还想在输出中有一些输入元组的字段。如何使用组合器功能来获得所需的输出?
输入元组到组合器聚合器函数:
("a", "b", "c" , "d")
所需的输出元组:
("a", "b", "newValue1", "newValue2", "newValue3")
过去,我尝试在init()
CombinerAggregator 方法中从元组的字段中创建一个模型,并将其从CombinerAggregator 作为输出返回。但我觉得这不是正确的解决方案。功能是否chainedAgg()
适用于这种情况?
任何帮助将不胜感激。
apache-zookeeper - 在storm.yaml中更改storm zookeeper服务器
我想将我的风暴集群指向 3 个 Zookeeper 节点。据我了解,我应该只更改 nimbus 和主管节点中的storm.yaml 配置而不停止任何节点。这是对的吗?
我应该如何验证我的更改?
java - Apache Storm 1.1.0 未运行并给出无法从客户端 sessionid 读取附加数据
我在 Apache Storm 1.1.0 中运行一个简单的 Hello World 类型的应用程序。应用程序有一个随机整数 spout 和一个打印元组输出的 bolt。但不知何故,我无法让它在我的 Windows 系统上运行。
我是 Apache Storm 的新手,并且正在学习教程。我已经在堆栈溢出中寻找答案,但我找不到任何已解决的问题。
以下是我运行的拓扑代码:
螺栓代码
喷口代码
我也可以提供其余代码,但我认为这不是必需的。如果需要,请在评论中提及,我也会提供。
每当我尝试运行应用程序时,都会出现以下错误。
10620 [main] INFO oassoazZooKeeper - 启动客户端连接,connectString=localhost:2000/storm sessionTimeout=20000 > watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@31b0f02 10625 [main-SendThread(0:0 :0:0:0:0:0:1:2000)] 信息 oassoazClientCnxn - 打开与服务器的套接字连接 > 0:0:0:0:0:0:0:1/0:0:0:0:0 :0:0:1:2000。不会尝试使用 SASL 进行身份验证(未知错误)10627 [main-SendThread(0:0:0:0:0:0:0:1:2000)] 信息 oassoazClientCnxn - 套接字连接建立到 > 0:0:0: 0:0:0:0:1/0:0:0:0:0:0:0:1:2000,启动会话 10627 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] 信息 oassoazsNIOServerCnxnFactory - 接受来自 /> 0:0:0:0:0:0:0:1:56905 10628 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] 信息 oassoazs 的套接字连接
oazsNIOServerCnxn - 客户端的关闭套接字连接 /> 0:0:0:0:0:0:0:1:56905 其 sessionid 为 0x16a8e5abd97000d 10746 [main] INFO oassoacfiCuratorFrameworkImpl - 开始 10747 [main] INFO oassoazZooKeeper - 启动客户端连接,connectString =localhost:2000/storm sessionTimeout=20000 > watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@73893ec1 10755 [main-SendThread(127.0.0.1:2000)] INFO oassoazClientCnxn - 打开到服务器的套接字连接127.0.0.1/127.0.0.1:2000。不会 > 尝试使用 SASL 进行身份验证(未知错误)10756 [main-SendThread(127.0.0.1:2000)] 信息 oassoazClientCnxn - 与 127.0.0.1/127.0.0.1:2000 建立的套接字连接,正在启动 > 会话 10758 [NIOServerCxn.Factory :0.0.0.0/0.0.0.0:2000] 信息 oassoazs 0/0.0.0.0:2000] INFO oassoazsNIOServerCnxn - 客户端的关闭套接字连接 /> 0:0:0:0:0:0:0:1:56878 其 sessionid 为 0x16a8e5abd970004 11074 [main] INFO oassoazZooKeeper - 会话:0x16a8e5abd970004 已关闭11077 [Curator-Framework-0] 信息 oassoacfiCuratorFrameworkImpl - backgroundOperationsLoop 退出 11079 [ProcessThread(sid:0 cport:-1):] 信息 oassoazsPrepRequestProcessor - sessionid 的已处理会话终止:> 0x16a8e5abd970000 11081 [main] 信息 oassoaza6 1108d1 会话 97001:0x [main] INFO oaszookeeper - 关闭领导选举人的 zookeeper 连接。11082 [main-EventThread] INFO oassoazClientCnxn - EventThread 关闭 11082 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] 信息 oassoazs SessionTrackerImpl - 关闭 11116 [main] 信息 oassoazsPrepRequestProcessor - 关闭 11117 [main] 信息 oassoazsSyncRequestProcessor - 关闭 11117 [SyncThread:0] 信息 oassoazsSyncRequestProcessor - SyncRequestProcessor 已退出!11117 [ProcessThread(sid:0 cport:-1):] 信息 oassoazsPrepRequestProcessor - PrepRequestProcessor 退出循环!11117 [main] INFO oassoazsFinalRequestProcessor - 请求处理器的关闭完成 11118 [main] INFO oastesting - 在进程 Zookeeper 中完成关闭 11118 [main] INFO oastesting - 删除临时路径 C:\Users\AKHAND~1\AppData\Local\Temp\ ae4119b4-70b3-4d04-9aee-5bfae4c4775b 11203 [main] INFO oastesting - 删除临时路径 C:
我无法理解为什么关闭客户端套接字以及为什么关闭会话?我无法让它工作。请帮忙。
java - Storm 中使用 2 个 RabbitMQ 队列的经典字数统计拓扑
我必须用 Java 和 Storm 编写一个简单的“字数统计”拓扑。特别是,我有一个外部数据源生成 CSV(逗号分隔)字符串,例如
丹尼尔, 0.5654, 144543, 用户, 899898, 评论,,,
这些字符串被插入到名为“input”的 RabbitMQ 队列中。这个数据源运行良好,我可以看到队列中的字符串。
现在,我修改了经典拓扑,添加了 RabbitMQSpout。目标是对每个 CSV 行的第一个字段进行字数统计,并将结果发布到一个名为“输出”的新队列中。问题是我在新队列中看不到任何元组,但拓扑已提交并RUNNING。
所以,总结一下:
- 外部数据源将项目放入输入队列
- RabbitMQSpout从输入队列中获取项目并将它们插入到拓扑中
- 执行经典的字数拓扑
- 最后一个螺栓将结果放入输出队列
问题:我可以看到输入队列中的项目,但没有任何内容进入输出,即使我使用相同的方法将项目发送到外部数据源中的队列(并且它有效)和RabbitMQExporter(不起作用......)
下面的一些代码
RabbitMQSpout
分裂螺栓
字数螺栓
RabbitMQExporterBolt
拓扑
RabbitMQManager
apache-storm - 如何通过以编程方式插入消息来端到端测试 Storm Topology 的功能
我们的 Apache Storm 拓扑使用 KafkaSpout 监听来自 Kafka 的消息,并在进行大量映射/减少/丰富/聚合等操作后,最终将数据插入 Cassandra。还有另一个 kafka 输入,如果拓扑找到响应,我们会在其中接收用户对数据的查询,然后将其发送到第三个 kafka 主题。现在我们想使用 Junit 编写 E2E 测试,其中我们可以直接以编程方式将数据插入拓扑,然后通过插入用户查询消息,我们可以在第三点断言我们的查询收到的响应是正确的。
为了实现这一点,我们考虑启动 EmbeddedKafka 和 CassandraUnit,然后用它们替换实际的 Kafka 和 Cassandra,然后我们可以在这个单一的 Junit 测试的上下文中启动拓扑。
但是我们的方法不太适合 JUnit,因为它使这些测试变得过于庞大。启动 kafka、cassandra 和拓扑都需要时间并消耗大量资源。Apache Storm 中有什么东西可以支持我们计划编写的测试吗?