1

我正在 Scala 中编写一个简单的应用程序,它在不同的主题上收听 Kafka,当写作事件发生时,我在 ElasticSearch 中写下数据。我使用elastic4s作为 ElasticSearch Java API 的包装器。因此,监听不同的主题被实现为在 Futures 中运行的并发进程。我的应用程序中的这段代码看起来像

Future { container.orangesHandler.runMessagesHandling() }
Future { container.lemonsHandler.runMessagesHandling() }
Future { container.limesHandler.runMessagesHandling() }
Future { container.mandarinsHandler.runMessagesHandling() }

为了写入 ElasticSearch,我有一个帮助对象,其中包含一个ElasticClient private val client = ElasticClient.transport(uri),它连接到 ElasticSearch 并通过它写入数据。为了实现写作,我在这个对象中有以下方法:

def update(indexName: String, objectType: String, data: String) =
        checkForIndexAndTypeExistence(indexName, objectType).flatMap { response =>
            client.execute {
                index into indexName -> objectType source data
            }
        }

private def checkForIndexAndTypeExistence(indexName: String, objectType: String) =
    client.execute(indexExists(indexName)).flatMap { response =>
        if (!response.isExists)
            client.execute(create index indexName mappings (mapping(objectType) templates(dynamicTemplate)))
        else
            checkForTypeExistence(indexName, objectType)
    }

private def checkForTypeExistence(indexName: String, objectType: String) =
    client.execute(typesExist(objectType) in indexName).flatMap { response =>
        if (!response.isExists())
            client.execute(putMapping(indexName / objectType) templates(dynamicTemplate))
        else Future(true)
    }

问题是当我运行 3 个 Futures 时一切正常。但是当我添加第四个 Future 时它不起作用。特别是客户端不起作用,它只是用永远不会完成的Promise响应。一个有趣的细节:当我在具有四核处理器的计算机上运行此应用程序时,它可以与四个 Futures 一起正常工作。

4

0 回答 0