Skip to content

Commit e1340e5

Browse files
committed
NSFS | add dedicated servers for each forks for health checks
Signed-off-by: nadav mizrahi <nadav.mizrahi16@gmail.com>
1 parent de301b6 commit e1340e5

File tree

9 files changed

+177
-61
lines changed

9 files changed

+177
-61
lines changed

config.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -979,6 +979,8 @@ config.ENDPOINT_SSL_PORT = Number(process.env.ENDPOINT_SSL_PORT) || 6443;
979979
// Remove the NSFS condition when NSFS starts to support STS.
980980
config.ENDPOINT_SSL_STS_PORT = Number(process.env.ENDPOINT_SSL_STS_PORT) || (process.env.NC_NSFS_NO_DB_ENV === 'true' ? -1 : 7443);
981981
config.ENDPOINT_SSL_IAM_PORT = Number(process.env.ENDPOINT_SSL_IAM_PORT) || -1;
982+
// each fork will get port in range [ENDPOINT_FORK_PORT_BASE, ENDPOINT_FORK_PORT_BASE + number of forks - 1)]
983+
config.ENDPOINT_FORK_PORT_BASE = Number(process.env.ENDPOINT_FORK_PORT_BASE) || 6002;
982984
config.ALLOW_HTTP = false;
983985
config.ALLOW_HTTP_METRICS = true;
984986
config.ALLOW_HTTPS_METRICS = true;

