0

这里我遇到一个问题,我从Kafka源接收消息,并编写一个拦截器从kafka消息(json格式)中提取两个字段(dataSoure和businessType)。这里我使用 gson.fromJson()。但问题是我得到了以下错误。

这里我想知道Flume是否会在超过限制时截断Flume事件?如果是,如何将其设置为更大的值。因为我的 kafka 消息总是很长,大约 60K 字节。

期待回复。提前致谢!

2015-12-09 11:48:05,665 (PollableSourceRunner-KafkaSource-apply) [错误 - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:153)] KafkaSource 异常,{} com.google。 gson.JsonSyntaxException: com.google.gson.stream.MalformedJsonException: com.google.gson.Gson.fromJson(Gson.java:809) com.google.gson.Gson.fromJson(Gson) 的第 1 行第 4096 列未终止的字符串.java:761) 在 com.xxx.flume.interceptor.JsonLogTypeInterceptor.intercept(JsonLogTypeInterceptor.java:43) 在 com.xxx.flume.interceptor 的 com.google.gson.Gson.fromJson(Gson.java:710)。 JsonLogTypeInterceptor.intercept(JsonLogTypeInterceptor.java:61) 在 org.apache.flume.interceptor.InterceptorChain.intercept(InterceptorChain.java:62) 在 org.apache.flume.channel.ChannelProcessor。org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130) 上的 processEventBatch(ChannelProcessor.java:146)

4

1 回答 1

0

最后,我通过调试源代码找到了根本原因。这是因为我尝试使用 Gson 将 event.getBody() 转换为地图,这是不正确的,因为 event.getBody() 是一个字节 [],而不是一个无法转换的字符串。正确的代码应该如下:

String body = new String(event.getBody(), "UTF-8");   
Map<String, Object> map = gson.fromJson(body, new TypeToken<Map<String, Object>>() {}.getType());
于 2015-12-13T08:56:51.183 回答