我正在尝试将 kafka 消息发送到 android 应用程序。
我在游戏后期才意识到,唯一的方法是使用 Kafka Rest Proxy。
我已经设置好了,但是我的密钥没有被引用,导致反序列化错误。
和/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic my_topic --property print.key=true --from-beginning
我收到 truck/6084/position {"timestamp":1636987919544,"truckId":6084,"driverId":36,"routeId":1177752383,"eventType":"Normal","correlationId":"597220341814695046","latitude":24.7700961485576,"longitude":46.6207663588065}
我已经将这些消息通过典型的消费者几乎一行一行地从融合中运行到 PC 应用程序,没有任何问题。
我的确切错误是
{"error_code":40801,"message":"com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'truck': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n at [Source: (byte[])\"truck/7103/position\"; line: 1, column: 7]"}
尝试使用“jsonschema”时
{"error_code":40801,"message":"Error deserializing key/value for partition vehicle_tracking_sysA-5 at offset 0. If needed, please seek past the record to continue consumption."}
任何见解将不胜感激。
克里斯
编辑:
Autoconnected Player Kafka - Received Expection: Sequence contains no matching element trace: at System.Linq.Enumerable.Single[TSource] (System.Collections.Generic.IEnumerable`1[T] source, System.Func`2[T,TResult] predicate) [0x00000] in <00000000000000000000000000000000>:0
at Confluent.Kafka.Impl.Librdkafka.SetDelegates (System.Type nativeMethodsClass) [0x00000] in <00000000000000000000000000000000>:0
at Confluent.Kafka.Impl.Librdkafka.TrySetDelegates (System.Collections.Generic.List`1[T] nativeMethodCandidateTypes) [0x00000] in <00000000000000000000000000000000>:0
at Confluent.Kafka.Impl.Librdkafka.LoadLinuxDelegates (System.String userSpecifiedPath) [0x00000] in <00000000000000000000000000000000>:0
at Confluent.Kafka.Impl.Librdkafka.Initialize (System.String userSpecifiedPath) [0x00000] in <00000000000000000000000000000000>:0
at Confluent.Kafka.Consumer`2[TKey,TValue]..ctor (Confluent.Kafka.ConsumerBuilder`2[TKey,TValue] builder) [0x00000] in <00000000000000000000000000000000>:0
at Confluent.Kafka.ConsumerBuilder`2[TKey,TValue].Build () [0x00000] in <00000000000000000000000000000000>:0
at KafkaConsumer+threadHandle.StartKafkaListener () [0x00000] in <00000000000000000000000000000000>:0
at System.Threading.ThreadHelper.ThreadStart_Context (System.Object state) [0x00000] in <00000000000000000000000000000000>:0
at System.Threading.ExecutionContext.RunInternal (System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, System.Object state, System.Boolean preserveSyncCtx) [0x00000] in <00000000000000000000000000000000>:0
at System.Threading.ExecutionContext.Run (System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, System.Object state, System.Boolean preserveSyncCtx) [0x00000] in <00000000000000000000000000000000>:0
at System.Threading.ExecutionContext.Run (System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, System.Object state) [0x00000] in <00000000000000000000000000000000>:0
at System.Threading.ThreadHelper.ThreadStart () [0x00000] in <00000000000000000000000000000000>:0