0

我有一个需要在触发器上执行的任务(比如在需要执行时为该函数传递一个布尔值 true)。

为此,可以使用任何线程机制,如 Thread、TimerTask 或 ScheduledThreadPool 等。其中请建议最好的方法,并且使用的线程或机制必须在其任务完成后释放资源,因为触发间隔是波动的。

4

1 回答 1

0

您可以使用RxJava,它提供了用于处理可观察事件流的反应式编程 API。它基于.NET 的响应式扩展。事件可以是基于时间的(例如,Observable.interval());它们可能来自一个集合 ( Observable.from(Iterable));或者您可以使用Subject.

例子

//
// NOTE: For brevity, I'm using Java 8 syntax (lambdas and method references).
//

public static void main(String[] args) throws Throwable {
    System.out.println("Main Thread: " + Thread.currentThread().getId());

    final CountDownLatch done = new CountDownLatch(1);
    final Observable<Trade> trades = simulateTrades();

    trades.where(t -> Math.abs(t.quantity) * t.price >= 250000)
          .observeOn(Schedulers.threadPoolForIO())
          .subscribe(
              Main::logLargeTrade,
              e -> { e.printStackTrace(); done.countDown(); },
              done::countDown
          );

    done.await();
    System.out.println("Done!");
}

private static Observable<Trade> simulateTrades() {
    final Random r = new Random();

    return Observable.interval(50L, TimeUnit.MILLISECONDS)
                     .take(100)
                     .map(
                         t -> new Trade(
                             Instant.now(),
                             "AAPL",
                             (r.nextInt(9) + 1) * 100,
                             500d + r.nextDouble() * 5d
                         )
                     );
}

private static void logLargeTrade(Trade t) {
    System.out.printf(
        "[%d: %s] Large Trade: %d %s @ %f%n",
        Thread.currentThread().getId(),
        t.timestamp.atOffset(ZoneOffset.UTC).toLocalDateTime(),
        t.quantity,
        t.symbol,
        t.price
    );
}

final static class Trade {
    final Instant timestamp;
    final String symbol;
    final double price;
    final int quantity;

    Trade(Instant time, String symbol, int quantity, double price) {
        this.timestamp = time;
        this.symbol = symbol;
        this.quantity = quantity;
        this.price = price;
    }
}

这里trades是贸易事件流。我们的触发条件匹配价值至少为 250,000 美元的交易,因此我们使用where()仅包含匹配该条件的交易。我们希望触发动作在线程池上执行,所以我们使用observeOn()指定一个使用线程池的调度器。 subscribe()创建——你猜对了——对过滤后的事件流的订阅。这种特殊的重载让我们可以将onNextonErroronCompleted回调作为 lambdas 传递。

simulateTrades()方法创建我们订阅的事件流。通常,这些事件将通过消息传递系统进入,或者它们将由流程中其他地方的另一个组件发布。为了这个例子,我只是使用一个计时器间隔来每秒发布 10 笔交易,并在 100 笔交易后终止。

于 2013-11-08T14:46:43.310 回答