0

我有一种情况,我需要跟踪的一些事件在特定超时 ( expiryTimeout) 后变得“陈旧”。过期后,我需要“删除”过期事件的引用,并通知外部服务该事件已过期。如果在到期之前接收到相同密钥的更新事件,则超时被“重置”(例如事件被“刷新”)。在这种情况下不需要发送通知。

我正在尝试使用番石榴缓存来实现上述目标并且不重新发明轮子,但我必须遗漏一些东西,因为似乎没有触发过期。

我有以下IO代码来创建“缓存”:

abstract sealed case class StaleTasksCleanerWithGuava private(srvc: ExternalService,
                                                              expiryTimeout: FiniteDuration)
                                                             (implicit cs: ContextShift[IO], evictionEs: ExecutorService) {
  implicit val logger = Slf4jLogger.getLogger[IO]

  private val removalListener = new RemovalListener[Key, TaskEvent]() {
    override def onRemoval(rn: RemovalNotification[Key, TaskEvent]): Unit = {
      val f = if(rn.wasEvicted()){
        val (appId, taskId) = rn.getKey
        logger.debug(s"[$appId-$taskId] Expiring entry now...") *>
          srvc.cleanUpStaleTask(appId, taskId) *>
          logger.debug(s"[$appId-$taskId] Entry expired.")
      } else
        IO.unit

      f.unsafeRunSync() // Necessary because of the way Guava's API is designed
    }
  }

  private val asyncRemovalListener = RemovalListeners.asynchronous(removalListener, evictionEs)

  private val c: Cache[Key, TaskEvent] = CacheBuilder.newBuilder()
    .expireAfterWrite(expiryTimeout.length, expiryTimeout.unit)
    .removalListener(asyncRemovalListener).build[Key, TaskEvent]()

  def putTask(e: TaskEvent): IO[Unit] = {
    val k = (e.appId, e.taskId)
    logger.debug(s"[${k._1}-${k._2}] Entry will expire in $expiryTimeout if not overridden.") *>
      IO(c.put(k, e))
  }
}

object StaleTasksCleanerWithGuava {
  type Key = (String, String)

  def create(srvc: ExternalService, expiryTimeout: FiniteDuration = 1.second)
            (implicit cs: ContextShift[IO], evictionEs: ExecutorService): IO[StaleTasksCleanerWithGuava] = {
    IO(new StaleTasksCleanerWithGuava(srvc, expiryTimeout) {})
  }
}

最值得注意的是,我使用 anval asyncRemovalListener = RemovalListeners.asynchronous(removalListener, evictionEs)来确保过期和删除是异步完成的,而不是作为其他缓存操作(读/写)的一部分。

我创建了以下示例应用程序来简单地测试事物:

object TestAppWithGuava2 extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    implicit val logger = Slf4jLogger.getLogger[IO]

    val dummy = new ExternalService {
      override def cleanUpStaleTask(appId: String, taskId: String): IO[Unit] =
        logger.debug(s"[Service - ${appId}-${taskId}] Notified! +1")
    }
    val taskEvent1 = TaskEvent("A", "task1")
    val taskEvent2 = TaskEvent("B", "task2")
    val taskEvent3 = TaskEvent("C", "task1")

    val evictionEcResource = Resource.make(IO{Executors.newFixedThreadPool(8)})(tp => IO(tp.shutdownNow()).void)

    val program = evictionEcResource.use{ evictionEc =>
      for {
        cache <- StaleTasksCleanerWithGuava.create(dummy, 3.second)(contextShift, evictionEc)
        _ <- cache.putTask(taskEvent3)
        _ <- cache.putTask(taskEvent1)
        _ <- cache.putTask(taskEvent2)
        _ <- IO.never  /// This is here on purpose, as I want to see things being expired
      } yield ()
    }
    IO.race(program, IO.sleep(6.seconds)).as(ExitCode.Success)
  }
}

然而,这可悲地未能做到我期望它做的事情。:) 当我运行它时,我得到了这些日志行,并且在几秒钟之后,应用程序终止(作为IO.sleep(6.seconds)终止并“赢得” race)。

2020-03-01 18:48:41,862 [StaleTasksCleanerWithGuava] [C-task1] Entry will expire in 3 seconds if not overridden.
2020-03-01 18:48:41,874 [StaleTasksCleanerWithGuava] [A-task1] Entry will expire in 3 seconds if not overridden.
2020-03-01 18:48:41,874 [StaleTasksCleanerWithGuava] [B-task2] Entry will expire in 3 seconds if not overridden.

我错过了什么?谢谢

4

0 回答 0