我正在尝试创建一个自定义包装器来记录处理每个顶点处理器执行的每条消息所花费的执行时间。
使用 DiagnosticProcessors.peekInputP/peekOutputP 和 processor.PeekWrappedP 作为灵感,我最终得到了以下代码:
public final class LogEngine {
private LogEngine() {
}
@Nonnull
public static <T> ProcessorMetaSupplier logP(@Nonnull ProcessorMetaSupplier wrapped) {
return new WrappingProcessorMetaSupplier(wrapped, p ->
new LogWrappedP<>(p));
}
@Nonnull
public static <T> ProcessorSupplier logP(@Nonnull ProcessorSupplier wrapped) {
return new WrappingProcessorSupplier(wrapped, p ->
new LogWrappedP<>(p));
}
@Nonnull
public static <T> DistributedSupplier<Processor> logP(@Nonnull DistributedSupplier<Processor> wrapped) {
return () -> new LogWrappedP<>(wrapped.get());
}
}
和
public final class LogWrappedP<T> implements Processor {
private final Processor wrappedProcessor;
private static Logger logger;
private ProcCtx ctx;
public LogWrappedP(@Nonnull Processor wrappedProcessor) {
checkNotNull(wrappedProcessor, "wrappedProcessor");
this.wrappedProcessor = wrappedProcessor;
}
@Override
public void init(@Nonnull Outbox outbox, @Nonnull Context context) {
// Fix issue #595: pass a logger with real class name to processor
// We do this only if context is ProcCtx (that is, not for tests where TestProcessorContext can be used
// and also other objects could be mocked or null, such as jetInstance())
if (context instanceof ProcCtx) {
ProcCtx c = (ProcCtx) context;
NodeEngine nodeEngine = ((HazelcastInstanceImpl) c.jetInstance().getHazelcastInstance()).node.nodeEngine;
ILogger newLogger = nodeEngine.getLogger(
createLoggerName(wrappedProcessor.getClass().getName(), c.vertexName(), c.globalProcessorIndex()));
ctx = new ProcCtx(c.jetInstance(), c.getSerializationService(), newLogger, c.vertexName(),
c.globalProcessorIndex(), c.processingGuarantee());
}
logger = LogManager.getLogger(wrappedProcessor.getClass().getName());
wrappedProcessor.init(outbox, ctx);
}
@Override
public boolean isCooperative() {
return wrappedProcessor.isCooperative();
}
@Override
public void process(int ordinal, @Nonnull Inbox inbox) {
KafkaMessage msg = (KafkaMessage) inbox.peek();
logger.info("START {} {} {} {} {}", ctx.vertexName(), getProcessorId(), getInstanceHost(), msg.getUuid(), Instant.now().toEpochMilli());
wrappedProcessor.process(ordinal, inbox);
logger.info("END {} {} {} {} {}", ctx.vertexName(), getProcessorId(), getInstanceHost(), msg.getUuid(), Instant.now().toEpochMilli());
}
@Override
public boolean tryProcess() {
return wrappedProcessor.tryProcess();
}
@Override
public boolean complete() {
return wrappedProcessor.complete();
}
@Override
public boolean completeEdge(int ordinal) {
return wrappedProcessor.completeEdge(ordinal);
}
@Override
public boolean saveToSnapshot() {
return wrappedProcessor.saveToSnapshot();
}
@Override
public void restoreFromSnapshot(@Nonnull Inbox inbox) {
wrappedProcessor.restoreFromSnapshot(inbox);
}
@Override
public boolean finishSnapshotRestore() {
return wrappedProcessor.finishSnapshotRestore();
}
protected int getProcessorId() {
return ctx.globalProcessorIndex();
}
protected String getInstanceUUID() {
return ctx.jetInstance().getCluster().getLocalMember().getUuid();
}
protected String getInstanceHost() {
return ctx.jetInstance().getCluster().getLocalMember().getAddress().getHost();
}
}
现在我可以在我的应用程序顶点中使用我的包装器:
Vertex kafkaSource = dag.newVertex("kafkaSource", streamKafkaP(properties, decodeKafkaMessage, topic))
.localParallelism(2);
Vertex app = dag.newVertex("app", LogEngine.logP(ProcessFrameP::new))
.localParallelism(2);
并得到预期的结果,
2018-02-18 08:47:04,024 INFO START app 1 172.21.0.1 bc407e15-e78e-4734-822d-1172485e6632 1518954424024
2018-02-18 08:47:04,108 INFO END app 1 172.21.0.1 bc407e15-e78e-4734-822d-1172485e6632 1518954424108
2018-02-18 08:47:04,681 INFO START app 1 172.21.0.1 82e38e7e-73b7-4729-8d28-4f7fc87700ad 1518954424681
2018-02-18 08:47:04,710 INFO END app 1 172.21.0.1 82e38e7e-73b7-4729-8d28-4f7fc87700ad 1518954424710
2018-02-18 08:47:05,524 INFO START app 1 172.21.0.1 16633f77-8af5-4ab1-b94a-6192022f904f 1518954425524
2018-02-18 08:47:05,551 INFO END app 1 172.21.0.1 16633f77-8af5-4ab1-b94a-6192022f904f 1518954425551
2018-02-18 08:47:06,518 INFO START app 1 172.21.0.1 29622922-4987-44d4-8def-186b415c8fa9 1518954426518
2018-02-18 08:47:06,533 INFO END app 1 172.21.0.1 29622922-4987-44d4-8def-186b415c8fa9 1518954426533
2018-02-18 08:47:07,457 INFO START app 1 172.21.0.1 ce016601-d7be-4382-bc81-1d6a75e8748b 1518954427457
2018-02-18 08:47:07,475 INFO END app 1 172.21.0.1 ce016601-d7be-4382-bc81-1d6a75e8748b 1518954427475
2018-02-18 08:47:08,358 INFO START app 1 172.21.0.1 6a0be934-3eb6-4e46-9f08-76c072304de6 1518954428358
2018-02-18 08:47:08,379 INFO END app 1 172.21.0.1 6a0be934-3eb6-4e46-9f08-76c072304de6 1518954428379
问题是包装器不适用于kafkaSource顶点。我试图了解 peekOutputP 背后的逻辑,但我无法获得源顶点的工作版本。
我应该怎么做才能让包装器也在源顶点中工作?