2

我正在使用 0.11.0.2 版的 Kafka Streams。

为了利用API,我使用builder 方法transform创建了自己的 API 。问题是我对某些字段/方法的 javadoc 不够清楚。StateStoreSupplierStores.create

val storeSupplier = Stores.create(STORE_NAME)
            .withStringKeys()
            .withStringValues()
            .persistent()
            .disableLogging()
            .windowed(WINDOW_SIZE, RETENTION, 3, false)
            .enableCaching()
            .build()

提到的变更日志将如何表示?

/**
* Indicates that a changelog should not be created for the key-value store
*/
PersistentKeyValueFactory<K, V> disableLogging();

这 4 个值如何相互影响?每个窗口都有最大数量的元素 - windowSize?一旦到达新窗口就开始了?每个窗口都可以划分numSegments为 RocksDB 磁盘上的文件吗?重复意味着键和值都相同,并且仅在同一窗口中检测到?

 /**
 * Set the persistent store as a windowed key-value store
 * @param windowSize size of the windows
 * @param retentionPeriod the maximum period of time in milli-second to keep each window in this store
 * @param numSegments the maximum number of segments for rolling the windowed store
 * @param retainDuplicates whether or not to retain duplicate data within the window
 */
PersistentKeyValueFactory<K, V> windowed(final long windowSize, long retentionPeriod, int numSegments, boolean retainDuplicates);

这里暗示了什么样的缓存?

/**
* Caching should be enabled on the created store.
*/
PersistentKeyValueFactory<K, V> enableCaching();
4

1 回答 1

2

我可以自信地回答这些问题中的 2/3:

提到的变更日志将如何表示?

更改日志是一个名为 的主题$applicationId-$storename-changelog。这是一个普通的键值主题,其中键是表键,值是表值。本主题由 Kafka Streams 创建和管理。如果你这样做disableLogging了,据我所知,如果它在没有重放整个拓扑的情况下以某种方式丢失(如果它是可重放的!)

这里暗示了什么样的缓存?

在访问底层 RocksDB 实例之前进行 LRU 内存缓存。例如CachedStateStoreCachedKeyValueStore具体 参见。CachedKeyValueStore#getInternal()

关于:

这 4 个值如何相互影响?每个窗口都有最大数量的元素-windowSize?一旦到达新窗口就开始了?并且每个窗口可以在 RocksDB 的磁盘上划分为 numSegments 文件?重复意味着键和值都相同,并且仅在同一窗口中检测到?

我最近还没有看这些内部结构,以至于无法准确记住。我可以说以下内容:

  • 除非您使用内存中的 LRU 存储,否则每个窗口都没有最大元素数。窗口是按时间存在的,因此您的条目会根据时间落入一个窗口或多个窗口,而不是窗口容量(通常没有固定容量)。更新:需要注意的重要一点是,如果您使用缓存存储,它只会以偏移提交间隔指定的间隔定期刷新到磁盘。如果这样的缓存存储支持 a KTable,则只有在拓扑提交并且存储刷新KTable时才将消息转发给其子级。
  • 是的,我相信每个窗口都在磁盘上划分为多个段。我最近看代码还不够准确,我可能是错的。请参阅RocksDBSegmentedBytesStore及其依赖项Segments
  • 不确定在这种情况下是否存在重复。
于 2018-01-24T15:08:18.427 回答