Skip to content

Commit 328e021

Browse files
committed
NSFS | add dedicated servers for each forks for health checks
Signed-off-by: nadav mizrahi <nadav.mizrahi16@gmail.com>
1 parent de301b6 commit 328e021

File tree

10 files changed

+226
-106
lines changed

10 files changed

+226
-106
lines changed

config.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -979,6 +979,8 @@ config.ENDPOINT_SSL_PORT = Number(process.env.ENDPOINT_SSL_PORT) || 6443;
979979
// Remove the NSFS condition when NSFS starts to support STS.
980980
config.ENDPOINT_SSL_STS_PORT = Number(process.env.ENDPOINT_SSL_STS_PORT) || (process.env.NC_NSFS_NO_DB_ENV === 'true' ? -1 : 7443);
981981
config.ENDPOINT_SSL_IAM_PORT = Number(process.env.ENDPOINT_SSL_IAM_PORT) || -1;
982+
// each fork will get port in range [ENDPOINT_FORK_PORT_BASE, ENDPOINT_FORK_PORT_BASE + number of forks - 1)]
983+
config.ENDPOINT_FORK_PORT_BASE = Number(process.env.ENDPOINT_FORK_PORT_BASE) || 6002;
982984
config.ALLOW_HTTP = false;
983985
config.ALLOW_HTTP_METRICS = true;
984986
config.ALLOW_HTTPS_METRICS = true;

