5

使用 Twitter4j 提供的代码示例,我希望在收集到 1,000 个状态列表后停止流,并返回此列表。我怎样才能做到这一点?

public class Stream {
    public List<Status> execute throws TwitterException {

    List<Status> statuses = new ArrayList();

    ConfigurationBuilder cb = new ConfigurationBuilder();
    cb.setDebugEnabled(true);
    cb.setOAuthConsumerKey("bbb");
    cb.setOAuthConsumerSecret("bbb");
    cb.setOAuthAccessToken("bbb");
    cb.setOAuthAccessTokenSecret("bbb");

    TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance();

    StatusListener listener = new StatusListener() {

        public void onStatus(Status status) {
            statuses.add(status);
            if (statuses.size>1000){
              //return statuses. Obviously that's not the correct place for a return statement...
            }
        }

        public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
            System.out.println("Got a status deletion notice id:" + statusDeletionNotice.getStatusId());
        }

        public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
            System.out.println("Got track limitation notice:" + numberOfLimitedStatuses);
        }

        public void onScrubGeo(long userId, long upToStatusId) {
            System.out.println("Got scrub_geo event userId:" + userId + " upToStatusId:" + upToStatusId);
        }

        public void onException(Exception ex) {
            ex.printStackTrace();
        }
    };

    FilterQuery fq = new FilterQuery();
    String keywords[] = {"Keyword 1", "Keyword 2"};

    fq.track(keywords);

    twitterStream.addListener(listener);
    twitterStream.filter(fq);

}
4

3 回答 3

6

考虑使用 aBlockingQueue作为中介,使用监听器将Status对象添加到其中。

流开始后,您可以开始Status从队列中取出 es,直到获得所需的一千个。

作为一个起点,它看起来像下面这样:

public class Stream {
    private static final int TOTAL_TWEETS = 1000;

    public List<Status> execute() throws TwitterException {
        // skipped for brevity...

        // TODO: You may have to tweak the capacity of the queue, depends on the filter query
        final BlockingQueue<Status> statuses = new LinkedBlockingQueue<Status>(10000); 
        final StatusListener listener = new StatusListener() {

            public void onStatus(Status status) {
                statuses.offer(status); // Add received status to the queue
            }

            // etc...
        };

        final FilterQuery fq = new FilterQuery();
        final String keywords[] = {"Keyword 1", "Keyword 2"};
        fq.track(keywords);

        twitterStream.addListener(listener);
        twitterStream.filter(fq);

        // Collect the 1000 statues
        final List<Status> collected = new ArrayList<Status>(TOTAL_TWEETS);
        while (collected.size() < TOTAL_TWEETS) {
            // TODO: Handle InterruptedException
            final Status status = statuses.poll(10, TimeUnit.SECONDS); 

            if (status == null) {
                // TODO: Consider hitting this too often could indicate no further Tweets
                continue;
            }
            collected.add(status);
        }
        twitterStream.shutdown();

        return collected;
    }
} 
于 2013-08-30T09:02:15.383 回答
4

强制一段异步代码进入同步模式并不是一个好主意。请参阅https://stackoverflow.com/a/5934656/276263 看看您是否可以修改您的逻辑。

但是,下面的代码可以根据您的要求工作。

public class Stream {

  public static void main(String[] args) throws TwitterException {
    Stream stream = new Stream();
    stream.execute();
  }

  private final Object lock = new Object();
  public List<Status> execute() throws TwitterException {

    final List<Status> statuses = new ArrayList();

    ConfigurationBuilder cb = new ConfigurationBuilder();
    cb.setDebugEnabled(true);
    cb.setOAuthConsumerKey("bbb");
    cb.setOAuthConsumerSecret("bbb");
    cb.setOAuthAccessToken("bbb");
    cb.setOAuthAccessTokenSecret("bbb");

    TwitterStream twitterStream = new TwitterStreamFactory(cb.build())
        .getInstance();

    StatusListener listener = new StatusListener() {

      public void onStatus(Status status) {
        statuses.add(status);
        System.out.println(statuses.size() + ":" + status.getText());
        if (statuses.size() > 100) {
          synchronized (lock) {
            lock.notify();
          }
          System.out.println("unlocked");
        }
      }

      public void onDeletionNotice(
          StatusDeletionNotice statusDeletionNotice) {
        System.out.println("Got a status deletion notice id:"
            + statusDeletionNotice.getStatusId());
      }

      public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
        System.out.println("Got track limitation notice:"
            + numberOfLimitedStatuses);
      }

      public void onScrubGeo(long userId, long upToStatusId) {
        System.out.println("Got scrub_geo event userId:" + userId
            + " upToStatusId:" + upToStatusId);
      }

      public void onException(Exception ex) {
        ex.printStackTrace();
      }

      @Override
      public void onStallWarning(StallWarning sw) {
        System.out.println(sw.getMessage());

      }
    };

    FilterQuery fq = new FilterQuery();
    String keywords[] = { "federer", "nadal", "#Salute" };

    fq.track(keywords);


    twitterStream.addListener(listener);
    twitterStream.filter(fq);

    try {
      synchronized (lock) {
        lock.wait();
      }
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    System.out.println("returning statuses");
    twitterStream.shutdown();
    return statuses;
  }
}
于 2013-08-30T06:33:23.510 回答
0

有两种方法:

如果您只需要完成使用流的线程(这是调用您的侦听器的线程),您可以使用twitterStream.cleanUp();. 这将优雅地停止线程。您可能希望boolean stopped在状态侦听器中使用一个变量,以便忽略此事件之后您将收到的任何调用。

您还可以通过调用关闭流消费线程及其调度程序线程(即守护线程),twitterStream.shutdown();但是这是结束与 twitter API 通信的更残酷的方法。尽管如果您从侦听器内部调用它会起作用,但我更喜欢@krishnakumarp建议的方法

于 2013-08-30T07:31:46.347 回答