我有一个帐户列表并在刻度上执行哈希连接并返回带有刻度数据的帐户。但是在 hashjoin 之后我有drainTo lListJet
然后阅读它DistributedStream
并返回它。
public List<Account> populateTicksInAccounts(List<Account> accounts) {
...
...
Pipeline p = Pipeline.create();
BatchSource<Tick> ticksSource = Sources.list(TICKS_LIST_NAME);
BatchSource<Account> accountSource = Sources.fromProcessor(AccountProcessor.of(accounts));
p.drawFrom(ticksSource)
.hashJoin(p.drawFrom(accountSource), JoinClause.joinMapEntries(Tick::getTicker), accountMapper())
.drainTo(Sinks.list(TEMP_LIST));
jet.newJob(p).join();
IListJet<Account> list = jet.getList(TEMP_LIST);
return DistributedStream.fromList(list).collect(DistributedCollectors.toIList());
}
是否可以drainTo
使用 javaList
而不是lListJet
在执行 hashjoin 之后?
像下面这样的东西是可能的吗?
IListJet<Account> accountWithTicks = new ArrayList<>();
p.drawFrom(ticksSource)
.hashJoin(p.drawFrom(accountSource), JoinClause.joinMapEntries(Tick::getTicker), accountMapper())
.drainTo(<CustomSinkProcessor(accountWithTicks)>);
return accountWithTicks;
CustomSinkProcessor 将在哪里获取空的 java 列表并与帐户一起返回?