Skip to content

Commit 4ff4dfb

Browse files
committed
NSFS | add dedicated servers for each forks for health checks
Signed-off-by: nadav mizrahi <nadav.mizrahi16@gmail.com>
1 parent de301b6 commit 4ff4dfb

File tree

8 files changed

+147
-60
lines changed

8 files changed

+147
-60
lines changed

config.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -979,6 +979,8 @@ config.ENDPOINT_SSL_PORT = Number(process.env.ENDPOINT_SSL_PORT) || 6443;
979979
// Remove the NSFS condition when NSFS starts to support STS.
980980
config.ENDPOINT_SSL_STS_PORT = Number(process.env.ENDPOINT_SSL_STS_PORT) || (process.env.NC_NSFS_NO_DB_ENV === 'true' ? -1 : 7443);
981981
config.ENDPOINT_SSL_IAM_PORT = Number(process.env.ENDPOINT_SSL_IAM_PORT) || -1;
982+
// each fork will get port in range [ENDPOINT_FORK_PORT_BASE, ENDPOINT_FORK_PORT_BASE + number of forks - 1)]
983+
config.ENDPOINT_FORK_PORT_BASE = Number(process.env.ENDPOINT_FORK_PORT_BASE) || 6002;
982984
config.ALLOW_HTTP = false;
983985
config.ALLOW_HTTP_METRICS = true;
984986
config.ALLOW_HTTPS_METRICS = true;

src/endpoint/endpoint.js

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ const SERVICES_TYPES_ENUM = Object.freeze({
5757
S3: 'S3',
5858
STS: 'STS',
5959
IAM: 'IAM',
60-
METRICS: 'METRICS'
60+
METRICS: 'METRICS',
61+
FORK_HEALTH: 'FORK_HEALTH',
6162
});
6263

