1

我正在关注Doc以尝试如何通过直接从 IMap 中查找来丰富无界流。我有两个地图:

  1. 产品:Map<String, Product>(ProductId 作为键)
  2. 卖家:(Map<String, Seller>以SellerId为key)

Product和都是Seller非常简单的类:

public class Product implements DataSerializable {
    String productId;
    String sellerId;
    int price;
...
public class Seller implements DataSerializable {
    String sellerId;
    int revenue;
...

我有两个数据生成器不断将数据推送到两个地图。两个地图都启用了事件日志。我已经验证事件日志工作正常。

我想ProductSeller地图丰富地图的流事件。这是我的代码片段:

IMap<String, Seller> sellerIMap = jetClient.getMap(SellerDataGenerator.SELLER_MAP);
StreamSource<Product> productStreamSource = Sources.mapJournal(ProductDataGenerator.PRODUCT_MAP, Util.mapPutEvents(), Util.mapEventNewValue(), START_FROM_CURRENT);
p.drawFrom(productStreamSource)
            .withoutTimestamps()
            .groupingKey(Product::getSellerId)
            .mapUsingIMap(sellerIMap, (product, seller) -> new EnrichedProduct(product, seller))
            .drainTo(getSink());
try {
        JobConfig jobConfig = new JobConfig();
        jobConfig.addClass(TaskSubmitter.class).addClass(Seller.class).addClass(Product.class).addClass(ExtendedProduct.class);
        jobConfig.setName(Constants.BASIC_TASK);
        Job job = jetClient.newJob(p, jobConfig);
    } finally {
        jetClient.shutdown();
    }

提交作业时,出现以下错误:

com.hazelcast.spi.impl.operationservice.impl.Invocation - [172.31.33.212]:80 [jet] [3.1] 异步执行回调失败:com.hazelcast.util.executor.DelegatingFuture$DelegatingExecutionCallback@77ac0407for call Invocation{ op=com.hazelcast.map.impl.operation.GetOperation{serviceName='hz:impl:mapService', identityHash=1939050026, partitionId=70,replicaIndex=0, callId=-37944, invocationTime=1570410704479 (2019-10-07 01:11:44.479), waitTimeout=-1, callTimeout=60000, name=sellerMap}, tryCount=250, tryPauseMillis=500, invokeCount=1, callTimeoutMillis=60000, firstInvocationTimeMs=1570410704479, firstInvocationTime='2019-10-07 01 :11:44.479', lastHeartbeatMillis=0, lastHeartbeatTime='1970-01-01 00:00:00.000', target=[172.31.33.212]:80, pendingResponse={VOID}, backupsAcksExpected=0, backupsAcksReceived=0,连接=空}

我试图在我的集群中放置一个和两个实例并得到相同的错误消息。我无法弄清楚根本原因是什么。

4

1 回答 1

1

ClassNotFoundException即使您在作业中添加了适当的类,您的问题似乎是 a 。您存储的对象IMap独立于您的 Jet 作业而存在,并且当事件日志源要求它们时,Jet 的 IMap 代码尝试反序列化它们并失败,因为 Jet 在其类路径中没有您的域模型类。

要继续,请将您在 IMap 中使用的类的 JAR 添加到 Jet 的类路径中。我们正在寻找能够消除此要求的解决方案。

日志输出中没有异常堆栈跟踪的原因是由于java.util.logging您没有明确添加更灵活的日志记录模块(例如 Log4j)时最终使用的默认设置。

Jet 的下一版包装将改进这方面。在此之前,您可以按照以下步骤操作:

  1. 进入libJet 的分发包目录,将 Log4j 下载到其中:

    $ cd lib
    $ wget https://repo1.maven.org/maven2/log4j/log4j/1.2.17/log4j-1.2.17.jar
    
  2. 编辑bin/common.sh以将模块添加到类路径。在文件末尾有一行

    CLASSPATH="$JET_HOME/lib/hazelcast-jet-3.1.jar:$CLASSPATH"
    

    您可以复制此行并替换hazelcast-jet-3.1log4j-1.2.17.

  3. 最后有一个构造变量commons.sh的多行命令。将和JAVA_OPTS添加到列表中。"-Dhazelcast.logging.type=log4j""-Dlog4j.configuration=file:$JET_HOME/config/log4j.properties"

  4. log4j.properties在目录中创建一个文件config

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %5p [%c{1}] [%t] - %m%n

# Change this level to debug to diagnose failed cluster formation:
log4j.logger.com.hazelcast.internal.cluster=info

log4j.logger.com.hazelcast.jet=info
log4j.rootLogger=info, stdout
于 2019-10-14T12:47:55.080 回答