src/endpoint/endpoint.js

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ const { SemaphoreMonitor } = require('../server/bg_services/semaphore_monitor');
4343
const prom_reporting = require('../server/analytic_services/prometheus_reporting');
4444
const { PersistentLogger } = require('../util/persistent_logger');
4545
const { get_notification_logger } = require('../util/notifications_util');
46+
const { is_nc_enviorment } = require('../nc/nc_utils');
4647
const NoobaaEvent = require('../manage_nsfs/manage_nsfs_events_utils').NoobaaEvent;
4748
const cluster = /** @type {import('node:cluster').Cluster} */ (
4849
/** @type {unknown} */ (require('node:cluster'))
@@ -57,7 +58,8 @@ const SERVICES_TYPES_ENUM = Object.freeze({
5758
S3: 'S3',
5859
STS: 'STS',
5960
IAM: 'IAM',
60-
METRICS: 'METRICS'
61+
METRICS: 'METRICS',
62+
FORK_HEALTH: 'FORK_HEALTH',
6163
});
6264

6365
const new_umask = process.env.NOOBAA_ENDPOINT_UMASK || 0o000;
@@ -117,11 +119,11 @@ async function main(options = {}) {
117119
const https_metrics_port = options.https_metrics_port || config.EP_METRICS_SERVER_SSL_PORT;
118120
/**
119121
* Please notice that we can run the main in 2 states:
120-
* 1. Only the primary process runs the main (fork is 0 or undefined) - everything that
122+
* 1. Only the primary process runs the main (fork is 0 or undefined) - everything that
121123
* is implemented here would be run by this process.
122-
* 2. A primary process with multiple forks (IMPORTANT) - if there is implementation that
123-
* in only relevant to the primary process it should be implemented in
124-
* fork_utils.start_workers because the primary process returns after start_workers
124+
* 2. A primary process with multiple forks (IMPORTANT) - if there is implementation that
125+
* in only relevant to the primary process it should be implemented in
126+
* fork_utils.start_workers because the primary process returns after start_workers
125127
* and the forks will continue executing the code lines in this function
126128
* */
127129
const is_workers_started_from_primary = await fork_utils.start_workers(http_metrics_port, https_metrics_port,
@@ -202,14 +204,29 @@ async function main(options = {}) {
202204
{ ...options, https_port: https_port_s3, http_port: http_port_s3, virtual_hosts, bucket_logger, notification_logger });
203205
await start_endpoint_server_and_cert(SERVICES_TYPES_ENUM.STS, init_request_sdk, { https_port: https_port_sts, virtual_hosts });
204206
await start_endpoint_server_and_cert(SERVICES_TYPES_ENUM.IAM, init_request_sdk, { https_port: https_port_iam });
205-
207+
const is_nc = is_nc_enviorment();
208+
// fork health server currently runs only on non containerized enviorment
209+
if (is_nc) {
210+
// current process is the primary and only fork. start the fork server directly with the base port
211+
if (cluster.isPrimary) {
212+
await fork_message_request_handler({
213+
nsfs_config_root: options.nsfs_config_root,
214+
health_port: config.ENDPOINT_FORK_PORT_BASE
215+
});
216+
// current process is a worker so we listen to get the port from the primary process.
217+
} else {
218+
process.on('message', fork_message_request_handler);
219+
//send a message to the primary process that we are ready to receive messages
220+
process.send('ready');
221+
}
222+
}
206223

207224
// START METRICS SERVER
208225
if ((http_metrics_port > 0 || https_metrics_port > 0) && cluster.isPrimary) {
209226
await prom_reporting.start_server(http_metrics_port, https_metrics_port, false, options.nsfs_config_root);
210227
}
211228

212-
// TODO: currently NC NSFS deployments don't have internal_rpc_client nor db,
229+
// TODO: currently NC NSFS deployments don't have internal_rpc_client nor db,
213230
// there for namespace monitor won't be registered
214231
if (internal_rpc_client && config.NAMESPACE_MONITOR_ENABLED) {
215232
endpoint_stats_collector.instance().set_rpc_client(internal_rpc_client);
@@ -289,8 +306,6 @@ function create_endpoint_handler(server_type, init_request_sdk, { virtual_hosts,
289306
return blob_rest_handler(req, res);
290307
} else if (req.url.startsWith('/total_fork_count')) {
291308
return fork_count_handler(req, res);
292-
} else if (req.url.startsWith('/endpoint_fork_id')) {
293-
return endpoint_fork_id_handler(req, res);
294309
} else if (req.url.startsWith('/_/')) {
295310
// internals non S3 requests
296311
const api = req.url.slice('/_/'.length);
@@ -531,8 +546,37 @@ function unavailable_handler(req, res) {
531546
res.end(reply);
532547
}
533548

549+
/**
550+
* handler for the inidivdual fork server. used to handle requests the get the worker id
551+
* currently used to check if fork is alive by the health script
552+
* @type {EndpointHandler}
553+
*/
554+
function fork_main_handler(req, res) {
555+
endpoint_utils.set_noobaa_server_header(res);
556+
endpoint_utils.prepare_rest_request(req);
557+
if (req.url.startsWith('/endpoint_fork_id')) {
558+
return endpoint_fork_id_handler(req, res);
559+
} else {
560+
return internal_api_error(req, res, `Unknown API call ${req.url}`);
561+
}
562+
}
563+
564+
/**
565+
* fork_message_request_handler is used to handle messages from the primary process.
566+
* the primary process sends a message with the designated port to start the fork server.
567+
* @param {Object} msg
568+
*/
569+
async function fork_message_request_handler(msg) {
570+
await http_utils.start_https_server(msg.health_port,
571+
SERVICES_TYPES_ENUM.FORK_HEALTH,
572+
fork_main_handler,
573+
msg.nsfs_config_root
574+
);
575+
}
576+
534577
exports.main = main;
535578
exports.create_endpoint_handler = create_endpoint_handler;
536579
exports.create_init_request_sdk = create_init_request_sdk;
580+
exports.endpoint_fork_id_handler = endpoint_fork_id_handler;
537581

538582
if (require.main === module) main();

src/manage_nsfs/health.js

Lines changed: 36 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ process.env.AWS_SDK_JS_SUPPRESS_MAINTENANCE_MODE_MESSAGE = '1';
101101
class NSFSHealth {
102102
constructor(options) {
103103
this.https_port = options.https_port;
104+
this.fork_base_port = options.fork_base_port;
104105
this.all_account_details = options.all_account_details;
105106
this.all_bucket_details = options.all_bucket_details;
106107
this.all_connection_details = options.all_connection_details;
@@ -241,10 +242,10 @@ class NSFSHealth {
241242
return service_health;
242243
}
243244

244-
async make_endpoint_health_request(url_path) {
245+
async make_endpoint_health_request(url_path, port = this.https_port) {
245246
const response = await make_https_request({
246-
HOSTNAME,
247-
port: this.https_port,
247+
hostname: HOSTNAME,
248+
port,
248249
path: url_path,
249250
method: 'GET',
250251
rejectUnauthorized: false,
@@ -260,43 +261,37 @@ class NSFSHealth {
260261
let url_path = '/total_fork_count';
261262
const worker_ids = [];
262263
let total_fork_count = 0;
264+
let fork_count_response;
263265
let response;
264266
try {
265-
const fork_count_response = await this.make_endpoint_health_request(url_path);
266-
if (!fork_count_response) {
267-
return {
268-
response: fork_response_code.NOT_RUNNING,
269-
total_fork_count: total_fork_count,
270-
running_workers: worker_ids,
271-
};
272-
}
273-
total_fork_count = fork_count_response.fork_count;
274-
if (total_fork_count > 0) {
275-
url_path = '/endpoint_fork_id';
276-
await P.retry({
277-
attempts: total_fork_count * 2,
278-
delay_ms: 1,
279-
func: async () => {
280-
const fork_id_response = await this.make_endpoint_health_request(url_path);
281-
if (fork_id_response.worker_id && !worker_ids.includes(fork_id_response.worker_id)) {
282-
worker_ids.push(fork_id_response.worker_id);
283-
}
284-
if (worker_ids.length < total_fork_count) {
285-
throw new Error('Number of running forks is less than the expected fork count.');
286-
}
287-
}
288-
});
289-
if (worker_ids.length === total_fork_count) {
290-
response = fork_response_code.RUNNING;
291-
} else {
292-
response = fork_response_code.MISSING_FORKS;
293-
}
294-
} else {
295-
response = fork_response_code.RUNNING;
296-
}
267+
fork_count_response = await this.make_endpoint_health_request(url_path);
297268
} catch (err) {
298-
dbg.log1('Error while pinging endpoint host :' + HOSTNAME + ', port ' + this.https_port, err);
299-
response = fork_response_code.NOT_RUNNING;
269+
dbg.log0('Error while pinging endpoint host :' + HOSTNAME, err);
270+
}
271+
if (!fork_count_response) {
272+
return {
273+
response: fork_response_code.NOT_RUNNING,
274+
total_fork_count: total_fork_count,
275+
running_workers: worker_ids,
276+
};
277+
}
278+
279+
total_fork_count = fork_count_response.fork_count;
280+
url_path = '/endpoint_fork_id';
281+
for (let i = 0; i < total_fork_count; i++) {
282+
const port = this.fork_base_port + i;
283+
try {
284+
const fork_id_response = await this.make_endpoint_health_request(url_path, port);
285+
worker_ids.push(fork_id_response.worker_id);
286+
} catch (err) {
287+
dbg.log0('Error while pinging fork :' + HOSTNAME + ', port ' + port, err);
288+
}
289+
}
290+
if (worker_ids.length < total_fork_count) {
291+
dbg.log0('Number of running forks is less than the expected fork count.');
292+
response = fork_response_code.MISSING_FORKS;
293+
} else {
294+
response = fork_response_code.RUNNING;
300295
}
301296
return {
302297
response: response,
@@ -635,6 +630,7 @@ class NSFSHealth {
635630
async function get_health_status(argv, config_fs) {
636631
try {
637632
const https_port = Number(argv.https_port) || config.ENDPOINT_SSL_PORT;
633+
const fork_base_port = Number(argv.fork_base_port) || config.ENDPOINT_FORK_PORT_BASE;
638634
const deployment_type = argv.deployment_type || 'nc';
639635
const all_account_details = get_boolean_or_string_value(argv.all_account_details);
640636
const all_bucket_details = get_boolean_or_string_value(argv.all_bucket_details);
@@ -643,8 +639,9 @@ async function get_health_status(argv, config_fs) {
643639
const lifecycle = get_boolean_or_string_value(argv.lifecycle);
644640

645641
if (deployment_type === 'nc') {
646-
const health = new NSFSHealth({ https_port,
647-
all_account_details, all_bucket_details, all_connection_details, notif_storage_threshold, lifecycle, config_fs });
642+
const health = new NSFSHealth({ https_port, fork_base_port,
643+
all_account_details, all_bucket_details, all_connection_details,
644+
notif_storage_threshold, lifecycle, config_fs });
648645
const health_status = await health.nc_nsfs_health();
649646
write_stdout_response(ManageCLIResponse.HealthStatus, health_status);
650647
} else {

src/nc/nc_utils.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,15 @@ function check_root_account_owns_user(root_account, account) {
2424
return root_account._id === account.owner;
2525
}
2626

27+
/**
28+
* @returns {boolean} true if the current environment is a NooBaa non containerized environment
29+
*/
30+
function is_nc_enviorment() {
31+
return process.env.NC_NSFS_NO_DB_ENV && process.env.NC_NSFS_NO_DB_ENV === 'true';
32+
}
33+
2734
// EXPORTS
2835
exports.generate_id = generate_id;
2936
exports.check_root_account_owns_user = check_root_account_owns_user;
37+
exports.is_nc_enviorment = is_nc_enviorment;
3038

src/test/system_tests/test_utils.js

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ const nb_native = require('../../util/nb_native');
1616
const { CONFIG_TYPES } = require('../../sdk/config_fs');
1717
const native_fs_utils = require('../../util/native_fs_utils');
1818
const { NodeHttpHandler } = require("@smithy/node-http-handler");
19+
const sinon = require('sinon');
1920

2021
const GPFS_ROOT_PATH = process.env.GPFS_ROOT_PATH;
2122
const IS_GPFS = !_.isUndefined(GPFS_ROOT_PATH);
@@ -790,6 +791,28 @@ const run_or_skip_test = cond => {
790791
} else return it.skip;
791792
};
792793

794+
/**
795+
* set_mock_functions sets mock functions used by the health script
796+
* the second param is an object having the name of the mock functions as the keys and
797+
* the value is an array of responses by the order of their call
798+
* @param {Object} Health
799+
* @param {{get_endpoint_response?: Object[], get_service_state?: Object[],
800+
* get_system_config_file?: Object[], get_service_memory_usage?: Object[],
801+
* get_lifecycle_health_status?: Object, get_latest_lifecycle_run_status?: Object}} mock_function_responses
802+
*/
803+
function set_health_mock_functions(Health, mock_function_responses) {
804+
for (const mock_function_name of Object.keys(mock_function_responses)) {
805+
const mock_function_responses_arr = mock_function_responses[mock_function_name];
806+
const obj_to_stub = mock_function_name === 'get_system_config_file' ? Health.config_fs : Health;
807+
808+
if (obj_to_stub[mock_function_name]?.restore) obj_to_stub[mock_function_name]?.restore();
809+
const stub = sinon.stub(obj_to_stub, mock_function_name);
810+
for (let i = 0; i < mock_function_responses_arr.length; i++) {
811+
stub.onCall(i).returns(Promise.resolve(mock_function_responses_arr[i]));
812+
}
813+
}
814+
}
815+
793816
exports.run_or_skip_test = run_or_skip_test;
794817
exports.blocks_exist_on_cloud = blocks_exist_on_cloud;
795818
exports.create_hosts_pool = create_hosts_pool;
@@ -830,3 +853,4 @@ exports.fail_test_if_default_config_dir_exists = fail_test_if_default_config_dir
830853
exports.create_config_dir = create_config_dir;
831854
exports.clean_config_dir = clean_config_dir;
832855
exports.CLI_UNSET_EMPTY_STRING = CLI_UNSET_EMPTY_STRING;
856+
exports.set_health_mock_functions = set_health_mock_functions;

0 commit comments

Comments
 (0)