问题标签 [apache-storm-topology]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
1946 浏览

netty - Apache Storm 的工作节点之间的 Netty 客户端连接错误

我正在运行 Apache Storm,在一个节点上设置了 Nimbus,在另外两个节点上设置了两个主管。在我启动拓扑(worker=2)后,它无法正常运行。在其中一个主管节点的工作日志中,我看到以下错误:

2018-07-04 17:36:02.650 o.a.s.m.n.Client client-boss-1 [ERROR] connection attempt 1 to Netty-Client-hostname/X.X.X.X:6700 failed: org.apache.storm.shade.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: hostname/X.X.X.X:6700

在另一个工作节点上,我看到以下错误: 2018-07-04 17:34:11.344 o.a.s.m.n.Client client-boss-1 [ERROR] connection attempt 3 to Netty-Client-hostname1/X.X.X.X:6700 failed: java.net.ConnectException: Connection refused: hostname1/X.X.X.X:6700

工作人员日志中没有其他错误。如果我用另一个工人(在同一个子网上)替换这些工人之一,拓扑运行完美。因此,问题似乎与两个工作节点之间的连接有关。但是,/etc/hosts 文件设置正确(与与其中一个正确工作的工作人员相同)并且两个工作人员可以相互访问(ping/ssh)。nimbus 和这些工人之间的连接很好(worker=1 的拓扑在这些工人中的每一个上都正确运行)。

我现在不确定这里可能是什么问题。任何帮助表示赞赏。

编辑:

在花了很多时间弄清楚这一点之后,我才知道在工作节点上不允许端口 6700 上的连接。我编辑了 iptables 以允许端口上的传入 tcp 连接。工作日志仍然显示一些网络连接错误,但现在,至少,拓扑运行良好。

sudo iptables -A INPUT -p tcp --dport 6700 -j ACCEPT

0 投票
1 回答
117 浏览

python - 如何在 Apache Storm Shell Bolt 中使用 Anaconda python

我正在尝试使用 shell bolt 使用 Python bolt 运行 Apache Storm 拓扑。在我的 Bolt 中,我使用的是安装在 /home/labuser/anaconda3/bin/python 中的 spacy 库。但是,当我运行拓扑风暴时显示错误消息模块未找到 spacy。调试后我发现storm正在使用位于 /usr/bin/python 的python 2.7。

我的问题是以下问题的扩展, 在风暴中,如何指定特定版本的 python

根据上述问题的答案,我尝试使用 super("home/labuser/anaconda3/bin/python", "splitsentence.py"); 创建 shell bolt 构造函数,但风暴继续在 /usr/bin/python 目录中获取 2.7。

我需要知道如何告诉 Storm 在 home/labuser/anaconda3/bin/python 目录中为我的 shell 螺栓使用 python3。

0 投票
1 回答
210 浏览

java - 在 StormCrawler 上获取拓扑以正确写入 warc 文件

在我的项目中,stormcrawler maven 原型似乎与 warc 模块不兼容。目前它只创建名称为“crawl-20180802121925-00000.warc.gz”的空 0 字节文件。我在这里错过了什么吗?

我尝试通过创建一个像这样的默认项目来启用warc写作:

然后像这样将依赖项添加到 pom.xml 中的 warc 模块

然后我将 WARCHdfsBolt 添加到 fetch 分组中,同时尝试写入本地文件系统目录。

无论我使用或不使用助焊剂在本地运行它,似乎都没有区别。您可以在这里查看演示存储库:https ://github.com/keyboardsamurai/storm-test-warc

0 投票
1 回答
266 浏览

maven - Storm 导致 Ignite log4j 上的依赖冲突

我尝试在 Storm 集群上运行 Storm 拓扑拓扑 jar 是由提供的。在拓扑螺栓内部,我想从 MyIgniteCache 模块读取数据,但出现以下错误。我认为依赖项(:/usr/hdp/2.6.0.3-8/storm/lib/log4j-slf4j-impl-2.8.jar:/usr/hdp/2.6.0.3-8/storm/lib/log4j-core- Storm-core 的 2.8.jar:log4j) 导致与 ignite-log4j 发生冲突。

