Browse Source

chore: cherry-pick 7abc7e45b2 from node (#29048)

Backports: https://github.com/nodejs/node/pull/38506

Co-authored-by: Fedor Indutny <[email protected]>
trop[bot] 4 years ago
parent
commit
ed0b654fee
2 changed files with 263 additions and 0 deletions
  1. 1 0
      patches/node/.patches
  2. 262 0
      patches/node/node-api_faster_threadsafe_function.patch

+ 1 - 0
patches/node/.patches

@@ -34,3 +34,4 @@ build_add_mjs_support_to_js2c.patch
 src_inline_asynccleanuphookhandle_in_headers.patch
 fix_handle_new_tostring_behavior_in_v8_serdes_test.patch
 fix_the_--harmony-weak-refs_has_been_removed_remove_from_specs.patch
+node-api_faster_threadsafe_function.patch

+ 262 - 0
patches/node/node-api_faster_threadsafe_function.patch

@@ -0,0 +1,262 @@
+From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
+From: Fedor Indutny <[email protected]>
+Date: Sat, 1 May 2021 11:26:46 -0700
+Subject: node-api: faster threadsafe_function
+
+Invoke threadsafe_function during the same tick and avoid marshalling
+costs between threads and/or churning event loop if either:
+
+1. There's a queued call already
+2. `Push()` is called while the main thread was running
+   threadsafe_function
+
+PR-URL: https://github.com/nodejs/node/pull/38506
+Reviewed-By: Anna Henningsen <[email protected]>
+Reviewed-By: Rich Trott <[email protected]>
+Reviewed-By: James M Snell <[email protected]>
+
+diff --git a/src/node_api.cc b/src/node_api.cc
+index f1a5265b6a7234dc754aedc86ecd3132f3d90b09..d1076b29aeb5133a0325d3e7ebd097d207e4f4a6 100644
+--- a/src/node_api.cc
++++ b/src/node_api.cc
+@@ -12,6 +12,7 @@
+ #include "tracing/traced_value.h"
+ #include "util-inl.h"
+ 
++#include <atomic>
+ #include <memory>
+ 
+ struct node_napi_env__ : public napi_env__ {
+@@ -131,6 +132,7 @@ class ThreadSafeFunction : public node::AsyncResource {
+                                    *v8::String::Utf8Value(env_->isolate, name)),
+       thread_count(thread_count_),
+       is_closing(false),
++      dispatch_state(kDispatchIdle),
+       context(context_),
+       max_queue_size(max_queue_size_),
+       env(env_),
+@@ -170,10 +172,8 @@ class ThreadSafeFunction : public node::AsyncResource {
+         return napi_closing;
+       }
+     } else {
+-      if (uv_async_send(&async) != 0) {
+-        return napi_generic_failure;
+-      }
+       queue.push(data);
++      Send();
+       return napi_ok;
+     }
+   }
+@@ -205,9 +205,7 @@ class ThreadSafeFunction : public node::AsyncResource {
+         if (is_closing && max_queue_size > 0) {
+           cond->Signal(lock);
+         }
+-        if (uv_async_send(&async) != 0) {
+-          return napi_generic_failure;
+-        }
++        Send();
+       }
+     }
+ 
+@@ -232,7 +230,6 @@ class ThreadSafeFunction : public node::AsyncResource {
+         cond = std::make_unique<node::ConditionVariable>();
+       }
+       if (max_queue_size == 0 || cond) {
+-        CHECK_EQ(0, uv_idle_init(loop, &idle));
+         return napi_ok;
+       }
+ 
+@@ -257,21 +254,46 @@ class ThreadSafeFunction : public node::AsyncResource {
+ 
+   napi_status Unref() {
+     uv_unref(reinterpret_cast<uv_handle_t*>(&async));
+-    uv_unref(reinterpret_cast<uv_handle_t*>(&idle));
+ 
+     return napi_ok;
+   }
+ 
+   napi_status Ref() {
+     uv_ref(reinterpret_cast<uv_handle_t*>(&async));
+-    uv_ref(reinterpret_cast<uv_handle_t*>(&idle));
+ 
+     return napi_ok;
+   }
+ 
+-  void DispatchOne() {
++  inline void* Context() {
++    return context;
++  }
++
++ protected:
++  void Dispatch() {
++    bool has_more = true;
++
++    // Limit maximum synchronous iteration count to prevent event loop
++    // starvation. See `src/node_messaging.cc` for an inspiration.
++    unsigned int iterations_left = kMaxIterationCount;
++    while (has_more && --iterations_left != 0) {
++      dispatch_state = kDispatchRunning;
++      has_more = DispatchOne();
++
++      // Send() was called while we were executing the JS function
++      if (dispatch_state.exchange(kDispatchIdle) != kDispatchRunning) {
++        has_more = true;
++      }
++    }
++
++    if (has_more) {
++      Send();
++    }
++  }
++
++  bool DispatchOne() {
+     void* data = nullptr;
+     bool popped_value = false;
++    bool has_more = false;
+ 
+     {
+       node::Mutex::ScopedLock lock(this->mutex);
+@@ -296,9 +318,9 @@ class ThreadSafeFunction : public node::AsyncResource {
+               cond->Signal(lock);
+             }
+             CloseHandlesAndMaybeDelete();
+-          } else {
+-            CHECK_EQ(0, uv_idle_stop(&idle));
+           }
++        } else {
++          has_more = true;
+         }
+       }
+     }
+@@ -316,6 +338,8 @@ class ThreadSafeFunction : public node::AsyncResource {
+         call_js_cb(env, js_callback, context, data);
+       });
+     }
++
++    return has_more;
+   }
+ 
+   void Finalize() {
+@@ -329,10 +353,6 @@ class ThreadSafeFunction : public node::AsyncResource {
+     EmptyQueueAndDelete();
+   }
+ 
+-  inline void* Context() {
+-    return context;
+-  }
+-
+   void CloseHandlesAndMaybeDelete(bool set_closing = false) {
+     v8::HandleScope scope(env->isolate);
+     if (set_closing) {
+@@ -352,18 +372,20 @@ class ThreadSafeFunction : public node::AsyncResource {
+           ThreadSafeFunction* ts_fn =
+               node::ContainerOf(&ThreadSafeFunction::async,
+                                 reinterpret_cast<uv_async_t*>(handle));
+-          v8::HandleScope scope(ts_fn->env->isolate);
+-          ts_fn->env->node_env()->CloseHandle(
+-              reinterpret_cast<uv_handle_t*>(&ts_fn->idle),
+-              [](uv_handle_t* handle) -> void {
+-                ThreadSafeFunction* ts_fn =
+-                    node::ContainerOf(&ThreadSafeFunction::idle,
+-                                      reinterpret_cast<uv_idle_t*>(handle));
+-                ts_fn->Finalize();
+-              });
++          ts_fn->Finalize();
+         });
+   }
+ 
++  void Send() {
++    // Ask currently running Dispatch() to make one more iteration
++    unsigned char current_state = dispatch_state.fetch_or(kDispatchPending);
++    if ((current_state & kDispatchRunning) == kDispatchRunning) {
++      return;
++    }
++
++    CHECK_EQ(0, uv_async_send(&async));
++  }
++
+   // Default way of calling into JavaScript. Used when ThreadSafeFunction is
+   //  without a call_js_cb_.
+   static void CallJs(napi_env env, napi_value cb, void* context, void* data) {
+@@ -387,16 +409,10 @@ class ThreadSafeFunction : public node::AsyncResource {
+     }
+   }
+ 
+-  static void IdleCb(uv_idle_t* idle) {
+-    ThreadSafeFunction* ts_fn =
+-        node::ContainerOf(&ThreadSafeFunction::idle, idle);
+-    ts_fn->DispatchOne();
+-  }
+-
+   static void AsyncCb(uv_async_t* async) {
+     ThreadSafeFunction* ts_fn =
+         node::ContainerOf(&ThreadSafeFunction::async, async);
+-    CHECK_EQ(0, uv_idle_start(&ts_fn->idle, IdleCb));
++    ts_fn->Dispatch();
+   }
+ 
+   static void Cleanup(void* data) {
+@@ -405,14 +421,20 @@ class ThreadSafeFunction : public node::AsyncResource {
+   }
+ 
+  private:
++  static const unsigned char kDispatchIdle = 0;
++  static const unsigned char kDispatchRunning = 1 << 0;
++  static const unsigned char kDispatchPending = 1 << 1;
++
++  static const unsigned int kMaxIterationCount = 1000;
++
+   // These are variables protected by the mutex.
+   node::Mutex mutex;
+   std::unique_ptr<node::ConditionVariable> cond;
+   std::queue<void*> queue;
+   uv_async_t async;
+-  uv_idle_t idle;
+   size_t thread_count;
+   bool is_closing;
++  std::atomic_uchar dispatch_state;
+ 
+   // These are variables set once, upon creation, and then never again, which
+   // means we don't need the mutex to read them.
+diff --git a/test/node-api/test_threadsafe_function/binding.c b/test/node-api/test_threadsafe_function/binding.c
+index c9c526153804c60b5bd5844d2c2aacac7f0fb514..ae3ec67de43cfffdfb10772761dbd69f01c623bb 100644
+--- a/test/node-api/test_threadsafe_function/binding.c
++++ b/test/node-api/test_threadsafe_function/binding.c
+@@ -7,7 +7,7 @@
+ #include <node_api.h>
+ #include "../../js-native-api/common.h"
+ 
+-#define ARRAY_LENGTH 10
++#define ARRAY_LENGTH 10000
+ #define MAX_QUEUE_SIZE 2
+ 
+ static uv_thread_t uv_threads[2];
+@@ -72,7 +72,7 @@ static void data_source_thread(void* data) {
+   for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) {
+     status = napi_call_threadsafe_function(ts_fn, &ints[index],
+         ts_fn_info->block_on_full);
+-    if (ts_fn_info->max_queue_size == 0) {
++    if (ts_fn_info->max_queue_size == 0 && (index % 1000 == 0)) {
+       // Let's make this thread really busy for 200 ms to give the main thread a
+       // chance to abort.
+       uint64_t start = uv_hrtime();
+diff --git a/test/node-api/test_threadsafe_function/test.js b/test/node-api/test_threadsafe_function/test.js
+index 3603d79ee6b5d36590503989d8168368eaf12b03..ccd3f4228a793ae77eff760309e31191ba8de49a 100644
+--- a/test/node-api/test_threadsafe_function/test.js
++++ b/test/node-api/test_threadsafe_function/test.js
+@@ -210,6 +210,15 @@ new Promise(function testWithoutJSMarshaller(resolve) {
+ }))
+ .then((result) => assert.strictEqual(result.indexOf(0), -1))
+ 
++// Make sure that threadsafe function isn't stalled when we hit
++// `kMaxIterationCount` in `src/node_api.cc`
++.then(() => testWithJSMarshaller({
++  threadStarter: 'StartThreadNonblocking',
++  maxQueueSize: binding.ARRAY_LENGTH >>> 1,
++  quitAfter: binding.ARRAY_LENGTH
++}))
++.then((result) => assert.deepStrictEqual(result, expectedArray))
++
+ // Start a child process to test rapid teardown
+ .then(() => testUnref(binding.MAX_QUEUE_SIZE))
+