1

当我尝试使用 window 和 fold 函数聚合元素时,一些元素会因为聚合而丢失。使用来自 Kafka 的元素(value:0, value:1, value:2, value:3)并将它们聚合为奇数和偶数值。

输出是:

{even=[0, 2, 4], odd=[1, 3]}
{even=[6, 8], odd=[5, 7, 9]}
{even=[14, 16, 18], odd=[15, 17]}
{even=[20, 22], odd=[19, 21, 23]}
{even=[24, 26, 28], odd=[25, 27]}

缺少 10-13 之间的数字,这发生在一组随机数字上。有人可以建议下面的代码中遗漏了什么吗?我怎样才能确保处理所有元素?

public static class Splitter implements FlatMapFunction<String, 
    Tuple3<String, String, List<String>>{
    private static final long serialVersionUID = 1L;

    @Override
    public void flatMap(String value, Collector<Tuple3<String, String, 
        List<String>>out) throws Exception {
        String[] vals = value.split(":");

        if(vals.length 1 && Integer.parseInt(vals[1]) % 2 == 0){
            out.collect(new Tuple3<String, String, List<String>>
             ("test","even", Arrays.asList(vals[1])));
        }else{
            out.collect(new Tuple3<String, String, List<String>>
            ("test","odd", Arrays.asList(vals[1])));
        }
    }
}


    DataStream<Map<String, List<String>>streamValue = 
    kafkaStream.flatMap(new Splitter()).keyBy(0)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(3000))).
    trigger(CustomizedCountTrigger.of(5L))//.trigger(CountTrigger.of(2))
    .fold(new HashMap<String, List<String>>(), new 
    FoldFunction<Tuple3<String, String, List<String>>, Map<String, 
    List<String>>>() {
        private static final long serialVersionUID = 1L;

        @Override
        public Map<String, List<String>fold(Map<String, 
        List<String>accumulator,
        Tuple3<String, String, List<String>value) throws 
        Exception {
            if(accumulator.get(value.f1) != null){
                List<Stringlist = new ArrayList<>();
                list.addAll(accumulator.get(value.f1));
                list.addAll(value.f2);
                accumulator.put(value.f1, list);
            }else{
                accumulator.put(value.f1, value.f2);
            }
            return accumulator;
        }
    });

    streamValue.print();
    env.execute("window test");
}


public class CustomizedCountTrigger<W extends Windowextends 
Trigger<Object, W{

    private static final long serialVersionUID = 1L;
    private final long maxCount;

    private final ReducingStateDescriptor<LongstateDesc =
    new ReducingStateDescriptor<>("count", new Sum(), 
    LongSerializer.INSTANCE);

    private CustomizedCountTrigger(long maxCount) {
        this.maxCount = maxCount;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window,
    TriggerContext ctx) throws Exception {
        ReducingState<Longcount = ctx.getPartitionedState(stateDesc);
        count.add(1L);
        if (count.get() >= maxCount) {
            count.clear();
            return TriggerResult.FIRE_AND_PURGE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window,

    org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext

    ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window,

    org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext

    ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(W window, 
    org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext

    ctx)
    throws Exception {
        ctx.getPartitionedState(stateDesc).clear();
    }

    @Override
    public String toString() {
        return "CountTrigger(" +  maxCount + ")";
    }

    public static <W extends WindowCustomizedCountTrigger<Wof(long 
    maxCount) {
        return new CustomizedCountTrigger<>(maxCount);
    }

    private static class Sum implements ReduceFunction<Long{
        private static final long serialVersionUID = 1L;

        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }

    }
}
4

1 回答 1

1

因此,我开始编写本文的第一部分,然后才注意到您的自定义触发器使您使用 TumblingEventTime 窗口的事实有点无关紧要,但我还是想包括我的原始想法,因为我不完全确定您为什么要使用 EventTime 窗口当你不使用它时。意识到这一点后,我的回答低于原作。

您是在单个并行还是多个上运行它?我问的原因是因为如果是多并行(并且kafka主题也是由多个分区组成),那么消息的接收和处理可能是非顺序的。这可能会导致带有时间戳的消息导致水印前进,从而导致窗口处理消息。然后下一条消息的事件时间在当前水印时间之前(也就是“迟到”),这将导致消息被丢弃。

例如:如果你有 20 个元素并且每个元素的事件时间是这样的:

消息1:事件时间:1000 消息1:事件时间:2000 等等...

你的事件时间窗口是 5001 毫秒。

现在消息 message1 到 message9 依次通过。第一个窗口将被处理并包含消息 1-5(消息 6 将导致窗口被处理)。现在如果message11 在message10 之前出现,它将导致处理包含消息6-9 的窗口。而当 message10 下一个出现时,水印已经超过了 message10 的事件时间,导致它作为“迟到的事件”被丢弃。

正确答案

尝试使用 countWindow,而不是使用 eventTime 窗口和自定义触发器。

所以替换这个:

.window(TumblingEventTimeWindows.of(Time.milliseconds(3000))).
trigger(CustomizedCountTrigger.of(5L))//.trigger(CountTrigger.of(2))

有了这个:

.countWindow(5L)
于 2017-10-18T19:08:02.213 回答