我一直在浏览 hazelcast-jet 文档,以查找对某些外部进程异步馈送的源的引用——在我的例子中是 http 帖子。
我确实查看了Kafka 代码,因为这似乎是最接近的,但无法弄清楚新到达的事件将如何触发任何事情。我假设,这里不会涉及阻塞线程。
我将不胜感激任何指针,以更好地了解如何在“流”元素被滴灌的环境中使用 hazelcast jet。
我一直在浏览 hazelcast-jet 文档,以查找对某些外部进程异步馈送的源的引用——在我的例子中是 http 帖子。
我确实查看了Kafka 代码,因为这似乎是最接近的,但无法弄清楚新到达的事件将如何触发任何事情。我假设,这里不会涉及阻塞线程。
我将不胜感激任何指针,以更好地了解如何在“流”元素被滴灌的环境中使用 hazelcast jet。
Hazelcast Jet 即将发布的 0.7 版引入了 Source Builder 对象,这使得构建自定义源变得更加简单。你可以用它来写类似这样的代码:
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create();
StreamSource<String> source = SourceBuilder
.timestampedStream("http-trickle", x -> new HttpSource())
.fillBufferFn(HttpSource::addToBuffer)
.destroyFn(HttpSource::destroy)
.build();
StreamStage<String> srcStage = pipeline.drawFrom(source);
}
private static class HttpSource {
private final BlockingQueue<String> queue = new LinkedBlockingQueue<>(10000);
private final ArrayList<String> buffer = new ArrayList<>();
private final AsyncClient<String> client =
new AsyncClient<String>().addReceiveListener(queue::add);
void addToBuffer(TimestampedSourceBuffer<String> sourceBuffer) {
queue.drainTo(buffer);
for (String line : buffer) {
sourceBuffer.add(line, extractTimestamp(line));
}
buffer.clear();
}
void destroy() {
client.close();
}
}
在这里,我使用了一个模拟AsyncClient
,它应该代表您的实际异步 HTTP 客户端。它希望您提供一个回调,以便在传入数据到达时对其进行处理。Jet 的源构建器会要求您提供另一个回调fillBufferFn
,它将数据发送到处理管道。
您对 的回调AsyncClient
应该将数据推送到并发队列,并且您fillBufferFn
应该将队列排到 Jet 的源缓冲区。
你可能会想简化我给这个的代码:
void addToBufferDirect(TimestampedSourceBuffer<String> sourceBuffer) {
for (String line; (line = queue.poll()) != null;) {
sourceBuffer.add(line, extractTimestamp(line));
}
}
这避免了中间缓冲区站在并发队列和 Jet 的源缓冲区之间。它实际上大部分时间都可以工作,但是如果您遇到流量高峰,addToBufferDirect
可能永远不会完成。这将违反与 Jet 的合同,该合同要求您fillBufferFn
在一秒钟左右内返回。
我们已经认识到这种将源构建器与异步客户端 API 结合使用的模式非常普遍,我们计划提供更多便利来处理它。