0

我想在 Flink 中实现马尔科夫模型。首先,我从 Kafka 读取数据。如何使用 flink 实现三元马尔可夫模型?

4

1 回答 1

0

我最终实现了马尔可夫模型。此代码仅计算转移矩阵。

    private static class MarkovModel implements AllWindowFunction<Tuple2<String,String>, Tuple3<Long, Long,     HashMap<String,Integer>>, TimeWindow>{
    @Override
    public void apply(TimeWindow window, Iterable<Tuple2<String, String>> requests, Collector<Tuple3<Long, Long, HashMap<String, Integer>>> out) throws Exception {

        HashMap<String,Integer> map = new HashMap<>();

        String first = "";
        String second = "";
        String third = "";

        for (Tuple2<String, String> request : requests) {
          if(first == ""){
              third = second;
              second = first;
              first = request.f1;
          }else if(second == ""){
              third = second;
              second = request.f1;
          }else if(third == ""){
              third = request.f1;
          }else{
              third = second;
              second = first;
              first = request.f1;
          }

          if(third != ""){
              int count = map.getOrDefault(first + second + third,0);
              map.put(first + second + third,count + 1);
          }
        }


        System.out.println(map);
        System.out.println(map.values().stream().mapToDouble(x->x).sum());
        out.collect(new Tuple3(window.getStart(), window.getEnd(), map));
    }
}
于 2017-10-17T12:48:03.247 回答