问题:
推理生成过程每秒将大约 300 个推理数据写入 MongoDB 集合。另一个进程利用 MongoDB 的更改流特性来回读这些推论并进行后处理。目前,调用变更流函数API(mongoc_change_stream_next())时只返回一个推理数据。因此,总共需要 300 次这样的调用才能在 1 秒内获取存储的所有推理数据。但是,每次读取后,需要大约 50ms 的时间来对单个/多个推理数据进行后处理。由于单一数据返回模型,引入了 15 倍的有效延迟。为了解决这个问题,我们正在尝试实施批量读取机制符合 MongoDB 的更改流功能。我们尝试了各种选项来实现相同的功能,但在每次更改流 API 调用后仍然只获得一个数据。有没有办法解决这个问题?
平台:
操作系统:Ubuntu 16.04
Mongo-c-driver:1.15.1
Mongo 服务器:4.0.12
尝试的选项:
将光标的批量大小设置为大于 1。
int main(void) {
const char *uri_string = "mongodb://localhost:27017/replicaSet=set0";
mongoc_change_stream_t *stream;
mongoc_collection_t *coll;
bson_error_t error;
mongoc_uri_t *uri;
mongoc_client_t *client;
/*
* Add the Mongo DB blocking read and scall the inference parse function with the Json
* */
uri = mongoc_uri_new_with_error (uri_string, &error);
if (!uri) {
fprintf (stderr,
"failed to parse URI: %s\n"
"error message: %s\n",
uri_string,
error.message);
return -1;
}
client = mongoc_client_new_from_uri (uri);
if (!client) {
return -1;
}
coll = mongoc_client_get_collection (client, <DB-NAME>, <collection-name>);
stream = mongoc_collection_watch (coll, &empty, NULL);
mongoc_cursor_set_batch_size(stream->cursor, 20);
while (1){
while (mongoc_change_stream_next (stream, &doc)) {
char *as_json = bson_as_relaxed_extended_json (doc, NULL);
............
............
//post processing consuming 50 ms of time
............
............
}
if (mongoc_change_stream_error_document (stream, &error, &err_doc)) {
if (!bson_empty (err_doc)) {
fprintf (stderr,
"Server Error: %s\n",
bson_as_relaxed_extended_json (err_doc, NULL));
} else {
fprintf (stderr, "Client Error: %s\n", error.message);
}
break;
}
}
return 0;
}