我有一个需要在触发器上执行的任务(比如在需要执行时为该函数传递一个布尔值 true)。
为此,可以使用任何线程机制,如 Thread、TimerTask 或 ScheduledThreadPool 等。其中请建议最好的方法,并且使用的线程或机制必须在其任务完成后释放资源,因为触发间隔是波动的。
我有一个需要在触发器上执行的任务(比如在需要执行时为该函数传递一个布尔值 true)。
为此,可以使用任何线程机制,如 Thread、TimerTask 或 ScheduledThreadPool 等。其中请建议最好的方法,并且使用的线程或机制必须在其任务完成后释放资源,因为触发间隔是波动的。
您可以使用RxJava,它提供了用于处理可观察事件流的反应式编程 API。它基于.NET 的响应式扩展。事件可以是基于时间的(例如,Observable.interval()
);它们可能来自一个集合 ( Observable.from(Iterable)
);或者您可以使用Subject
.
//
// NOTE: For brevity, I'm using Java 8 syntax (lambdas and method references).
//
public static void main(String[] args) throws Throwable {
System.out.println("Main Thread: " + Thread.currentThread().getId());
final CountDownLatch done = new CountDownLatch(1);
final Observable<Trade> trades = simulateTrades();
trades.where(t -> Math.abs(t.quantity) * t.price >= 250000)
.observeOn(Schedulers.threadPoolForIO())
.subscribe(
Main::logLargeTrade,
e -> { e.printStackTrace(); done.countDown(); },
done::countDown
);
done.await();
System.out.println("Done!");
}
private static Observable<Trade> simulateTrades() {
final Random r = new Random();
return Observable.interval(50L, TimeUnit.MILLISECONDS)
.take(100)
.map(
t -> new Trade(
Instant.now(),
"AAPL",
(r.nextInt(9) + 1) * 100,
500d + r.nextDouble() * 5d
)
);
}
private static void logLargeTrade(Trade t) {
System.out.printf(
"[%d: %s] Large Trade: %d %s @ %f%n",
Thread.currentThread().getId(),
t.timestamp.atOffset(ZoneOffset.UTC).toLocalDateTime(),
t.quantity,
t.symbol,
t.price
);
}
final static class Trade {
final Instant timestamp;
final String symbol;
final double price;
final int quantity;
Trade(Instant time, String symbol, int quantity, double price) {
this.timestamp = time;
this.symbol = symbol;
this.quantity = quantity;
this.price = price;
}
}
这里trades
是贸易事件流。我们的触发条件匹配价值至少为 250,000 美元的交易,因此我们使用where()
仅包含匹配该条件的交易。我们希望触发动作在线程池上执行,所以我们使用observeOn()
指定一个使用线程池的调度器。 subscribe()
创建——你猜对了——对过滤后的事件流的订阅。这种特殊的重载让我们可以将onNext
、onError
和onCompleted
回调作为 lambdas 传递。
该simulateTrades()
方法创建我们订阅的事件流。通常,这些事件将通过消息传递系统进入,或者它们将由流程中其他地方的另一个组件发布。为了这个例子,我只是使用一个计时器间隔来每秒发布 10 笔交易,并在 100 笔交易后终止。