0

我对 vert.x 平台很陌生。我的项目中有一个标准和一个工人 verticle,它通过 eventBus 进行通信。worker verticle 在循环和数据库访问中执行多个 REST API 调用。

我的问题是工人verticle在某些运行中没有问题地完成任务,但有时它会抛出错误。

Exception in thread "vert.x-worker-thread-12" io.vertx.core.VertxException: Connection was closed

我正在使用 kotlin 协程来处理constructDevice(vertx: Vertx)执行大多数 REST API 调用和数据库访问的函数。

谁能告诉我上述问题的原因是什么,还有什么方法可以改进constructDevice(vertx: Vertx)功能以有效地处理多个 REST API 调用和 MongoDB 访问。

    // worker verticle to handle multiple REST API calls and MongoDB database access
    
    class DeviceDiscoverVerticle : CoroutineVerticle() {
        override suspend fun start() {
            val consumer = vertx.eventBus().localConsumer<String>("listDevice")
            consumer.handler { message ->
                CoroutineScope(vertx.dispatcher()).launch {
                    constructDevice(vertx)
                }
                message.reply("discovered")
            }
        }
    }
    
    // standard verticle to communicate with worker verticle 
    
    class ListDeviceVerticle : CoroutineVerticle() {
        override suspend fun start() {
            val reply = awaitResult<Message<String>> { h ->
                vertx.eventBus().request("listDevice", "deviceIPs", h)
            }
            println("Reply received: ${reply.body()}")
        }
    }
    
    fun main() {
        val vertx = Vertx.vertx()
        val workOption = DeploymentOptions().setWorker(true)
        vertx.deployVerticle(DeviceDiscoverVerticle(), workOption)
        vertx.deployVerticle(ListDeviceVerticle())
    }


    suspend fun constructDevice(vertx: Vertx) {
        val deviceRepository = listOf(
            "10.22.0.106",
            "10.22.0.120",
            "10.22.0.115",
            "10.22.0.112"
        )
    
        val webClient = WebClient.create(vertx)
        val config = json { obj("db_name" to "mnSet", "connection_string" to "mongodb://localhost:27017") }
        val mongoClient: MongoClient = MongoClient.create(vertx, config)
        val json = Json(JsonConfiguration.Stable.copy(ignoreUnknownKeys = true))
        
        // loop through the IP list and calls REST endpoints
        
        val deviceList = deviceRepository.map { deviceIP ->
            val deviceIPconfig: DeviceIPconfig
            val deviceType: DeviceType
            val requestDeviceIP: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/ipconfig/")
            val requestDeviceType: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/information/")
    
            val responseDeviceIP = awaitResult<HttpResponse<Buffer>> { handler ->
                requestDeviceIP.send(handler)
            }
            deviceIPconfig = if (responseDeviceIP.statusCode() == 200) {
                json.parse(DeviceIPconfig.serializer(), responseDeviceIP.bodyAsString())
            } else {
                println("request to device $deviceIP failed with ${responseDeviceIP.statusCode()}")
                DeviceIPconfig()
            }
            
            val responseDeviceType = awaitResult<HttpResponse<Buffer>> { handler ->
                requestDeviceType.send(handler)
            }
            if (responseDeviceType.statusCode() == 200) {
                deviceType = json.parse(DeviceType.serializer(), responseDeviceType.bodyAsString())
                val device = DeviceModel(deviceIPconfig, deviceType)
                json {
                    obj(
                        "_id" to deviceIPconfig.localMac,
                        "device" to json.stringify(DeviceModel.serializer(), device)
                    )
                }
            } else {
                println("request to device $deviceIP failed with ${responseDeviceType.statusCode()}")
                jsonObjectOf()
            }
    
        }.filterNot { it.isEmpty }
        
        // construct data to upload in mongoDB
        
        val activeDeviceIDs = json {
            obj("_id" to "activeDeviceIDs",
                "activeDeviceIDs" to deviceList.map { it.get<String>("_id") })
        }
        val activeDevices = json {
            obj("_id" to "activeDevices",
                "activeDevices" to json { array(deviceList) }
            )
        }
        
        // save the data in MongoDB
        
        mongoClient.save("devices", activeDeviceIDs) { res ->
            if (res.succeeded()) {
                println("saved successfully")
            } else {
                res.cause().printStackTrace()
            }
        }
        mongoClient.save("devices", activeDevices) { res ->
            if (res.succeeded()) {
                println("saved successfully")
            } else {
                res.cause().printStackTrace()
            }
        }
    }