6364
const new_umask = process.env.NOOBAA_ENDPOINT_UMASK || 0o000;
@@ -117,11 +118,11 @@ async function main(options = {}) {
117118
const https_metrics_port = options.https_metrics_port || config.EP_METRICS_SERVER_SSL_PORT;
118119
/**
119120
* Please notice that we can run the main in 2 states:
120-
* 1. Only the primary process runs the main (fork is 0 or undefined) - everything that
121+
* 1. Only the primary process runs the main (fork is 0 or undefined) - everything that
121122
* is implemented here would be run by this process.
122-
* 2. A primary process with multiple forks (IMPORTANT) - if there is implementation that
123-
* in only relevant to the primary process it should be implemented in
124-
* fork_utils.start_workers because the primary process returns after start_workers
123+
* 2. A primary process with multiple forks (IMPORTANT) - if there is implementation that
124+
* in only relevant to the primary process it should be implemented in
125+
* fork_utils.start_workers because the primary process returns after start_workers
125126
* and the forks will continue executing the code lines in this function
126127
* */
127128
const is_workers_started_from_primary = await fork_utils.start_workers(http_metrics_port, https_metrics_port,
@@ -202,14 +203,27 @@ async function main(options = {}) {
202203
{ ...options, https_port: https_port_s3, http_port: http_port_s3, virtual_hosts, bucket_logger, notification_logger });
203204
await start_endpoint_server_and_cert(SERVICES_TYPES_ENUM.STS, init_request_sdk, { https_port: https_port_sts, virtual_hosts });
204205
await start_endpoint_server_and_cert(SERVICES_TYPES_ENUM.IAM, init_request_sdk, { https_port: https_port_iam });
205-
206+
const is_nc = process.env.NC_NSFS_NO_DB_ENV && process.env.NC_NSFS_NO_DB_ENV === 'true';
207+
// fork health server currently runs only on non containerized enviorment
208+
if (is_nc) {
209+
if (cluster.isPrimary) {
210+
await fork_message_request_handler({
211+
nsfs_config_root: options.nsfs_config_root,
212+
health_port: config.ENDPOINT_FORK_PORT_BASE
213+
});
214+
} else {
215+
process.on('message', fork_message_request_handler);
216+
//send a message to the primary process that we are ready to receive messages
217+
process.send('ready');
218+
}
219+
}
206220

207221
// START METRICS SERVER
208222
if ((http_metrics_port > 0 || https_metrics_port > 0) && cluster.isPrimary) {
209223
await prom_reporting.start_server(http_metrics_port, https_metrics_port, false, options.nsfs_config_root);
210224
}
211225

212-
// TODO: currently NC NSFS deployments don't have internal_rpc_client nor db,
226+
// TODO: currently NC NSFS deployments don't have internal_rpc_client nor db,
213227
// there for namespace monitor won't be registered
214228
if (internal_rpc_client && config.NAMESPACE_MONITOR_ENABLED) {
215229
endpoint_stats_collector.instance().set_rpc_client(internal_rpc_client);
@@ -289,8 +303,6 @@ function create_endpoint_handler(server_type, init_request_sdk, { virtual_hosts,
289303
return blob_rest_handler(req, res);
290304
} else if (req.url.startsWith('/total_fork_count')) {
291305
return fork_count_handler(req, res);
292-
} else if (req.url.startsWith('/endpoint_fork_id')) {
293-
return endpoint_fork_id_handler(req, res);
294306
} else if (req.url.startsWith('/_/')) {
295307
// internals non S3 requests
296308
const api = req.url.slice('/_/'.length);
@@ -531,8 +543,25 @@ function unavailable_handler(req, res) {
531543
res.end(reply);
532544
}
533545

546+
function fork_main_handler(req, res) {
547+
endpoint_utils.set_noobaa_server_header(res);
548+
endpoint_utils.prepare_rest_request(req);
549+
if (req.url.startsWith('/endpoint_fork_id')) {
550+
return endpoint_fork_id_handler(req, res);
551+
}
552+
}
553+
554+
async function fork_message_request_handler(msg) {
555+
await http_utils.start_https_server(msg.health_port,
556+
SERVICES_TYPES_ENUM.FORK_HEALTH,
557+
fork_main_handler,
558+
msg.nsfs_config_root
559+
);
560+
}
561+
534562
exports.main = main;
535563
exports.create_endpoint_handler = create_endpoint_handler;
536564
exports.create_init_request_sdk = create_init_request_sdk;
565+
exports.endpoint_fork_id_handler = endpoint_fork_id_handler;
537566

538567
if (require.main === module) main();

src/manage_nsfs/health.js

Lines changed: 45 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,14 @@ process.env.AWS_SDK_JS_SUPPRESS_MAINTENANCE_MODE_MESSAGE = '1';
101101
class NSFSHealth {
102102
constructor(options) {
103103
this.https_port = options.https_port;
104+
this.fork_base_port = options.fork_base_port;
104105
this.all_account_details = options.all_account_details;
105106
this.all_bucket_details = options.all_bucket_details;
106107
this.all_connection_details = options.all_connection_details;
107108
this.notif_storage_threshold = options.notif_storage_threshold;
108109
this.lifecycle = options.lifecycle;
109110
this.config_fs = options.config_fs;
111+
this.disable_service_validation = options.disable_service_validation;
110112
}
111113

112114
/**
@@ -121,7 +123,13 @@ class NSFSHealth {
121123
async nc_nsfs_health() {
122124
let endpoint_state;
123125
let memory;
124-
const noobaa_service_state = await this.get_service_state(NOOBAA_SERVICE_NAME);
126+
let noobaa_service_state;
127+
if (this.disable_service_validation) {
128+
// if service validation is disabled, we assume the service is running, set dummy pid. should be used only for testing
129+
noobaa_service_state = { name: NOOBAA_SERVICE_NAME, service_status: 'active', pid: '100' };
130+
} else {
131+
noobaa_service_state = await this.get_service_state(NOOBAA_SERVICE_NAME);
132+
}
125133
const { service_status, pid } = noobaa_service_state;
126134
if (pid !== '0') {
127135
endpoint_state = await this.get_endpoint_response();
@@ -241,10 +249,10 @@ class NSFSHealth {
241249
return service_health;
242250
}
243251

244-
async make_endpoint_health_request(url_path) {
252+
async make_endpoint_health_request(url_path, port = this.https_port) {
245253
const response = await make_https_request({
246-
HOSTNAME,
247-
port: this.https_port,
254+
hostname: HOSTNAME,
255+
port,
248256
path: url_path,
249257
method: 'GET',
250258
rejectUnauthorized: false,
@@ -260,43 +268,37 @@ class NSFSHealth {
260268
let url_path = '/total_fork_count';
261269
const worker_ids = [];
262270
let total_fork_count = 0;
271+
let fork_count_response;
263272
let response;
264273
try {
265-
const fork_count_response = await this.make_endpoint_health_request(url_path);
266-
if (!fork_count_response) {
267-
return {
268-
response: fork_response_code.NOT_RUNNING,
269-
total_fork_count: total_fork_count,
270-
running_workers: worker_ids,
271-
};
272-
}
273-
total_fork_count = fork_count_response.fork_count;
274-
if (total_fork_count > 0) {
275-
url_path = '/endpoint_fork_id';
276-
await P.retry({
277-
attempts: total_fork_count * 2,
278-
delay_ms: 1,
279-
func: async () => {
280-
const fork_id_response = await this.make_endpoint_health_request(url_path);
281-
if (fork_id_response.worker_id && !worker_ids.includes(fork_id_response.worker_id)) {
282-
worker_ids.push(fork_id_response.worker_id);
283-
}
284-
if (worker_ids.length < total_fork_count) {
285-
throw new Error('Number of running forks is less than the expected fork count.');
286-
}
287-
}
288-
});
289-
if (worker_ids.length === total_fork_count) {
290-
response = fork_response_code.RUNNING;
291-
} else {
292-
response = fork_response_code.MISSING_FORKS;
293-
}
294-
} else {
295-
response = fork_response_code.RUNNING;
296-
}
274+
fork_count_response = await this.make_endpoint_health_request(url_path);
297275
} catch (err) {
298-
dbg.log1('Error while pinging endpoint host :' + HOSTNAME + ', port ' + this.https_port, err);
299-
response = fork_response_code.NOT_RUNNING;
276+
dbg.log0('Error while pinging endpoint host :' + HOSTNAME, err);
277+
}
278+
if (!fork_count_response) {
279+
return {
280+
response: fork_response_code.NOT_RUNNING,
281+
total_fork_count: total_fork_count,
282+
running_workers: worker_ids,
283+
};
284+
}
285+
286+
total_fork_count = fork_count_response.fork_count;
287+
url_path = '/endpoint_fork_id';
288+
for (let i = 0; i < total_fork_count; i++) {
289+
const port = this.fork_base_port + i;
290+
try {
291+
const fork_id_response = await this.make_endpoint_health_request(url_path, port);
292+
worker_ids.push(fork_id_response.worker_id);
293+
} catch (err) {
294+
dbg.log0('Error while pinging fork :' + HOSTNAME + ', port ' + port, err);
295+
}
296+
}
297+
if (worker_ids.length < total_fork_count) {
298+
dbg.log0('Number of running forks is less than the expected fork count.');
299+
response = fork_response_code.MISSING_FORKS;
300+
} else {
301+
response = fork_response_code.RUNNING;
300302
}
301303
return {
302304
response: response,
@@ -635,16 +637,19 @@ class NSFSHealth {
635637
async function get_health_status(argv, config_fs) {
636638
try {
637639
const https_port = Number(argv.https_port) || config.ENDPOINT_SSL_PORT;
640+
const fork_base_port = Number(argv.fork_base_port) || config.ENDPOINT_FORK_PORT_BASE;
638641
const deployment_type = argv.deployment_type || 'nc';
639642
const all_account_details = get_boolean_or_string_value(argv.all_account_details);
640643
const all_bucket_details = get_boolean_or_string_value(argv.all_bucket_details);
641644
const all_connection_details = get_boolean_or_string_value(argv.all_connection_details);
642645
const notif_storage_threshold = get_boolean_or_string_value(argv.notif_storage_threshold);
643646
const lifecycle = get_boolean_or_string_value(argv.lifecycle);
647+
const disable_service_validation = get_boolean_or_string_value(argv.disable_service_validation);
644648

645649
if (deployment_type === 'nc') {
646-
const health = new NSFSHealth({ https_port,
647-
all_account_details, all_bucket_details, all_connection_details, notif_storage_threshold, lifecycle, config_fs });
650+
const health = new NSFSHealth({ https_port, fork_base_port,
651+
all_account_details, all_bucket_details, all_connection_details,
652+
notif_storage_threshold, lifecycle, config_fs, disable_service_validation });
648653
const health_status = await health.nc_nsfs_health();
649654
write_stdout_response(ManageCLIResponse.HealthStatus, health_status);
650655
} else {

src/manage_nsfs/manage_nsfs_constants.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ const VALID_OPTIONS_GLACIER = {
7575
};
7676

7777
const VALID_OPTIONS_DIAGNOSE = {
78-
'health': new Set([ 'https_port', 'deployment_type', 'all_account_details', 'all_bucket_details', 'all_connection_details', 'notif_storage_threshold', 'lifecycle', ...CLI_MUTUAL_OPTIONS]),
78+
'health': new Set([ 'https_port', 'deployment_type', 'all_account_details', 'all_bucket_details', 'all_connection_details', 'notif_storage_threshold', 'lifecycle', 'disable_service_validation', ...CLI_MUTUAL_OPTIONS]),
7979
'gather-logs': new Set([ CONFIG_ROOT_FLAG]),
8080
'metrics': new Set([CONFIG_ROOT_FLAG])
8181
};

src/test/unit_tests/test_nc_with_a_couple_of_forks.js

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,4 +223,27 @@ const s3_uid6001 = generate_s3_client(access_details.access_key,
223223
await s3_uid6001.deleteBucket({ Bucket: bucket_name4 });
224224
await fs.promises.rm(new_bucket_path_param2, { recursive: true });
225225
});
226+
227+
mocha.it('health concurrency with load - ping forks', async function() {
228+
this.timeout(100000); // eslint-disable-line no-invalid-this
229+
230+
for (let i = 0; i < 10; i++) {
231+
// a couple of requests for health check
232+
const failed_operations = [];
233+
const successful_operations = [];
234+
const num_of_concurrency = 10;
235+
for (let j = 0; j < num_of_concurrency; j++) {
236+
exec_manage_cli(TYPES.DIAGNOSE, 'health', {disable_service_validation: true})
237+
.catch(err => failed_operations.push(err))
238+
.then(res => successful_operations.push(res));
239+
}
240+
await P.delay(7000);
241+
assert.equal(successful_operations.length, num_of_concurrency);
242+
assert.equal(failed_operations.length, 0);
243+
for (const res of successful_operations) {
244+
const parsed_res = JSON.parse(res);
245+
assert.strictEqual(parsed_res.response.reply.status, 'OK');
246+
}
247+
}
248+
});
226249
});

src/util/fork_utils.js

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const prom_reporting = require('../server/analytic_services/prometheus_reporting
1010
const NoobaaEvent = require('../manage_nsfs/manage_nsfs_events_utils').NoobaaEvent;
1111
const config = require('../../config');
1212
const stats_collector_utils = require('./stats_collector_utils');
13+
const { once } = require('node:events');
1314

1415

1516
const io_stats = {
@@ -27,7 +28,7 @@ const fs_workers_stats = {};
2728
* When count > 0 the primary process will fork worker processes to process incoming http requests.
2829
* In case of any worker exit, also the entire process group will exit.
2930
* @see https://nodejs.org/api/cluster.html
30-
*
31+
*
3132
* @param {number} [metrics_port]
3233
* @param {number} [https_metrics_port]
3334
* @param {string} [nsfs_config_root] nsfs configuration path
@@ -36,15 +37,19 @@ const fs_workers_stats = {};
3637
*/
3738
async function start_workers(metrics_port, https_metrics_port, nsfs_config_root, count = 0) {
3839
const exit_events = [];
40+
const fork_port_offsets = [];
3941
if (cluster.isPrimary && count > 0) {
4042
for (let i = 0; i < count; ++i) {
4143
const worker = cluster.fork();
4244
console.warn('WORKER started', { id: worker.id, pid: worker.process.pid });
45+
// no need to await. will run once the worker is ready to receive messages
46+
_send_fork_server_message(worker, i, nsfs_config_root);
47+
fork_port_offsets.push(worker.id);
4348
}
4449

4550
// We don't want to leave the process with a partial set of workers,
4651
// so if any worker exits, we will print an error message in the logs and start a new one.
47-
cluster.on('exit', (worker, code, signal) => {
52+
cluster.on('exit', async (worker, code, signal) => {
4853
console.warn('WORKER exit', { id: worker.id, pid: worker.process.pid, code, signal });
4954
new NoobaaEvent(NoobaaEvent.FORK_EXIT).create_event(undefined, { id: worker.id, pid: worker.process.pid,
5055
code: code, signal: signal}, undefined);
@@ -66,6 +71,9 @@ async function start_workers(metrics_port, https_metrics_port, nsfs_config_root,
6671
console.warn(`${exit_events.length} exit events in the last ${config.NSFS_EXIT_EVENTS_TIME_FRAME_MIN} minutes,` +
6772
` max allowed are: ${config.NSFS_MAX_EXIT_EVENTS_PER_TIME_FRAME}`);
6873
const new_worker = cluster.fork();
74+
const offset = fork_port_offsets.findIndex(id => id === worker.id);
75+
await _send_fork_server_message(new_worker, offset, nsfs_config_root);
76+
fork_port_offsets[offset] = new_worker.id;
6977
console.warn('WORKER re-started', { id: new_worker.id, pid: new_worker.process.pid });
7078
});
7179
for (const id in cluster.workers) {
@@ -101,6 +109,25 @@ function nsfs_io_stats_handler(msg) {
101109
}
102110
}
103111

112+
/**
113+
* Sends a message to the worker to start the fork server with the giver port
114+
* NOTE - currently runs only on non containerized enviorment
115+
* @param {*} worker
116+
* @param {*} offset
117+
* @param {*} nsfs_config_root
118+
*/
119+
async function _send_fork_server_message(worker, offset, nsfs_config_root) {
120+
const is_nc = process.env.NC_NSFS_NO_DB_ENV && process.env.NC_NSFS_NO_DB_ENV === 'true';
121+
if (is_nc && offset >= 0) {
122+
//wait for the worker to be ready to receive messages
123+
await once(worker, 'message');
124+
worker.send({
125+
nsfs_config_root: nsfs_config_root,
126+
health_port: config.ENDPOINT_FORK_PORT_BASE + offset
127+
});
128+
}
129+
}
130+
104131
function _update_ops_stats(stats) {
105132
//Go over the op_stats
106133
for (const op_name of stats_collector_utils.op_names) {

src/util/http_utils.js

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -799,8 +799,8 @@ function http_get(uri, options) {
799799
/**
800800
* start_https_server starts the secure https server by type and options and creates a certificate if required
801801
* @param {number} https_port
802-
* @param {('S3'|'IAM'|'STS'|'METRICS')} server_type
803-
* @param {Object} request_handler
802+
* @param {('S3'|'IAM'|'STS'|'METRICS'|'FORK_HEALTH')} server_type
803+
* @param {Object} request_handler
804804
*/
805805
async function start_https_server(https_port, server_type, request_handler, nsfs_config_root) {
806806
const ssl_cert_info = await ssl_utils.get_ssl_cert_info(server_type, nsfs_config_root);
@@ -818,8 +818,8 @@ async function start_https_server(https_port, server_type, request_handler, nsfs
818818
/**
819819
* start_http_server starts the non-secure http server by type
820820
* @param {number} http_port
821-
* @param {('S3'|'IAM'|'STS'|'METRICS')} server_type
822-
* @param {Object} request_handler
821+
* @param {('S3'|'IAM'|'STS'|'METRICS'|'FORK_HEALTH')} server_type
822+
* @param {Object} request_handler
823823
*/
824824
async function start_http_server(http_port, server_type, request_handler) {
825825
const http_server = http.createServer(request_handler);
@@ -834,11 +834,11 @@ async function start_http_server(http_port, server_type, request_handler) {
834834
* Listen server for http/https ports
835835
* @param {number} port
836836
* @param {http.Server} server
837-
* @param {('S3'|'IAM'|'STS'|'METRICS')} server_type
837+
* @param {('S3'|'IAM'|'STS'|'METRICS'|'FORK_HEALTH')} server_type
838838
*/
839839
function listen_port(port, server, server_type) {
840840
return new Promise((resolve, reject) => {
841-
if (server_type !== 'METRICS') {
841+
if (server_type !== 'METRICS' && server_type !== 'FORK_HEALTH') {
842842
setup_endpoint_server(server);
843843
}
844844
server.listen(port, err => {

0 commit comments

Comments
 (0)