0

我需要像monix.Observable.bufferTimedAndCounted自定义“weither”这样的功能。我找到bufferTimedWithPressure了允许使用项目重量的操作员:

val subj = PublishSubject[String]()

subj
  .bufferTimedWithPressure(1.seconds, 5, _ => 3)

  .subscribe(s => {
    println(s)
    Future.successful(Ack.Continue)
  })

for (i <- 1 to 60) {
  Thread.sleep(100)
  subj.onNext(i.toString)
}

但是发射会在每个指定的持续时间内发生。我需要像这样的行为bufferTimedAndCounted,所以当缓冲区满时会发生发射。如何做到这一点?

4

1 回答 1

0

我从 monix 源复制了BufferTimedObservable并稍微更改了它,添加了权重函数(注意 - 我没有在所有情况下都对其进行测试):

import java.util.concurrent.TimeUnit

import monix.execution.Ack.{Continue, Stop}
import monix.execution.cancelables.{CompositeCancelable, MultiAssignCancelable}
import monix.execution.{Ack, Cancelable}
import monix.reactive.Observable
import monix.reactive.observers.Subscriber

import scala.collection.mutable.ListBuffer
import scala.concurrent.Future
import scala.concurrent.duration.{Duration, FiniteDuration, MILLISECONDS}

/**
  * Copied from monix sources, adopted to size instead count
  *
  */  
 final class BufferTimedWithWeigthObservable[+A](source: Observable[A], timespan: FiniteDuration, maxSize: Int, sizeOf: A => Int)
  extends Observable[Seq[A]] {

  require(timespan > Duration.Zero, "timespan must be strictly positive")
  require(maxSize >= 0, "maxSize must be positive")

  def unsafeSubscribeFn(out: Subscriber[Seq[A]]): Cancelable = {
    val periodicTask = MultiAssignCancelable()

    val connection = source.unsafeSubscribeFn(new Subscriber[A] with Runnable {
      self =>
      implicit val scheduler = out.scheduler

      private[this] val timespanMillis = timespan.toMillis
      // MUST BE synchronized by `self`
      private[this] var ack: Future[Ack] = Continue
      // MUST BE synchronized by `self`
      private[this] var buffer = ListBuffer.empty[A]
      // MUST BE synchronized by `self`
      private[this] var currentSize = 0

      private[this] var sizeOfLast = 0

      private[this] var expiresAt = scheduler.clockMonotonic(MILLISECONDS) + timespanMillis

      locally {
        // Scheduling the first tick, in the constructor
        periodicTask := out.scheduler.scheduleOnce(timespanMillis, TimeUnit.MILLISECONDS, self)
      }

      // Runs periodically, every `timespan`
      def run(): Unit = self.synchronized {
        val now = scheduler.clockMonotonic(MILLISECONDS)
        // Do we still have time remaining?
        if (now < expiresAt) {
          // If we still have time remaining, it's either a scheduler
          // problem, or we rushed to signaling the bundle upon reaching
          // the maximum size in onNext. So we sleep some more.
          val remaining = expiresAt - now
          periodicTask := scheduler.scheduleOnce(remaining, TimeUnit.MILLISECONDS, self)
        } else if (buffer != null) {
          // The timespan has passed since the last signal so we need
          // to send the current bundle
          sendNextAndReset(now, byPeriod = true).syncOnContinue(
            // Schedule the next tick, but only after we are done
            // sending the bundle
            run())
        }
      }

      // Must be synchronized by `self`
      private def sendNextAndReset(now: Long, byPeriod: Boolean = false): Future[Ack] = {
        val prepare = if (byPeriod) buffer else buffer.dropRight(1)
        // Reset
        if (byPeriod) {
          buffer = ListBuffer.empty[A]
          currentSize = 0
          sizeOfLast = 0
        } else {
          buffer = buffer.takeRight(1)
          currentSize = sizeOfLast
        }
        // Setting the time of the next scheduled tick
        expiresAt = now + timespanMillis
        ack = ack.syncTryFlatten.syncFlatMap {
          case Continue => out.onNext(prepare)
          case Stop => Stop
        }
        ack
      }

      def onNext(elem: A): Future[Ack] = self.synchronized {
        val now = scheduler.clockMonotonic(MILLISECONDS)

        buffer.append(elem)
        sizeOfLast = sizeOf(elem)
        currentSize = currentSize + sizeOfLast
        // 9 and 9 true
        //10 and 9
        if (expiresAt <= now || (maxSize > 0 && maxSize < currentSize)) {
          sendNextAndReset(now)
        }
        else {
          Continue
        }
      }

      def onError(ex: Throwable): Unit = self.synchronized {
        periodicTask.cancel()
        ack = Stop
        buffer = null

        out.onError(ex)
      }

      def onComplete(): Unit = self.synchronized {
        periodicTask.cancel()

        if (buffer.nonEmpty) {
          val bundleToSend = buffer.toList
          // In case the last onNext isn't finished, then
          // we need to apply back-pressure, otherwise this
          // onNext will break the contract.
          ack.syncOnContinue {
            out.onNext(bundleToSend)
            out.onComplete()
          }
        } else {
          // We can just stream directly
          out.onComplete()
        }

        // GC relief
        buffer = null
        // Ensuring that nothing else happens
        ack = Stop
      }
    })

    CompositeCancelable(connection, periodicTask)
  }
}

如何使用它:

object MonixImplicits {

  implicit class RichObservable[+A](source: Observable[A]) {
    def bufferTimedAndSized(timespan: FiniteDuration, maxSize: Int, sizeOf: A => Int): Observable[Seq[A]] = {
      new BufferTimedWithWeigthObservable(source, timespan, maxSize, sizeOf)
    }
  }

}

import MonixImplicits._

someObservable.bufferTimedAndSized(1.seconds, 5, item => item.size)
于 2019-08-02T17:13:18.520 回答