我对 vert.x 平台很陌生。我的项目中有一个标准和一个工人 verticle,它通过 eventBus 进行通信。worker verticle 在循环和数据库访问中执行多个 REST API 调用。
我的问题是工人verticle在某些运行中没有问题地完成任务,但有时它会抛出错误。
Exception in thread "vert.x-worker-thread-12" io.vertx.core.VertxException: Connection was closed
我正在使用 kotlin 协程来处理constructDevice(vertx: Vertx)
执行大多数 REST API 调用和数据库访问的函数。
谁能告诉我上述问题的原因是什么,还有什么方法可以改进constructDevice(vertx: Vertx)
功能以有效地处理多个 REST API 调用和 MongoDB 访问。
// worker verticle to handle multiple REST API calls and MongoDB database access
class DeviceDiscoverVerticle : CoroutineVerticle() {
override suspend fun start() {
val consumer = vertx.eventBus().localConsumer<String>("listDevice")
consumer.handler { message ->
CoroutineScope(vertx.dispatcher()).launch {
constructDevice(vertx)
}
message.reply("discovered")
}
}
}
// standard verticle to communicate with worker verticle
class ListDeviceVerticle : CoroutineVerticle() {
override suspend fun start() {
val reply = awaitResult<Message<String>> { h ->
vertx.eventBus().request("listDevice", "deviceIPs", h)
}
println("Reply received: ${reply.body()}")
}
}
fun main() {
val vertx = Vertx.vertx()
val workOption = DeploymentOptions().setWorker(true)
vertx.deployVerticle(DeviceDiscoverVerticle(), workOption)
vertx.deployVerticle(ListDeviceVerticle())
}
suspend fun constructDevice(vertx: Vertx) {
val deviceRepository = listOf(
"10.22.0.106",
"10.22.0.120",
"10.22.0.115",
"10.22.0.112"
)
val webClient = WebClient.create(vertx)
val config = json { obj("db_name" to "mnSet", "connection_string" to "mongodb://localhost:27017") }
val mongoClient: MongoClient = MongoClient.create(vertx, config)
val json = Json(JsonConfiguration.Stable.copy(ignoreUnknownKeys = true))
// loop through the IP list and calls REST endpoints
val deviceList = deviceRepository.map { deviceIP ->
val deviceIPconfig: DeviceIPconfig
val deviceType: DeviceType
val requestDeviceIP: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/ipconfig/")
val requestDeviceType: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/information/")
val responseDeviceIP = awaitResult<HttpResponse<Buffer>> { handler ->
requestDeviceIP.send(handler)
}
deviceIPconfig = if (responseDeviceIP.statusCode() == 200) {
json.parse(DeviceIPconfig.serializer(), responseDeviceIP.bodyAsString())
} else {
println("request to device $deviceIP failed with ${responseDeviceIP.statusCode()}")
DeviceIPconfig()
}
val responseDeviceType = awaitResult<HttpResponse<Buffer>> { handler ->
requestDeviceType.send(handler)
}
if (responseDeviceType.statusCode() == 200) {
deviceType = json.parse(DeviceType.serializer(), responseDeviceType.bodyAsString())
val device = DeviceModel(deviceIPconfig, deviceType)
json {
obj(
"_id" to deviceIPconfig.localMac,
"device" to json.stringify(DeviceModel.serializer(), device)
)
}
} else {
println("request to device $deviceIP failed with ${responseDeviceType.statusCode()}")
jsonObjectOf()
}
}.filterNot { it.isEmpty }
// construct data to upload in mongoDB
val activeDeviceIDs = json {
obj("_id" to "activeDeviceIDs",
"activeDeviceIDs" to deviceList.map { it.get<String>("_id") })
}
val activeDevices = json {
obj("_id" to "activeDevices",
"activeDevices" to json { array(deviceList) }
)
}
// save the data in MongoDB
mongoClient.save("devices", activeDeviceIDs) { res ->
if (res.succeeded()) {
println("saved successfully")
} else {
res.cause().printStackTrace()
}
}
mongoClient.save("devices", activeDevices) { res ->
if (res.succeeded()) {
println("saved successfully")
} else {
res.cause().printStackTrace()
}
}
}
更新的问题:1
@Damian 我已根据您的输入更新了我的问题。为了便于理解,我已经简化了上面的问题,但是当我尝试使用 promise/future 来实现这些事情时,我在某些时候遇到了困难。
我的任务是从不同的 REST 端点获取数据并从中获取 kotlin 类,我想并行处理。
fun constructDeviceDevice(deviceIP: String, device: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
val requestDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/$device")
val deviceDevicePromise: Promise<HttpResponse<Buffer>> = Promise.promise()
requestDevices.send { asyncResult ->
if (asyncResult.succeeded())
deviceDevicePromise.complete(asyncResult.result())
else
deviceDevicePromise.fail("Http request failed");
}
return deviceDevicePromise.future()
}
fun constructDeviceDevices(deviceIP: String, webClient: WebClient): Future<List<Future<HttpResponse<Buffer>>>> {
val requestDeviceDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/")
val deviceDevicesPromise: Promise<List<Future<HttpResponse<Buffer>>>> = Promise.promise()
requestDeviceDevices.send { asyncResult ->
if (asyncResult.succeeded()) {
// this will return Json array and each element of that array needs to be called again in a loop.
val result = asyncResult.result().bodyAsJsonArray().map { device ->
constructDeviceDevice(deviceIP, device.toString(), webClient)
}
deviceDevicesPromise.complete(result)
} else
deviceDevicesPromise.fail("Http request failed")
}
return deviceDevicesPromise.future()
}
fun constructDevice(vertx: Vertx, webClient: WebClient, deviceIP: String): List<Future<HttpResponse<Buffer>>> {
val deviceDevicesFuture: Future<List<Future<HttpResponse<Buffer>>>> = constructDeviceDevices(deviceIP, webClient)
// I need to call other rest points similar to this and I need map the result to kotlin class.
// how do get HTTP response out of each future request in deviceDevicesFuture: Future<List<Future<HttpResponse<Buffer>>>>.
}
class DeviceDiscoverVerticle : AbstractVerticle() {
override fun start() {
val deviceRepository = // list of IP strings
val webClient = WebClient.create(vertx)
vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
deviceRepository.forEach { deviceIP ->
val futureList = constructDevice(vertx, webClient, deviceIP)
CompositeFuture.all(futureList).onComplete { allFuturesResult ->
if (allFuturesResult.succeeded()) {
// how to handle individual future result here to construct data
} else {
println("failed")
}
}
}
}
}
更新的问题:2
@Damian 正如你所建议的,我已经更新了我的代码。
fun constructDeviceDevice(deviceIP: String, device: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
val requestDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/flows/$device")
val deviceDevicePromise: Promise<HttpResponse<Buffer>> = Promise.promise()
requestDevices.send { asyncResult ->
if (asyncResult.succeeded())
deviceDevicePromise.complete(asyncResult.result())
else
deviceDevicePromise.fail("Http request failed")
}
return deviceDevicePromise.future()
}
fun constructDeviceDevices(deviceIP: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
val requestDeviceDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/flows/")
val deviceDevicesPromise: Promise<HttpResponse<Buffer>> = Promise.promise()
requestDeviceDevices.send { asyncResult ->
if (asyncResult.succeeded()) {
deviceDevicesPromise.complete(asyncResult.result())
}
else
deviceDevicesPromise.fail("Http request failed")
}
return deviceDevicesPromise.future()
}
fun constructDevice(webClient: WebClient, deviceIP: String): Future<DeviceFlow> {
val json = Json(JsonConfiguration.Stable.copy(ignoreUnknownKeys = true, isLenient = true))
val constructDevicePromise: Promise<DeviceFlow> = Promise.promise()
val httpDevicesFuture: Future<HttpResponse<Buffer>> = constructDeviceDevices(deviceIP, webClient)
httpDevicesFuture.onComplete { ar ->
if(ar.succeeded()) {
val futureList = ar.result().bodyAsJsonArray().map { device ->
constructDeviceDevice(deviceIP, device.toString(), webClient)
}
CompositeFuture.all(futureList).onComplete { asyncResult ->
if (asyncResult.succeeded()) {
asyncResult.result().list<HttpResponse<Buffer>>().forEach { res ->
//not all future in futureList are completed here some of them shows Future{unresolved}
}
constructDevicePromise.complete(DeviceFlow(label = "xyz"))
}
else {
constructDevicePromise.fail("failed")
}
}
}
}
return constructDevicePromise.future()
}
class DeviceDiscoverVerticle : AbstractVerticle() {
override fun start() {
val deviceRepository = //list of IPs
val webClient = WebClient.create(vertx)
vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
deviceRepository.forEach { deviceIP ->
val constructDeviceFuture = constructDevice(webClient, deviceIP)
constructDeviceFuture.onComplete {ar ->
//println(ar.result().toString())
}
}
}
}
}
我的问题在里面
CompositeFuture.all(futureList).onComplete { asyncResult ->
if (asyncResult.succeeded()) {
asyncResult.result().list<HttpResponse<Buffer>>().forEach {
在这里,大多数期货都没有解决,执行在这里被绞死。
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@67d2e79}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@8bad0c6}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@c854509}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
所以我已更改CompositeFuture.all(futureList).onComplete
为CompositeFuture.join(futureList).onComplete
根据 vert.x 文档加入将等待所有未来完成
连接组合一直等到所有未来都完成,无论是成功还是失败。CompositeFuture.join 接受多个期货参数(最多 6 个)并返回一个期货,当所有期货都成功时成功,当所有期货都完成并且至少其中一个失败时返回失败
但现在很少有期货会失败。这是更改为之后的未来列表的输出CompositeFuture.join
CompositeFuture.join(futureList).onComplete { asyncResult ->
println(futureList)
if (asyncResult.succeeded()) { res ->
// println(res) this one gets hanged and not printing all response
asyncResult.result().list<HttpResponse<Buffer>>().forEach {
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5e9d3832}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@379c326a}]
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@51a39962}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@edcd528}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@293c3e5c}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5f86d3ec}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@12a329f7}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@7abedb1e}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@3238d4cb}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5bc868d3}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@50af1ecc}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5cc549ec}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@282f4033}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@41a890b3}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@147d772a}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
由于我的设备无法处理并发请求,因此很少有期货失败?还有为什么程序执行卡在里面
asyncResult.result().list<HttpResponse<Buffer>>().forEach {
如果设备并发请求处理存在问题,那么解决此问题的其他解决方案是什么。是否可以在 vertx 环境之外运行整个其余调用并通过事件总线与其通信?
此外,如果我部署DeviceDiscoverVerticle
为标准 Verticle 而不是工作 Verticle,则应用程序完全卡在 CompositeFuture.all(futureList).onComplete
.