Skip to content

Commit c8edfb7

Browse files
committed
test: add test for effect of UV_THREADPOOL_SIZE
This (not particularly elegant) native addon tests the effect of UV_THREADPOOL_SIZE on node-api. The test fails if Node.js allows more than UV_THREADPOOL_SIZE async tasks to run concurrently, or if it limits the number of concurrent async tasks to anything less than UV_THREADPOOL_SIZE.
1 parent 634eb50 commit c8edfb7

File tree

3 files changed

+204
-0
lines changed

3 files changed

+204
-0
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"targets": [
3+
{
4+
"target_name": "test_uv_threadpool_size",
5+
"sources": [ "test_uv_threadpool_size.c" ]
6+
}
7+
]
8+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
'use strict';
2+
const common = require('../../common');
3+
const { test } = require(`./build/${common.buildType}/test_uv_threadpool_size`);
4+
5+
const uvThreadpoolSize = parseInt(process.env.EXPECTED_UV_THREADPOOL_SIZE ||
6+
process.env.UV_THREADPOOL_SIZE, 10) || 4;
7+
test(uvThreadpoolSize);
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
#undef NDEBUG
2+
#include <assert.h>
3+
#include <node_api.h>
4+
#include <stdlib.h>
5+
#include <uv.h>
6+
#include "../../js-native-api/common.h"
7+
8+
typedef struct {
9+
uv_mutex_t mutex;
10+
uint32_t threadpool_size;
11+
uint32_t n_tasks_started;
12+
uint32_t n_tasks_exited;
13+
uint32_t n_tasks_finalized;
14+
bool observed_saturation;
15+
} async_shared_data;
16+
17+
typedef struct {
18+
uint32_t task_id;
19+
async_shared_data* shared_data;
20+
napi_async_work request;
21+
} async_carrier;
22+
23+
static inline bool all_tasks_started(async_shared_data* d) {
24+
assert(d->n_tasks_started <= d->threadpool_size + 1);
25+
return d->n_tasks_started == d->threadpool_size + 1;
26+
}
27+
28+
static inline bool all_tasks_exited(async_shared_data* d) {
29+
assert(d->n_tasks_exited <= d->n_tasks_started);
30+
return all_tasks_started(d) && d->n_tasks_exited == d->n_tasks_started;
31+
}
32+
33+
static inline bool all_tasks_finalized(async_shared_data* d) {
34+
assert(d->n_tasks_finalized <= d->n_tasks_exited);
35+
return all_tasks_exited(d) && d->n_tasks_finalized == d->n_tasks_exited;
36+
}
37+
38+
static inline bool still_saturating(async_shared_data* d) {
39+
return d->n_tasks_started < d->threadpool_size;
40+
}
41+
42+
static inline bool threadpool_saturated(async_shared_data* d) {
43+
return d->n_tasks_started == d->threadpool_size && d->n_tasks_exited == 0;
44+
}
45+
46+
static inline bool threadpool_desaturating(async_shared_data* d) {
47+
return d->n_tasks_started >= d->threadpool_size && d->n_tasks_exited != 0;
48+
}
49+
50+
static inline void print_info(const char* label, async_carrier* c) {
51+
async_shared_data* d = c->shared_data;
52+
printf("%s task_id=%u n_tasks_started=%u n_tasks_exited=%u "
53+
"n_tasks_finalized=%u observed_saturation=%d\n",
54+
label,
55+
c->task_id,
56+
d->n_tasks_started,
57+
d->n_tasks_exited,
58+
d->n_tasks_finalized,
59+
d->observed_saturation);
60+
}
61+
62+
static void Execute(napi_env env, void* data) {
63+
async_carrier* c = (async_carrier*)data;
64+
async_shared_data* d = c->shared_data;
65+
66+
// As long as fewer than threadpool_size async tasks have been started, more
67+
// should be started (eventually). Only once that happens should scheduled
68+
// async tasks remain queued.
69+
uv_mutex_lock(&d->mutex);
70+
bool should_be_concurrent = still_saturating(d);
71+
d->n_tasks_started++;
72+
assert(d->n_tasks_started <= d->threadpool_size + 1);
73+
74+
print_info("start", c);
75+
76+
if (should_be_concurrent) {
77+
// Wait for the thread pool to be saturated. This is not an elegant way of
78+
// doing so, but it really does not matter much here.
79+
while (still_saturating(d)) {
80+
print_info("waiting", c);
81+
uv_mutex_unlock(&d->mutex);
82+
uv_sleep(100);
83+
uv_mutex_lock(&d->mutex);
84+
}
85+
86+
// One async task will observe that the threadpool is saturated, that is,
87+
// that threadpool_size tasks have been started and none have exited yet.
88+
// That task will be the first to exit.
89+
if (!d->observed_saturation) {
90+
assert(threadpool_saturated(d));
91+
d->observed_saturation = true;
92+
} else {
93+
assert(threadpool_saturated(d) || threadpool_desaturating(d));
94+
}
95+
} else {
96+
// If this task is not among the first threadpool_size tasks, it should not
97+
// have been started unless other tasks have already finished.
98+
assert(threadpool_desaturating(d));
99+
}
100+
101+
print_info("exit", c);
102+
103+
// Allow other tasks to access the shared data. If the thread pool is actually
104+
// larger than threadpool_size, this allows an extraneous task to start, which
105+
// will lead to an assertion error.
106+
uv_mutex_unlock(&d->mutex);
107+
uv_sleep(1000);
108+
uv_mutex_lock(&d->mutex);
109+
110+
d->n_tasks_exited++;
111+
uv_mutex_unlock(&d->mutex);
112+
}
113+
114+
static void Complete(napi_env env, napi_status status, void* data) {
115+
async_carrier* c = (async_carrier*)data;
116+
async_shared_data* d = c->shared_data;
117+
118+
if (status != napi_ok) {
119+
napi_throw_type_error(env, NULL, "Execute callback failed.");
120+
return;
121+
}
122+
123+
uv_mutex_lock(&d->mutex);
124+
assert(threadpool_desaturating(d));
125+
d->n_tasks_finalized++;
126+
print_info("finalize", c);
127+
if (all_tasks_finalized(d)) {
128+
uv_mutex_unlock(&d->mutex);
129+
uv_mutex_destroy(&d->mutex);
130+
free(d);
131+
} else {
132+
uv_mutex_unlock(&d->mutex);
133+
}
134+
135+
NODE_API_CALL_RETURN_VOID(env, napi_delete_async_work(env, c->request));
136+
free(c);
137+
}
138+
139+
static napi_value Test(napi_env env, napi_callback_info info) {
140+
size_t argc = 1;
141+
napi_value argv[1];
142+
napi_value this;
143+
void* data;
144+
NODE_API_CALL(env, napi_get_cb_info(env, info, &argc, argv, &this, &data));
145+
NODE_API_ASSERT(env, argc >= 1, "Not enough arguments, expected 1.");
146+
147+
async_shared_data* shared_data = calloc(1, sizeof(async_shared_data));
148+
assert(shared_data != NULL);
149+
int ret = uv_mutex_init(&shared_data->mutex);
150+
assert(ret == 0);
151+
152+
napi_valuetype t;
153+
NODE_API_CALL(env, napi_typeof(env, argv[0], &t));
154+
NODE_API_ASSERT(
155+
env, t == napi_number, "Wrong first argument, integer expected.");
156+
NODE_API_CALL(
157+
env, napi_get_value_uint32(env, argv[0], &shared_data->threadpool_size));
158+
159+
napi_value resource_name;
160+
NODE_API_CALL(env,
161+
napi_create_string_utf8(
162+
env, "TestResource", NAPI_AUTO_LENGTH, &resource_name));
163+
164+
for (uint32_t i = 0; i <= shared_data->threadpool_size; i++) {
165+
async_carrier* carrier = malloc(sizeof(async_carrier));
166+
assert(carrier != NULL);
167+
carrier->task_id = i;
168+
carrier->shared_data = shared_data;
169+
NODE_API_CALL(env,
170+
napi_create_async_work(env,
171+
NULL,
172+
resource_name,
173+
Execute,
174+
Complete,
175+
carrier,
176+
&carrier->request));
177+
NODE_API_CALL(env, napi_queue_async_work(env, carrier->request));
178+
}
179+
180+
return NULL;
181+
}
182+
183+
static napi_value Init(napi_env env, napi_value exports) {
184+
napi_property_descriptor desc = DECLARE_NODE_API_PROPERTY("test", Test);
185+
NODE_API_CALL(env, napi_define_properties(env, exports, 1, &desc));
186+
return exports;
187+
}
188+
189+
NAPI_MODULE(NODE_GYP_MODULE_NAME, Init)

0 commit comments

Comments
 (0)