1

我正在尝试将来自 Sample Stream 的推文存储到数据库中并同时存储原始 json。我正在使用hbc Github 存储库中Twitter4jStatusClient示例。由于我只是将信息的子集实时存储到数据库中,因此我希望也存储推文的原始 json,以便在需要时检索其他信息。但是 usingTwitter4jStatusClient意味着侦听器在不同的线程上执行,并且在这里,它表示为了获取 json 对象,它必须从检索 json 对象的同一线程中执行。使用时有没有办法保存 json 字符串Twitter4JStatusClient?我选择不使用这个例子因为我只想执行某些操作并保存 json 字符串(如果它是一个状态)。谢谢!

    // Create an appropriately sized blocking queue
    BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10000);

    // Define our endpoint: By default, delimited=length is set (we need this for our processor)
    // and stall warnings are on.
    StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint();
    // Specify the language filter for the endpoint
    endpoint.addQueryParameter(Constants.LANGUAGE_PARAM, Joiner.on(',').join(Lists.newArrayList("en")));
    endpoint.stallWarnings(false);

    Authentication auth = new OAuth1(consumerKey, consumerSecret, token, secret);

    // Create a new BasicClient. By default gzip is enabled.
    BasicClient client = new ClientBuilder()
            .name("sampleStreamClient")
            .hosts(Constants.STREAM_HOST)
            .endpoint(endpoint)
            .authentication(auth)
            .processor(new StringDelimitedProcessor(queue))
            .build();

    // Create an executor service which will spawn threads to do the actual work of parsing the incoming messages and
    // calling the listeners on each message
    int numProcessingThreads = 4;
    ExecutorService service = Executors.newFixedThreadPool(numProcessingThreads);


    StatusListener listener = new SampleStreamStatusListener(jsonInserter);

    // Wrap our BasicClient with the twitter4j client
    t4jClient = new Twitter4jStatusClient(
            client, queue, Lists.newArrayList(listener), service);
4

1 回答 1

0

我有类似的问题Twitter4jStatusClient,这里有一些想法

中间队列

你可以有一个单独的线程池,它从你的变量中读取原始消息queue,将它们存储在某个地方,然后将它们放入一个我们将调用的新队列hbcQueue中,你将其传递给Twitter4jStatusClient构造函数而不是queue.

BlockingQueue<String> hbcQueue = new LinkedBlockingQueue<>(10000);
ExecutorService rawJsonSaver = Executors.newFixedThreadPool(numProcessingThreads);
for (int i = 0; i < numProcessingThreads; i++) {
  rawJsonSaver.execute(() -> {
    for (;;) {
      try {
        String msg = queue.take();
        JSONObject jobj = new JSONObject(msg);
        if (JSONObjectType.determine(jobj) == JSONObjectType.Type.STATUS) {
          System.out.println(msg);  // Save it
          hbcQueue.add(msg);
        }
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt(); break;
      } catch (JSONException e) {
        continue;
      }
    }
  });
}
Twitter4jStatusClient t4jClient = new Twitter4jStatusClient(
    client, hbcQueue, Lists.newArrayList(listener), service);

但这当然有第二次解析 JSON 并为第二个并发队列添加另一个阻塞锁操作的性能劣势。

重新序列化

如果您稍后要在 Java 中处理原始 JSON,则可以使用纯 Java 序列化,因为Status对象传递给您的StatusListenerimplements Serializable。这距离将其重新序列化回 JSON 并不遥远,但至少您不需要手动序列化每个字段。

  @Override
  public void onStatus(final Status status) {
    byte[] serializedStatus;
    try (ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
         ObjectOutputStream objStream = new ObjectOutputStream(byteStream)) {
      objStream.writeObject(status);
      serializedStatus = byteStream.toByteArray();
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
    // store serializedStatus
    // . . .
  }
于 2016-06-26T05:13:18.467 回答