src/endpoint/endpoint.js

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ const { SemaphoreMonitor } = require('../server/bg_services/semaphore_monitor');
4343
const prom_reporting = require('../server/analytic_services/prometheus_reporting');
4444
const { PersistentLogger } = require('../util/persistent_logger');
4545
const { get_notification_logger } = require('../util/notifications_util');
46+
const { is_nc_enviorment } = require('../nc/nc_utils');
4647
const NoobaaEvent = require('../manage_nsfs/manage_nsfs_events_utils').NoobaaEvent;
4748
const cluster = /** @type {import('node:cluster').Cluster} */ (
4849
/** @type {unknown} */ (require('node:cluster'))
@@ -57,7 +58,8 @@ const SERVICES_TYPES_ENUM = Object.freeze({
5758
S3: 'S3',
5859
STS: 'STS',
5960
IAM: 'IAM',
60-
METRICS: 'METRICS'
61+
METRICS: 'METRICS',
62+
FORK_HEALTH: 'FORK_HEALTH',
6163
});
6264

6365
const new_umask = process.env.NOOBAA_ENDPOINT_UMASK || 0o000;
@@ -117,11 +119,11 @@ async function main(options = {}) {
117119
const https_metrics_port = options.https_metrics_port || config.EP_METRICS_SERVER_SSL_PORT;
118120
/**
119121
* Please notice that we can run the main in 2 states:
120-
* 1. Only the primary process runs the main (fork is 0 or undefined) - everything that
122+
* 1. Only the primary process runs the main (fork is 0 or undefined) - everything that
121123
* is implemented here would be run by this process.
122-
* 2. A primary process with multiple forks (IMPORTANT) - if there is implementation that
123-
* in only relevant to the primary process it should be implemented in
124-
* fork_utils.start_workers because the primary process returns after start_workers
124+
* 2. A primary process with multiple forks (IMPORTANT) - if there is implementation that
125+
* in only relevant to the primary process it should be implemented in
126+
* fork_utils.start_workers because the primary process returns after start_workers
125127
* and the forks will continue executing the code lines in this function
126128
* */
127129
const is_workers_started_from_primary = await fork_utils.start_workers(http_metrics_port, https_metrics_port,
@@ -202,14 +204,29 @@ async function main(options = {}) {
202204
{ ...options, https_port: https_port_s3, http_port: http_port_s3, virtual_hosts, bucket_logger, notification_logger });
203205
await start_endpoint_server_and_cert(SERVICES_TYPES_ENUM.STS, init_request_sdk, { https_port: https_port_sts, virtual_hosts });
204206
await start_endpoint_server_and_cert(SERVICES_TYPES_ENUM.IAM, init_request_sdk, { https_port: https_port_iam });
205-
207+
const is_nc = is_nc_enviorment();
208+
// fork health server currently runs only on non containerized enviorment
209+
if (is_nc) {
210+
// current process is the primary and only fork. start the fork server directly with the base port
211+
if (cluster.isPrimary) {
212+
await fork_message_request_handler({
213+
nsfs_config_root: options.nsfs_config_root,
214+
health_port: config.ENDPOINT_FORK_PORT_BASE
215+
});
216+
// current process is a worker so we listen to get the port from the primary process.
217+
} else {
218+
process.on('message', fork_message_request_handler);
219+
//send a message to the primary process that we are ready to receive messages
220+
process.send('ready');
221+
}
222+
}
206223

207224
// START METRICS SERVER
208225
if ((http_metrics_port > 0 || https_metrics_port > 0) && cluster.isPrimary) {
209226
await prom_reporting.start_server(http_metrics_port, https_metrics_port, false, options.nsfs_config_root);
210227
}
211228

212-
// TODO: currently NC NSFS deployments don't have internal_rpc_client nor db,
229+
// TODO: currently NC NSFS deployments don't have internal_rpc_client nor db,
213230
// there for namespace monitor won't be registered
214231
if (internal_rpc_client && config.NAMESPACE_MONITOR_ENABLED) {
215232
endpoint_stats_collector.instance().set_rpc_client(internal_rpc_client);
@@ -289,8 +306,6 @@ function create_endpoint_handler(server_type, init_request_sdk, { virtual_hosts,
289306
return blob_rest_handler(req, res);
290307
} else if (req.url.startsWith('/total_fork_count')) {
291308
return fork_count_handler(req, res);
292-
} else if (req.url.startsWith('/endpoint_fork_id')) {
293-
return endpoint_fork_id_handler(req, res);
294309
} else if (req.url.startsWith('/_/')) {
295310
// internals non S3 requests
296311
const api = req.url.slice('/_/'.length);
@@ -531,8 +546,37 @@ function unavailable_handler(req, res) {
531546
res.end(reply);
532547
}
533548

549+
/**
550+
* handler for the inidivdual fork server. used to handle requests the get the worker id
551+
* currently used to check if fork is alive by the health script
552+
* @type {EndpointHandler}
553+
*/
554+
function fork_main_handler(req, res) {
555+
endpoint_utils.set_noobaa_server_header(res);
556+
endpoint_utils.prepare_rest_request(req);
557+
if (req.url.startsWith('/endpoint_fork_id')) {
558+
return endpoint_fork_id_handler(req, res);
559+
} else {
560+
return internal_api_error(req, res, `Unknown API call ${req.url}`);
561+
}
562+
}
563+
564+
/**
565+
* fork_message_request_handler is used to handle messages from the primary process.
566+
* the primary process sends a message with the designated port to start the fork server.
567+
* @param {Object} msg
568+
*/
569+
async function fork_message_request_handler(msg) {
570+
await http_utils.start_https_server(msg.health_port,
571+
SERVICES_TYPES_ENUM.FORK_HEALTH,
572+
fork_main_handler,
573+
msg.nsfs_config_root
574+
);
575+
}
576+
534577
exports.main = main;
535578
exports.create_endpoint_handler = create_endpoint_handler;
536579
exports.create_init_request_sdk = create_init_request_sdk;
580+
exports.endpoint_fork_id_handler = endpoint_fork_id_handler;
537581

538582
if (require.main === module) main();

src/manage_nsfs/health.js

Lines changed: 45 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,14 @@ process.env.AWS_SDK_JS_SUPPRESS_MAINTENANCE_MODE_MESSAGE = '1';
101101
class NSFSHealth {
102102
constructor(options) {
103103
this.https_port = options.https_port;
104+
this.fork_base_port = options.fork_base_port;
104105
this.all_account_details = options.all_account_details;
105106
this.all_bucket_details = options.all_bucket_details;
106107
this.all_connection_details = options.all_connection_details;
107108
this.notif_storage_threshold = options.notif_storage_threshold;
108109
this.lifecycle = options.lifecycle;
109110
this.config_fs = options.config_fs;
111+
this.disable_service_validation = options.disable_service_validation;
110112
}
111113

112114
/**
@@ -121,7 +123,13 @@ class NSFSHealth {
121123
async nc_nsfs_health() {
122124
let endpoint_state;
123125
let memory;
124-
const noobaa_service_state = await this.get_service_state(NOOBAA_SERVICE_NAME);
126+
let noobaa_service_state;
127+
if (this.disable_service_validation) {
128+
// if service validation is disabled, we assume the service is running, set dummy pid. should be used only for testing
129+
noobaa_service_state = { name: NOOBAA_SERVICE_NAME, service_status: 'active', pid: '100' };
130+
} else {
131+
noobaa_service_state = await this.get_service_state(NOOBAA_SERVICE_NAME);
132+
}
125133
const { service_status, pid } = noobaa_service_state;
126134
if (pid !== '0') {
127135
endpoint_state = await this.get_endpoint_response();
@@ -241,10 +249,10 @@ class NSFSHealth {
241249
return service_health;
242250
}
243251

244-
async make_endpoint_health_request(url_path) {
252+
async make_endpoint_health_request(url_path, port = this.https_port) {
245253
const response = await make_https_request({
246-
HOSTNAME,
247-
port: this.https_port,
254+
hostname: HOSTNAME,
255+
port,
248256
path: url_path,
249257
method: 'GET',
250258
rejectUnauthorized: false,
@@ -260,43 +268,37 @@ class NSFSHealth {
260268
let url_path = '/total_fork_count';
261269
const worker_ids = [];
262270
let total_fork_count = 0;
271+
let fork_count_response;
263272
let response;
264273
try {
265-
const fork_count_response = await this.make_endpoint_health_request(url_path);
266-
if (!fork_count_response) {
267-
return {
268-
response: fork_response_code.NOT_RUNNING,
269-
total_fork_count: total_fork_count,
270-
running_workers: worker_ids,
271-
};
272-
}
273-
total_fork_count = fork_count_response.fork_count;
274-
if (total_fork_count > 0) {
275-
url_path = '/endpoint_fork_id';
276-
await P.retry({
277-
attempts: total_fork_count * 2,
278-
delay_ms: 1,
279-
func: async () => {
280-
const fork_id_response = await this.make_endpoint_health_request(url_path);
281-
if (fork_id_response.worker_id && !worker_ids.includes(fork_id_response.worker_id)) {
282-
worker_ids.push(fork_id_response.worker_id);
283-
}
284-
if (worker_ids.length < total_fork_count) {
285-
throw new Error('Number of running forks is less than the expected fork count.');
286-
}
287-
}
288-
});
289-
if (worker_ids.length === total_fork_count) {
290-
response = fork_response_code.RUNNING;
291-
} else {
292-
response = fork_response_code.MISSING_FORKS;
293-
}
294-
} else {
295-
response = fork_response_code.RUNNING;
296-
}
274+
fork_count_response = await this.make_endpoint_health_request(url_path);
297275
} catch (err) {
298-
dbg.log1('Error while pinging endpoint host :' + HOSTNAME + ', port ' + this.https_port, err);
299-
response = fork_response_code.NOT_RUNNING;
276+
dbg.log0('Error while pinging endpoint host :' + HOSTNAME, err);
277+
}
278+
if (!fork_count_response) {
279+
return {
280+
response: fork_response_code.NOT_RUNNING,
281+
total_fork_count: total_fork_count,
282+
running_workers: worker_ids,
283+
};
284+
}
285+
286+
total_fork_count = fork_count_response.fork_count;
287+
url_path = '/endpoint_fork_id';
288+
for (let i = 0; i < total_fork_count; i++) {
289+
const port = this.fork_base_port + i;
290+
try {
291+
const fork_id_response = await this.make_endpoint_health_request(url_path, port);
292+
worker_ids.push(fork_id_response.worker_id);
293+
} catch (err) {
294+
dbg.log0('Error while pinging fork :' + HOSTNAME + ', port ' + port, err);
295+
}
296+
}
297+
if (worker_ids.length < total_fork_count) {
298+
dbg.log0('Number of running forks is less than the expected fork count.');
299+
response = fork_response_code.MISSING_FORKS;
300+
} else {
301+
response = fork_response_code.RUNNING;
300302
}
301303
return {
302304
response: response,
@@ -635,16 +637,19 @@ class NSFSHealth {
635637
async function get_health_status(argv, config_fs) {
636638
try {
637639
const https_port = Number(argv.https_port) || config.ENDPOINT_SSL_PORT;
640+
const fork_base_port = Number(argv.fork_base_port) || config.ENDPOINT_FORK_PORT_BASE;
638641
const deployment_type = argv.deployment_type || 'nc';
639642
const all_account_details = get_boolean_or_string_value(argv.all_account_details);
640643
const all_bucket_details = get_boolean_or_string_value(argv.all_bucket_details);
641644
const all_connection_details = get_boolean_or_string_value(argv.all_connection_details);
642645
const notif_storage_threshold = get_boolean_or_string_value(argv.notif_storage_threshold);
643646
const lifecycle = get_boolean_or_string_value(argv.lifecycle);
647+
const disable_service_validation = get_boolean_or_string_value(argv.disable_service_validation);
644648

645649
if (deployment_type === 'nc') {
646-
const health = new NSFSHealth({ https_port,
647-
all_account_details, all_bucket_details, all_connection_details, notif_storage_threshold, lifecycle, config_fs });
650+
const health = new NSFSHealth({ https_port, fork_base_port,
651+
all_account_details, all_bucket_details, all_connection_details,
652+
notif_storage_threshold, lifecycle, config_fs, disable_service_validation });
648653
const health_status = await health.nc_nsfs_health();
649654
write_stdout_response(ManageCLIResponse.HealthStatus, health_status);
650655
} else {

src/manage_nsfs/manage_nsfs_constants.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ const VALID_OPTIONS_GLACIER = {
7575
};
7676

7777
const VALID_OPTIONS_DIAGNOSE = {
78-
'health': new Set([ 'https_port', 'deployment_type', 'all_account_details', 'all_bucket_details', 'all_connection_details', 'notif_storage_threshold', 'lifecycle', ...CLI_MUTUAL_OPTIONS]),
78+
'health': new Set([ 'https_port', 'deployment_type', 'all_account_details', 'all_bucket_details', 'all_connection_details', 'notif_storage_threshold', 'lifecycle', 'disable_service_validation', ...CLI_MUTUAL_OPTIONS]),
7979
'gather-logs': new Set([ CONFIG_ROOT_FLAG]),
8080
'metrics': new Set([CONFIG_ROOT_FLAG])
8181
};

src/nc/nc_utils.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,15 @@ function check_root_account_owns_user(root_account, account) {
2424
return root_account._id === account.owner;
2525
}
2626

27+
/**
28+
* @returns {boolean} true if the current environment is a NooBaa non containerized environment
29+
*/
30+
function is_nc_enviorment() {
31+
return process.env.NC_NSFS_NO_DB_ENV && process.env.NC_NSFS_NO_DB_ENV === 'true';
32+
}
33+
2734
// EXPORTS
2835
exports.generate_id = generate_id;
2936
exports.check_root_account_owns_user = check_root_account_owns_user;
37+
exports.is_nc_enviorment = is_nc_enviorment;
3038

src/test/unit_tests/test_nc_with_a_couple_of_forks.js

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,4 +223,27 @@ const s3_uid6001 = generate_s3_client(access_details.access_key,
223223
await s3_uid6001.deleteBucket({ Bucket: bucket_name4 });
224224
await fs.promises.rm(new_bucket_path_param2, { recursive: true });
225225
});
226+
227+
mocha.it('health concurrency with load - ping forks', async function() {
228+
this.timeout(100000); // eslint-disable-line no-invalid-this
229+
230+
for (let i = 0; i < 10; i++) {
231+
// a couple of requests for health check
232+
const failed_operations = [];
233+
const successful_operations = [];
234+
const num_of_concurrency = 10;
235+
for (let j = 0; j < num_of_concurrency; j++) {
236+
exec_manage_cli(TYPES.DIAGNOSE, 'health', {disable_service_validation: true})
237+
.catch(err => failed_operations.push(err))
238+
.then(res => successful_operations.push(res));
239+
}
240+
await P.delay(7000);
241+
assert.equal(successful_operations.length, num_of_concurrency);
242+
assert.equal(failed_operations.length, 0);
243+
for (const res of successful_operations) {
244+
const parsed_res = JSON.parse(res);
245+
assert.strictEqual(parsed_res.response.reply.status, 'OK');
246+
}
247+
}
248+
});
226249
});

src/util/fork_utils.js

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ const prom_reporting = require('../server/analytic_services/prometheus_reporting
1010
const NoobaaEvent = require('../manage_nsfs/manage_nsfs_events_utils').NoobaaEvent;
1111
const config = require('../../config');
1212
const stats_collector_utils = require('./stats_collector_utils');
13+
const { once } = require('node:events');
14+
const { is_nc_enviorment } = require('../nc/nc_utils');
1315

1416

1517
const io_stats = {
@@ -27,7 +29,7 @@ const fs_workers_stats = {};
2729
* When count > 0 the primary process will fork worker processes to process incoming http requests.
2830
* In case of any worker exit, also the entire process group will exit.
2931
* @see https://nodejs.org/api/cluster.html
30-
*
32+
*
3133
* @param {number} [metrics_port]
3234
* @param {number} [https_metrics_port]
3335
* @param {string} [nsfs_config_root] nsfs configuration path
@@ -36,15 +38,19 @@ const fs_workers_stats = {};
3638
*/
3739
async function start_workers(metrics_port, https_metrics_port, nsfs_config_root, count = 0) {
3840
const exit_events = [];
41+
const fork_port_offsets = [];
3942
if (cluster.isPrimary && count > 0) {
4043
for (let i = 0; i < count; ++i) {
4144
const worker = cluster.fork();
4245
console.warn('WORKER started', { id: worker.id, pid: worker.process.pid });
46+
// no need to await. will run once the worker is ready to receive messages
47+
_send_fork_server_message(worker, i, nsfs_config_root);
48+
fork_port_offsets.push(worker.id);
4349
}
4450

4551
// We don't want to leave the process with a partial set of workers,
4652
// so if any worker exits, we will print an error message in the logs and start a new one.
47-
cluster.on('exit', (worker, code, signal) => {
53+
cluster.on('exit', async (worker, code, signal) => {
4854
console.warn('WORKER exit', { id: worker.id, pid: worker.process.pid, code, signal });
4955
new NoobaaEvent(NoobaaEvent.FORK_EXIT).create_event(undefined, { id: worker.id, pid: worker.process.pid,
5056
code: code, signal: signal}, undefined);
@@ -66,7 +72,11 @@ async function start_workers(metrics_port, https_metrics_port, nsfs_config_root,
6672
console.warn(`${exit_events.length} exit events in the last ${config.NSFS_EXIT_EVENTS_TIME_FRAME_MIN} minutes,` +
6773
` max allowed are: ${config.NSFS_MAX_EXIT_EVENTS_PER_TIME_FRAME}`);
6874
const new_worker = cluster.fork();
69-
console.warn('WORKER re-started', { id: new_worker.id, pid: new_worker.process.pid });
75+
const offset = fork_port_offsets.findIndex(id => id === worker.id);
76+
_send_fork_server_message(new_worker, offset, nsfs_config_root);
77+
fork_port_offsets[offset] = new_worker.id;
78+
const port = is_nc_enviorment() ? {port: config.ENDPOINT_FORK_PORT_BASE + offset} : {};
79+
console.warn('WORKER re-started', { id: new_worker.id, pid: new_worker.process.pid, ...port});
7080
});
7181
for (const id in cluster.workers) {
7282
if (id) {
@@ -101,6 +111,29 @@ function nsfs_io_stats_handler(msg) {
101111
}
102112
}
103113

114+
/**
115+
* Sends a message to the worker to start the fork server with the giver port
116+
* NOTE - currently runs only on non containerized enviorment
117+
* @param {*} worker
118+
* @param {*} offset
119+
* @param {*} nsfs_config_root
120+
*/
121+
async function _send_fork_server_message(worker, offset, nsfs_config_root) {
122+
const is_nc = is_nc_enviorment();
123+
if (is_nc && offset >= 0) {
124+
//wait for the worker to be ready to receive messages
125+
try {
126+
await once(worker, 'message', { signal: AbortSignal.timeout(5 * 60000) });
127+
worker.send({
128+
nsfs_config_root: nsfs_config_root,
129+
health_port: config.ENDPOINT_FORK_PORT_BASE + offset
130+
});
131+
} catch (err) {
132+
dbg.warn(`Timeout: It took more than 5 minute to get a message from worker ${worker.id} do not send start server message`);
133+
}
134+
}
135+
}
136+
104137
function _update_ops_stats(stats) {
105138
//Go over the op_stats
106139
for (const op_name of stats_collector_utils.op_names) {

0 commit comments

Comments
 (0)