更新的问题:1

@Damian 我已根据您的输入更新了我的问题。为了便于理解,我已经简化了上面的问题,但是当我尝试使用 promise/future 来实现这些事情时,我在某些时候遇到了困难。

我的任务是从不同的 REST 端点获取数据并从中获取 kotlin 类,我想并行处理。

fun constructDeviceDevice(deviceIP: String, device: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/$device")
    val deviceDevicePromise: Promise<HttpResponse<Buffer>> = Promise.promise()

    requestDevices.send { asyncResult ->
        if (asyncResult.succeeded())
            deviceDevicePromise.complete(asyncResult.result())
        else
            deviceDevicePromise.fail("Http request failed");
    }
    return deviceDevicePromise.future()
}

fun constructDeviceDevices(deviceIP: String, webClient: WebClient): Future<List<Future<HttpResponse<Buffer>>>> {
    val requestDeviceDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/")
    val deviceDevicesPromise: Promise<List<Future<HttpResponse<Buffer>>>> = Promise.promise()

    requestDeviceDevices.send { asyncResult ->
        if (asyncResult.succeeded()) {
            // this will return Json array and each element of that array needs to be called again in a loop.
            val result = asyncResult.result().bodyAsJsonArray().map { device ->
                constructDeviceDevice(deviceIP, device.toString(), webClient)
            }
            deviceDevicesPromise.complete(result)
        } else
            deviceDevicesPromise.fail("Http request failed")
    }
    return deviceDevicesPromise.future()
}

fun constructDevice(vertx: Vertx, webClient: WebClient, deviceIP: String): List<Future<HttpResponse<Buffer>>> {

    val deviceDevicesFuture: Future<List<Future<HttpResponse<Buffer>>>> = constructDeviceDevices(deviceIP, webClient)
    // I need to call other rest points similar to this and I need map the result to kotlin class.

   // how do get HTTP response out of each future request in deviceDevicesFuture: Future<List<Future<HttpResponse<Buffer>>>>. 

}

class DeviceDiscoverVerticle : AbstractVerticle() {
        override fun start() {
            val deviceRepository = // list of IP strings
    
            val webClient = WebClient.create(vertx)
            vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
                deviceRepository.forEach { deviceIP ->
                    val futureList = constructDevice(vertx, webClient, deviceIP)
                    CompositeFuture.all(futureList).onComplete { allFuturesResult ->
                            if (allFuturesResult.succeeded()) {
                                // how to handle individual future result here to construct data
                            } else {
                                println("failed")
                            }
                        }
                }
            }
        }

更新的问题:2

@Damian 正如你所建议的,我已经更新了我的代码。

fun constructDeviceDevice(deviceIP: String, device: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/flows/$device")
    val deviceDevicePromise: Promise<HttpResponse<Buffer>> = Promise.promise()

    requestDevices.send { asyncResult ->
        if (asyncResult.succeeded())
            deviceDevicePromise.complete(asyncResult.result())
        else
            deviceDevicePromise.fail("Http request failed")
    }
    return deviceDevicePromise.future()
}

fun constructDeviceDevices(deviceIP: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDeviceDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/flows/")
    val deviceDevicesPromise: Promise<HttpResponse<Buffer>> = Promise.promise()

    requestDeviceDevices.send { asyncResult ->
        if (asyncResult.succeeded()) {
            deviceDevicesPromise.complete(asyncResult.result())
        }
        else
            deviceDevicesPromise.fail("Http request failed")
    }
    return deviceDevicesPromise.future()
}