ava.lang.IncompatibleClassChangeError: 在 java.lang.ClassLoader.defineClass1(Native Method) ~[?:1.8.0_112] 在 java.lang.ClassLoader.defineClass(ClassLoader.java:763) 实现类 ~[?:1.8.0_112 ] 在 java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) ~[?:1.8.0_112] 在 java.net.URLClassLoader.defineClass(URLClassLoader.java:467) ~[?:1.8.0_112] 在 java. net.URLClassLoader.access$100(URLClassLoader.java:73) ~[?:1.8.0_112] at java.net.URLClassLoader$1.run(URLClassLoader.java:368) ~[?:1.8.0_112] at java.net。 URLClassLoader$1.run(URLClassLoader.java:362) ~[?:1.8.0_112] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_112] at java.net.URLClassLoader.findClass(URLClassLoader. java:361) ~[?:1.8.0_112] 在 java.lang。ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_112] at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) ~[?:1.8.0_112] at java.lang.ClassLoader。 loadClass(ClassLoader.java:357) ~[?:1.8.0_112] at java.lang.Class.forName0(Native Method) ~[?:1.8.0_112] at java.lang.Class.forName(Class.java:264 ) ~[?:1.8.0_112] at org.apache.ignite.internal.util.IgniteUtils.addLog4jNoOpLogger(IgniteUtils.java:8366) ~[stormjar.jar:?] at org.apache.ignite.internal.IgnitionEx.start (IgnitionEx.java:930) ~[stormjar.jar:?] at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:854) ~[stormjar.jar:?] at org.apache.ignite.internal .IgnitionEx.start(IgnitionEx.java:724) ~[stormjar.jar:?] at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:693)~[stormjar.jar:?] at org.apache.ignite.Ignition.start(Ignition.java:352)~[stormjar.jar:?]

我的风暴拓扑模块的依赖树如下

这是我的配置缓存的依赖树

尽管存在这些依赖关系,但在 IDE 上运行拓扑和 myconfigcache 模块时,我收到了另一个严重警告

那么我该如何处理不包括 Storm log4j 依赖项的运行拓扑呢?

0 投票
1 回答
314 浏览

apache-storm - KafkaSpout 中的异常

我正在关注异常风暴拓扑。

POM 配置:

我正在使用已弃用的storm-kafka 库。如果这是上述异常的原因,那么让我知道如何使用storm-kafka-client 库创建kafka spout 并将自定义方案传递给它

谢谢。

0 投票
1 回答
194 浏览

java - Storm Supervisor 找不到 KafkaSpout 类

我正在使用 Apache Storm 1.1.2、Apache Kafka 0.10、Zookeeper 和 Docker Compose 编写一个 dockerized Java 9 Spring 应用程序。

我的拓扑完全在我的 dockerized 服务内的本地集群上运行,但现在我将它移动到生产集群,出现了一个问题。

我为 Storm 集群创建提交拓扑的服务似乎工作正常,并且代码在 PostConstruct 内部看起来大多像这样

我的 docker compose 文件看起来像这样。

所有的容器都起来了。在 UI 中,我可以看到在 post 构造中创建的拓扑,但没有正在处理 Kafka 消息,并且应该使用本地 Kafka 生产者来生成聚合的 Bolt 没有发布。

在主管容器中,/logs/worker-artifact/topology-id****/6700/worker.log我可以看到重复的两个异常。

第一个(我认为更重要)是ClassNotFoundException: org.apache.storm.kafka.spout.KafkaSpout

第二个例外是org.apache.storm.shade.org.jboss.netty.channel.ChannelException: Failed to bind to: 0.0.0.0/0.0.0.0:6700

更新

不幸的是,我不能发布我的整个 pom,但这是我的 Storm 依赖项

这是我的spring-boot-maven-plugin。我虽然添加配置以使复制到我的容器的 jar 不可执行,但可以解决问题。当我检查容器中的 jar 时,它看起来包含依赖项的行 jar 但也有大量乱码

这是我的大部分 dockerfile

0 投票
1 回答
194 浏览

java - Storm bolt 无法从 spout 反序列化对象

我正在使用 Storm 1.1.2、JDK 8(Storm 不喜欢 JDK 9 编译代码)、Kafka 0.11 和 Docker Compose 制作一个 Spring 应用程序。

这个想法是有一个容器化的服务,它可以接收 REST 调用来创建风暴拓扑,然后将它们提交到风暴集群。一切都在本地工作,但是将拓扑提交从提交到本地集群移动到StormSubmitter导致问题。我已经解决了大多数问题,但是有一个奇怪的序列化问题。

我有一个从 Kafka 成功读取的喷嘴。它读取 Protobuf 对象的字节数组,并使用自定义反序列化器从中创建消息。我从这个 spout 读取了两个不同的螺栓,一个打印传入消息(螺栓 A),一个根据字段过滤消息并将它们发送到另一个要聚合的螺栓(螺栓 B)。

