2

我检查了 Hazelcast Jet 以满足我的项目需求,但我发现文档在以下主题方面确实含糊不清:

1)当我对两个列表流执行数据连接时......例如:

BatchStage<Trade> trades = p.drawFrom(list("trades"));
BatchStage<Entry<Integer, Broker>> brokers =    
p.drawFrom(list("brokers"));
BatchStage<Tuple2<Trade, Broker>> joined = trades.hashJoin(brokers,
    joinMapEntries(Trade::brokerId),
    Tuple2::tuple2);
joined.drainTo(Sinks.logger());

那么我能以某种方式告诉 Jet 下面实际会发生什么连接吗?地图侧连接或减少侧连接......?我的意思是想象一下“经纪人”规模很小,而交易规模很大。执行这两组连接的最佳技术是地图侧连接,也就是广播连接......当 Jet 进行连接时,哪些数据将通过网络传输?是否有任何基于大小的优化?

2)我正在测试以下场景:

简单的管道:

private Pipeline createPipeLine() {
    Pipeline p = Pipeline.create();
    BatchStage stage = p.drawFrom(Sources.<Date>list("master"));
    stage.drainTo(Sinks.logger());
    return p;
}

list("master")不断被集群中的另一个节点填充。现在,当我将此管道提交到集群时,只有列表的子集(“master”)被排空到记录器。我可以以某种方式将 Jet 作业设置为不断消耗list("master")标准输出吗?

提前致谢

4

1 回答 1

2
  1. 来自 HashJoin 的 Javadoc:

    在实现上,哈希连接转换针对吞吐量进行了优化,因此每个计算成员都拥有所有丰富数据的本地副本,存储在哈希表中(因此得名)。在从主流中摄取任何数据之前,丰富的流被完全消耗。

    对于您的示例,列表中的所有项目broker将首先从所有成员中消耗,然后trades列表将被消耗。

  2. IList是批处理源,您需要一个流式源来持续消费项目。您可以IQueue用作源,这是为队列创建源的简单方法:

    StreamSource<Trade> queueSource = SourceBuilder.<IQueue<Trade>>stream("queueStream", 
            c -> c.jetInstance().getHazelcastInstance().getQueue("trades"))
        .<Trade>fillBufferFn((queue, buf) -> buf.add(queue.poll()))
        .build();
    
于 2018-11-05T06:48:26.313 回答