我正在 Scala 中编写一个简单的应用程序,它在不同的主题上收听 Kafka,当写作事件发生时,我在 ElasticSearch 中写下数据。我使用elastic4s作为 ElasticSearch Java API 的包装器。因此,监听不同的主题被实现为在 Futures 中运行的并发进程。我的应用程序中的这段代码看起来像
Future { container.orangesHandler.runMessagesHandling() }
Future { container.lemonsHandler.runMessagesHandling() }
Future { container.limesHandler.runMessagesHandling() }
Future { container.mandarinsHandler.runMessagesHandling() }
为了写入 ElasticSearch,我有一个帮助对象,其中包含一个ElasticClient private val client = ElasticClient.transport(uri)
,它连接到 ElasticSearch 并通过它写入数据。为了实现写作,我在这个对象中有以下方法:
def update(indexName: String, objectType: String, data: String) =
checkForIndexAndTypeExistence(indexName, objectType).flatMap { response =>
client.execute {
index into indexName -> objectType source data
}
}
private def checkForIndexAndTypeExistence(indexName: String, objectType: String) =
client.execute(indexExists(indexName)).flatMap { response =>
if (!response.isExists)
client.execute(create index indexName mappings (mapping(objectType) templates(dynamicTemplate)))
else
checkForTypeExistence(indexName, objectType)
}
private def checkForTypeExistence(indexName: String, objectType: String) =
client.execute(typesExist(objectType) in indexName).flatMap { response =>
if (!response.isExists())
client.execute(putMapping(indexName / objectType) templates(dynamicTemplate))
else Future(true)
}
问题是当我运行 3 个 Futures 时一切正常。但是当我添加第四个 Future 时它不起作用。特别是客户端不起作用,它只是用永远不会完成的Promise响应。一个有趣的细节:当我在具有四核处理器的计算机上运行此应用程序时,它可以与四个 Futures 一起正常工作。