1

我一直在浏览 hazelcast-jet 文档,以查找对某些外部进程异步馈送的源的引用——在我的例子中是 http 帖子。

我确实查看了Kafka 代码,因为这似乎是最接近的,但无法弄清楚新到达的事件将如何触发任何事情。我假设,这里不会涉及阻塞线程。

我将不胜感激任何指针,以更好地了解如何在“流”元素被滴灌的环境中使用 hazelcast jet。

4

1 回答 1

1

Hazelcast Jet 即将发布的 0.7 版引入了 Source Builder 对象,这使得构建自定义源变得更加简单。你可以用它来写类似这样的代码:

public static void main(String[] args) {
    Pipeline pipeline = Pipeline.create();
    StreamSource<String> source = SourceBuilder
            .timestampedStream("http-trickle", x -> new HttpSource())
            .fillBufferFn(HttpSource::addToBuffer)
            .destroyFn(HttpSource::destroy)
            .build();
    StreamStage<String> srcStage = pipeline.drawFrom(source);
}

private static class HttpSource {
    private final BlockingQueue<String> queue = new LinkedBlockingQueue<>(10000);
    private final ArrayList<String> buffer = new ArrayList<>();
    private final AsyncClient<String> client = 
        new AsyncClient<String>().addReceiveListener(queue::add);

    void addToBuffer(TimestampedSourceBuffer<String> sourceBuffer) {
        queue.drainTo(buffer);
        for (String line : buffer) {
            sourceBuffer.add(line, extractTimestamp(line));
        }
        buffer.clear();
    }

    void destroy() {
        client.close();
    }
}

在这里,我使用了一个模拟AsyncClient,它应该代表您的实际异步 HTTP 客户端。它希望您提供一个回调,以便在传入数据到达时对其进行处理。Jet 的源构建器会要求您提供另一个回调fillBufferFn,它将数据发送到处理管道。

您对 的回调AsyncClient应该将数据推送到并发队列,并且您fillBufferFn应该将队列排到 Jet 的源缓冲区。

你可能会想简化我给这个的代码:

void addToBufferDirect(TimestampedSourceBuffer<String> sourceBuffer) {
    for (String line; (line = queue.poll()) != null;) {
        sourceBuffer.add(line, extractTimestamp(line));
    }
}

这避免了中间缓冲区站在并发队列和 Jet 的源缓冲区之间。它实际上大部分时间都可以工作,但是如果您遇到流量高峰,addToBufferDirect可能永远不会完成。这将违反与 Jet 的合同,该合同要求您fillBufferFn在一秒钟左右内返回。

我们已经认识到这种将源构建器与异步客户端 API 结合使用的模式非常普遍,我们计划提供更多便利来处理它。

于 2018-09-24T08:45:51.790 回答