2

I've got a bizarre file that is always the same size, with the same modified date. (some driver showing its state as a file's contents through a fake filesystem). It contains a short string that changes when the device's state changes. I don't think I can Watch the file because it is a fake filesystem. Cool so far!

I was hoping to modernize it! A changing file that may or may not have anyone listening feels like a flow.

I'm not entirely sure about distinctUntilChanged and conflate (do they play nice together?) and if I'm just reinventing a StateFlow?

fun fileChanges(file: File): Flow<String> = flow {
    while (true) {
        emit(file.readText(charset))
        delay(1)
    }
}
    .flowOn(Dispatchers.IO) // Run in background
    .distinctUntilChanged() // no duplicate status
    .conflate() // only most recent
    .map { it.trim() } // just the one line without newlines

This seems to work (with the expected system overhead) and I'm only trimming the deduped values, but... it smells funny. Like I should be able to reuse file handles, or memory mapped stuff, or a mark()/reset(), or better still, wrap it all up in a StateFlow object.

4

1 回答 1

0

这是一个有趣的练习,因为我还在学习如何以最佳方式利用流程。感谢那。
这就是我想出的:

class PollingFileWatcher(private val file: File, private val pollingInterval: Long) {
    private val scope = CoroutineScope(Dispatchers.IO)
    private val state = MutableStateFlow("") //initial state doesn't matter, we will update it directly when starting to poll anyway
    private var pollingJob: Job? = null
    private var subscriptions = 0 // keep track of how many observers there are, start polling if this changes 0 -> 1, stop polling if it changes 1 -> 0

    private fun readContent(): String {
        return file.readText(Charset.defaultCharset()) //Or whatever Charset fits
    }

    private fun createPollingJob() = scope.launch {
        while (isActive) {
            state.value = readContent()
            delay(pollingInterval)
        }
    }

    private fun startPolling() {
        state.value = readContent() // update to latest value before subscribers start collecting, otherwise they might get the useless initial value
        pollingJob = createPollingJob()
    }

    private fun stopPolling() {
        pollingJob?.cancel()
        pollingJob = null
    }

    private fun onSubscribe() = synchronized(this) {
        if (subscriptions++ == 0) { // changed from 0 to 1
            startPolling()
        }
    }

    private fun onUnsubscribe() = synchronized(this) {
        if (--subscriptions == 0) { // changed from 1 to 0
            stopPolling()
        }
    }

    fun changes() = flow {
        onSubscribe()
        try {
            state.collect {
                emit(it)
            }
        } finally { // e.g. the collecting context has been cancelled
            onUnsubscribe()
        }
    }
}

这可能适用于任何类型的长时间运行的作业,但这可能有点太多了。

一般来说,轮询在第一个收集器开始收集时开始,在最后一个收集器停止收集时取消。简而言之:如果没有人在看,就没有工作。
MutableStateFlow自动缓存最新值并立即将其发送到新收集器,并删除重复值(比较完成equals

示例用法:

val watcher = PollingFileWatcher(File("/path/to/the/file"), 100)
GlobalScope.launch{ // please never actually use GlobalScope
    watcher.changes().collect{
      // TODO 
    }
}
于 2020-07-22T23:15:16.050 回答