Skip to content

NC Health | add dedicated servers for each forks for health checks #9045

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,8 @@ config.ENDPOINT_SSL_PORT = Number(process.env.ENDPOINT_SSL_PORT) || 6443;
// Remove the NSFS condition when NSFS starts to support STS.
config.ENDPOINT_SSL_STS_PORT = Number(process.env.ENDPOINT_SSL_STS_PORT) || (process.env.NC_NSFS_NO_DB_ENV === 'true' ? -1 : 7443);
config.ENDPOINT_SSL_IAM_PORT = Number(process.env.ENDPOINT_SSL_IAM_PORT) || -1;
// each fork will get port in range [ENDPOINT_FORK_PORT_BASE, ENDPOINT_FORK_PORT_BASE + number of forks - 1)]
config.ENDPOINT_FORK_PORT_BASE = Number(process.env.ENDPOINT_FORK_PORT_BASE) || 6002;
config.ALLOW_HTTP = false;
Comment on lines +1009 to 1011
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Guard against port-range collisions for fork servers

ENDPOINT_FORK_PORT_BASE is hard-coded to 6002, while other well-known ports (S3 = 6001, metrics = 7001-7004, etc.) are nearby.
Before the primary starts assigning ports (base + fork_index) you should verify that none of them overlap with:

  • ENDPOINT_PORT / ENDPOINT_SSL_PORT
  • metrics ports
  • any value explicitly supplied via env-vars or config.json

A quick sanity check during startup will save you from cryptic “EADDRINUSE” crashes.

+assert(_.difference(
+    _.range(config.ENDPOINT_FORK_PORT_BASE,
+            config.ENDPOINT_FORK_PORT_BASE + config.ENDPOINT_FORKS),
+    [
+        config.ENDPOINT_PORT,
+        config.ENDPOINT_SSL_PORT,
+        config.EP_METRICS_SERVER_PORT,
+        config.EP_METRICS_SERVER_SSL_PORT
+    ]).length === config.ENDPOINT_FORKS,
+   'Fork-health port range overlaps with an existing service');

Committable suggestion skipped: line range outside the PR's diff.

config.ALLOW_HTTP_METRICS = true;
config.ALLOW_HTTPS_METRICS = true;
Expand All @@ -1018,6 +1020,8 @@ config.VIRTUAL_HOSTS = process.env.VIRTUAL_HOSTS || '';

config.NC_HEALTH_ENDPOINT_RETRY_COUNT = 3;
config.NC_HEALTH_ENDPOINT_RETRY_DELAY = 10;
config.NC_FORK_SERVER_TIMEOUT = 5; // 5 minutes
config.NC_FORK_SERVER_RETRIES = 10;


