1

我按照Quarkus - Using Apache Kafka with Reactive Messaging创建了一个示例来品尝它,我改变了这样的消息流:

  1. 保存帖子后,通过 CDI 触发事件。
  2. 收到 CDI 并发送到 Kafka 主题。
  3. 从 Kafka 主题中读取数据,并将其作为 SSE 公开给客户端。

Kafka 消息传递的配置,是application.properties.

# Consume data from Kafka
mp.messaging.incoming.activities.connector=smallrye-kafka
mp.messaging.incoming.activities.value.deserializer=io.vertx.kafka.client.serialization.JsonObjectDeserializer

# Produce data to Kafka
mp.messaging.outgoing.activitiesOut.connector=smallrye-kafka
mp.messaging.outgoing.activitiesOut.topic=activities
mp.messaging.outgoing.activitiesOut.value.serializer=io.vertx.kafka.client.serialization.JsonObjectSerializer

CDI 事件和反应消息的事件处理类。

@ApplicationScoped
public class ActivityStreams {

    ReplaySubject<JsonObject> replaySubject;
    Flowable<JsonObject> flowable;

    @PostConstruct public void init() {
        replaySubject = ReplaySubject.create();
        flowable = replaySubject.share().toFlowable(BackpressureStrategy.BUFFER);
    }

    public void onActivityCreated(@ObservesAsync Activity activity) {
        replaySubject.onNext(JsonObject.mapFrom(activity));
    }

    @Outgoing("activitiesOut")
    public Publisher<JsonObject> onReceivedActivityCreated() {
        return flowable;
    }

    @Incoming("activities")
    @Outgoing("my-data-stream")
    @Broadcast
    public Activity onActivityReceived(JsonObject data) {
        Activity activity = data.mapTo(Activity.class);
        activity.setOccurred(LocalDateTime.now());
        return activity;
    }

}

当我尝试将其公开为 SSE 时,它无法按预期工作。

@Path("/activities")
@ApplicationScoped
public class ActivityResource {

    @Inject
    @Channel("my-data-stream")
    public Publisher<Activity> stream;

    @GET
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @SseElementType(MediaType.APPLICATION_JSON)
    Publisher<Activity> eventStream(){
        return stream;
    }
}

在控制台日志中,我看到了发送到活动队列的消息,但没有进一步的 SSE 步骤。当我通过 访问 sse 端点时curl,它总是返回Not found状态。

curl -v -N  -H "Accept:text/event-stream" http://localhost:8080/activities  --connect-timeout 60

...
HTTP/1.1 404 Not Found

完整的示例代码在这里

4

0 回答 0