fun constructDevice(webClient: WebClient, deviceIP: String): Future<DeviceFlow> {
    val json = Json(JsonConfiguration.Stable.copy(ignoreUnknownKeys = true, isLenient = true))
    val constructDevicePromise: Promise<DeviceFlow> = Promise.promise()
    val httpDevicesFuture: Future<HttpResponse<Buffer>> = constructDeviceDevices(deviceIP, webClient)

    httpDevicesFuture.onComplete { ar ->
        if(ar.succeeded()) {
            val futureList = ar.result().bodyAsJsonArray().map { device ->
                constructDeviceDevice(deviceIP, device.toString(), webClient)
            }
            CompositeFuture.all(futureList).onComplete { asyncResult ->
                if (asyncResult.succeeded()) {
                    asyncResult.result().list<HttpResponse<Buffer>>().forEach { res ->
                        //not all future in futureList are completed here some of them shows Future{unresolved}
                    }
                    constructDevicePromise.complete(DeviceFlow(label = "xyz"))
                }
                else {
                    constructDevicePromise.fail("failed")
                }
            }

        }
    }
    return constructDevicePromise.future()
}


class DeviceDiscoverVerticle : AbstractVerticle() {
    override fun start() {
        val deviceRepository = //list of IPs

        val webClient = WebClient.create(vertx)
        vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
            deviceRepository.forEach { deviceIP ->
                val constructDeviceFuture = constructDevice(webClient, deviceIP)
                constructDeviceFuture.onComplete {ar ->
                    //println(ar.result().toString())
                }
            }
        }
    }
}

我的问题在里面

CompositeFuture.all(futureList).onComplete { asyncResult ->
                        if (asyncResult.succeeded()) {
                            asyncResult.result().list<HttpResponse<Buffer>>().forEach {

在这里,大多数期货都没有解决,执行在这里被绞死。

[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@67d2e79}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@8bad0c6}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@c854509}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]

所以我已更改CompositeFuture.all(futureList).onCompleteCompositeFuture.join(futureList).onComplete根据 vert.x 文档加入将等待所有未来完成

连接组合一直等到所有未来都完成,无论是成功还是失败。CompositeFuture.join 接受多个期货参数(最多 6 个)并返回一个期货,当所有期货都成功时成功,当所有期货都完成并且至少其中一个失败时返回失败

但现在很少有期货会失败。这是更改为之后的未来列表的输出CompositeFuture.join

CompositeFuture.join(futureList).onComplete { asyncResult ->
println(futureList)
                            if (asyncResult.succeeded()) { res ->
// println(res) this one gets hanged and not printing all response
                                asyncResult.result().list<HttpResponse<Buffer>>().forEach {



[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5e9d3832}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@379c326a}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@51a39962}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@edcd528}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@293c3e5c}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5f86d3ec}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@12a329f7}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@7abedb1e}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@3238d4cb}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5bc868d3}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@50af1ecc}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5cc549ec}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@282f4033}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@41a890b3}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@147d772a}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]

由于我的设备无法处理并发请求,因此很少有期货失败?还有为什么程序执行卡在里面

