我按照Quarkus - Using Apache Kafka with Reactive Messaging创建了一个示例来品尝它,我改变了这样的消息流:
- 保存帖子后,通过 CDI 触发事件。
- 收到 CDI 并发送到 Kafka 主题。
- 从 Kafka 主题中读取数据,并将其作为 SSE 公开给客户端。
Kafka 消息传递的配置,是application.properties
.
# Consume data from Kafka
mp.messaging.incoming.activities.connector=smallrye-kafka
mp.messaging.incoming.activities.value.deserializer=io.vertx.kafka.client.serialization.JsonObjectDeserializer
# Produce data to Kafka
mp.messaging.outgoing.activitiesOut.connector=smallrye-kafka
mp.messaging.outgoing.activitiesOut.topic=activities
mp.messaging.outgoing.activitiesOut.value.serializer=io.vertx.kafka.client.serialization.JsonObjectSerializer
CDI 事件和反应消息的事件处理类。
@ApplicationScoped
public class ActivityStreams {
ReplaySubject<JsonObject> replaySubject;
Flowable<JsonObject> flowable;
@PostConstruct public void init() {
replaySubject = ReplaySubject.create();
flowable = replaySubject.share().toFlowable(BackpressureStrategy.BUFFER);
}
public void onActivityCreated(@ObservesAsync Activity activity) {
replaySubject.onNext(JsonObject.mapFrom(activity));
}
@Outgoing("activitiesOut")
public Publisher<JsonObject> onReceivedActivityCreated() {
return flowable;
}
@Incoming("activities")
@Outgoing("my-data-stream")
@Broadcast
public Activity onActivityReceived(JsonObject data) {
Activity activity = data.mapTo(Activity.class);
activity.setOccurred(LocalDateTime.now());
return activity;
}
}
当我尝试将其公开为 SSE 时,它无法按预期工作。
@Path("/activities")
@ApplicationScoped
public class ActivityResource {
@Inject
@Channel("my-data-stream")
public Publisher<Activity> stream;
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
Publisher<Activity> eventStream(){
return stream;
}
}
在控制台日志中,我看到了发送到活动队列的消息,但没有进一步的 SSE 步骤。当我通过 访问 sse 端点时curl
,它总是返回Not found状态。
curl -v -N -H "Accept:text/event-stream" http://localhost:8080/activities --connect-timeout 60
...
HTTP/1.1 404 Not Found
完整的示例代码在这里。