我有一个 kafka 流应用程序,它获取仓库库存流和最新价格流,并根据这些价格计算每个库存的价值。
我收到一个库存更新,作为带有关键warehouseId 的流。
我总共有 ~100 个定价商品和 ~10 个仓库
我对 kafka-state 中 RocksDB 文件的大小不断增加有疑问。
代码如下
KTable<String, Price> throttledPriceTable =
pricesStream //Price stream of key:value itemId:price
.toTable()
.suppress(
Suppressed.untilTimeLimit(Duration.ofMinutes(5),
Suppressed.BufferConfig.unbounded()));
KTable<InventoryKey, Inventory> inventoryTable =
inventoryStream //inventory stream key:value warehouseId:Map<itemId, volume>
.flatMap(
(warehouseId, inventoryUpdate) ->
inventoryUpdate.entrySet() //entry set of item id
.stream()
.map(
entry -> {
//new key warhouseId+itemId
InventoryKey key = new InventoryKey(warehouseId, entry.getKey());
Inventory inventory = new Inventory(key, entry.getVolume());
return KeyValue.pair(key, inventory);
})
.collect(Collectors.toList())
)
.toTable(Materialized.with(InventoryKey.serde, Inventory.serde))
;
//ktable-ktable foreign key join
inventoryTable.leftJoin(throttledPriceTable,
Inventory -> Inventory.getInventoryKey().getItemId(),
((inventory, price) -> {
String itemId = inventory.getInventoryKey().getItemId());
if(price == null) {
logger.error("Missing price infor for item {}", itemId);
return null;
}
return (inventory.getVolume * price.getValue())
}))
.toStream()
;
RocksDB 文件的大小已经稳定增长了一周左右。我没有看到空间被回收。kstream toTable 操作似乎占用了大部分空间。我无法弄清楚为什么磁盘空间没有被释放,因为新条目相互替换。
我确定 object 的equals
和hashCode
函数InventoryKey
写得正确。
4.0K ./3_10
4.0K ./7_1
4.0K ./8_1
4.0K ./3_6
4.0K ./4_1
4.0K ./5_8
4.0K ./1_9
4.0K ./5_2
4.0K ./7_7
4.0K ./8_9
4.0K ./3_7
4.0K ./5_7
4.0K ./1_5
4.0K ./6_1
4.0K ./3_0
25M ./8_2/rocksdb/KSTREAM-TOTABLE-STATE-STORE-0000000041
25M ./8_2/rocksdb
25M ./8_2
4.0K ./7_11
4.0K ./7_10/KSTREAM-JOINOTHER-0000000030-store
4.0K ./7_10/KSTREAM-JOINTHIS-0000000029-store
16K ./7_10
4.0K ./4_9
4.0K ./2_8
4.0K ./0_0
4.0K ./2_4
4.0K ./4_10
4.0K ./7_0/KSTREAM-JOINOTHER-0000000030-store
4.0K ./7_0/KSTREAM-JOINTHIS-0000000029-store
16K ./7_0
4.0K ./5_9
4.0K ./5_6
4.0K ./1_2
25M ./8_6/rocksdb/KSTREAM-TOTABLE-STATE-STORE-0000000041
25M ./8_6/rocksdb
25M ./8_6
4.0K ./8_3
4.0K ./3_3
4.0K ./0_1
25M ./8_0/rocksdb/KSTREAM-TOTABLE-STATE-STORE-0000000041
25M ./8_0/rocksdb
25M ./8_0
4.0K ./5_1
4.0K ./4_2
4.0K ./7_8/KSTREAM-JOINOTHER-0000000030-store
4.2M ./7_8/KSTREAM-JOINTHIS-0000000029-store/KSTREAM-JOINTHIS-0000000029-store.1628640000000
4.2M ./7_8/KSTREAM-JOINTHIS-0000000029-store
4.2M ./7_8
4.0K ./5_10
4.0K ./0_10
4.0K ./5_0
4.0K ./1_10
4.0K ./5_11
4.0K ./4_4
4.0K ./4_3
4.3M ./8_8/rocksdb/KSTREAM-TOTABLE-STATE-STORE-0000000041
4.3M ./8_8/rocksdb
4.3M ./8_8
4.0K ./1_11
4.0K ./4_8
4.0K ./0_4
4.0K ./2_11
4.0K ./7_2/KSTREAM-JOINOTHER-0000000030-store
4.0K ./7_2/KSTREAM-JOINTHIS-0000000029-store
16K ./7_2
4.0K ./1_0
4.0K ./7_6/KSTREAM-JOINOTHER-0000000030-store
4.0K ./7_6/KSTREAM-JOINTHIS-0000000029-store
16K ./7_6
4.0K ./1_4
4.0K ./0_2
4.0K ./5_3
4.0K ./1_6
4.0K ./6_7
4.0K ./0_9
97M ./6_4/rocksdb/KSTREAM-TOTABLE-STATE-STORE-0000000034
4.3M ./6_4/rocksdb/KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000049
101M ./6_4/rocksdb
101M ./6_4
4.0K ./2_3
4.0K ./1_1
4.0K ./6_5
4.0K ./1_8
4.0K ./6_11
4.0K ./3_1
4.0K ./1_7
4.0K ./7_9
4.0K ./2_1
4.0K ./6_3
4.0K ./3_4
4.0K ./7_4/KSTREAM-JOINOTHER-0000000030-store
4.0K ./7_4/KSTREAM-JOINTHIS-0000000029-store
16K ./7_4
25M ./8_10/rocksdb/KSTREAM-TOTABLE-STATE-STORE-0000000041
25M ./8_10/rocksdb
25M ./8_10
4.0K ./0_5
4.0K ./5_4
4.0K ./0_8
4.0K ./4_6
101M ./6_0/rocksdb/KSTREAM-TOTABLE-STATE-STORE-0000000034
29M ./6_0/rocksdb/KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000049
129M ./6_0/rocksdb
129M ./6_0
4.0K ./2_9
4.0K ./7_5
4.0K ./2_6
4.0K ./0_6
4.0K ./2_10
4.0K ./5_5
4.0K ./3_2
4.0K ./3_5
4.3M ./8_4/rocksdb/KSTREAM-TOTABLE-STATE-STORE-0000000041
4.3M ./8_4/rocksdb
4.3M ./8_4
4.0K ./4_0
4.0K ./8_11
4.0K ./3_9
4.0K ./2_7
4.0K ./8_5
4.0K ./7_3
4.0K ./0_7
4.0K ./8_7
97M ./6_10/rocksdb/KSTREAM-TOTABLE-STATE-STORE-0000000034
4.3M ./6_10/rocksdb/KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000049
101M ./6_10/rocksdb
101M ./6_10
97M ./6_6/rocksdb/KSTREAM-TOTABLE-STATE-STORE-0000000034
29M ./6_6/rocksdb/KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000049
125M ./6_6/rocksdb
125M ./6_6
97M ./6_2/rocksdb/KSTREAM-TOTABLE-STATE-STORE-0000000034
4.3M ./6_2/rocksdb/KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000049
101M ./6_2/rocksdb
101M ./6_2
4.0K ./2_5
4.0K ./3_11
4.0K ./6_9
4.0K ./4_5
89M ./6_8/rocksdb/KSTREAM-TOTABLE-STATE-STORE-0000000034
4.3M ./6_8/rocksdb/KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000049
93M ./6_8/rocksdb
93M ./6_8
4.0K ./3_8
4.0K ./1_3
4.0K ./2_2
4.0K ./4_7
4.0K ./2_0
4.0K ./0_3
4.0K ./4_11
4.0K ./0_11