4

我一直在查看NAPI 文档以尝试了解它是如何处理multithreading. 根据文档napi_create_threadsafe_function()napi_call_threadsafe_function()用于js functions从多个线程创建和调用。问题是文档不是那么直截了当,没有示例,我在其他任何地方都找不到。

如果有人有任何使用经验napi_create_threadsafe_function()napi_call_threadsafe_function()或者知道在哪里可以找到使用它们的示例。请您提供一个基本示例,以便我了解如何正确使用它们。

我正在编写一个C插件C++,不需要使用这些功能。我没有使用包装器node-addon-api,而是napi直接使用

4

3 回答 3

7

作为一个概括性的标签,我们可以说,N-API ThreadSafeFunctions 充当了在工作线程上执行的异步 C/C++ 代码和用于信息交换的 JavaScript 层之间的安全隧道

在进行技术讨论之前,让我们考虑一个场景,即我们需要完成一个运行时间很长的繁重任务。我们都知道把这个任务放在 node.js 主线程上不是一个好的选择,它会阻塞事件循环并阻塞队列中的所有其他任务。所以一个不错的选择可能是在一个单独的线程中考虑这个任务(让我们将此线程称为工作线程)。JavaScript 异步回调和 Promise 正是采用这种方法。

假设我们已经在工作线程上部署了任务,并且我们已经准备好部分结果,我们希望将其发送到 JavaScript 层。然后过程涉及的是,将结果转换为 napi_value 然后从 C/C++ 调用回调 JavaScript 函数。不幸的是,这两个操作都不能从工作线程中执行。这些操作应该只在主线程中完成。JavaScript Promise 和 Callback,等到任务完成,然后切换到主线程,任务结果在一个普通的 C/C++ 存储设施中,如结构等。然后进行 napi_value 转换并从 main 调用 JavaScript 回调函数线。

由于我们的任务运行时间非常长,我们可能不想等到任务结束后再与 JavaScript 层交换结果。让我们考虑一个场景,我们正在一个非常大的视频中搜索对象,我们希望在找到对象时将检测到的对象发送到 JavaScript 层。在这种情况下,我们必须在任务仍在进行时开始发送任务结果这是异步线程安全函数调用来帮助我们的场景。它充当工作线程和 JavaScript 层之间用于信息交换的安全隧道。让我们考虑以下函数片段

napi_value CAsyncStreamSearch(napi_env env, napi_callback_info info)
{
    // The native addon function exposed to JavaScript
    // This will be the funciton a node.js application calling.
}

void ExecuteWork(napi_env env, void* data)
{
    // We will use this function to get the task done.
    // This code will be executed on a worker thread.
}

void OnWorkComplete(napi_env env, napi_status status, void* data)
{
    // after the `ExecuteWork` function exits, this
    // callback function will be called on the main thread
}

void ThreadSafeCFunction4CallingJS(napi_env env, napi_value js_cb,
                 void* context, void* data)
{
   // This funcion acts as a safe tunnel between the asynchronous C/C++ code 
   // executing the worker thread and the JavaScript layer for information exchange.
}

前三个函数与我们熟悉的 JavaScript Promise 和 Callback 几乎相同。第四个专门用于异步线程安全函数调用。在这种情况下,我们的长时间运行的任务正在由工作线程上的 ExecuteWork() 函数执行。假设它指示我们不要从 ExecuteWork() 调用 JavaScript(以及任何结果的 napi_value 转换),但允许从 ThreadSafeCFunction4CallingJS 这样做只要我们使用与 C/C++ 函数指针等效的 napi 调用 ThreadSafeCFunction4CallingJS。然后我们可以将 JavaScript 调用打包到这个 ThreadSafeCFunction4CallingJS() 函数中。然后当 ExecuteWork() 函数可以将结果传递给 ThreadSafeCFunction4CallingJS() 时,它在结构等普通 C/C++ 存储单元中被调用。 ThreadSafeCFunction4CallingJS() 将此结果转换为 napi_value 并调用 JavaScript 函数。 在封面下,ThreadSafeCFunction4CallingJS() 函数正在排队到事件循环,并最终由主线程执行。

以下代码片段包含在 CAsyncStreamSearch() 中,负责通过使用 napi_create_threadsafe_function() 创建一个等效于 N-API 的 C/C++ 函数指针,并且它是从本机插件的主线程本身完成的。类似地,通过使用napi_create_async_work()函数创建工作线程的请求,然后通过使用napi_queue_async_work()将工作放入事件队列 中,以便工作线程将来会拾取该项目。

