1

问题:
推理生成过程每秒将大约 300 个推理数据写入 MongoDB 集合。另一个进程利用 MongoDB 的更改流特性来回读这些推论并进行后处理。目前,调用变更流函数API(mongoc_change_stream_next())时只返回一个推理数据。因此,总共需要 300 次这样的调用才能在 1 秒内获取存储的所有推理数据。但是,每次读取后,需要大约 50ms 的时间来对单个/多个推理数据进行后处理。由于单一数据返回模型,引入了 15 倍的有效延迟。为了解决这个问题,我们正在尝试实施批量读取机制符合 MongoDB 的更改流功能。我们尝试了各种选项来实现相同的功能,但在每次更改流 API 调用后仍然只获得一个数据。有没有办法解决这个问题?

平台:
操作系统:Ubuntu 16.04
Mongo-c-driver:1.15.1
Mongo 服务器:4.0.12

尝试的选项:
将光标的批量大小设置为大于 1。

int main(void) {
    const char *uri_string = "mongodb://localhost:27017/replicaSet=set0";
    mongoc_change_stream_t *stream;
    mongoc_collection_t *coll;
    bson_error_t error;
        mongoc_uri_t *uri;
    mongoc_client_t *client;

    /*
    * Add the Mongo DB blocking read and scall the inference parse function with the Json
                 * */
    uri = mongoc_uri_new_with_error (uri_string, &error);
    if (!uri) {
        fprintf (stderr,
        "failed to parse URI: %s\n"
        "error message:       %s\n",
        uri_string,
        error.message);
        return -1;
    }

    client = mongoc_client_new_from_uri (uri);
    if (!client) {
        return -1;
    }

    coll = mongoc_client_get_collection (client,  <DB-NAME>, <collection-name>);
    stream = mongoc_collection_watch (coll, &empty, NULL);
    mongoc_cursor_set_batch_size(stream->cursor, 20);
    while (1){
        while (mongoc_change_stream_next (stream, &doc)) {
            char *as_json = bson_as_relaxed_extended_json (doc, NULL); 
            ............
            ............
            //post processing consuming 50 ms of time
            ............
            ............
        }
        if (mongoc_change_stream_error_document (stream, &error, &err_doc)) {
            if (!bson_empty (err_doc)) {
                fprintf (stderr,
                "Server Error: %s\n",
                bson_as_relaxed_extended_json (err_doc, NULL));
            } else {
                fprintf (stderr, "Client Error: %s\n", error.message);
            }
            break;
        }
    }
    return 0;
}

4

1 回答 1

1

目前调用变更流函数API(mongoc_change_stream_next())时只返回一个推理数据

从技术上讲,并不是返回单个文档。这是因为mongoc_change_stream_next()迭代底层游标,将每个游标设置bson为下一个文档。因此,即使返回的批量大小超过 1,它仍然需要对每个文档进行迭代。

你可以试试:

  • 创建单独的线程以并行处理文档,因此您不必等待每个文档 50 毫秒或累积等待 15 秒。

  • 循环遍历一批文档,即 50 缓存它们然后执行批处理

  • 在单独的线程上批量处理它们(以上两者的组合)

于 2020-03-15T22:35:26.223 回答