问题标签 [apache-storm-topology]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
116 浏览

apache-storm - 是否有任何 Java API 可以知道拓扑何时准备好从 Spout 读取第一条消息

我们的 Apache Storm 拓扑使用 KafkaSpout 监听来自 Kafka 的消息,并在进行大量映射/减少/丰富/聚合等操作后,最终将数据插入 Cassandra。还有另一个 kafka 输入,如果拓扑找到响应,我们会在其中接收用户对数据的查询,然后将其发送到第三个 kafka 主题。现在我们想使用 Junit 编写 E2E 测试,其中我们可以直接以编程方式将数据插入拓扑,然后通过插入用户查询消息,我们可以在第三点断言我们的查询收到的响应是正确的。

为了实现这一点,我们考虑启动 EmbeddedKafka 和 CassandraUnit,然后用它们替换实际的 Kafka 和 Cassandra,然后我们可以在这个单一的 Junit 测试的上下文中启动拓扑。

在开始我们的实际测试之前,我们创建拓扑并将其提交到 LocalCluster。它在不同的线程上启动拓扑,然后从 Before 中出来并开始执行我们的测试。在那之前,拓扑还没有准备好,因为它需要一些时间来准备好处理。是否有任何 java API 可以告诉我们拓扑何时准备好进行处理(意味着准备好从 Spout 读取第一条消息)?

0 投票
1 回答
36 浏览

apache-storm - 什么可以用作 CassandraWriterBolt 的测试存根?

我从 Kafka 读取 json,FieldExtractionBolt 读取 json 将数据提取到元组值并将它们传递给 CassandraWriterBolt,CassandraWriterBolt 又在 Cassandra 中写入一条记录,将所有这些元组值写入单独的列。

Kafka 上的 JSON 消息 -

FieldExtractionBolt -

CassandraWriterBolt -

我尝试根据此处给出的答案编写测试 -如何通过以编程方式插入消息来 E2E 测试 Storm Topology 的功能

在我的项目中,我在 Spring 配置中定义了所有的螺栓、喷口和流。这使得编写/阅读我的拓扑非常容易。我通过从 ApplicationContext 获取 bolt、spouts 和 stream beans 来构建拓扑。在我的 Spring 配置中,KafkaSpout 和 CassandraWriterBolt 是在“prod”配置文件下定义的,因此它们只能在 prod 和“test”配置文件下使用,我为 KafkaSpout 和 CassandraWriterBolt 定义了存根。对于 KafkaSpout,我使用了 FixedToupleSpout,对于 CassandraWriterBolt,我使用了 TestWordCounter。

这是我的测试

我得到的结果不是我所期望的。我收到以下错误 -

看起来,TestWordCounter 只是将第一个值作为元组读取(仅限货币对并跳过出价和要价)。似乎 TestWordCounter 在这里不是一个正确的选择。CassandraWriterBolt 的正确存根是什么,以便我可以断言它将收到 2 条记录,一条为 GBPJPY,另一条为 GBPUSD,以及他们的买入价和卖出价?

0 投票
1 回答
274 浏览

java - 为 Apache Storm 编写集成测试

用 Java 为 Apache Storm 拓扑编写集成测试的推荐方法是什么?感谢任何建议/链接。

0 投票
1 回答
112 浏览

apache-storm - 在拓扑中可以从一个螺栓传输到另一个螺栓的最大消息大小是多少

我有一个使用streamparse构建的拓扑,它接收来自kafka的输入,然后spout将消息传输到主螺栓,主螺栓计算并生成10 MB的消息,该消息必须传递给其他3个螺栓。

数据在主螺栓中成功生成,但在传输到另一个螺栓时出现以下错误

ERROR Unable to write to stream UDP:localhost:514 for appender syslog: org.apache.logging.log4j.core.appender.AppenderLoggingException: Error flushing stream UDP:localhost:514

0 投票
0 回答
90 浏览

apache-storm - 远程提交的风暴拓扑未运行

我们创建了一个风暴拓扑并在本地模式下进行了测试,一切正常。然后我们做了一个构建并提交给 nimbus 和一个有 4 个插槽的主管。拓扑出现在 Storm UI 上并显示为活动的,集群上使用了 4 个插槽。但是当点击拓扑时,没有spouts,没有bolts,没有统计信息。我们的 Redis 数据库也没有写入任何内容。所以我们想知道是否有一些我们没有做的事情。

