1

我有以下案例场景

在此处输入图像描述

有 2 台虚拟机正在向 Kafka 发送流,CEP 引擎正在接收这些流,当单个流满足特定条件时会生成警告。

目前,CEP 正在检查两个患者的两条数据流(当心率 > 65 和呼吸率 > 68 时)是否相同,并同时发出警报,如下所示

 // detecting pattern
        Pattern<joinEvent, ? > pattern = Pattern.<joinEvent>begin("start")
                .subtype(joinEvent.class).where(new FilterFunction<joinEvent>() {
                    @Override
                    public boolean filter(joinEvent joinEvent) throws Exception {
                        return joinEvent.getHeartRate() > 65 ;
                    }
                })
                .subtype(joinEvent.class)
                .where(new FilterFunction<joinEvent>() {
                    @Override
                    public boolean filter(joinEvent joinEvent) throws Exception {
                        return joinEvent.getRespirationRate() > 68;
                    }
                }).within(Time.milliseconds(100));

但我想对两个流使用不同的条件。例如,我想发出警报,如果

For patient A : if heart rate > 65 and Respiration Rate > 68
For patient B : if heart rate > 75 and Respiration Rate > 78

我该如何做到这一点?我是否需要在同一环境中创建多个流环境或多个模式。

4

1 回答 1

2

根据您的要求,您可以根据需要创建 2 种不同的模式以进行清晰的分离。

如果您想使用相同的模式执行此操作,那么它也是可能的。为此,请在一个 kafka 源中阅读所有 kafka 主题:

    FlinkKafkaConsumer010<JoinEvent> kafkaSource = new FlinkKafkaConsumer010<>(
        Arrays.asList("topic1", "topic2"),
        new StringSerializerToEvent(),
        props);

在这里,我假设您的两个主题的事件结构相同,并且您拥有患者姓名以及被传输的事件的一部分。

一旦你这样做了,它就会变得很容易,因为你只需要使用“或”创建一个模式,如下所示:

    Pattern.<JoinEvent>begin("first")
        .where(new SimpleCondition<JoinEvent>() {

          @Override
          public boolean filter(JoinEvent event) throws Exception {
            return event.getPatientName().equals("A") && event.getHeartRate() > 65 && joinEvent.getRespirationRate() > 68;
          }
        })
        .or(new SimpleCondition<JoinEvent>() {

          @Override
          public boolean filter(JoinEvent event) throws Exception {
            return event.getPatientName().equals("B") && event.getHeartRate() > 75 && joinEvent.getRespirationRate() > 78;
          }
        });

只要您的条件匹配,这就会产生匹配。虽然,我不确定“.within(Time.milliseconds(100))”在您的示例中实现了什么。

于 2017-09-01T12:01:06.867 回答