Skip to content

Commit 7e446ae

Browse files
committed
[Refactor]: worker as a separate entry point for cluster primary
1 parent ec6f0ec commit 7e446ae

File tree

4 files changed

+146
-45
lines changed

4 files changed

+146
-45
lines changed

packages/fastboot-app-server/src/fastboot-app-server.js

Lines changed: 88 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
const assert = require('assert');
44
const cluster = require('cluster');
55
const os = require('os');
6-
7-
const Worker = require('./worker');
6+
const path = require('path');
7+
const serialize = require('./utils/serialization').serialize;
88

99
class FastBootAppServer {
1010
constructor(options) {
@@ -34,37 +34,17 @@ class FastBootAppServer {
3434

3535
this.propagateUI();
3636

37-
if (cluster.isWorker) {
38-
this.worker = new Worker({
39-
ui: this.ui,
40-
distPath: this.distPath || process.env.FASTBOOT_DIST_PATH,
41-
cache: this.cache,
42-
gzip: this.gzip,
43-
host: this.host,
44-
port: this.port,
45-
username: this.username,
46-
password: this.password,
47-
httpServer: this.httpServer,
48-
beforeMiddleware: this.beforeMiddleware,
49-
afterMiddleware: this.afterMiddleware,
50-
buildSandboxGlobals: this.buildSandboxGlobals,
51-
chunkedResponse: this.chunkedResponse,
52-
});
37+
this.workerCount = options.workerCount ||
38+
(process.env.NODE_ENV === 'test' ? 1 : null) ||
39+
os.cpus().length;
5340

54-
this.worker.start();
55-
} else {
56-
this.workerCount = options.workerCount ||
57-
(process.env.NODE_ENV === 'test' ? 1 : null) ||
58-
os.cpus().length;
41+
this._clusterInitialized = false;
5942

60-
assert(this.distPath || this.downloader, "FastBootAppServer must be provided with either a distPath or a downloader option.");
61-
assert(!(this.distPath && this.downloader), "FastBootAppServer must be provided with either a distPath or a downloader option, but not both.");
62-
}
43+
assert(this.distPath || this.downloader, "FastBootAppServer must be provided with either a distPath or a downloader option.");
44+
assert(!(this.distPath && this.downloader), "FastBootAppServer must be provided with either a distPath or a downloader option, but not both.");
6345
}
6446

6547
start() {
66-
if (cluster.isWorker) { return; }
67-
6848
return this.initializeApp()
6949
.then(() => this.subscribeToNotifier())
7050
.then(() => this.forkWorkers())
@@ -75,6 +55,9 @@ class FastBootAppServer {
7555
})
7656
.catch(err => {
7757
this.ui.writeLine(err.stack);
58+
})
59+
.finally(() => {
60+
this._clusterInitialized = true;
7861
});
7962
}
8063

@@ -138,6 +121,12 @@ class FastBootAppServer {
138121
}
139122
}
140123

