Skip to content

Commit b89f054

Browse files
committed
system store - reduce load time (part of performance effort 4.20)
Signed-off-by: Amit Prinz Setter <alphaprinz@gmail.com>
1 parent bb57299 commit b89f054

File tree

4 files changed

+22
-15
lines changed

4 files changed

+22
-15
lines changed

config.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ config.INTERNAL_STORAGE_POOL_NAME = 'system-internal-storage-pool';
250250
// config.SPILLOVER_TIER_NAME = 'bucket-spillover-tier';
251251
config.ALLOW_BUCKET_CREATE_ON_INTERNAL = true;
252252
config.BUCKET_AUTOCONF_TIER2_ENABLED = false;
253+
config.SYSTEM_STORE_LOAD_CONCURRENCY = parseInt(process.env.SYSTEM_STORE_LOAD_CONCURRENCY, 10) || 5;
253254

254255
//////////////////////////
255256
// MD AGGREGATOR CONFIG //

src/server/bg_services/cluster_hb.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ function do_heartbeat({ skip_server_monitor } = {}) {
9393
update: {
9494
clusters: [update]
9595
}
96-
});
96+
}, false);
9797
});
9898
})
9999
.then(() => {

src/server/bg_services/md_aggregator.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ async function run_md_aggregator(md_store, system_store, target_now, delay) {
6868
});
6969
if (changes) {
7070
const update = _.omit(changes, 'more_updates');
71-
await system_store.make_changes({ update });
71+
await system_store.make_changes({ update }, false);
7272
update_range = !changes.more_updates;
7373
if (update_range) {
7474
await system_store.make_changes({
@@ -78,7 +78,7 @@ async function run_md_aggregator(md_store, system_store, target_now, delay) {
7878
global_last_update: range.till_time,
7979
}]
8080
}
81-
});
81+
}, false);
8282
}
8383
await P.delay(delay);
8484
} else {
@@ -206,7 +206,7 @@ function find_next_range({
206206
},
207207
}))
208208
}
209-
});
209+
}, false);
210210
}
211211

212212
// on normal operation the time_diff to close can be closed within a single MD_AGGREGATOR_INTERVAL

src/server/system_services/system_store.js

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,7 @@ class SystemStore extends EventEmitter {
351351
this.is_finished_initial_load = false;
352352
this.START_REFRESH_THRESHOLD = 10 * 60 * 1000;
353353
this.FORCE_REFRESH_THRESHOLD = 60 * 60 * 1000;
354+
this.SYSTEM_STORE_LOAD_CONCURRENCY = config.SYSTEM_STORE_LOAD_CONCURRENCY || 5;
354355
this._load_serial = new semaphore.Semaphore(1, { warning_timeout: this.START_REFRESH_THRESHOLD });
355356
for (const col of COLLECTIONS) {
356357
db_client.instance().define_collection(col);
@@ -359,6 +360,8 @@ class SystemStore extends EventEmitter {
359360
js_utils.deep_freeze(COLLECTIONS_BY_NAME);
360361
this.refresh_middleware = () => this.refresh();
361362
this.initial_load();
363+
//call refresh() periodically
364+
setInterval(this.refresh, this.START_REFRESH_THRESHOLD);
362365
}
363366

364367
[util.inspect.custom]() { return 'SystemStore'; }
@@ -411,26 +414,29 @@ class SystemStore extends EventEmitter {
411414
// because it might not see the latest changes if we don't reload right after make_changes.
412415
return this._load_serial.surround(async () => {
413416
try {
414-
dbg.log3('SystemStore: loading ...');
417+
dbg.log3('SystemStore: loading ... this.last_update_time =', this.last_update_time, ", since =", since);
418+
419+
const new_data = new SystemStoreData();
415420

416421
// If we get a load request with an timestamp older then our last update time
417422
// we ensure we load everyting from that timestamp by updating our last_update_time.
418423
if (!_.isUndefined(since) && since < this.last_update_time) {
419-
dbg.log0('SystemStore.load: Got load request with a timestamp older then my last update time');
424+
dbg.log0('SystemStore.load: Got load request with a timestamp', since, 'older than my last update time', this.last_update_time);
420425
this.last_update_time = since;
421426
}
422427
this.master_key_manager.load_root_key();
423-
const new_data = new SystemStoreData();
424428
let millistamp = time_utils.millistamp();
425429
await this._register_for_changes();
426430
await this._read_new_data_from_db(new_data);
427431
const secret = await os_utils.read_server_secret();
428432
this._server_secret = secret;
429-
dbg.log1('SystemStore: fetch took', time_utils.millitook(millistamp));
430-
dbg.log1('SystemStore: fetch size', size_utils.human_size(JSON.stringify(new_data).length));
431-
dbg.log1('SystemStore: fetch data', util.inspect(new_data, {
432-
depth: 4
433-
}));
433+
if (dbg.should_log(1)) { //param should match below logs' level
434+
dbg.log1('SystemStore: fetch took', time_utils.millitook(millistamp));
435+
dbg.log1('SystemStore: fetch size', size_utils.human_size(JSON.stringify(new_data).length));
436+
dbg.log1('SystemStore: fetch data', util.inspect(new_data, {
437+
depth: 4
438+
}));
439+
}
434440
this.old_db_data = this._update_data_from_new(this.old_db_data || {}, new_data);
435441
this.data = _.cloneDeep(this.old_db_data);
436442
millistamp = time_utils.millistamp();
@@ -506,7 +512,7 @@ class SystemStore extends EventEmitter {
506512
}
507513
};
508514
await db_client.instance().connect();
509-
await P.map(COLLECTIONS, async col => {
515+
await P.map_with_concurrency(this.SYSTEM_STORE_LOAD_CONCURRENCY, COLLECTIONS, async col => {
510516
const res = await db_client.instance().collection(col.name)
511517
.find(newly_updated_query, {
512518
projection: { last_update: 0 }
@@ -598,7 +604,7 @@ class SystemStore extends EventEmitter {
598604
* @property {Object} [remove]
599605
*
600606
*/
601-
async make_changes(changes) {
607+
async make_changes(changes, publish = true) {
602608
// Refreshing must be done outside the semapore lock because refresh
603609
// might call load that is locking on the same semaphore.
604610
await this.refresh();
@@ -611,7 +617,7 @@ class SystemStore extends EventEmitter {
611617
if (any_news) {
612618
if (this.is_standalone) {
613619
await this.load(last_update);
614-
} else {
620+
} else if (publish) {
615621
// notify all the cluster (including myself) to reload
616622
await server_rpc.client.redirector.publish_to_cluster({
617623
method_api: 'server_inter_process_api',

0 commit comments

Comments
 (0)