asyncResult.result().list<HttpResponse<Buffer>>().forEach { 

如果设备并发请求处理存在问题,那么解决此问题的其他解决方案是什么。是否可以在 vertx 环境之外运行整个其余调用并通过事件总线与其通信?

此外,如果我部署DeviceDiscoverVerticle为标准 Verticle 而不是工作 Verticle,则应用程序完全卡在 CompositeFuture.all(futureList).onComplete.

4

2 回答 2

1

我不熟悉 kotlin 和协程,但我可能对 vert.x 本身有一些建议。首先根据文档

在大多数情况下,应在应用程序启动时创建一次 Web 客户端,然后重用。否则你会失去很多好处,比如连接池,如果实例没有正确关闭,可能会泄漏资源。

我看到您在constructDevice 方法中调用了Webclient.create(vertx),因此每次发送“listDevice”事件时都会创建新的WebClient,因此您可以考虑更改它。

我最近有类似的事情要做,最后使用了Futures。请注意,当您调用 awaitResult 时,您正在阻塞线程以等待异步执行,如果这是标准 Verticle,您确实会收到阻塞线程警告的垃圾邮件。你可以做的是创建一个promise,在你的http处理程序中完成/失败,在处理程序之外你只需返回promise.future()对象。在循环之外,您可以处理所有期货,不同之处在于期货处理也将是异步的,因此您不会阻塞线程。

此外,为了使代码更简洁并利用 vert.x 异步特性,最好将 http 和 mongo 处理拆分为单独的 verticles,即

  1. HttpVerticle 获取 listDevice 事件
  2. HttpVerticle 为 5 个不同的请求创建 5 个 future
  3. 当所有期货完成时,future.onComplete()/compositeFuture.all() 被触发并发送“updateDB”事件
  4. MongoVerticle 接收并处理 'updateDB' 事件

您的具体问题可能未在此处解决,但我希望它至少能让您更进一步

下面的评论是java中期货的一个例子

public class HttpVerticle extends AbstractVerticle {

WebClient webClient;

@Override
public void start() throws Exception {

    webClient = WebClient.create(vertx);

    vertx.eventBus().consumer("run_multiple_requests", event -> {
        //When event is received this block is handled by some thread from worker pool, let's call it 'main thread'
        Promise<HttpResponse<Buffer>> request1Promise = Promise.promise();
        Promise<HttpResponse<Buffer>> request2Promise = Promise.promise();
        Promise<HttpResponse<Buffer>> request3Promise = Promise.promise();

        //Since webclient is async, all calls will be asynchronous
        webClient.get("ip1", "/endpoint")
                .send(asyncResult -> {
                    //async block #1 if it's worker verticle, it's probably picked up by another thread
                    //here we specify that our promise finished or failed
                    if (asyncResult.succeeded()) {
                        request1Promise.complete(asyncResult.result());
                    } else {
                        request1Promise.fail("Http request failed");
                    }
                });

        //at this point async block #1 is probably still processing
        webClient.get("ip2", "/endpoint")
                .send(asyncResult -> {
                    //async block #2 if it's worker verticle, it's probably picked up by another thread
                    //here we specify that our promise finished or failed
                    if (asyncResult.succeeded()) {
                        request2Promise.complete(asyncResult.result());
                    } else {
                        request2Promise.fail("Http request failed");
                    }
                });

        //at this point async block #1 and #2 are probably still processing
        webClient.get("ip3", "/endpoint")
                .send(asyncResult -> {
                    //async block #3 if it's worker verticle, it's probably picked up by another thread
                    //here we specify that our promise finished or failed
                    if (asyncResult.succeeded()) {
                        request3Promise.complete(asyncResult.result());
                    } else {
                        request3Promise.fail("Http request failed");
                    }
                });

        //retrieving futures from promises
        Future<HttpResponse<Buffer>> future1 = request1Promise.future();
        Future<HttpResponse<Buffer>> future2 = request2Promise.future();
        Future<HttpResponse<Buffer>> future3 = request3Promise.future();

       
        CompositeFuture.all(future1, future2, future3).onComplete(allFuturesResult -> {
            //async block #4 this will be executed only when all futures complete, but since it's async it does
            // not block our 'main thread'
            if (allFuturesResult.succeeded()) {
                //all requests succeeded
                vertx.eventBus().send("update_mongo", someMessage);
            } else {
                //some of the requests failed, handle it here
            }
        });
        
        //at this point async block #1 #2 #3 are probably still processing and #4 is waiting for callback
        //but we leave our event handler and free 'main thread' without waiting for anything
    });
}

当然,这段代码可以(并且应该)更短,所有这些都是硬编码的,没有任何数组和循环,只是为了清楚起见

如果您使用 logback 或 log4j(其他可能也是如此),您可以将 [%t] 放在日志模式中,它会在日志消息中显示线程名称,对我个人而言,了解所有这些异步块的流程真的很有帮助

还有一件事,通过这种设置,所有三个请求实际上将同时发送,因此请确保 http 服务器能够同时处理多个请求。

于 2020-06-24T13:44:41.853 回答
1

了解更多您要实现的目标,首先在方法中,constructDeviceDevices()我会将返回类型更改为 just Future<HttpResponse<Buffer>>,如果成功,只需调用deviceDevicesPromise.complete(asyncResult.result())

然后,在constructDevice()方法中,我将调用我们修改后的constructDeviceDevices()方法并从中获取一个未来对象,我们称之为它Future<HttpResponse<Buffer>> httpDevicesFuture。下一步是调用httpDevicesFuture.onComplete(ar -> {<handler code>})您有权访问的处理程序,该处理程序ar.result()是来自“.../devices/”端点的响应,所以现在在同一个块中,我将遍历该响应并获取List<Future<HttpResponse<Buffer>>>。仍然留在同一个块中,我会写CompositeFuture.all(futuresList).onComplete(ar -> handler)ar将是它的类型CompositeFuture,它有一个list()实际返回已完成期货列表的方法(并且在此处理程序中它们都已完成)所以现在使用该列表您可以检索HttpResponse<Buffer>对于每个未来,每一个都将是您的“.../devices/$device”响应,您可以将它们映射到您想要的任何对象。现在,在同一个处理程序中,我将决定下一步我想去哪里,我可能会通过在 eventBus 上发送一条消息来做到这一点,例如eventBus.send("HTTP_PROCESSING_DONE", serializedDevices)或以防出现问题eventBus.send("HTTP_FAILURE", someMessage)。但是在您的情况下,如果您想为某个列表中的每个 IP 执行所有这些操作,而不是强制它同步,那么仍然在同一个块中,您可以进行任何对象映射并调用constructDeviceFuture.complete(mappedObject/List<MappedObject>),这意味着您必须再创建一个未来,你将从constructDevice()方法返回

基本上你被卡住了,因为你试图在异步世界中重现顺序执行,特别是在你尝试从constructDevice()方法返回值的那一刻,这意味着我们实际上想要等待所有执行在这一行的时间完成代码已被处理,而在 vert.x 中并非如此。

它看起来像那样(语法可能已关闭,因此将其视为伪代码)

    fun constructDeviceDevice(deviceIP: String, device: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/$device")
    val deviceDevicePromise: Promise<HttpResponse<Buffer>> = Promise.promise()

    requestDevices.send { asyncResult ->
        if (asyncResult.succeeded())
            deviceDevicePromise.complete(asyncResult.result())
        else
            deviceDevicePromise.fail("Http request failed");
    }
    return deviceDevicePromise.future()
}

fun constructDeviceDevices(deviceIP: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDeviceDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/")
    val deviceDevicesPromise: Future<HttpResponse<Buffer>> = Promise.promise()

    requestDeviceDevices.send { asyncResult ->
        if (asyncResult.succeeded()) {
            deviceDevicesPromise.complete(asyncResult.result())
        } else
            deviceDevicesPromise.fail("Http request failed")
    }
    return deviceDevicesPromise.future()
}

fun constructDevice(vertx: Vertx, webClient: WebClient, deviceIP: String): Future<SomeDomainObject> {

    //Type of below promise depends on what you are mapping responses to. It may also be a list of mapped objects
    val constructDevicePromise: Promise<SomeDomainObject> = Promise.promise()
    val httpDevicesFuture: Future<HttpResponse<Buffer>> = constructDeviceDevices(deviceIP, webClient)

    httpDevicesFuture.onComplete { ar ->
        if (ar.succeeded()) {
            val futureList: List<Future<HttpResponse<Buffer>>>
            //loop through ar.result() and populate deviceDevicesFuture list

            CompositeFuture.all(futureList).onComplete { allFuturesResult ->
                if (allFuturesResult.succeeded()) {
                    // here you have access to allFuturesResult.list() method
                    // at this point you know all futures have finished, you can retrieve result from them (you may need to cast them from Object)
                    // when you have List<HttpResponse> you map it to whatever you want
                    val myMappedObject: SomeDomainObject = mappingResult()
                    constructDevicePromise.complete(myMappedObject)
                } else {
                    constructDevicePromise.fail("failed")
                }
            }
        }
    }

    return constructDevicePromise.future()
}

class DeviceDiscoverVerticle : AbstractVerticle() {
    override fun start() {
        val deviceRepository = // list of IP strings

        val webClient = WebClient.create(vertx)
        vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
            deviceRepository.forEach { deviceIP ->
                //here dependent on your logic, you handle each future alone or create a list and handle them together
                val constructDeviceFuture: Future<SomeDomainObject> = constructDevice(vertx, webClient, deviceIP)
                constructDeviceFuture.onComplete(ar -> {
                    ar.result() // <- this is your mapped object
                    eventBus.send("SOME_NEXT_LOGIC", serializedDomainObject)
                })
            }
            
            //if you need to handle all devices at once, once again you need to make CompositeFuture from all responses of constructDevice
        }
    }
}

更新 2 响应

关于CompositeFuture.all():您缺少一件事,CompositeFuture.all() 等到所有期货成功至少一个失败。即使一个失败了,它也不会等待其他人(这就像逻辑与,不需要等待其余的,因为我们已经知道结果了)。CompositeFuture.join()另一方面,只是等待所有期货完成,但如果其中任何一个失败,则由此产生的未来也将失败(但您至少应该得到所有期货的结果)。

这实际上就是您在输出中看到的内容,CompositeFuture.all()您得到一堆已完成的 Futures,一个失败了,其余的未解决。

这部分还缺少一件事:

vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
        deviceRepository.forEach { deviceIP ->
            val constructDeviceFuture = constructDevice(webClient, deviceIP)
            constructDeviceFuture.onComplete {ar ->
                //println(ar.result().toString())
            }
        }
    }

