我需要一些帮助来修改https://github.com/playframework/play-java-websocket-example。此示例基于Source
给定 Stock ID 的 own。每Source
在某个时间间隔内产生新的股票价值:
public Source<StockUpdate, NotUsed> update() {
return source.throttle(1, duration, 1, ThrottleMode.shaping())
.map(sq -> new StockUpdate(sq.symbol, sq.price));
}
这个例子大部分是我需要的(“观察/不观察”股票的想法),所以我想以非常相似的方式使用它,但我不想在我的应用程序中伪造股票报价时间间隔生成器,因为我已连接真正的,一个证券交易所的来源。我通过阅读消息(在我的应用程序的其他部分)java.io.InputStream
,现在,我将它们放入java.util.Queue
(java.util.concurrent.LinkedBlockingQueue
)通过queue.offer(message)
,现在我想以某种方式从 Stock.update() 获取它(但我不知道这是最好的地方)。
到目前为止,我知道我可以使用例如Source.queue(BUFFER_LENGTH, OverflowStrategy.backpressure())
,但它没有对(我的)队列的任何引用。我也知道我可以使用Source.queue(BUFFER_LENGTH, OverflowStrategy.backpressure()).to(...).run(...).offer(...)
,但不幸的是我不知道如何将它UserActor.addStock(Stock)
与Flow
,UniqueKillSwitch
等连接起来。
也许有人需要像我这样的修改并可以提供一些提示?