diff --git a/config.js b/config.js index 6338dd7d9a..a7c0d6b6a6 100644 --- a/config.js +++ b/config.js @@ -236,8 +236,8 @@ config.ROOT_KEY_MOUNT = '/etc/noobaa-server/root_keys'; config.DB_TYPE = /** @type {nb.DBType} */ (process.env.DB_TYPE || 'postgres'); -config.POSTGRES_DEFAULT_MAX_CLIENTS = 10; -config.POSTGRES_MD_MAX_CLIENTS = (process.env.LOCAL_MD_SERVER === 'true') ? 70 : 10; +config.POSTGRES_DEFAULT_MAX_CLIENTS = 5; +config.POSTGRES_MD_MAX_CLIENTS = (process.env.LOCAL_MD_SERVER === 'true') ? 70 : 5; /////////////////// // SYSTEM CONFIG // @@ -250,6 +250,7 @@ config.INTERNAL_STORAGE_POOL_NAME = 'system-internal-storage-pool'; // config.SPILLOVER_TIER_NAME = 'bucket-spillover-tier'; config.ALLOW_BUCKET_CREATE_ON_INTERNAL = true; config.BUCKET_AUTOCONF_TIER2_ENABLED = false; +config.SYSTEM_STORE_LOAD_CONCURRENCY = parseInt(process.env.SYSTEM_STORE_LOAD_CONCURRENCY, 10) || 5; ////////////////////////// // MD AGGREGATOR CONFIG // diff --git a/src/server/bg_services/cluster_hb.js b/src/server/bg_services/cluster_hb.js index f5651f3b37..3d1d58c582 100644 --- a/src/server/bg_services/cluster_hb.js +++ b/src/server/bg_services/cluster_hb.js @@ -93,7 +93,7 @@ function do_heartbeat({ skip_server_monitor } = {}) { update: { clusters: [update] } - }); + }, false); }); }) .then(() => { diff --git a/src/server/bg_services/md_aggregator.js b/src/server/bg_services/md_aggregator.js index 8958af7ef0..5d3372523c 100644 --- a/src/server/bg_services/md_aggregator.js +++ b/src/server/bg_services/md_aggregator.js @@ -68,7 +68,7 @@ async function run_md_aggregator(md_store, system_store, target_now, delay) { }); if (changes) { const update = _.omit(changes, 'more_updates'); - await system_store.make_changes({ update }); + await system_store.make_changes({ update }, false); update_range = !changes.more_updates; if (update_range) { await system_store.make_changes({ @@ -78,7 +78,7 @@ async function run_md_aggregator(md_store, system_store, target_now, delay) { global_last_update: range.till_time, }] } - }); + }, false); } await P.delay(delay); } else { @@ -206,7 +206,7 @@ function find_next_range({ }, })) } - }); + }, false); } // on normal operation the time_diff to close can be closed within a single MD_AGGREGATOR_INTERVAL diff --git a/src/server/system_services/system_store.js b/src/server/system_services/system_store.js index 87a4f28e08..33c8c0bbd6 100644 --- a/src/server/system_services/system_store.js +++ b/src/server/system_services/system_store.js @@ -351,6 +351,7 @@ class SystemStore extends EventEmitter { this.is_finished_initial_load = false; this.START_REFRESH_THRESHOLD = 10 * 60 * 1000; this.FORCE_REFRESH_THRESHOLD = 60 * 60 * 1000; + this.SYSTEM_STORE_LOAD_CONCURRENCY = config.SYSTEM_STORE_LOAD_CONCURRENCY || 5; this._load_serial = new semaphore.Semaphore(1, { warning_timeout: this.START_REFRESH_THRESHOLD }); for (const col of COLLECTIONS) { db_client.instance().define_collection(col); @@ -393,17 +394,21 @@ class SystemStore extends EventEmitter { if (this.data) { load_time = this.data.time; } + let res; const since_load = Date.now() - load_time; if (since_load < this.START_REFRESH_THRESHOLD) { - return this.data; + res = this.data; } else if (since_load < this.FORCE_REFRESH_THRESHOLD) { dbg.warn(`system_store.refresh: system_store.data.time > START_REFRESH_THRESHOLD, since_load = ${since_load}, START_REFRESH_THRESHOLD = ${this.START_REFRESH_THRESHOLD}`); this.load().catch(_.noop); - return this.data; + res = this.data; } else { dbg.warn(`system_store.refresh: system_store.data.time > FORCE_REFRESH_THRESHOLD, since_load = ${since_load}, FORCE_REFRESH_THRESHOLD = ${this.FORCE_REFRESH_THRESHOLD}`); - return this.load(); + res = this.load(); } + //call refresh periodically + P.delay_unblocking(this.START_REFRESH_THRESHOLD).then(this.refresh); + return res; } async load(since) { @@ -411,26 +416,29 @@ class SystemStore extends EventEmitter { // because it might not see the latest changes if we don't reload right after make_changes. return this._load_serial.surround(async () => { try { - dbg.log3('SystemStore: loading ...'); + dbg.log3('SystemStore: loading ... this.last_update_time =', this.last_update_time, ", since =", since); + + const new_data = new SystemStoreData(); // If we get a load request with an timestamp older then our last update time // we ensure we load everyting from that timestamp by updating our last_update_time. if (!_.isUndefined(since) && since < this.last_update_time) { - dbg.log0('SystemStore.load: Got load request with a timestamp older then my last update time'); + dbg.log0('SystemStore.load: Got load request with a timestamp', since, 'older than my last update time', this.last_update_time); this.last_update_time = since; } this.master_key_manager.load_root_key(); - const new_data = new SystemStoreData(); let millistamp = time_utils.millistamp(); await this._register_for_changes(); await this._read_new_data_from_db(new_data); const secret = await os_utils.read_server_secret(); this._server_secret = secret; - dbg.log1('SystemStore: fetch took', time_utils.millitook(millistamp)); - dbg.log1('SystemStore: fetch size', size_utils.human_size(JSON.stringify(new_data).length)); - dbg.log1('SystemStore: fetch data', util.inspect(new_data, { - depth: 4 - })); + if (dbg.should_log(1)) { //param should match below logs' level + dbg.log1('SystemStore: fetch took', time_utils.millitook(millistamp)); + dbg.log1('SystemStore: fetch size', size_utils.human_size(JSON.stringify(new_data).length)); + dbg.log1('SystemStore: fetch data', util.inspect(new_data, { + depth: 4 + })); + } this.old_db_data = this._update_data_from_new(this.old_db_data || {}, new_data); this.data = _.cloneDeep(this.old_db_data); millistamp = time_utils.millistamp(); @@ -506,7 +514,7 @@ class SystemStore extends EventEmitter { } }; await db_client.instance().connect(); - await P.map(COLLECTIONS, async col => { + await P.map_with_concurrency(this.SYSTEM_STORE_LOAD_CONCURRENCY, COLLECTIONS, async col => { const res = await db_client.instance().collection(col.name) .find(newly_updated_query, { projection: { last_update: 0 } @@ -598,7 +606,7 @@ class SystemStore extends EventEmitter { * @property {Object} [remove] * */ - async make_changes(changes) { + async make_changes(changes, publish = true) { // Refreshing must be done outside the semapore lock because refresh // might call load that is locking on the same semaphore. await this.refresh(); @@ -611,7 +619,7 @@ class SystemStore extends EventEmitter { if (any_news) { if (this.is_standalone) { await this.load(last_update); - } else { + } else if (publish) { // notify all the cluster (including myself) to reload await server_rpc.client.redirector.publish_to_cluster({ method_api: 'server_inter_process_api',