0

我的任务是将一些基于 Reactor 的服务迁移到 webflux.fn + 协程。该服务会生成一个 png 罗盘图像。在从新的基于协程的服务返回给 Postman 之前,我看到了指南针的字节。在 Postman 中,我看到请求成功,但正文中没有内容;空的。我一直无法弄清楚为什么 Flow 返回成功但没有内容。我会很感激你的建议...

谢谢

>>>REQUEST
curl -X "POST" "http://localhost:8080/api/compass" \
-H 'Content-Type: application/json; charset=utf-8' \
-d $'{}'
>>>RESPONSE
HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: image/png
Cache-Control: no-cache, no-store, max-age=0, must-revalidate
Pragma: no-cache
Expires: 0
X-Content-Type-Options: nosniff
X-Frame-Options: DENY
X-XSS-Protection: 1 ; mode=block
Referrer-Policy: no-referrer
connection: close
>>>WEBFLUX.FN ROUTER
@Component
class MyRouter(private val myHandler: MyHandler) {
    @Bean
    fun routes(myHandler: MyHandler) =
        coRouter {
            accept(APPLICATION_JSON).nest {
                ("/api".nest {
                    POST("/compass", myHandler::generateCompass)
                })
            }
        }
}
>>>HANDLER
override suspend fun generateCompass(request: ServerRequest): ServerResponse {
    return myService
        .generateCompass(request.awaitBody())
        .fold({ throw it }, {
            ok()
                .contentType(MediaType.IMAGE_PNG)
                .bodyAndAwait(it)
        })
}
>>>SERVICE
   suspend fun generateCompass(request: CompassRequest
    ): Either<Throwable, Flow<ByteArray>> =

        Either.Right(
            flow<ByteArray> {
                MapCompass(request)
                    .exportToRaster()
                    .map { it.toByteArray() }
            })
4

1 回答 1

0

最后发现使用flow{}和flowOf()是有区别的。我需要使用 flowOf() 来流式传输我的 ByteArray。一切都好,现在...

>>>CORRECTED CODE
    private fun generateMapCompass(
        request: MapCompassRequest
    ): Either<Throwable, Flow<ByteArray>> =
        MapCompass()
            .exportToRaster(MediaType.Png, ImageDimension(1.0f, 1.0f))
            .map { baos ->
                baos.toByteArray()
            }
            .map { flowOf(it) }
/**
 * An analogue of the [flow] builder that does not check the context of execution of the resulting flow.
 * Used in our own operators where we trust the context of invocations.
 */
@PublishedApi
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
    return object : Flow<T> {
        override suspend fun collect(collector: FlowCollector<T>) {
            collector.block()
        }
    }
}
/**
 * Creates a flow that produces values from the specified `vararg`-arguments.
 *
 * Example of usage:
 * ```
 * flowOf(1, 2, 3)
 * ```
 */
public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
    for (element in elements) {
        emit(element)
    }
}
于 2020-08-09T00:21:51.787 回答