Skip to content

Commit f516c81

Browse files
patuwwya-tylenda
authored andcommitted
Get HTTP platform connection status
1 parent 85afff9 commit f516c81

File tree

3 files changed

+28
-17
lines changed

3 files changed

+28
-17
lines changed

packages/host/src/lib/cpm-connector.ts

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import { TypedEmitter, generateSTHKey, normalizeUrl } from "@scramjet/utility";
2323
import { ObjLogger } from "@scramjet/obj-logger";
2424
import { ReasonPhrases } from "http-status-codes";
2525
import { DuplexStream } from "@scramjet/api-server";
26+
import { VerserClientConnection } from "@scramjet/verser/src/types";
2627

2728
type STHInformation = {
2829
id?: string;
@@ -223,7 +224,7 @@ export class CPMConnector extends TypedEmitter<Events> {
223224
* @returns {string} Host id.
224225
*/
225226
getId(): string | undefined {
226-
return this.info.id;
227+
return this.config.id;
227228
}
228229

229230
/**
@@ -249,6 +250,8 @@ export class CPMConnector extends TypedEmitter<Events> {
249250
};
250251
}
251252

253+
await this.setLoadCheckMessageSender();
254+
252255
StringStream.from(duplex.input as Readable)
253256
.JSONParse()
254257
.map(async (message: EncodedControlMessage) => {
@@ -292,9 +295,10 @@ export class CPMConnector extends TypedEmitter<Events> {
292295
[CPMMessageCode.NETWORK_INFO, await this.getNetworkInfo()]
293296
);
294297

295-
this.emit("connect");
296298

297-
await this.setLoadCheckMessageSender();
299+
300+
301+
this.emit("connect");
298302

299303
return new Promise((resolve, reject) => {
300304
duplex.on("end", () => {
@@ -332,11 +336,17 @@ export class CPMConnector extends TypedEmitter<Events> {
332336
this.verserClient.updateHeaders({ "x-sth-id": this.info.id });
333337
}
334338

335-
let connection;
339+
let connection: VerserClientConnection;
336340

337341
try {
338342
this.logger.trace("Connecting to Manager", this.cpmUrl, this.cpmId);
339343
connection = await this.verserClient.connect();
344+
345+
connection.socket
346+
.once("close", async () => {
347+
this.logger.warn("CLOSE STATUS", connection.res.statusCode)
348+
await this.handleConnectionClose(connection.res.statusCode || -1);
349+
});
340350
} catch (error: any) {
341351
this.logger.error("Can not connect to Manager", this.cpmUrl, this.cpmId, error.message);
342352

@@ -345,12 +355,7 @@ export class CPMConnector extends TypedEmitter<Events> {
345355
return;
346356
}
347357

348-
this.logger.info("Connected to Manager");
349-
350-
connection.socket
351-
.once("close", async () => {
352-
await this.handleConnectionClose();
353-
});
358+
this.logger.info("Connected...");
354359

355360
/**
356361
* @TODO: Distinguish existing `connect` request and started communication (Manager handled this host
@@ -361,7 +366,7 @@ export class CPMConnector extends TypedEmitter<Events> {
361366
this.connected = true;
362367
this.connectionAttempts = 0;
363368

364-
connection.req.once("error", async (error: any) => {
369+
connection.res.once("error", async (error: any) => {
365370
this.logger.error("Request error", error);
366371

367372
try {
@@ -386,7 +391,7 @@ export class CPMConnector extends TypedEmitter<Events> {
386391
* Handles connection close.
387392
* Tries to reconnect.
388393
*/
389-
async handleConnectionClose() {
394+
async handleConnectionClose(connectionStatusCode: number) {
390395
this.handleCommunicationRequestEnd();
391396

392397
this.connection?.removeAllListeners();
@@ -400,6 +405,10 @@ export class CPMConnector extends TypedEmitter<Events> {
400405
clearInterval(this.loadInterval);
401406
}
402407

408+
if (connectionStatusCode === 403) {
409+
this.isAbandoned = true;
410+
}
411+
403412
await this.reconnect();
404413
}
405414

@@ -426,9 +435,9 @@ export class CPMConnector extends TypedEmitter<Events> {
426435
this.isReconnecting = true;
427436

428437
await new Promise<void>((resolve, reject) => {
429-
setTimeout(async () => {
430-
this.logger.info("Connection lost, retrying", this.connectionAttempts);
438+
this.logger.info("Connection lost, retrying", this.connectionAttempts);
431439

440+
setTimeout(async () => {
432441
await this.connect().then(resolve, reject);
433442
}, this.config.reconnectionDelay);
434443
});

packages/verser/src/lib/verser-client.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const BPMux = require("bpmux").BPMux;
1313

1414
type Events = {
1515
error: (err: Error) => void;
16+
close: (reason: string) => void;
1617
};
1718

1819
/**
@@ -112,11 +113,12 @@ export class VerserClient extends TypedEmitter<Events> {
112113
reject(err);
113114
});
114115

115-
connectRequest.on("connect", (req, socket) => {
116+
connectRequest.once("connect", (res, socket, head) => {
117+
this.logger.info("HEAD", head.toString());
116118
this.socket = socket;
117119
this.mux();
118120

119-
resolve({ req, socket });
121+
resolve({ res, socket });
120122
});
121123

122124
connectRequest.flushHeaders();

packages/verser/src/types/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ export type VerserClientConnection = {
4040
/**
4141
* Connection request object.
4242
*/
43-
req: IncomingMessage;
43+
res: IncomingMessage;
4444
};
4545

4646
export type VerserRequestResult = { incomingMessage: IncomingMessage; clientRequest: ClientRequest }

0 commit comments

Comments
 (0)