0

我想将 Flink 用于远程患者监测案例场景,其中包括陀螺仪、加速度计、ECG 流、HR 速率流、RR 速率等各种传感器。因此在这种情况下,我们不可能拥有相同的数据类型或输入率等,但我仍然想检测心律失常或其他涉及在这些多个传感器上进行 CEP 的医疗状况

我所知道的是,如果我想对这些传感器执行一些复杂的事件处理,那么我有 2 个选项需要在 CEP 之前完成

  1. 加入差异流
  2. 合并差异流

之前我是根据传感器的时间戳执行连接,但它不会导致连接所有事件,因为差异流可以在微秒内具有差异速率和不同的时间戳,因此时间戳完全相等的情况很少见。

所以我想使用选项#2,即在执行 CEP 之前执行合并。为此,我在 Flink 文档中发现,我可以合并两个流,但它们应该具有相同的数据类型,我尝试做同样的事情,但我没有成功,因为我得到了以下错误

  Exception in thread "main" java.lang.IllegalArgumentException: Cannot union streams of different types: GenericType<org.carleton.cep.monitoring.latest.Events.RRIntervalStreamEvent> and GenericType<org.carleton.cep.monitoring.latest.Events.qrsIntervalStreamEvent>
    at org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:217)

现在让我们看看我是如何尝试执行合并的。所以基本上我有两个流类,它们的属性如下

RRIntervalStream 事件流

public Integer Sensor_id;
public Long time;
public Integer RRInterval;

qrsIntervalStream 事件流

public Integer Sensor_id;
public Long time;
public Integer qrsInterval;

这两个流都有生成器类,它们也以指定的速率以相同的数据类型发送事件。下面是我尝试合并它们的代码。

// getting qrs interval stream
   DataStream<qrsIntervalStreamEvent> qrs_stream_raw = envrionment.
                    addSource(new Qrs_interval_Gen(input_rate_qrs_S,Total_Number_Of_Events_in_qrs)).name("qrs stream");


// getting RR interval stream
 DataStream<RRIntervalStreamEvent> rr_stream_raw = envrionment.
                         addSource(new RR_interval_Gen(input_rate_rr_S,Total_Number_Of_Events_in_RR)).name("RR stream");


//merging both streams
 DataStream<Tuple3<Integer,Long,Integer>> mergedStream;
            mergedStream = rr_stream_raw.union(new DataStream[]{qrs_stream_raw});

我必须使用new DataStream[],因为刚刚使用qrs_stream_raw导致错误,如下所示。

错误快照

有人可以给我一个想法吗

  1. 我应该如何合并这两个流?
  2. 我应该如何合并两个以上的流?
  3. 是否有一些引擎可以合并两个以上具有不同结构的流,如果是,我应该使用哪个引擎
4

1 回答 1

3

正如 Alex 所指出的,我们可以使用两个流的相同数据类型并可以在 Flink 中加入它们,另一种选择是使用 Siddhi 或 Flink-Siddhi 扩展。但我只想在 Flink 中做所有事情

因此,我在程序中进行了一些更改以使其正常工作

第 1 步:让我的两个生成器类都返回通用类型

public class RR_interval_Gen extends RichParallelSourceFunction<Tuple3<Integer,Long, Integer>>

step# 2:使两个流生成器都具有元组类型,然后合并 2 个流。

 // getting qrs interval stream
    DataStream<Tuple3<Integer,Long,Integer>> qrs_stream_raw = envrionment.
            addSource(new Qrs_interval_Gen(input_rate_qrs_S,Total_Number_Of_Events_in_qrs)).name("qrs stream");


    // getting RR interval stream
         DataStream<Tuple3<Integer,Long,Integer>> rr_stream_raw = envrionment.
                 addSource(new RR_interval_Gen(input_rate_rr_S,Total_Number_Of_Events_in_RR)).name("RR stream");


         //merging both streams
    DataStream<Tuple3<Integer,Long,Integer>> mergedStream = rr_stream_raw.union(qrs_stream_raw);
于 2017-11-15T20:41:47.100 回答