Skip to content

system store - reduce load time (part of performance effort 4.20) #9096

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ config.ROOT_KEY_MOUNT = '/etc/noobaa-server/root_keys';

config.DB_TYPE = /** @type {nb.DBType} */ (process.env.DB_TYPE || 'postgres');

config.POSTGRES_DEFAULT_MAX_CLIENTS = 10;
config.POSTGRES_MD_MAX_CLIENTS = (process.env.LOCAL_MD_SERVER === 'true') ? 70 : 10;
config.POSTGRES_DEFAULT_MAX_CLIENTS = 5;
config.POSTGRES_MD_MAX_CLIENTS = (process.env.LOCAL_MD_SERVER === 'true') ? 70 : 5;

///////////////////
// SYSTEM CONFIG //
Expand All @@ -250,6 +250,7 @@ config.INTERNAL_STORAGE_POOL_NAME = 'system-internal-storage-pool';
// config.SPILLOVER_TIER_NAME = 'bucket-spillover-tier';
config.ALLOW_BUCKET_CREATE_ON_INTERNAL = true;
config.BUCKET_AUTOCONF_TIER2_ENABLED = false;
config.SYSTEM_STORE_LOAD_CONCURRENCY = parseInt(process.env.SYSTEM_STORE_LOAD_CONCURRENCY, 10) || 5;

//////////////////////////
// MD AGGREGATOR CONFIG //
Expand Down
2 changes: 1 addition & 1 deletion src/server/bg_services/cluster_hb.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ function do_heartbeat({ skip_server_monitor } = {}) {
update: {
clusters: [update]
}
});
}, false);
});
})
.then(() => {
Expand Down
6 changes: 3 additions & 3 deletions src/server/bg_services/md_aggregator.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async function run_md_aggregator(md_store, system_store, target_now, delay) {
});
if (changes) {
const update = _.omit(changes, 'more_updates');
await system_store.make_changes({ update });
await system_store.make_changes({ update }, false);
update_range = !changes.more_updates;
if (update_range) {
await system_store.make_changes({
Expand All @@ -78,7 +78,7 @@ async function run_md_aggregator(md_store, system_store, target_now, delay) {
global_last_update: range.till_time,
}]
}
});
}, false);
}
await P.delay(delay);
} else {
Expand Down Expand Up @@ -206,7 +206,7 @@ function find_next_range({
},
}))
}
});
}, false);
}

// on normal operation the time_diff to close can be closed within a single MD_AGGREGATOR_INTERVAL
Expand Down
36 changes: 22 additions & 14 deletions src/server/system_services/system_store.js
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ class SystemStore extends EventEmitter {
this.is_finished_initial_load = false;
this.START_REFRESH_THRESHOLD = 10 * 60 * 1000;
this.FORCE_REFRESH_THRESHOLD = 60 * 60 * 1000;
this.SYSTEM_STORE_LOAD_CONCURRENCY = config.SYSTEM_STORE_LOAD_CONCURRENCY || 5;
this._load_serial = new semaphore.Semaphore(1, { warning_timeout: this.START_REFRESH_THRESHOLD });
for (const col of COLLECTIONS) {
db_client.instance().define_collection(col);
Expand Down Expand Up @@ -393,44 +394,51 @@ class SystemStore extends EventEmitter {
if (this.data) {
load_time = this.data.time;
}
let res;
const since_load = Date.now() - load_time;
if (since_load < this.START_REFRESH_THRESHOLD) {
return this.data;
res = this.data;
} else if (since_load < this.FORCE_REFRESH_THRESHOLD) {
dbg.warn(`system_store.refresh: system_store.data.time > START_REFRESH_THRESHOLD, since_load = ${since_load}, START_REFRESH_THRESHOLD = ${this.START_REFRESH_THRESHOLD}`);
this.load().catch(_.noop);
return this.data;
res = this.data;
} else {
dbg.warn(`system_store.refresh: system_store.data.time > FORCE_REFRESH_THRESHOLD, since_load = ${since_load}, FORCE_REFRESH_THRESHOLD = ${this.FORCE_REFRESH_THRESHOLD}`);
return this.load();
res = this.load();
}
//call refresh periodically
P.delay_unblocking(this.START_REFRESH_THRESHOLD).then(this.refresh);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can cause a lot of refresh() calls to be queued. I believe that refresh() is called on every RPC request as a middleware (see here), so I don't think we should schedule another refresh.

return res;
}

async load(since) {
// serializing load requests since we have to run a fresh load after the previous one will finish
// because it might not see the latest changes if we don't reload right after make_changes.
return this._load_serial.surround(async () => {
try {
dbg.log3('SystemStore: loading ...');
dbg.log3('SystemStore: loading ... this.last_update_time =', this.last_update_time, ", since =", since);

const new_data = new SystemStoreData();

// If we get a load request with an timestamp older then our last update time
// we ensure we load everyting from that timestamp by updating our last_update_time.
if (!_.isUndefined(since) && since < this.last_update_time) {
dbg.log0('SystemStore.load: Got load request with a timestamp older then my last update time');
dbg.log0('SystemStore.load: Got load request with a timestamp', since, 'older than my last update time', this.last_update_time);
this.last_update_time = since;
}
this.master_key_manager.load_root_key();
const new_data = new SystemStoreData();
let millistamp = time_utils.millistamp();
await this._register_for_changes();
await this._read_new_data_from_db(new_data);
const secret = await os_utils.read_server_secret();
this._server_secret = secret;
dbg.log1('SystemStore: fetch took', time_utils.millitook(millistamp));
dbg.log1('SystemStore: fetch size', size_utils.human_size(JSON.stringify(new_data).length));
dbg.log1('SystemStore: fetch data', util.inspect(new_data, {
depth: 4
}));
if (dbg.should_log(1)) { //param should match below logs' level
dbg.log1('SystemStore: fetch took', time_utils.millitook(millistamp));
dbg.log1('SystemStore: fetch size', size_utils.human_size(JSON.stringify(new_data).length));
dbg.log1('SystemStore: fetch data', util.inspect(new_data, {
depth: 4
}));
}
this.old_db_data = this._update_data_from_new(this.old_db_data || {}, new_data);
this.data = _.cloneDeep(this.old_db_data);
millistamp = time_utils.millistamp();
Expand Down Expand Up @@ -506,7 +514,7 @@ class SystemStore extends EventEmitter {
}
};
await db_client.instance().connect();
await P.map(COLLECTIONS, async col => {
await P.map_with_concurrency(this.SYSTEM_STORE_LOAD_CONCURRENCY, COLLECTIONS, async col => {
const res = await db_client.instance().collection(col.name)
.find(newly_updated_query, {
projection: { last_update: 0 }
Expand Down Expand Up @@ -598,7 +606,7 @@ class SystemStore extends EventEmitter {
* @property {Object} [remove]
*
*/
async make_changes(changes) {
async make_changes(changes, publish = true) {
// Refreshing must be done outside the semapore lock because refresh
// might call load that is locking on the same semaphore.
await this.refresh();
Expand All @@ -611,7 +619,7 @@ class SystemStore extends EventEmitter {
if (any_news) {
if (this.is_standalone) {
await this.load(last_update);
} else {
} else if (publish) {
// notify all the cluster (including myself) to reload
await server_rpc.client.redirector.publish_to_cluster({
method_api: 'server_inter_process_api',
Expand Down