2

我正在尝试将 Storm 设置为聚合流,但在同一流上使用各种(DRPC 可用)指标。

例如,流由消息组成,这些消息具有发送者、接收者、消息到达的通道和传递消息的网关。我在决定如何组织一个或多个拓扑结构时遇到了麻烦,这些拓扑结构可以为我提供例如网关和/或通道的消息总数。除了总数之外,每分钟的计数也会很好。

基本思想是有一个可以接受消息事件的 spout,并从那里根据需要聚合数据。目前我正在使用 Trident 和 DRPC,并且我想出了两种可能的拓扑来解决这个阶段的问题。无法决定哪种方法更好,如果有的话?!

整个源代码都可以在这个gist上找到。它分为三个类:

  • 随机消息喷口
    • 用于发出消息数据
    • 模拟真实数据源
  • 分离拓扑
    • 为所需的每个指标创建单独的 DRPC 流
    • 还会为每个指标创建一个单独的查询状态
    • 他们都使用相同的 spout 实例
  • 组合拓扑
    • 创建具有所有所需指标的单个 DRPC 流
    • 为每个指标创建单独的查询状态
    • 每个查询状态都提取所需的指标并为其分组结果

现在,对于问题和疑问:

  • 分离拓扑
    • 是否有必要使用相同的 spout 实例,或者我可以每次都说 new RandomMessageSpout() 吗?
    • 我喜欢这样的想法,即我不需要按所有指标保存分组数据,而只是我们需要稍后提取的分组
    • spout 发出的数据是否实际上由所有状态/查询组合处理,例如不是第一个出现的数据?
    • 这以后还会在运行时启用新状态/查询组合的动态添加吗?
  • 组合拓扑
    • 我真的不喜欢我需要保留按所有指标分组的数据的想法,因为我不需要所有组合
    • 令人惊讶的是,所有指标总是返回相同的数据
  • stateQuery 中的 SnapshotGet 与 TupleCollectionGet
    • 使用 SnapshotGet 事情往往会奏效,但并非总是如此,只有 TupleCollectionGet 解决了这个问题
    • 关于什么是正确的做法的任何指示?

我想这是一个冗长的问题/主题,但非常感谢任何帮助!此外,如果我完全错过了架构,那么我将非常欢迎有关如何实现这一点的建议。提前致谢 :-)

4

1 回答 1

0

您实际上不能通过使用相同的 spout 实例SeparateTopology调用来拆分流newStream(),因为这会创建同一个RandomMessageSpoutspout 的新实例,这将导致多个单独的 spout 实例向您的拓扑发出重复的值。(Spout 并行化只能在具有分区 spout 的 Storm 中实现,其中每个 spout 实例处理整个数据集的一个分区——例如 Kafka 分区)。

此处正确的方法是修改CombinedTopology以根据您需要的每个指标将流拆分为多个流(见下文),然后groupBy()按该指标的字段和persistentAggregate()每个新分支的流执行。

从三叉戟常见问题解答中,

"each" 返回一个 Stream 对象,您可以将其存储在变量中。然后,您可以在同一个 Stream 上运行多个 eaches 来拆分它,例如:

Stream s = topology.each(...).groupBy(...).aggregate(...)
Stream branch1 = s.each(...)
Stream branch2 = s.each(...)

请参阅Storm 邮件列表上的此线程,以及线程以获取更多信息。

于 2013-10-09T16:17:53.027 回答