1

我需要一些帮助来修改https://github.com/playframework/play-java-websocket-example。此示例基于Source给定 Stock ID 的 own。每Source在某个时间间隔内产生新的股票价值:

public Source<StockUpdate, NotUsed> update() {
    return source.throttle(1, duration, 1, ThrottleMode.shaping())
            .map(sq -> new StockUpdate(sq.symbol, sq.price));
}

这个例子大部分是我需要的(“观察/不观察”股票的想法),所以我想以非常相似的方式使用它,但我不想在我的应用程序中伪造股票报价时间间隔生成器,因为我已连接真正的,一个证券交易所的来源。我通过阅读消息(在我的应用程序的其他部分)java.io.InputStream,现在,我将它们放入java.util.Queuejava.util.concurrent.LinkedBlockingQueue)通过queue.offer(message),现在我想以某种方式从 Stock.update() 获取它(但我不知道这是最好的地方)。

到目前为止,我知道我可以使用例如Source.queue(BUFFER_LENGTH, OverflowStrategy.backpressure()),但它没有对(我的)队列的任何引用。我也知道我可以使用Source.queue(BUFFER_LENGTH, OverflowStrategy.backpressure()).to(...).run(...).offer(...),但不幸的是我不知道如何将它UserActor.addStock(Stock)Flow,UniqueKillSwitch等连接起来。

也许有人需要像我这样的修改并可以提供一些提示?

4

1 回答 1

0

到目前为止,我知道我可以使用例如 Source.queue(BUFFER_LENGTH, OverflowStrategy.backpressure()),但它没有对(我的)队列的任何引用

您可以更改您的代码,使其使用.Source.queue而不是java.util.Queue. 或者,您可以有一个线程从队列中读取并将其推送到Source.queue.

于 2017-11-05T23:20:36.710 回答