我注意到两个螺栓之间的唯一区别是螺栓 B 有一个构造函数,而螺栓 A 没有。

由于某种原因,bolt A 可以毫无问题地从 spout 接收消息并打印它们,但是每次消息到达 bolt B 时,它都会引发异常com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): my.package.MyProtobufMessage。我看到您可以为类注册序列化程序,但是为什么当螺栓 B 不能时螺栓 A 能够处理消息?

也是单独的问题,但是当我添加第三个拓扑时,nimbus 不会为其分配主管。一个拓扑将包含 2 个作品和 9 个执行人员,第二个拓扑将包含 2 个工作人员和 6 个执行人员,然后我将添加第三个拓扑,该拓扑将显示在 UI 和 Nimbus 日志中,但不会显示在主管日志中。在 UI 中,第三个拓扑将有 0 个工作人员、执行人员和 0 个分配的内存

0 投票
1 回答
1305 浏览

java - Apache Storm Kafka Spout 延迟问题

我正在使用 Storm 1.1.2 和 Kafka 0.11 构建一个 Java Spring 应用程序,以在 Docker 容器中启动。

我的拓扑结构中的所有内容都按计划工作,但在 Kafka 的高负载下,Kafka 延迟会随着时间的推移越来越多。

我的 KafkaSpoutConfig:

那么我的拓扑如下

RouterBolt(扩展 BaseRichBolt)执行一个非常简单的 switch 语句,然后使用本地 KafkaProducer 对象将新消息发送到另一个主题。就像我说的,一切都可以编译并且拓扑按预期运行,但是在高负载(3000 条消息/秒)下,Kafka 延迟只是堆积起来,等同于拓扑的低吞吐量。

我试过禁用acking

但我想这不是一个问题。

我在 Storm UI 上看到 RouterBolt 在高负载下的执行延迟为 1.2 毫秒,处理延迟为 0.03 毫秒,这让我相信 Spout 是瓶颈。并行提示是 25,因为有 25 个分区“我的话题”。谢谢!

0 投票
1 回答
114 浏览

apache-zookeeper - 风暴大窗口大小导致执行程序被 Nimbus 杀死

我有一个 java spring 应用程序,它基于创建拓扑结构的 DTO 将拓扑提交给风暴(1.1.2)nimbus。

除了非常大的窗户外,这很好用。我正在用几个不同的滑动和翻滚窗口对其进行测试。除了每 15 分钟推进一次的 24 小时滑动窗口之外,没有人给我任何问题。该拓扑将从 Kafka 接收约 250 条消息/秒,并使用一个简单的时间戳提取器将它们窗口化,延迟为 3 秒(就像我正在测试的所有其他拓扑一样)。

我已经与工人和内存限额一起玩过,试图解决这个问题,但我的默认配置是 1 个工人,堆大小为 2048mb。我也尝试过减少影响最小的滞后。

我认为窗口大小可能变得太大并且工作人员内存不足,这会延迟心跳或 Zookeeper 连接签入,这反过来又会导致 Nimbus 杀死工作人员。

发生的情况是每隔一段时间(大约 11 个窗口前进),Nimbus 日志报告该拓扑的执行器“不活动”,并且该拓扑的工作日志显示KeeperException拓扑无法与 Zookeeper 通信或java.lang.ExceptionInInitializerError:null与巢PrivelegedActionException

当拓扑被分配一个新的工作人员时,我正在做的聚合丢失了。我认为这是因为窗口至少保存 250*60*15*11 (messagesPerSecond*secondsPerMinute*15mins*windowAdvancesBeforeCrash) 消息,每个消息大约 84 个字节。要完成整个窗口,它将最终成为 250*60*15*97 条消息(messagesPerSecond*secondsPerMinute*15mins*15minIncrementsIn24HoursPlusAnExpiredWindow)。如果我的数学是正确的,这大约是 1.8gbs,所以我觉得工人内存应该覆盖窗口,或者至少超过 11 个窗口推进值。

我可以稍微增加内存,但不会太多。我还可以减少内存/工作人员的数量并增加工作人员/拓扑的数量,但我想知道我是否缺少一些东西?我是否可以增加工作人员的心跳时间,以便执行者在被杀之前有更多时间签到,或者由于某种原因这会很糟糕?如果我更改了心跳 if 将在拓扑的配置图中。谢谢!

0 投票
1 回答
27 浏览

apache-storm - [Storm]如果其中一个失败,锚定到相同消息 id 的其他元组会发生什么?

如果锚定到消息 id 的元组之一失败,其他元组会完全处理还是被风暴停止?