0

我正在使用 Kafka spout 构建 Storm 拓扑。我正在以 JSON 格式从 Kafka(没有 Zookeeper)消费,Storm 应该输出它。
如何为 JSON 数据类型定义正确的架构?目前,我有这样的代码库和基本的 spout 实现:

val cluster = new LocalCluster()
val bootstrapServers = "localhost:9092"
val topologyBuilder = new TopologyBuilder()

val spoutConfig = KafkaSpoutConfig.builder(bootstrapServers, "test").build()

topologyBuilder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1)

val config = new Config()
cluster.submitTopology("kafkaTest", config, topologyBuilder.createTopology())

cluster.shutdown()

我是 Apache Storm 的新手,所以很高兴有任何建议。

4

1 回答 1

1

你可以做几件事:

您可以定义一个RecordTranslator. ConsumerRecord该接口允许您根据从 Kafka 读取的数据来定义 spout 将如何构造元组。

默认实现如下所示:

public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");

    @Override
    public List<Object> apply(ConsumerRecord<K, V> record) {
        return new Values(record.topic(),
                record.partition(),
                record.offset(),
                record.key(),
                record.value());
    }

    @Override
    public Fields getFieldsFor(String stream) {
        return FIELDS;
    }

如您所见,您将获得一个ConsumerRecord,这是一个内置在底层 Kafka 客户端库中的类型,然后必须将其转换为List<Object>元组值。如果您想在发出数据之前对记录做一些复杂的事情,这就是您的做法。例如,如果您想将键、值和偏移量填充到随后发出的数据结构中,您可以在此处执行此操作。你像使用翻译器一样KafkaSpoutConfig.builder(bootstrapServers, "test").setRecordTranslator(myTranslator).build()

如果您只想将键/值反序列化为您自己的数据类之一,一个更好的选择是实现Deserializer. 这将让您定义如何反序列化从 Kafka 获得的键/值。如果你想反序列化例如你自己的数据类的值,你可以使用这个接口来完成。

默认StringDeserializer是这样的:

    @Override
    public String deserialize(String topic, byte[] data) {
        try {
            if (data == null)
                return null;
            else
                return new String(data, encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
        }
    }

一旦你创建了你自己的Deserializer,你就可以通过执行类似KafkaSpoutConfig.builder(bootstrapServers, "test").setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, YourDeserializer.class).build(). 有一个类似的消费者属性用于设置值反序列化器。

于 2020-01-16T17:41:46.230 回答