1

我想写一个简单的批处理器类。它有一个请求队列并等待该队列变满或经过一段时间,然后才与数据库对话。

通过通道实现这个队列非常方便——这样我们的客户端就会在它满的时候被挂起。但是我怎样才能知道通道是否已满?

当然,我可以创建一个向通道发送一些东西然后执行一些检查的方法。下一步是将其封装在从 Channel 派生的类中。仍然很脏(目前还不清楚我该如何处理onSend/ onReceive)。还有更优雅的解决方案吗?也许是开箱即用的东西?

4

2 回答 2

6

这不是开箱即用的,但是可以使用actor轻松实现相应的批处理逻辑。您实际上并不需要一个类(但如果您愿意,可以将此代码包装在一个类中)。您可以使用以下实现作为模板:

const val MAX_SIZE = 100 // max number of data items in batch
const val MAX_TIME = 1000 // max time (in ms) to wait

val batchActor = actor<Data> {
    val batch = mutableListOf<Data>()
    var deadline = 0L // deadline for sending this batch to DB
    while (true) {
        // when deadline is reached or size is exceeded, then force batch to DB
        val remainingTime = deadline - System.currentTimeMillis()
        if (batch.isNotEmpty() && remainingTime <= 0 || batch.size >= MAX_SIZE) {
            saveToDB(batch)
            batch.clear()
            continue
        }
        // wait until items is received or timeout reached
        select<Unit> {
            // when received -> add to batch
            channel.onReceive {
                batch.add(it)
                // init deadline on first item added to batch
                if (batch.size == 1) deadline = System.currentTimeMillis() + MAX_TIME
            }
            // when timeout is reached just finish select, note: no timeout when batch is empty
            if (batch.isNotEmpty()) onTimeout(remainingTime) {}
        }
    }
}

现在,您只需batchActor.send(data)在需要向数据库发送任何内容时执行此操作,actor 内部的逻辑负责批处理并将生成的批处理保存到数据库。

于 2018-02-27T09:49:38.487 回答
0

Channel 接口声明了一个 isFull 属性,可以查询该属性以确定它是否已达到容量。

无论如何,我看不到有一个回调函数可以在达到容量时自动调用一个函数,但是您可以定期检查这个 isFull 属性以查看它是否达到容量。

于 2018-02-15T16:03:35.560 回答