17

如何为风暴拓扑提供自定义配置?例如,如果我构建了一个连接到 MySQL 集群的拓扑,并且我希望能够在不重新编译的情况下更改需要连接的服务器,我该怎么做?我的偏好是使用配置文件,但我担心文件本身没有部署到集群,因此它不会运行(除非我对集群如何工作的理解有缺陷)。到目前为止,我所见过的在运行时将配置选项传递给风暴拓扑的唯一方法是通过命令行参数,但是当您获得大量参数时,这会很麻烦。

确实有一个想法是利用 shell 脚本将文件读入一个变量,并将该变量的内容作为字符串传递给拓扑,但如果可能的话,我想要一些更简洁的东西。

有人遇到过这种情况么?如果是这样,您是如何解决的?

编辑:

它似乎需要提供更多的澄清。我的场景是我有一个拓扑,我希望能够在不同的环境中部署而无需重新编译它。通常,我会创建一个配置文件,其中包含诸如数据库连接参数之类的内容并将其传入。我想知道如何在 Storm 中执行类似的操作。

4

7 回答 7

8

您可以指定与拓扑一起提交的配置(通常通过 yaml 文件)。我们如何在自己的项目中自己管理这个问题是我们有单独的配置文件用于开发和一个用于生产,在其中我们存储我们的服务器、redis 和数据库 IP 和端口等。然后当我们运行我们的命令来构建 jar 并提交风暴拓扑包括正确的配置文件,具体取决于您的部署环境。bolts 和 spout 只需从stormConf 映射中读取它们所需的配置,该映射在bolt 的prepare() 方法中传递给它们。

http://storm.apache.org/documentation/Configuration.html

每个配置都在 Storm 代码库的 defaults.yaml 中定义了一个默认值。您可以通过在 Nimbus 和主管的类路径中定义一个storm.yaml 来覆盖这些配置。最后,您可以在使用 StormSubmitter 时定义与拓扑一起提交的特定于拓扑的配置。但是,特定于拓扑的配置只能覆盖以“拓扑”为前缀的配置。

Storm 0.7.0 及更高版本允许您基于每个螺栓/每个喷口覆盖配置。

您还将在http://nathanmarz.github.io/storm/doc/backtype/storm/StormSubmitter.html上看到 submitJar 和 submitTopology 传递了一个名为 conf 的映射。

希望这能让你开始。

于 2014-02-23T10:24:06.070 回答
3

我通过在代码中提供配置解决了这个问题:

config.put(Config.TOPOLOGY_WORKER_CHILDOPTS, SOME_OPTS);

我试图提供特定于拓扑的,storm.yaml但它不起作用。如果您可以使用storm.yaml,请纠正我。

更新:
对于任何想知道 SOME_OPTS 是什么的人,这是来自 Storm 邮件列表上的 Satish Duggana:

Config.TOPOLOGY_WORKER_CHILDOPTS:可以覆盖拓扑的 WORKER_CHILDOPTS 的选项。您可以配置任何 java 选项,如内存、gc 等

在你的情况下,它可以是

config.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx1g");
于 2015-03-12T03:01:02.467 回答
1

实际上最适合您的是将配置存储在可变键值存储(s3、redis 等)中,然后将其拉入以配置您随后使用的数据库连接(我假设您已经计划限制如何使用您经常与数据库交谈,因此获取此配置的开销并不是什么大问题)。这种设计允许您即时更改数据库连接,甚至无需重新部署拓扑。

于 2013-08-06T02:17:36.333 回答
0

这个想法是,当您构建拓扑时,您会创建 spout 和 bolt 的实例(除其他外),这些实例会被序列化并分发到集群中的正确位置。如果你想配置一个 spout 或 bolt 的行为,你可以在提交之前创建拓扑时这样做,你可以通过在 bolt 或 spout 上设置实例变量来实现,反过来,驱动你想要的可配置行为。

于 2013-08-06T03:11:55.860 回答
0

我也遇到了同样的问题。我通过在集群中配置 NFS 解决了这个问题,并将该配置文件放在共享位置,以便所有集群机器都可以使用它。在 linux 系统链接中配置 NFS 非常容易。

于 2014-01-15T11:55:57.420 回答
0

我遇到了和你一样的问题,这是我棘手的解决方案:

使用一个简单的 java 文件作为配置文件,比如说topo_config.java,它看起来像:

package com.xxx
public class topo_config {
    public static String zk_host = "192.168.10.60:2181";
    public static String kafka_topic = "my_log_topic";
    public static int worker_num = 2;
    public static int log_spout_num = 4;
    // ...
}

这个文件放在我的配置文件夹中,然后编写一个脚本,说compile.sh它将它复制到正确的包并进行编译,如下所示:

cp config/topo_config.java src/main/java/com/xxx/
mvn package

直接实现配置:

Config conf = new Config();
conf.setNumWorkers(topo_config.worker_num);
于 2014-11-14T09:23:09.177 回答
0

我们已经看到了同样的问题并通过添加以下每个拓扑来解决它

config.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx4096m -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled -Djava.net.preferIPv4Stack=true");

还使用 Nimbus UI 进行了验证,如下所示。

topology.worker.childopts   -Xmx4096m -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled -Djava.net.preferIPv4Stack=true
于 2016-07-24T20:56:45.153 回答