我正在使用带有三叉戟拓扑的风暴,但我无法理解并行度是如何获得的,它根据我的计算和我在风暴 UI 上看到的内容而有所不同,
这是分配工人数量的代码:
public Config getTopologyConfiguration() {
Config conf = new Config();
//conf.setDebug(true);
conf.setNumWorkers(6);
conf.setMessageTimeoutSecs(100);
return conf;
}
这是流处理代码:
s.name("aggregation_stream")
.parallelismHint(invoiceAggregationConfig.getSpoutParallelism())
.partitionBy(groupedFields)
.partitionAggregate(aggregateInputFields,
new GenericAggregator(groupedFields, aggregatedFieldsList, aggregateFieldsOperationList),
aggregatorOutputFields)
.parallelismHint(invoiceAggregationConfig.getAggregationParallelism())
.shuffle()
.each(aggregatorOutputFields,
new CreatePaymentFromInvoices(paymentType, groupMap, aggMap, paymentExtraParams),
Const.PAYMENT_FIELD)
.each(TridentUtils.fieldsConcat(aggregatorOutputFields, Const.PAYMENT_FIELD),
new CreateApplicationFromPaymentAndInvoices(invoiceType),
Const.APPLICATIONS_FIELD)
.each(TridentUtils.fieldsConcat(aggregatorOutputFields, Const.PAYMENT_FIELD, Const.APPLICATIONS_FIELD),
new RestbusFilterForPaymentAndApplications(environment, bu, serviceConfiguration))
.parallelismHint(invoiceAggregationConfig.getPersistenceParallelism());
我在上面的代码中使用的并行属性在这里:
spoutParallelism: 3
aggregationParallelism: 6
persistenceParallelism: 6
现在根据我的计算,执行者的数量应该是 3*6 + 6 = 24
但是在 Storm UI 中它显示 23,如何?
已编辑
添加新的屏幕截图,其中包含有关各个组件的信息
这里我可以看到 Executors 和任务的数量是 50,但是我没有为此设置任何配置,storm 本身是否提供了这个?
其次,发出的元组数量巨大,我没有产生这么多数据,这是100多倍的元组,为什么会在UI中显示这么多的元组?