0

我有一个带有任意键的字符串的 kafka 主题。我想创建一个characters in string : value配对主题,例如:

input("key","value") -> outputs (["v","value"],["a","value"],...)

为简单起见,我的输入主题有一个分区,因此 KTable 代码应该将所有消息接收到单个实例。

我创建了以下沙箱代码,它可以很好地构建新表,但在将新项目放入原始主题时不会更新:

import java.util.LinkedHashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

public class Sandbox 
{
    private final static String kafkaBootstrapServers = "192.168.1.254:9092";
    private final static String kafkaGlobalTablesDirectory = "C:\\Kafka\\tmp\\kafka-streams-global-tables\\";
    private final static String topic = "sandbox";
    private static KafkaStreams streams;
    public static void main(String[] args) 
    {
        // 1. set up the test data
        Properties producerProperties = new Properties();
        producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        producerProperties.put(ProducerConfig.CLIENT_ID_CONFIG, Sandbox.class.getName() + "_testProducer");
        producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        Producer<String, String> sandboxProducer = new KafkaProducer<>(producerProperties);

        sandboxProducer.send(new ProducerRecord<String, String>(topic,"uvw","uvw"));
        
        // 2. read the test data and check it's working
        ReadOnlyKeyValueStore<String, String> store = getStore();
        
        printStore(store.all());
        System.out.println("-------------ADDING NEW VALUE----------------");
        sandboxProducer.send(new ProducerRecord<String, String>(topic,"xyz","xyz"));
        System.out.println("-------------ADDED NEW VALUE----------------");
        printStore(store.all());
        
        sandboxProducer.close();
        streams.close();
    }
    
    private static void printStore(KeyValueIterator<String, String> i)
    {
        System.out.println("-------------PRINT START----------------");
        while (i.hasNext())
        {
            KeyValue<String, String> n = i.next();
            System.out.println(n.key + ":" +  String.join(",", n.value));
        }
        System.out.println("-------------PRINT END----------------");
    }
    
    private static ReadOnlyKeyValueStore<String, String> getStore()
    {
        ReadOnlyKeyValueStore<String, String> store = null;
        String storeString = "sandbox_store";
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(topic
                , Consumed.with(Serdes.String(),Serdes.String()))
            .filter((k,v)->v!=null)
            .flatMap((k,v)->{
                Set<KeyValue<String, String>> results = new LinkedHashSet<>();
                if (v != null)
                {
                    for (char subChar : v.toCharArray())
                    {
                        results.add(KeyValue.<String, String>pair(new String(new char[] {subChar}), v));
                    }
                }
                return results;
            })
            .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) 
            .aggregate(()->new String()
                    , (key, value, agg) -> {
                        agg = agg + value;
                        return agg;
                    }
                    ,Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(storeString)
                        .withKeySerde(Serdes.String())
                        .withValueSerde(Serdes.String()));

        final Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "sandbox");
        streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, Sandbox.class.getName());
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, kafkaGlobalTablesDirectory + "Sandbox");
        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        streams = new KafkaStreams(builder.build(), streamsConfiguration);
        
        streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
              System.out.println("Exception on thread " + thread.getName() + ":" + throwable.getLocalizedMessage());
            });
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

        streams.cleanUp(); // clear any old streams data - forces a rebuild of the local caches.
        streams.start(); // hangs until the global table is built
        
        StoreQueryParameters<ReadOnlyKeyValueStore<String,  String>> storeSqp 
        = StoreQueryParameters.fromNameAndType(storeString
        ,QueryableStoreTypes.<String, String>keyValueStore());
        
        // this while loop gives time for Kafka Streams to start up properly before creating the store
        while (store == null)
        {
            try {
                TimeUnit.SECONDS.sleep(1);
                store = streams.store(storeSqp);
                System.out.println("Store " + storeString + " Created successfully.");
            } catch (InterruptedException e) {
            }
            catch (Exception e) {
                System.out.println("Exception creating store " + storeString + ". Will try again in 1 second. Message: " + e.getLocalizedMessage());
            }
        }
        return store;
    }
}

我得到的输出如下:

存储 sandbox_store 创建成功。
-------------打印开始----
u:uvw
v:uvw
w:uvw
---------- ---打印结束----
-------------添加新值------------- ---
-------------添加新值----
-------------打印开始----------------
u:uvw
v:uvw
w:uvw
-------------打印结束---------- ------

请注意,我添加的 xyz 不见了!

(ps我知道我可以使用reduce代替aggregate,但实际上新值将是不同的类型,而不是字符串,因此它不适用于我的实际用例)

现在,如果我在第二次打印之前添加 10 秒的暂停;或者,如果我在不清除主题的情况下重新启动 Sandbox 类,则会出现第一个 xyz。所以很明显,系统中的某个地方存在时间延迟。在实践中,我正在处理 300mb+ 的消息,所有消息都进入输入主题,每小时一次;所以延迟甚至比几秒钟还要长。

我怎样才能帮助加快速度?

4

0 回答 0