我有一个带有任意键的字符串的 kafka 主题。我想创建一个characters in string : value
配对主题,例如:
input("key","value") -> outputs (["v","value"],["a","value"],...)
为简单起见,我的输入主题有一个分区,因此 KTable 代码应该将所有消息接收到单个实例。
我创建了以下沙箱代码,它可以很好地构建新表,但在将新项目放入原始主题时不会更新:
import java.util.LinkedHashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
public class Sandbox
{
private final static String kafkaBootstrapServers = "192.168.1.254:9092";
private final static String kafkaGlobalTablesDirectory = "C:\\Kafka\\tmp\\kafka-streams-global-tables\\";
private final static String topic = "sandbox";
private static KafkaStreams streams;
public static void main(String[] args)
{
// 1. set up the test data
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
producerProperties.put(ProducerConfig.CLIENT_ID_CONFIG, Sandbox.class.getName() + "_testProducer");
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> sandboxProducer = new KafkaProducer<>(producerProperties);
sandboxProducer.send(new ProducerRecord<String, String>(topic,"uvw","uvw"));
// 2. read the test data and check it's working
ReadOnlyKeyValueStore<String, String> store = getStore();
printStore(store.all());
System.out.println("-------------ADDING NEW VALUE----------------");
sandboxProducer.send(new ProducerRecord<String, String>(topic,"xyz","xyz"));
System.out.println("-------------ADDED NEW VALUE----------------");
printStore(store.all());
sandboxProducer.close();
streams.close();
}
private static void printStore(KeyValueIterator<String, String> i)
{
System.out.println("-------------PRINT START----------------");
while (i.hasNext())
{
KeyValue<String, String> n = i.next();
System.out.println(n.key + ":" + String.join(",", n.value));
}
System.out.println("-------------PRINT END----------------");
}
private static ReadOnlyKeyValueStore<String, String> getStore()
{
ReadOnlyKeyValueStore<String, String> store = null;
String storeString = "sandbox_store";
StreamsBuilder builder = new StreamsBuilder();
builder.stream(topic
, Consumed.with(Serdes.String(),Serdes.String()))
.filter((k,v)->v!=null)
.flatMap((k,v)->{
Set<KeyValue<String, String>> results = new LinkedHashSet<>();
if (v != null)
{
for (char subChar : v.toCharArray())
{
results.add(KeyValue.<String, String>pair(new String(new char[] {subChar}), v));
}
}
return results;
})
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.aggregate(()->new String()
, (key, value, agg) -> {
agg = agg + value;
return agg;
}
,Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(storeString)
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "sandbox");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, Sandbox.class.getName());
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, kafkaGlobalTablesDirectory + "Sandbox");
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
System.out.println("Exception on thread " + thread.getName() + ":" + throwable.getLocalizedMessage());
});
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.cleanUp(); // clear any old streams data - forces a rebuild of the local caches.
streams.start(); // hangs until the global table is built
StoreQueryParameters<ReadOnlyKeyValueStore<String, String>> storeSqp
= StoreQueryParameters.fromNameAndType(storeString
,QueryableStoreTypes.<String, String>keyValueStore());
// this while loop gives time for Kafka Streams to start up properly before creating the store
while (store == null)
{
try {
TimeUnit.SECONDS.sleep(1);
store = streams.store(storeSqp);
System.out.println("Store " + storeString + " Created successfully.");
} catch (InterruptedException e) {
}
catch (Exception e) {
System.out.println("Exception creating store " + storeString + ". Will try again in 1 second. Message: " + e.getLocalizedMessage());
}
}
return store;
}
}
我得到的输出如下:
存储 sandbox_store 创建成功。
-------------打印开始----
u:uvw
v:uvw
w:uvw
---------- ---打印结束----
-------------添加新值------------- ---
-------------添加新值----
-------------打印开始----------------
u:uvw
v:uvw
w:uvw
-------------打印结束---------- ------
请注意,我添加的 xyz 不见了!
(ps我知道我可以使用reduce
代替aggregate
,但实际上新值将是不同的类型,而不是字符串,因此它不适用于我的实际用例)
现在,如果我在第二次打印之前添加 10 秒的暂停;或者,如果我在不清除主题的情况下重新启动 Sandbox 类,则会出现第一个 xyz。所以很明显,系统中的某个地方存在时间延迟。在实践中,我正在处理 300mb+ 的消息,所有消息都进入输入主题,每小时一次;所以延迟甚至比几秒钟还要长。
我怎样才能帮助加快速度?