我有一种情况,我需要跟踪的一些事件在特定超时 ( expiryTimeout
) 后变得“陈旧”。过期后,我需要“删除”过期事件的引用,并通知外部服务该事件已过期。如果在到期之前接收到相同密钥的更新事件,则超时被“重置”(例如事件被“刷新”)。在这种情况下不需要发送通知。
我正在尝试使用番石榴缓存来实现上述目标并且不重新发明轮子,但我必须遗漏一些东西,因为似乎没有触发过期。
我有以下IO
代码来创建“缓存”:
abstract sealed case class StaleTasksCleanerWithGuava private(srvc: ExternalService,
expiryTimeout: FiniteDuration)
(implicit cs: ContextShift[IO], evictionEs: ExecutorService) {
implicit val logger = Slf4jLogger.getLogger[IO]
private val removalListener = new RemovalListener[Key, TaskEvent]() {
override def onRemoval(rn: RemovalNotification[Key, TaskEvent]): Unit = {
val f = if(rn.wasEvicted()){
val (appId, taskId) = rn.getKey
logger.debug(s"[$appId-$taskId] Expiring entry now...") *>
srvc.cleanUpStaleTask(appId, taskId) *>
logger.debug(s"[$appId-$taskId] Entry expired.")
} else
IO.unit
f.unsafeRunSync() // Necessary because of the way Guava's API is designed
}
}
private val asyncRemovalListener = RemovalListeners.asynchronous(removalListener, evictionEs)
private val c: Cache[Key, TaskEvent] = CacheBuilder.newBuilder()
.expireAfterWrite(expiryTimeout.length, expiryTimeout.unit)
.removalListener(asyncRemovalListener).build[Key, TaskEvent]()
def putTask(e: TaskEvent): IO[Unit] = {
val k = (e.appId, e.taskId)
logger.debug(s"[${k._1}-${k._2}] Entry will expire in $expiryTimeout if not overridden.") *>
IO(c.put(k, e))
}
}
object StaleTasksCleanerWithGuava {
type Key = (String, String)
def create(srvc: ExternalService, expiryTimeout: FiniteDuration = 1.second)
(implicit cs: ContextShift[IO], evictionEs: ExecutorService): IO[StaleTasksCleanerWithGuava] = {
IO(new StaleTasksCleanerWithGuava(srvc, expiryTimeout) {})
}
}
最值得注意的是,我使用 anval asyncRemovalListener = RemovalListeners.asynchronous(removalListener, evictionEs)
来确保过期和删除是异步完成的,而不是作为其他缓存操作(读/写)的一部分。
我创建了以下示例应用程序来简单地测试事物:
object TestAppWithGuava2 extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
implicit val logger = Slf4jLogger.getLogger[IO]
val dummy = new ExternalService {
override def cleanUpStaleTask(appId: String, taskId: String): IO[Unit] =
logger.debug(s"[Service - ${appId}-${taskId}] Notified! +1")
}
val taskEvent1 = TaskEvent("A", "task1")
val taskEvent2 = TaskEvent("B", "task2")
val taskEvent3 = TaskEvent("C", "task1")
val evictionEcResource = Resource.make(IO{Executors.newFixedThreadPool(8)})(tp => IO(tp.shutdownNow()).void)
val program = evictionEcResource.use{ evictionEc =>
for {
cache <- StaleTasksCleanerWithGuava.create(dummy, 3.second)(contextShift, evictionEc)
_ <- cache.putTask(taskEvent3)
_ <- cache.putTask(taskEvent1)
_ <- cache.putTask(taskEvent2)
_ <- IO.never /// This is here on purpose, as I want to see things being expired
} yield ()
}
IO.race(program, IO.sleep(6.seconds)).as(ExitCode.Success)
}
}
然而,这可悲地未能做到我期望它做的事情。:) 当我运行它时,我得到了这些日志行,并且在几秒钟之后,应用程序终止(作为IO.sleep(6.seconds)
终止并“赢得” race
)。
2020-03-01 18:48:41,862 [StaleTasksCleanerWithGuava] [C-task1] Entry will expire in 3 seconds if not overridden.
2020-03-01 18:48:41,874 [StaleTasksCleanerWithGuava] [A-task1] Entry will expire in 3 seconds if not overridden.
2020-03-01 18:48:41,874 [StaleTasksCleanerWithGuava] [B-task2] Entry will expire in 3 seconds if not overridden.
我错过了什么?谢谢