我有最直接的 Kafka Streams DSL 用例:读取 CSV 传感器数据,按时间戳分组和输出。以下代码无法编译:
public static void main(String[] args) {
StreamsConfig streamingConfig = new StreamsConfig(getProperties());
Serde<String> stringSerde = Serdes.String();
CSVDeserializer<SensorData> sensorDataDeserializer = new CSVDeserializer<>(SensorData.class);
JsonSerializer<SensorData> sensorDataSerializer = new JsonSerializer<>();
Serde sensorDataSerde = Serdes.serdeFrom(sensorDataSerializer, sensorDataDeserializer);
JsonDeserializer<SensorData> sensorDataJsonDeserializer = new JsonDeserializer<>(SensorData.class);
Serde sensorDataJSONSerde = Serdes.serdeFrom(sensorDataSerializer, sensorDataJsonDeserializer);
StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(stringSerializer);
WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(stringDeserializer);
Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);
JsonSerializer<SensorDataAccumulator> accSerializer = new JsonSerializer<>();
JsonDeserializer accDeserializer = new JsonDeserializer<>(SensorDataAccumulator.class);
Serde<SensorDataAccumulator> accSerde = Serdes.serdeFrom(accSerializer, accDeserializer);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String,SensorData> initialStream = kStreamBuilder.stream(stringSerde,sensorDataSerde,"e40_orig");
final KStream<String, SensorData> sensorDataKStream = initialStream
.filter((k, v) -> (v != null))
.map((k, v) -> new KeyValue<>(v.getMeasurementDateTime().toString(), v));
sensorDataKStream
.filter((k, v) -> (v != null))
.groupBy((k,v) -> k, stringSerde, sensorDataJSONSerde)
.aggregate(SensorDataAccumulator::new,
==> error (k, v, list) -> list.add(v), //CHANGED THIS -->((SensorDataAccumulator)list).add((SensorData)v),
TimeWindows.of(10000),
accSerde, "acc")
.to(windowedSerde, accSerde, "out");
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder,streamingConfig);
kafkaStreams.start();
}
由于
错误:(90、45)java:找不到符号符号:方法添加(java.lang.Object)位置:java.lang.Object类型的变量列表
诡异的。
public class SensorDataAccumulator {
ArrayList list = new ArrayList<SensorData>();
public SensorDataAccumulator add(SensorData s) {
list.add(s);
return this;
}
按注释进行转换会导致以下运行时异常(在输出窗口累积之前)。
[2017-01-02 13:00:45,614] INFO task [1_0] Initializing processor nodes of the topology (org.apache.kafka.streams.processor.internals.StreamTask:123)
[2017-01-02 13:01:04,173] WARN Error while fetching metadata with correlation id 779 : {out=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient:600)
[2017-01-02 13:01:04,662] INFO stream-thread [StreamThread-1] Shutting down (org.apache.kafka.streams.processor.internals.StreamThread:268)
[2017-01-02 13:01:04,663] INFO stream-thread [StreamThread-1] Committing consumer offsets of task 0_0 (org.apache.kafka.streams.processor.internals.StreamThread:358)
[2017-01-02 13:01:04,666] INFO stream-thread [StreamThread-1] Committing consumer offsets of task 1_0 (org.apache.kafka.streams.processor.internals.StreamThread:358)
[2017-01-02 13:01:04,668] INFO stream-thread [StreamThread-1] Closing a task 0_0 (org.apache.kafka.streams.processor.internals.StreamThread:751)
[2017-01-02 13:01:04,668] INFO stream-thread [StreamThread-1] Closing a task 1_0 (org.apache.kafka.streams.processor.internals.StreamThread:751)
[2017-01-02 13:01:04,668] INFO stream-thread [StreamThread-1] Flushing state stores of task 0_0 (org.apache.kafka.streams.processor.internals.StreamThread:368)
[2017-01-02 13:01:04,669] INFO stream-thread [StreamThread-1] Flushing state stores of task 1_0 (org.apache.kafka.streams.processor.internals.StreamThread:368)
Exception in thread "StreamThread-1" java.lang.NoSuchMethodError: org.rocksdb.RocksIterator.close()V
at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDbIterator.close(RocksDBStore.java:468)
at org.apache.kafka.streams.state.internals.RocksDBStore.closeOpenIterators(RocksDBStore.java:411)
at org.apache.kafka.streams.state.internals.RocksDBStore.close(RocksDBStore.java:397)
at org.apache.kafka.streams.state.internals.RocksDBWindowStore.close(RocksDBWindowStore.java:276)
at org.apache.kafka.streams.state.internals.MeteredWindowStore.close(MeteredWindowStore.java:109)
at org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:125)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:349)
at org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:120)
at org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:348)
at org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:328)
at org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:344)
at org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:305)
at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:269)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:252)
[2017-01-02 13:01:05,316] INFO stream-thread [StreamThread-1] Closing the state manager of task 0_0 (org.apache.kafka.streams.processor.internals.StreamThread:347)
[2017-01-02 13:01:05,316] INFO stream-thread [StreamThread-1] Closing the state manager of task 1_0 (org.apache.kafka.streams.processor.internals.StreamThread:347)
调试的add
方法SensorDataAccumulator
应该给出一个线索:
所以,如果我理解正确,我会保留 aArrayList list = new ArrayList<SensorData>();
但实际上,在过程中的某个地方,它的成员被更改为LinkedTreeMap
. 打字员把我弄丢了……
好的,这LinkedTreeMap
是 GSON 用于我JsonDeserializer
和JsonSerializer
类的底层数据结构。因此,为了完整起见,我将在下面添加这些内容。
目前我不确定我做错了什么以及在哪里修复它。我应该使用不同的序列化程序、不同的数据结构吗?不同的语言;)?
欢迎任何意见。
public class JsonSerializer<T> implements Serializer<T> {
private Gson gson = new Gson();
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String topic, T t) {
return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
}
@Override
public void close() {
}
}
public class JsonDeserializer<T> implements Deserializer<T> {
private Gson gson = new Gson();
private Class<T> deserializedClass;
public JsonDeserializer(Class<T> deserializedClass) {
this.deserializedClass = deserializedClass;
}
public JsonDeserializer() {
}
@Override
public void configure(Map<String, ?> map, boolean b) {
if(deserializedClass == null) {
deserializedClass = (Class<T>) map.get("serializedClass");
}
}
@Override
public T deserialize(String s, byte[] bytes) {
if(bytes == null){
return null;
}
return gson.fromJson(new String(bytes),deserializedClass);
}
@Override
public void close() {
}
}