Storm 版本:2.0.0
操作系统:Linux Mint 19.1 Cinnamon

0 投票
1 回答
778 浏览

apache-storm - java.lang.RuntimeException: org.apache.storm.thrift.TApplicationException: 内部错误处理 beginFileUpload

我正在尝试将 Storm-start 中的ExclamationTopology示例提交到单节点集群并收到以下错误

我的环境:

  • Linuxmint-19.1(64 位)
  • Apache-storm-2.1.0
  • Apache-zookeeper-3.5.5
0 投票
2 回答
279 浏览

apache-storm - 如何在storm crawler中使用python bolt?

我有一些用 python 编写的图像分类器。网上有很多例子描述了在标准输入/标准输出中使用的风暴螺栓中使用 python 的方式。我想将我的 python 图像分类器与风暴爬虫拓扑集成。有没有可能?

谢谢

0 投票
1 回答
200 浏览

apache-storm - InvalidTopologyException(msg:Component: [x] 从不存在的流 [y] 订阅

我正在尝试从kafka读取数据并使用storm插入cassandra。我也配置了拓扑,但是我遇到了一些问题,我不知道为什么会这样。

这是我的提交者作品。

在这里,如果我评论最后一行,我看不到任何异常。最后一行,我收到以下错误:

InvalidTopologyException(msg:Component: [cassandrabolt] subscribes from non-existent stream: [default] of component [checkingbolt])

有人可以帮我吗,这里有什么问题?

这是 CheckingBolt 中的 outputFieldDeclarer

我在 CassandraInsertBolt 的 declareOutputFields 方法中没有任何内容,因为该螺栓不会发出任何值。

TIA

0 投票
1 回答
156 浏览

apache-storm - Apache Storm 集群未将拓扑的组件分配给所有可用的工作人员

我的拓扑配置为使用 14 个工作人员,我目前在集群中有 16 个工作人员可用。但是当我提交拓扑时,Storm UI 上的“工作人员资源”部分显示所有 14 个工作人员都已启动,但组件(拓扑组件)仅分配给 7 个。其余显示 N/A(见下面的快照)。

拓扑快照

工人/主管日志文件没有显示任何有意义的错误,因为某些工人没有被分配组件。过去 6 小时以上我一直在谷歌搜索,但无济于事

我的环境:

  • 阿帕奇风暴 2.1.0
  • 动物园管理员 3.4.9
  • 一个主节点(Ubuntu 18.0.4 LTS)
  • 三个主管节点(Ubuntu 18.0.4 LTS)
0 投票
0 回答
155 浏览

logging - 在 Apache Storm Workers 中设置与 logstash 兼容的日志记录模式

我在使用 Storm 2.1.0 将 Storm 工作人员/拓扑日志转换为与 ELK 兼容的格式 (JSON) 时遇到了一些困难。

这些是我当前的工作人员日志配置:

cluster.xml

worker.xml

根据我的这种配置,我希望日志消息的格式类似于以下内容(为清楚起见,添加了换行符:

但是,似乎消息正在被主管进程甚至工作人员本身“包装”。我得到的日志消息如下所示:

(为可读性添加了换行符/空格)

这里似乎发生了几件事(向内):

  1. 主管似乎包装了所有工作人员消息并在它们前面加上Worker Process <worker-id>:. 可以通过+包装整个消息的 来注意到这一点。
  2. 工人似乎以某种方式包装了自己的日志消息。该message部分日志还包含另一条日志消息。

至于我的问题:

  1. 我可以禁用此主管日志前缀吗?查看 中的源代码org.apache.storm.daemon.supervisor.BasicContainer#launch,这似乎是硬编码的。我无法想象正确的拓扑部署会导致添加到每个日志消息中的硬编码前缀。

  2. 这个工人消息包装怎么会发生?正如我所看到的,“wrappee”是我的实际拓扑消息(我希望解析),而“wrapper”则完全是另外一回事(使用记录器STDERR?为什么要记录到STDERR?使用级别INFO???)

基本上,我想在拓扑执行期间简单地记录一些消息并控制这些消息的格式。我怎样才能用 Storm 可靠地做到这一点?