2

我正在从 kafka 主题中获取 json 数据。我如何应用 json 解析来获取使用反序列化方法的风暴方案类中所有对象的所有字段,之后我将值返回到新的返回值()。(backtype.storm.tuple.Values 类方法) ?即,如果我的主题中有 2 个 json 对象,我循环它们以获取所有字段,最后我必须将所有值返回到 return 方法。我的返回应该包含两个 json 对象的所有字段。

我的问题:return 方法中只返回了 2 个 obj json 数据。我认为第二个对象的所有字段都覆盖了第一个对象字段。最后返回第二个对象字段。

你们中的任何人都可以给我一个返回所有对象字段(1,2 个对象字段)的想法......

提前致谢

public class MainParserSpout implements Scheme{
  String tweet_created_at;
  String tweet_id;
  String tweet_id_str;
  String tweet_text;
  String tweet_source;`    
@Override

try{

public List<Object> deserialize(byte[] bytes){
  String twitterEvent = new String(bytes, "UTF-8");
   JSONArray JSON = new JSONArray(twitterEvent);
      for(int i=0;i<JSON.length();i++) {
        JSONObject object_tweet=JSON.getJSONObject(i);
//Tweet status                  
          try{
            this.tweet_created_at=object_tweet.getString("created_at");
            this.tweet_id=object_tweet.getString("id");
            this.tweet_id_str=object_tweet.getString("id_str");
            this.tweet_text=object_tweet.getString("text");
            this.tweet_source=object_tweet.getString("source");
          }catch(Exception e){}
    } //array for close
}catch(Exception e){}
} //JSON array close
  return new Values(tweet_created_at,tweet_id,tweet_id_str,tweet_text,tweet_source);
} //deserialize method close
public Fields getOutputFields() {
    return newFields("tweet_created_at","tweet_id","tweet_id_str","tweet_text","tweet_source");
} //getOutputFields method close
} //class close
4

2 回答 2

2

您不能在一次调用deserialize. 但是,您可以通过“加倍”您的元组来发出两条推文,即每个值/字段/属性两次。之后,您可以使用一个带有“双推文”的螺栓,拆分此元组并发出两个单推文元组。

类似的东西(我不熟悉 JSON Tweet 格式,所以这是关于问题代码示例的更多猜测):

@Override
public List<Object> deserialize(byte[] bytes){
  List<String> doubleTweet = new ArrayList<String>();

  try{
    String twitterEvent = new String(bytes, "UTF-8");
    JSONArray JSON = new JSONArray(twitterEvent);


    for(int i=0;i<JSON.length();i++) {
      JSONObject object_tweet=JSON.getJSONObject(i);
      for(int j=0;j<object_tweet.length();j++){
        //Tweet status                  
        try{
          doubleTweet.add(object_tweet.getString("created_at"));
          doubleTweet.add(object_tweet.getString("id"));
          doubleTweet.add(object_tweet.getString("id_str"));
          doubleTweet.add(object_tweet.getString("text"));
          doubleTweet.add(object_tweet.getString("source"));
        }catch(Exception e){}
      }
    }
  }catch(Exception e){}

  return doubleTweet;
}

doubleTweet每个字段包含两次(第一个推文的字段 0-4 和第二个推文的字段 5-9)。因此,一个连续的螺栓可以只提取这些字段并为每条推文发出一个 5 字段元组)。

作为替代方案,您还可以RawScheme在后续螺栓中使用并执行 JSON 解析。在这个螺栓中,您可以发出多个元组(即,每条推文一个):https ://github.com/apache/storm/tree/master/external/storm-kafka#multischeme

如果您使用RawScheme具有单个byte[]字段的元组,则由 spout 发出。因此,您可以在其中执行 JSON parsonBolt.execute()Collector.emit()为每个 Tweet 调用。

于 2016-01-06T11:02:24.343 回答
1

我错过了 kafka 是消息发布-订阅消息系统的观点。当我尝试将数据发送给生产者时,我将 Json 卡盘 20 个对象作为单个消息发送,但我的方案仅适用于单个 Json 卡盘。所以我将单个 20 个对象 Json 卡盘分成 20 个 json 卡盘并发送每个转给 Json 制片人。

于 2016-01-28T07:36:55.827 回答