2

当我想将 Kotlin Flows 与普通回调一起使用时,我一直在寻找合适的解决方案或最佳实践。我的用例是我编写了一个在内部使用 Kotlin Flow 的 kotlin 库,并且我必须假设用户将使用 Java。所以我认为最好的解决方案是将基本回调接口重载到我的流方法并以collect如下方式调用它:

class KotlinClass {

    interface Callback {
        fun onResult(result: Int)
    }

    private fun foo() = flow {
        for (i in 1..3) {
            emit(i)
        }
    }

    fun bar(callback: Callback) {
        runBlocking {
            foo().collect { callback.onResult(it) }
        }
    }

  private fun main() {
    bar(object : Callback {
        override fun onResult(result: Int) {
            TODO("Not yet implemented")
        }
    })
}

在我的 Java 应用程序中,我可以像这样简单地使用它:

public class JavaClass {

    public void main() {
        KotlinClass libraryClass = new KotlinClass();
        libraryClass.bar(new KotlinClass.Callback() {
            @Override
            public void onResult(int result) {
                // TODO("Not yet implemented")
            }
        });
    } 
}

我不确定要走的路是什么,因为我想让我的 Kotlin 库使用 Flows 以良好的方式用于 Java 和 Kotlin。

我遇到过,callbackFlow但这似乎只有当我想称之为 flow-ify 一个基于回调的 API 时?因为我对 Kotlin 和 Flows 很陌生,如果我的问题因缺少 kotlin 的一些基本概念而存在缺陷,请道歉。

4

3 回答 3

6

我会给 Java 客户端更多的流控制权。我会在你的回调接口中添加一个onStartandonCompletion方法。除此之外,我会使用自己的CoroutineScope——也许可以从 Java 客户端定制。而且我不会阻止 Kotlin 函数中的调用线程 - 不runBlocking

@InternalCoroutinesApi
class KotlinClass {
    val coroutineScope = CoroutineScope(Dispatchers.Default)

    interface FlowCallback {
        @JvmDefault
        fun onStart() = Unit

        @JvmDefault
        fun onCompletion(thr: Throwable?) = Unit
        fun onResult(result: Int)
    }

    private fun foo() = flow {
        for (i in 1..3) {
            emit(i)
        }
    }

    fun bar(flowCallback: FlowCallback) {
        coroutineScope.launch {
            foo().onStart { flowCallback.onStart() }
                .onCompletion { flowCallback.onCompletion(it) }
                .collect { flowCallback.onResult(it) }
        }
    }

    fun close() {
        coroutineScope.cancel()
    }    
}

现在,Java 客户端可以完全控制如何启动、收集和取消流程。例如,您可以使用闩锁等待完成、设置超时并取消协同程序范围。这首先看起来像很多代码,但通常您需要这种灵活性。

public class JavaClass {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        KotlinClass libraryClass = new KotlinClass();
        libraryClass.bar(new KotlinClass.FlowCallback() {
            @Override
            public void onCompletion(@Nullable Throwable thr) {
                latch.countDown();
            }


            @Override
            public void onResult(int result) {
                System.out.println(result);
            }
        });

        try {
            latch.await(5, TimeUnit.SECONDS);
        } finally {
            libraryClass.close();
        }
    }
}
于 2020-03-09T20:58:23.670 回答
1

万一有人想知道一个通​​用的解决方案。这是来自@rene answer here的增强版本。

  1. 接受泛型类型
  2. 一个可配置的coroutineScope
// JavaFlow.kt
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.launch

@InternalCoroutinesApi
class JavaFlow<T>(
    private val coroutineScope: CoroutineScope = CoroutineScope(Dispatchers.Default)
) {

    interface OperatorCallback <T> {
        @JvmDefault
        fun onStart() = Unit

        @JvmDefault
        fun onCompletion(thr: Throwable?) = Unit
        fun onResult(result: T)
    }

    fun collect(
        flow: Flow<T>,
        operatorCallback: OperatorCallback<T>,
    ) {
        coroutineScope.launch {
            flow
                .onStart { operatorCallback.onStart() }
                .onCompletion { operatorCallback.onCompletion(it) }
                .collect { operatorCallback.onResult(it) }
        }
    }

    fun close() {
        coroutineScope.cancel()
    }
}

Java 调用方:

// code omitted...
new JavaFlow<File>().collect(
        // compressImageAsFlow is our actual kotlin flow extension
        FileUtils.compressImageAsFlow(file, activity),
        new JavaFlow.OperatorCallback<File>() {
            @Override
            public void onResult(File result) {
                // do something with the result here
                SafeSingleton.setFile(result);
            }
        }
);


// or using lambda with method references
// new JavaFlow<File>().collect(
//        FileUtils.compressImageAsFlow(file, activity),
//        SafeSingleton::setFile
// );

// Change coroutineScope to Main
// new JavaFlow<File>(CoroutineScopeKt.MainScope()).collect(
//        FileUtils.compressImageAsFlow(file, activity),
//        SafeSingleton::setFile
// );

OperatorCallback.onStart并且OperatorCallback.onCompletion是可选的,根据需要覆盖它。

于 2021-11-21T19:14:12.423 回答
1

您无需在 Kotlin 代码中创建接口。您可以这样定义 bar :

 fun bar(callback: (Int) -> Unit) {
     runBlocking {
         foo().collect { callback(it) }
     }
 }

从 Java 代码中,您可以像这样调用函数:

public class JavaClass {

    public static void main(String[] args) {
        KotlinClass libraryClass = new KotlinClass();
        libraryClass.bar(v -> { System.out.println(v); return Unit.INSTANCE; });
    } 
}
于 2020-03-09T20:18:13.457 回答