124+
/**
125+
* send message to worker
126+
*
127+
* @method broadcast
128+
* @param {Object} message
129+
*/
141130
broadcast(message) {
142131
let workers = cluster.workers;
143132

@@ -153,6 +142,10 @@ class FastBootAppServer {
153142
forkWorkers() {
154143
let promises = [];
155144

145+
// https://nodejs.org/api/cluster.html#cluster_cluster_setupprimary_settings
146+
// Note: cluster.setupPrimary in v16.0.0
147+
cluster.setupMaster(this.clusterSetupPrimary());
148+
156149
for (let i = 0; i < this.workerCount; i++) {
157150
promises.push(this.forkWorker());
158151
}
@@ -161,31 +154,53 @@ class FastBootAppServer {
161154
}
162155

163156
forkWorker() {
164-
let env = this.buildWorkerEnv();
165-
let worker = cluster.fork(env);
157+
let worker = cluster.fork(this.buildWorkerEnv());
166158

167-
this.ui.writeLine(`forked worker ${worker.process.pid}`);
159+
this.ui.writeLine(`Worker ${worker.process.pid} forked`);
160+
161+
let firstBootResolve;
162+
let firstBootReject;
163+
const firstBootPromise = new Promise((resolve, reject) => {
164+
firstBootResolve = resolve;
165+
firstBootReject = reject;
166+
});
167+
168+
if (this._clusterInitialized) {
169+
firstBootResolve();
170+
}
171+
172+
worker.on('online', () => {
173+
this.ui.writeLine(`Worker ${worker.process.pid} online.`);
174+
});
175+
176+
worker.on('message', (message) => {
177+
if (message.event === 'http-online') {
178+
this.ui.writeLine(`Worker ${worker.process.pid} healthy.`);
179+
firstBootResolve();
180+
}
181+
});
168182

169183
worker.on('exit', (code, signal) => {
184+
let error;
170185
if (signal) {
171-
this.ui.writeLine(`worker was killed by signal: ${signal}`);
186+
error = new Error(`Worker ${worker.process.pid} killed by signal: ${signal}`);
172187
} else if (code !== 0) {
173-
this.ui.writeLine(`worker exited with error code: ${code}`);
188+
error = new Error(`Worker ${worker.process.pid} exited with error code: ${code}`);
174189
} else {
175-
this.ui.writeLine(`worker exited`);
190+
error = new Error(`Worker ${worker.process.pid} exited gracefully. It should only exit when told to do so.`);
176191
}
177192

178-
this.forkWorker();
193+
if (!this._clusterInitialized) {
194+
// Do not respawn for a failed first launch.
195+
firstBootReject(error);
196+
} else {
197+
// Do respawn if you've ever successfully been initialized.
198+
this.ui.writeLine(error);
199+
this.forkWorker();
200+
}
179201
});
180202

181-
return new Promise(resolve => {
182-
this.ui.writeLine('worker online');
183-
worker.on('message', message => {
184-
if (message.event === 'http-online') {
185-
resolve();
186-
}
187-
});
188-
});
203+
return firstBootPromise;
189204
}
190205

191206
buildWorkerEnv() {
@@ -198,6 +213,36 @@ class FastBootAppServer {
198213
return env;
199214
}
200215

216+
/**
217+
* Extension point to allow configuring the default fork configuration.
218+
*
219+
* @method clusterSetupPrimary
220+
* @returns {Object}
221+
* @public
222+
*/
223+
clusterSetupPrimary() {
224+
const workerOptions = {
225+
ui: this.ui,
226+
distPath: this.distPath || process.env.FASTBOOT_DIST_PATH,
227+
cache: this.cache,
228+
gzip: this.gzip,
229+
host: this.host,
230+
port: this.port,
231+
username: this.username,
232+
password: this.password,
233+
httpServer: this.httpServer,
234+
beforeMiddleware: this.beforeMiddleware,
235+
afterMiddleware: this.afterMiddleware,
236+
buildSandboxGlobals: this.buildSandboxGlobals,
237+
chunkedResponse: this.chunkedResponse,
238+
};
239+
240+
const workerPath = this.workerPath || path.join(__dirname, './worker-start.js');
241+
return {
242+
exec: workerPath,
243+
args: [serialize(workerOptions)]
244+
};
245+
}
201246
}
202247

203248
module.exports = FastBootAppServer;
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
'use strict';
2+
3+
/**
4+
* The purpose of this module is to provide a serialization layer for passing arguments to a new Worker instance
5+
* This allows us to completely separate the cluster worker from the cluster primary
6+
*/
7+
8+
function circularReplacer() {
9+
const seen = new WeakSet();
10+
11+
return (key, value) => {
12+
if (typeof value === 'object' && value !== null) {
13+
if (seen.has(value)) {
14+
return;
15+
}
16+
17+
seen.add(value);
18+
}
19+
20+
return value;
21+
}
22+
}
23+
24+
function serialize(object) {
25+
let data = encodeURIComponent(JSON.stringify(object, circularReplacer()));
26+
let buff = new Buffer.from(data);
27+
return buff.toString('base64');
28+
}
29+
30+
function deserialize(string) {
31+
let buff = new Buffer.from(string, 'base64');
32+
return JSON.parse(decodeURIComponent(buff.toString('ascii')));
33+
}
34+
35+
module.exports = {
36+
serialize,
37+
deserialize
38+
};
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
'use strict';
2+
3+
const ClusterWorker = require('./worker');
4+
const worker = new ClusterWorker();
5+
6+
worker.start();

packages/fastboot-app-server/src/worker.js

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,19 @@
33
const FastBoot = require('fastboot');
44
const fastbootMiddleware = require('fastboot-express-middleware');
55
const ExpressHTTPServer = require('./express-http-server');
6+
const deserialize = require('./utils/serialization').deserialize;
7+
const UI = require('./ui');
68

79
class Worker {
8-
constructor(options) {
10+
constructor(argOptions) {
11+
this.forkOptions = deserialize(process.argv[2])
12+
// Define the enumerated options set.
13+
// Combination of any launch options and any directly passed options.
14+
const options = Object.assign({}, this.forkOptions, argOptions);
15+
16+
this.ui = new UI();
917
this.distPath = options.distPath;
1018
this.httpServer = options.httpServer;
11-
this.ui = options.ui;
1219
this.cache = options.cache;
1320
this.gzip = options.gzip;
1421
this.host = options.host;
@@ -57,6 +64,11 @@ class Worker {
5764
process.on('message', message => this.handleMessage(message));
5865
}
5966

67+
/**
68+
* received messages from primary
69+
* @method handleMessage
70+
* @param {Object} message
71+
*/
6072
handleMessage(message) {
6173
switch (message.event) {
6274
case 'reload':

0 commit comments

Comments
 (0)