Skip to content

Commit 4b2fe6d

Browse files
committed
system store - reduce load time (part of performance effort 4.20)
Signed-off-by: Amit Prinz Setter <alphaprinz@gmail.com>
1 parent bb57299 commit 4b2fe6d

File tree

4 files changed

+27
-18
lines changed

4 files changed

+27
-18
lines changed

config.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ config.INTERNAL_STORAGE_POOL_NAME = 'system-internal-storage-pool';
250250
// config.SPILLOVER_TIER_NAME = 'bucket-spillover-tier';
251251
config.ALLOW_BUCKET_CREATE_ON_INTERNAL = true;
252252
config.BUCKET_AUTOCONF_TIER2_ENABLED = false;
253+
config.SYSTEM_STORE_LOAD_CONCURRENCY = parseInt(process.env.SYSTEM_STORE_LOAD_CONCURRENCY, 10) || 5;
253254

254255
//////////////////////////
255256
// MD AGGREGATOR CONFIG //

src/server/bg_services/cluster_hb.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ function do_heartbeat({ skip_server_monitor } = {}) {
9393
update: {
9494
clusters: [update]
9595
}
96-
});
96+
}, false);
9797
});
9898
})
9999
.then(() => {

src/server/bg_services/md_aggregator.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ async function run_md_aggregator(md_store, system_store, target_now, delay) {
6868
});
6969
if (changes) {
7070
const update = _.omit(changes, 'more_updates');
71-
await system_store.make_changes({ update });
71+
await system_store.make_changes({ update }, false);
7272
update_range = !changes.more_updates;
7373
if (update_range) {
7474
await system_store.make_changes({
@@ -78,7 +78,7 @@ async function run_md_aggregator(md_store, system_store, target_now, delay) {
7878
global_last_update: range.till_time,
7979
}]
8080
}
81-
});
81+
}, false);
8282
}
8383
await P.delay(delay);
8484
} else {
@@ -206,7 +206,7 @@ function find_next_range({
206206
},
207207
}))
208208
}
209-
});
209+
}, false);
210210
}
211211

212212
// on normal operation the time_diff to close can be closed within a single MD_AGGREGATOR_INTERVAL

src/server/system_services/system_store.js

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,7 @@ class SystemStore extends EventEmitter {
351351
this.is_finished_initial_load = false;
352352
this.START_REFRESH_THRESHOLD = 10 * 60 * 1000;
353353
this.FORCE_REFRESH_THRESHOLD = 60 * 60 * 1000;
354+
this.SYSTEM_STORE_LOAD_CONCURRENCY = config.SYSTEM_STORE_LOAD_CONCURRENCY || 5;
354355
this._load_serial = new semaphore.Semaphore(1, { warning_timeout: this.START_REFRESH_THRESHOLD });
355356
for (const col of COLLECTIONS) {
356357
db_client.instance().define_collection(col);
@@ -393,44 +394,51 @@ class SystemStore extends EventEmitter {
393394
if (this.data) {
394395
load_time = this.data.time;
395396
}
397+
let res;
396398
const since_load = Date.now() - load_time;
397399
if (since_load < this.START_REFRESH_THRESHOLD) {
398-
return this.data;
400+
res = this.data;
399401
} else if (since_load < this.FORCE_REFRESH_THRESHOLD) {
400402
dbg.warn(`system_store.refresh: system_store.data.time > START_REFRESH_THRESHOLD, since_load = ${since_load}, START_REFRESH_THRESHOLD = ${this.START_REFRESH_THRESHOLD}`);
401403
this.load().catch(_.noop);
402-
return this.data;
404+
res = this.data;
403405
} else {
404406
dbg.warn(`system_store.refresh: system_store.data.time > FORCE_REFRESH_THRESHOLD, since_load = ${since_load}, FORCE_REFRESH_THRESHOLD = ${this.FORCE_REFRESH_THRESHOLD}`);
405-
return this.load();
407+
res = this.load();
406408
}
409+
//call refresh periodically
410+
P.delay_unblocking(this.START_REFRESH_THRESHOLD).then(this.refresh);
411+
return res;
407412
}
408413

409414
async load(since) {
410415
// serializing load requests since we have to run a fresh load after the previous one will finish
411416
// because it might not see the latest changes if we don't reload right after make_changes.
412417
return this._load_serial.surround(async () => {
413418
try {
414-
dbg.log3('SystemStore: loading ...');
419+
dbg.log3('SystemStore: loading ... this.last_update_time =', this.last_update_time, ", since =", since);
420+
421+
const new_data = new SystemStoreData();
415422

416423
// If we get a load request with an timestamp older then our last update time
417424
// we ensure we load everyting from that timestamp by updating our last_update_time.
418425
if (!_.isUndefined(since) && since < this.last_update_time) {
419-
dbg.log0('SystemStore.load: Got load request with a timestamp older then my last update time');
426+
dbg.log0('SystemStore.load: Got load request with a timestamp', since, 'older than my last update time', this.last_update_time);
420427
this.last_update_time = since;
421428
}
422429
this.master_key_manager.load_root_key();
423-
const new_data = new SystemStoreData();
424430
let millistamp = time_utils.millistamp();
425431
await this._register_for_changes();
426432
await this._read_new_data_from_db(new_data);
427433
const secret = await os_utils.read_server_secret();
428434
this._server_secret = secret;
429-
dbg.log1('SystemStore: fetch took', time_utils.millitook(millistamp));
430-
dbg.log1('SystemStore: fetch size', size_utils.human_size(JSON.stringify(new_data).length));
431-
dbg.log1('SystemStore: fetch data', util.inspect(new_data, {
432-
depth: 4
433-
}));
435+
if (dbg.should_log(1)) { //param should match below logs' level
436+
dbg.log1('SystemStore: fetch took', time_utils.millitook(millistamp));
437+
dbg.log1('SystemStore: fetch size', size_utils.human_size(JSON.stringify(new_data).length));
438+
dbg.log1('SystemStore: fetch data', util.inspect(new_data, {
439+
depth: 4
440+
}));
441+
}
434442
this.old_db_data = this._update_data_from_new(this.old_db_data || {}, new_data);
435443
this.data = _.cloneDeep(this.old_db_data);
436444
millistamp = time_utils.millistamp();
@@ -506,7 +514,7 @@ class SystemStore extends EventEmitter {
506514
}
507515
};
508516
await db_client.instance().connect();
509-
await P.map(COLLECTIONS, async col => {
517+
await P.map_with_concurrency(this.SYSTEM_STORE_LOAD_CONCURRENCY, COLLECTIONS, async col => {
510518
const res = await db_client.instance().collection(col.name)
511519
.find(newly_updated_query, {
512520
projection: { last_update: 0 }
@@ -598,7 +606,7 @@ class SystemStore extends EventEmitter {
598606
* @property {Object} [remove]
599607
*
600608
*/
601-
async make_changes(changes) {
609+
async make_changes(changes, publish = true) {
602610
// Refreshing must be done outside the semapore lock because refresh
603611
// might call load that is locking on the same semaphore.
604612
await this.refresh();
@@ -611,7 +619,7 @@ class SystemStore extends EventEmitter {
611619
if (any_news) {
612620
if (this.is_standalone) {
613621
await this.load(last_update);
614-
} else {
622+
} else if (publish) {
615623
// notify all the cluster (including myself) to reload
616624
await server_rpc.client.redirector.publish_to_cluster({
617625
method_api: 'server_inter_process_api',

0 commit comments

Comments
 (0)