Skip to content

Commit 24e11b7

Browse files
authored
System store phase1 (#9148)
* system store - reduce load time (part of performance effort 4.20) Signed-off-by: Amit Prinz Setter <alphaprinz@gmail.com> * revent some of previous changes Signed-off-by: Amit Prinz Setter <alphaprinz@gmail.com> --------- Signed-off-by: Amit Prinz Setter <alphaprinz@gmail.com>
1 parent d330a90 commit 24e11b7

File tree

3 files changed

+17
-12
lines changed

3 files changed

+17
-12
lines changed

config.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ config.INTERNAL_STORAGE_POOL_NAME = 'system-internal-storage-pool';
250250
// config.SPILLOVER_TIER_NAME = 'bucket-spillover-tier';
251251
config.ALLOW_BUCKET_CREATE_ON_INTERNAL = true;
252252
config.BUCKET_AUTOCONF_TIER2_ENABLED = false;
253+
config.SYSTEM_STORE_LOAD_CONCURRENCY = parseInt(process.env.SYSTEM_STORE_LOAD_CONCURRENCY, 10) || 5;
253254

254255
//////////////////////////
255256
// MD AGGREGATOR CONFIG //

src/server/bg_services/cluster_hb.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ function do_heartbeat({ skip_server_monitor } = {}) {
9393
update: {
9494
clusters: [update]
9595
}
96-
});
96+
}, false);
9797
});
9898
})
9999
.then(() => {

src/server/system_services/system_store.js

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,7 @@ class SystemStore extends EventEmitter {
351351
this.is_finished_initial_load = false;
352352
this.START_REFRESH_THRESHOLD = 10 * 60 * 1000;
353353
this.FORCE_REFRESH_THRESHOLD = 60 * 60 * 1000;
354+
this.SYSTEM_STORE_LOAD_CONCURRENCY = config.SYSTEM_STORE_LOAD_CONCURRENCY || 5;
354355
this._load_serial = new semaphore.Semaphore(1, { warning_timeout: this.START_REFRESH_THRESHOLD });
355356
for (const col of COLLECTIONS) {
356357
db_client.instance().define_collection(col);
@@ -411,26 +412,29 @@ class SystemStore extends EventEmitter {
411412
// because it might not see the latest changes if we don't reload right after make_changes.
412413
return this._load_serial.surround(async () => {
413414
try {
414-
dbg.log3('SystemStore: loading ...');
415+
dbg.log3('SystemStore: loading ... this.last_update_time =', this.last_update_time, ", since =", since);
416+
417+
const new_data = new SystemStoreData();
415418

416419
// If we get a load request with an timestamp older then our last update time
417420
// we ensure we load everyting from that timestamp by updating our last_update_time.
418421
if (!_.isUndefined(since) && since < this.last_update_time) {
419-
dbg.log0('SystemStore.load: Got load request with a timestamp older then my last update time');
422+
dbg.log0('SystemStore.load: Got load request with a timestamp', since, 'older than my last update time', this.last_update_time);
420423
this.last_update_time = since;
421424
}
422425
this.master_key_manager.load_root_key();
423-
const new_data = new SystemStoreData();
424426
let millistamp = time_utils.millistamp();
425427
await this._register_for_changes();
426428
await this._read_new_data_from_db(new_data);
427429
const secret = await os_utils.read_server_secret();
428430
this._server_secret = secret;
429-
dbg.log1('SystemStore: fetch took', time_utils.millitook(millistamp));
430-
dbg.log1('SystemStore: fetch size', size_utils.human_size(JSON.stringify(new_data).length));
431-
dbg.log1('SystemStore: fetch data', util.inspect(new_data, {
432-
depth: 4
433-
}));
431+
if (dbg.should_log(1)) { //param should match below logs' level
432+
dbg.log1('SystemStore: fetch took', time_utils.millitook(millistamp));
433+
dbg.log1('SystemStore: fetch size', size_utils.human_size(JSON.stringify(new_data).length));
434+
dbg.log1('SystemStore: fetch data', util.inspect(new_data, {
435+
depth: 4
436+
}));
437+
}
434438
this.old_db_data = this._update_data_from_new(this.old_db_data || {}, new_data);
435439
this.data = _.cloneDeep(this.old_db_data);
436440
millistamp = time_utils.millistamp();
@@ -506,7 +510,7 @@ class SystemStore extends EventEmitter {
506510
}
507511
};
508512
await db_client.instance().connect();
509-
await P.map(COLLECTIONS, async col => {
513+
await P.map_with_concurrency(this.SYSTEM_STORE_LOAD_CONCURRENCY, COLLECTIONS, async col => {
510514
const res = await db_client.instance().collection(col.name)
511515
.find(newly_updated_query, {
512516
projection: { last_update: 0 }
@@ -598,7 +602,7 @@ class SystemStore extends EventEmitter {
598602
* @property {Object} [remove]
599603
*
600604
*/
601-
async make_changes(changes) {
605+
async make_changes(changes, publish = true) {
602606
// Refreshing must be done outside the semapore lock because refresh
603607
// might call load that is locking on the same semaphore.
604608
await this.refresh();
@@ -611,7 +615,7 @@ class SystemStore extends EventEmitter {
611615
if (any_news) {
612616
if (this.is_standalone) {
613617
await this.load(last_update);
614-
} else {
618+
} else if (publish) {
615619
// notify all the cluster (including myself) to reload
616620
await server_rpc.client.redirector.publish_to_cluster({
617621
method_api: 'server_inter_process_api',

0 commit comments

Comments
 (0)