我正在关注Doc以尝试如何通过直接从 IMap 中查找来丰富无界流。我有两个地图:
- 产品:
Map<String, Product>
(ProductId 作为键) - 卖家:(
Map<String, Seller>
以SellerId为key)
Product
和都是Seller
非常简单的类:
public class Product implements DataSerializable {
String productId;
String sellerId;
int price;
...
public class Seller implements DataSerializable {
String sellerId;
int revenue;
...
我有两个数据生成器不断将数据推送到两个地图。两个地图都启用了事件日志。我已经验证事件日志工作正常。
我想Product
用Seller
地图丰富地图的流事件。这是我的代码片段:
IMap<String, Seller> sellerIMap = jetClient.getMap(SellerDataGenerator.SELLER_MAP);
StreamSource<Product> productStreamSource = Sources.mapJournal(ProductDataGenerator.PRODUCT_MAP, Util.mapPutEvents(), Util.mapEventNewValue(), START_FROM_CURRENT);
p.drawFrom(productStreamSource)
.withoutTimestamps()
.groupingKey(Product::getSellerId)
.mapUsingIMap(sellerIMap, (product, seller) -> new EnrichedProduct(product, seller))
.drainTo(getSink());
try {
JobConfig jobConfig = new JobConfig();
jobConfig.addClass(TaskSubmitter.class).addClass(Seller.class).addClass(Product.class).addClass(ExtendedProduct.class);
jobConfig.setName(Constants.BASIC_TASK);
Job job = jetClient.newJob(p, jobConfig);
} finally {
jetClient.shutdown();
}
提交作业时,出现以下错误:
com.hazelcast.spi.impl.operationservice.impl.Invocation - [172.31.33.212]:80 [jet] [3.1] 异步执行回调失败:com.hazelcast.util.executor.DelegatingFuture$DelegatingExecutionCallback@77ac0407for call Invocation{ op=com.hazelcast.map.impl.operation.GetOperation{serviceName='hz:impl:mapService', identityHash=1939050026, partitionId=70,replicaIndex=0, callId=-37944, invocationTime=1570410704479 (2019-10-07 01:11:44.479), waitTimeout=-1, callTimeout=60000, name=sellerMap}, tryCount=250, tryPauseMillis=500, invokeCount=1, callTimeoutMillis=60000, firstInvocationTimeMs=1570410704479, firstInvocationTime='2019-10-07 01 :11:44.479', lastHeartbeatMillis=0, lastHeartbeatTime='1970-01-01 00:00:00.000', target=[172.31.33.212]:80, pendingResponse={VOID}, backupsAcksExpected=0, backupsAcksReceived=0,连接=空}
我试图在我的集群中放置一个和两个实例并得到相同的错误消息。我无法弄清楚根本原因是什么。