1

我正在寻找在我的代码中使用 play 枚举器 (play.api.libs.iteratee.Enumerator[A]) 的正确方法,我有一个“InfoBlock”类型的对象流,我想将它重定向到 websocket .我实际上做的是:

保存块的数据结构

私有惰性 val buf:mutable.Queue[InfoBlock] = new mutable.SynchronizedQueue[InfoBlock]

枚举器中使用的回调

def getCallback: Future[Option[InfoBlock]] = Future{

if (!buf.isEmpty)
  Some(buf.dequeue)
else
  None}

块由另一个线程生成并使用以下命令添加到队列中:

buf += new InfoBlock(...) 

然后在控制器中,我想设置一个 websocket 来流式传输该数据,执行以下操作:

def stream = WebSocket.using[String]{ request =>

  val in = Iteratee.consume[String]()

  val enu:Enumerator[InfoBlock] = Enumerator.fromCallback1(
   isFirst => extractor.getCallback
  )

  val out:Enumerator[String] = enu &> Enumeratee.map(blk => blk.author+"  ->  "+blk.msg)

(in,out)}

它可以工作但有一个大问题,当连接打开时它会发送一堆块(=〜50)并停止,如果我打开一个新的websocket然后我得到另一堆块但没有更多。我试图设置一些属性到 js 对象 WebSocket 特别是我尝试设置

 websocket.binaryType = "arraybuffer" 

因为我认为使用“blob”可能是原因,但我错了,问题一定是服务器端的,我不知道..

4

1 回答 1

0

来自 Enumerator.fromCallback 上的 Enumerator ScalaDocs,描述了检索器函数:

输入函数。如果有要传递的输入,则返回最终以 Some 值赎回的未来,如果已到达流的末尾,则返回最终以 None 赎回的未来。

这意味着枚举器将从队列中拉出所有内容开始。当它为空时,回调将返回一个 None。枚举器将此视为流的结尾,并关闭向下游发送完成状态。它不会再寻找任何数据

与其使用可变队列进行消息传递,不如尝试将 Enumerator/Iteratee 范例推送到您的工作人员中。创建一个输出您正在创建的实例的枚举器,并让迭代器从中提取。如果需要,您可以在中间粘贴一些枚举以进行一些转换。

于 2013-11-14T19:24:43.700 回答