0

我知道,如果我这样做,mapState.clear()我将能够将所有值清除到特定键的状态中,但我的问题是:有没有办法做类似的事情mapState.clear()并将所有状态清除到 mapStates 中?就像mapState.isEmpty()它会说“真”一样,因为 mapState 中的所有键都被清理了,而不仅仅是当前键。

谢谢。亲切的问候!

4

1 回答 1

2

因为我们讨论的是嵌套地图的情况,所以很容易混淆我们的术语。所以让我们把这个问题放到一个例子的上下文中。

假设您有一个关于用户的事件流,并且在 aKeyedProcessFunction中您正在使用 aMapState<ATTR, VALUE>来维护每个用户的属性/值对的映射:

userEvents
    .keyBy(e -> e.userId)
    .process(new ManageUserData())

在 process 函数中,任何时候使用 MapState 都只能为用户操作与正在处理的事件相对应的一张地图,

public static class ManageUserData extends KeyedProcessFunction<...> {
    MapState<ATTR, VALUE> userMap;
}

所以userMap.clear()将清除一个用户的整个属性/值对映射,但不理会其他映射。

我相信您在问是否有某种方法可以一次清除所有用户的所有 MapStates。是的,有一种方法可以做到这一点,尽管它有点晦涩难懂,而且实现起来并不完全简单。

如果您将KeyedProcessFunction示例KeyedBroadcastProcessFunction中的用户,清除他们的 MapState。KeyedBroadcastProcessFunctionprocessBroadcastElement()

每当您希望发生这种情况时,您都必须安排在广播流上发送事件。

您应该注意文档中有关使用广播状态的警告。请记住,其中实现的逻辑processBroadcastElement()必须在所有并行实例中具有相同的确定性行为。

于 2020-09-08T17:25:02.557 回答