/** @type {'file' | 'executable'} */
Expand Down
63 changes: 54 additions & 9 deletions src/endpoint/endpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const { SemaphoreMonitor } = require('../server/bg_services/semaphore_monitor');
const prom_reporting = require('../server/analytic_services/prometheus_reporting');
const { PersistentLogger } = require('../util/persistent_logger');
const { get_notification_logger } = require('../util/notifications_util');
const { is_nc_environment } = require('../nc/nc_utils');
const NoobaaEvent = require('../manage_nsfs/manage_nsfs_events_utils').NoobaaEvent;
const cluster = /** @type {import('node:cluster').Cluster} */ (
/** @type {unknown} */ (require('node:cluster'))
Expand All @@ -57,7 +58,8 @@ const SERVICES_TYPES_ENUM = Object.freeze({
S3: 'S3',
STS: 'STS',
IAM: 'IAM',
METRICS: 'METRICS'
METRICS: 'METRICS',
FORK_HEALTH: 'FORK_HEALTH',
});

const new_umask = process.env.NOOBAA_ENDPOINT_UMASK || 0o000;
Expand Down Expand Up @@ -117,11 +119,11 @@ async function main(options = {}) {
const https_metrics_port = options.https_metrics_port || config.EP_METRICS_SERVER_SSL_PORT;
/**
* Please notice that we can run the main in 2 states:
* 1. Only the primary process runs the main (fork is 0 or undefined) - everything that
* 1. Only the primary process runs the main (fork is 0 or undefined) - everything that
* is implemented here would be run by this process.
* 2. A primary process with multiple forks (IMPORTANT) - if there is implementation that
* in only relevant to the primary process it should be implemented in
* fork_utils.start_workers because the primary process returns after start_workers
* 2. A primary process with multiple forks (IMPORTANT) - if there is implementation that
* in only relevant to the primary process it should be implemented in
* fork_utils.start_workers because the primary process returns after start_workers
* and the forks will continue executing the code lines in this function
* */
const is_workers_started_from_primary = await fork_utils.start_workers(http_metrics_port, https_metrics_port,
Expand Down Expand Up @@ -202,14 +204,29 @@ async function main(options = {}) {
{ ...options, https_port: https_port_s3, http_port: http_port_s3, virtual_hosts, bucket_logger, notification_logger });
await start_endpoint_server_and_cert(SERVICES_TYPES_ENUM.STS, init_request_sdk, { https_port: https_port_sts, virtual_hosts });
await start_endpoint_server_and_cert(SERVICES_TYPES_ENUM.IAM, init_request_sdk, { https_port: https_port_iam });

const is_nc = is_nc_environment();
// fork health server currently runs only on non containerized enviorment
if (is_nc) {
// current process is the primary and only fork. start the fork server directly with the base port
if (cluster.isPrimary) {
await fork_message_request_handler({
nsfs_config_root: options.nsfs_config_root,
health_port: config.ENDPOINT_FORK_PORT_BASE
});
// current process is a worker so we listen to get the port from the primary process.
} else {
process.on('message', fork_message_request_handler);
//send a message to the primary process that we are ready to receive messages
process.send({ready_to_start_fork_server: true});
}
}

// START METRICS SERVER
if ((http_metrics_port > 0 || https_metrics_port > 0) && cluster.isPrimary) {
await prom_reporting.start_server(http_metrics_port, https_metrics_port, false, options.nsfs_config_root);
}

// TODO: currently NC NSFS deployments don't have internal_rpc_client nor db,
// TODO: currently NC NSFS deployments don't have internal_rpc_client nor db,
// there for namespace monitor won't be registered
if (internal_rpc_client && config.NAMESPACE_MONITOR_ENABLED) {
endpoint_stats_collector.instance().set_rpc_client(internal_rpc_client);
Expand Down Expand Up @@ -289,8 +306,6 @@ function create_endpoint_handler(server_type, init_request_sdk, { virtual_hosts,
return blob_rest_handler(req, res);
} else if (req.url.startsWith('/total_fork_count')) {
return fork_count_handler(req, res);
} else if (req.url.startsWith('/endpoint_fork_id')) {
return endpoint_fork_id_handler(req, res);
} else if (req.url.startsWith('/_/')) {
// internals non S3 requests
const api = req.url.slice('/_/'.length);
Expand Down Expand Up @@ -531,8 +546,38 @@ function unavailable_handler(req, res) {
res.end(reply);
}

/**
* handler for the inidivdual fork server. used to handle requests the get the worker id
* currently used to check if fork is alive by the health script
* @param {EndpointRequest} req
* @param {import('http').ServerResponse} res
*/
function fork_main_handler(req, res) {
endpoint_utils.set_noobaa_server_header(res);
endpoint_utils.prepare_rest_request(req);
if (req.url.startsWith('/endpoint_fork_id')) {
return endpoint_fork_id_handler(req, res);
} else {
return internal_api_error(req, res, `Unknown API call ${req.url}`);
}
}

/**
* fork_message_request_handler is used to handle messages from the primary process.
* the primary process sends a message with the designated port to start the fork server.
* @param {Object} msg
*/
async function fork_message_request_handler(msg) {
await http_utils.start_https_server(msg.health_port,
SERVICES_TYPES_ENUM.FORK_HEALTH,
fork_main_handler,
msg.nsfs_config_root
);
}
Comment on lines +570 to +576
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate message payload before boot-strapping a health server

fork_message_request_handler trusts msg.health_port blindly.
Protect against malformed or repeated messages (cluster control traffic, user error) to avoid EADDRINUSE crashes:

async function fork_message_request_handler(msg) {
-    await http_utils.start_https_server(msg.health_port,
+    if (!msg || typeof msg.health_port !== 'number') return;
+    await http_utils.start_https_server(msg.health_port,
         SERVICES_TYPES_ENUM.FORK_HEALTH,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async function fork_message_request_handler(msg) {
await http_utils.start_https_server(msg.health_port,
SERVICES_TYPES_ENUM.FORK_HEALTH,
fork_main_handler,
msg.nsfs_config_root
);
}
async function fork_message_request_handler(msg) {
if (!msg || typeof msg.health_port !== 'number') return;
await http_utils.start_https_server(msg.health_port,
SERVICES_TYPES_ENUM.FORK_HEALTH,
fork_main_handler,
msg.nsfs_config_root
);
}
🤖 Prompt for AI Agents
In src/endpoint/endpoint.js around lines 570 to 576, the function
fork_message_request_handler uses msg.health_port without validation, which can
cause EADDRINUSE errors if the port is invalid or already in use. Add validation
to check that msg.health_port is a valid, unused port number before calling
start_https_server. Also, implement logic to handle repeated or malformed
messages gracefully to prevent server crashes.


exports.main = main;
exports.create_endpoint_handler = create_endpoint_handler;
exports.create_init_request_sdk = create_init_request_sdk;
exports.endpoint_fork_id_handler = endpoint_fork_id_handler;

if (require.main === module) main();
75 changes: 36 additions & 39 deletions src/manage_nsfs/health.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ process.env.AWS_SDK_JS_SUPPRESS_MAINTENANCE_MODE_MESSAGE = '1';
class NSFSHealth {
constructor(options) {
this.https_port = options.https_port;
this.fork_base_port = options.fork_base_port;
this.all_account_details = options.all_account_details;
this.all_bucket_details = options.all_bucket_details;
this.all_connection_details = options.all_connection_details;
Expand Down Expand Up @@ -241,10 +242,10 @@ class NSFSHealth {
return service_health;
}

async make_endpoint_health_request(url_path) {
async make_endpoint_health_request(url_path, port = this.https_port) {
const response = await make_https_request({
HOSTNAME,
port: this.https_port,
hostname: HOSTNAME,
port,
path: url_path,
method: 'GET',
rejectUnauthorized: false,
Expand All @@ -260,43 +261,37 @@ class NSFSHealth {
let url_path = '/total_fork_count';
const worker_ids = [];
let total_fork_count = 0;
let fork_count_response;
let response;
try {
const fork_count_response = await this.make_endpoint_health_request(url_path);
if (!fork_count_response) {
return {
response: fork_response_code.NOT_RUNNING,
total_fork_count: total_fork_count,
running_workers: worker_ids,
};
}
total_fork_count = fork_count_response.fork_count;
if (total_fork_count > 0) {
url_path = '/endpoint_fork_id';
await P.retry({
attempts: total_fork_count * 2,
delay_ms: 1,
func: async () => {
const fork_id_response = await this.make_endpoint_health_request(url_path);
if (fork_id_response.worker_id && !worker_ids.includes(fork_id_response.worker_id)) {
worker_ids.push(fork_id_response.worker_id);
}
if (worker_ids.length < total_fork_count) {
throw new Error('Number of running forks is less than the expected fork count.');
}
}
});
if (worker_ids.length === total_fork_count) {
response = fork_response_code.RUNNING;
} else {
response = fork_response_code.MISSING_FORKS;
}
} else {
response = fork_response_code.RUNNING;
}
fork_count_response = await this.make_endpoint_health_request(url_path);
} catch (err) {
dbg.log1('Error while pinging endpoint host :' + HOSTNAME + ', port ' + this.https_port, err);
response = fork_response_code.NOT_RUNNING;
dbg.log0('Error while pinging endpoint host :' + HOSTNAME, err);
}
if (!fork_count_response) {
return {
response: fork_response_code.NOT_RUNNING,
total_fork_count: total_fork_count,
running_workers: worker_ids,
};
}

total_fork_count = fork_count_response.fork_count;
url_path = '/endpoint_fork_id';
for (let i = 0; i < total_fork_count; i++) {
const port = this.fork_base_port + i;
try {
const fork_id_response = await this.make_endpoint_health_request(url_path, port);
worker_ids.push(fork_id_response.worker_id);
} catch (err) {
dbg.log0('Error while pinging fork :' + HOSTNAME + ', port ' + port, err);
}
}
if (worker_ids.length < total_fork_count) {
dbg.log0('Number of running forks is less than the expected fork count.');
response = fork_response_code.MISSING_FORKS;
} else {
response = fork_response_code.RUNNING;
}
return {
response: response,
Expand Down Expand Up @@ -637,6 +632,7 @@ class NSFSHealth {
async function get_health_status(argv, config_fs) {
try {
const https_port = Number(argv.https_port) || config.ENDPOINT_SSL_PORT;
const fork_base_port = Number(argv.fork_base_port) || config.ENDPOINT_FORK_PORT_BASE;
const deployment_type = argv.deployment_type || 'nc';
const all_account_details = get_boolean_or_string_value(argv.all_account_details);
const all_bucket_details = get_boolean_or_string_value(argv.all_bucket_details);
Expand All @@ -645,8 +641,9 @@ async function get_health_status(argv, config_fs) {
const lifecycle = get_boolean_or_string_value(argv.lifecycle);

if (deployment_type === 'nc') {
const health = new NSFSHealth({ https_port,
all_account_details, all_bucket_details, all_connection_details, notif_storage_threshold, lifecycle, config_fs });
const health = new NSFSHealth({ https_port, fork_base_port,
all_account_details, all_bucket_details, all_connection_details,
notif_storage_threshold, lifecycle, config_fs });
const health_status = await health.nc_nsfs_health();
write_stdout_response(ManageCLIResponse.HealthStatus, health_status);
} else {
Expand Down
8 changes: 8 additions & 0 deletions src/nc/nc_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,15 @@ function check_root_account_owns_user(root_account, account) {
return root_account._id === account.owner;
}

/**
* @returns {boolean} true if the current environment is a NooBaa non containerized environment
*/
function is_nc_environment() {
return process.env.NC_NSFS_NO_DB_ENV && process.env.NC_NSFS_NO_DB_ENV === 'true';
}

// EXPORTS
exports.generate_id = generate_id;
exports.check_root_account_owns_user = check_root_account_owns_user;
exports.is_nc_environment = is_nc_environment;

Loading