napi_value CAsyncStreamSearch(napi_env env, napi_callback_info info)
{
-- -- -- --
-- -- -- --
  // Create a thread-safe N-API callback function correspond to the C/C++ callback function
  napi_create_threadsafe_function(env,
      js_cb, NULL, work_name, 0, 1, NULL, NULL, NULL,
      ThreadSafeCFunction4CallingJS, // the C/C++ callback function
      // out: the asynchronous thread-safe JavaScript function
      &(async_stream_data_ex->tsfn_StreamSearch));

  // Create an async work item, that can be deployed in the node.js event queue
  napi_create_async_work( env, NULL,
       work_name,
       ExecuteWork,
       OnWorkComplete,
       async_stream_data_ex,
       // OUT: THE handle to the async work item
       &(async_stream_data_ex->work_StreamSearch);)

  // Queue the work item for execution.
  napi_queue_async_work(env, async_stream_data_ex->work_StreamSearch);

  return NULL;
}

然后在异步执行任务期间(ExecuteWork() 函数)通过​​调用 napi_call_threadsafe_function() 函数调用 ThreadSafeCFunction4CallingJS(),如下所示。

static void ExecuteWork(napi_env env, void *data)
{
  // tsfn is napi equivalent of point to ThreadSafeCFunction4CallingJS
  // function that we created at CAsyncStreamSearch function
  napi_acquire_threadsafe_function( tsfn )
  Loop
  {
    // this will eventually invoke ThreadSafeCFunction4CallingJS()
   // we may call any number of time (in fact it can be called from any thread)
    napi_call_threadsafe_function( tsfn, WorkResult, );
  }
  napi_release_threadsafe_function( tsfn,);
}

您指出的示例是最好的信息来源之一,它直接来自 node.js 团队本身。当我学习这个概念时,我也参考了同一个例子,在我的研究过程中,通过从中提取原始想法重新创建了这个例子,希望你会发现这个简化了很多。它可以在

https://github.com/msatyan/MyNodeC/blob/master/src/mync1/ThreadSafeAsyncStream.cpp https://github.com/msatyan/MyNodeC/blob/master/test/ThreadSafeAsyncStream.js

于 2019-12-01T23:34:00.403 回答
2

如果其他人遇到这个问题。我终于设法在这里找到了一个例子。

一旦我更好地理解它并获得了工作样本,我将在这里更新。希望将来需要这个的人会比我更轻松。

见萨蒂安的回答

于 2019-11-20T18:11:35.357 回答
0

该站点的解决方案在这里对我有用

struct ThreadCtx {
  ThreadCtx(Napi::Env env) {};

  std::thread nativeThread;
  Napi::ThreadSafeFunction tsfn;
};


void Target::Connect(const Napi::CallbackInfo& info) {
  Napi::Env env = info.Env();


  threadCtx = new ThreadCtx(env);

  // Create a ThreadSafeFunction
  threadCtx->tsfn = Napi::ThreadSafeFunction::New(env, info[0].As<Napi::Function>(), "Resource Name", 0 /* Unlimited queue */, 1 /* Only 1 thread */, threadCtx,
        [&]( Napi::Env, void *finalizeData, ThreadCtx *context ) {
            printf("Thread cleanup\n");
            threadCtx->nativeThread.join();
        },
        (void*)nullptr
        );

  // Create a native thread
  threadCtx->nativeThread = std::thread([&] {
    auto callback = [](Napi::Env env, Napi::Function cb, char* buffer) {
        cb.Call({Napi::String::New(env, buffer)});
    };



    char reply[1024];
    memset(reply, 0, sizeof(reply));
    while(true)
    {
        size_t reply_length = boost::asio::read(s, boost::asio::buffer(reply, sizeof(reply)));

        if(reply_length <= 0) {
            printf("Bad read from boost asio\n");
            break;
        }


        // Callback (blocking) to JS
        napi_status status = threadCtx->tsfn.BlockingCall(reply, callback);
        if (status != napi_ok)
        {
            // Handle error
            break;
        }
    }

    // Release the thread-safe function
    threadCtx->tsfn.Release();
  });
}
于 2020-05-21T23:04:03.707 回答