你没有检查是否ar.succeeded(),如果你会看到最终的未来实际上是失败的,这就是为什么最终结果不是你所期望的。

现在只是纯粹猜测发生了什么。您可能会(在某种程度上)杀死(在某种程度上)这个其余的 API(我假设每个 vertx 事件都是相同的 API)有这么多并发请求,如果您在单个请求处理程序中放置一些毫秒精度的日志消息,您可能应该看到请求是相隔几毫秒。我想 API 能够处理很少的请求,然后下一个请求由于某些异常/阻塞/超时或其他原因而失败,而所有其他请求可能根本没有得到响应,或者正在等待它们达到某个超时。如果您将 Verticle 定义为标准,则当任何内容持续超过两秒时,您将收到警告,此外,有一个线程处理所有这些内容,因此如果一个请求挂起很长时间,那时标准的verticle将完全没有响应。这可能是你被困的原因CompositeFuture.join()方法。

所以现在你可以做几件事:

  1. 您可以将并发执行更改为顺序执行。基本上,不是n事先创建期货,而是为单个元素创建一个未来,然后调用future.compose(ar -> {})这个处理程序将仅在未来完成时被调用。然后在同一个处理程序中创建并返回下一个元素的未来。实现 imo 有点棘手,但可行(我已经使用 java stream reduce 将x未来减少为单个)。当您以这种方式实现时,您一次将有一个请求,因此 API 应该没有问题。请注意,仍然会同时处理不同的 IP,但每个 IP 的请求将是连续的,因此它可能工作得很好。

  2. 您可以创建另一个标准 Verticle,它只响应单个事件,该事件将调用“/devices/$device”端点。现在在您现在拥有的代码中,当您循环通过初始 http 响应时,您无需再产生 20 个 HTTP 请求,而是将 20 个事件发送到 eventBus。当您只有一个处理该特定消息的 Verticle 实例,并且它是一个只有一个线程的标准 Verticle 时,实际上此时应该只处理一条消息并且应该排队。这也非常容易调整,因为您可以增加 Verticle 实例的数量,并且您将拥有与 Verticle 实例数量一样多的并发请求。

  3. 您提到完全在 vertx 之外处理它,我认为这根本没有必要,但如果您认为它最适合您,那么它非常简单。如果您已经有Vertx来自某个地方的对象,那么将该对象传递给其他类的构造函数是没有问题的。在那里,您可以拥有自己的 http 客户端、您自己的方法,基本上是您想要的任何东西,并且在某些时候,当您决定要使用 vert.x 时,您可以调用vertx.eventBus().send()并触发一些将由 vert.x 处理的逻辑。要记住的最重要的事情是不要创建多个Vertx对象实例,因为它们将具有单独的事件总线。实际上正如文档所述

Verticles ... 这个模型是完全可选的,如果你不想,Vert.x 不会强迫你以这种方式创建应用程序。

所以你可以让你的常规应用程序在任何框架中编写,并且在某些时候仍然只是实例化 Vertx 对象,执行单个任务,然后回到你的基本框架,但老实说,我认为你非常接近解决这个问题 :)

于 2020-06-25T16:06:24.263 回答