Skip to content

Commit 0529c02

Browse files
author
Eryk Solecki
committed
Topic router stream types
1 parent 74df4c4 commit 0529c02

File tree

2 files changed

+27
-23
lines changed

2 files changed

+27
-23
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@
7777
"devDependencies": {
7878
"@istanbuljs/nyc-config-typescript": "^1.0.2",
7979
"@npmcli/run-script": "4.2.1",
80-
"@types/node": "15.12.5",
80+
"@types/node": "^18.15.11",
8181
"@typescript-eslint/eslint-plugin": "^5.41.0",
8282
"@typescript-eslint/parser": "^5.41.0",
8383
"build-if-changed": "^1.5.5",

packages/host/src/lib/serviceDiscovery/topicRouter.ts

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import { ObjLogger } from "@scramjet/obj-logger";
2-
import { APIExpose, OpResponse, ParsedMessage } from "@scramjet/types";
2+
import { APIExpose, OpResponse } from "@scramjet/types";
33
import { ReasonPhrases } from "http-status-codes";
44
import { ServiceDiscovery } from "./sd-adapter";
5-
import { IncomingMessage } from "http";
5+
import { IncomingMessage } from "http";
66
import { StreamOrigin } from "./streamHandler";
77
import { TopicState } from "./topicHandler";
88
import { WorkState } from "./streamHandler";
@@ -20,12 +20,12 @@ type TopicsPostRes = {
2020
}
2121
type TopicDeleteReq = {}
2222

23-
type TopicDownstreamReq = IncomingMessage & {
24-
headers: {
25-
"content-type": string,
23+
type TopicStreamReq = IncomingMessage & {
24+
headers?: {
25+
"content-type"?: string,
2626
cpm?: string
2727
},
28-
params: { topic: string }
28+
params?: { topic?: string }
2929
}
3030

3131
class TopicRouter {
@@ -36,7 +36,7 @@ class TopicRouter {
3636
this.serviceDiscovery = serviceDiscovery;
3737
apiServer.get(`${apiBaseUrl}/topics`, () => this.serviceDiscovery.getTopics());
3838
apiServer.op("post", `${apiBaseUrl}/topics`, this.topicsPost)
39-
apiServer.get(`${apiBaseUrl}/topic/:topic`, () => this.deleteTopic);
39+
apiServer.op("delete", `${apiBaseUrl}/topic/:topic`, () => this.deleteTopic);
4040
apiServer.downstream(`${apiBaseUrl}/topic/:topic`, this.topicDownstream, { checkContentType: false });
4141
apiServer.upstream(`${apiBaseUrl}/topic/:topic`, this.topicUpstream);
4242
}
@@ -63,16 +63,16 @@ class TopicRouter {
6363
}
6464
}
6565

66-
async topicDownstream(req: TopicDownstreamReq) {
66+
async topicDownstream(req: TopicStreamReq) {
6767
const { "content-type": contentType = "", cpm } = req.headers;
68-
const { topic: topicName } = req.params;
68+
const { topic: name = "" } = req.params || {};
6969
if (!isContentType(contentType)) return { opStatus: ReasonPhrases.BAD_REQUEST, error: "Unsupported content-type" }
70-
if (!TopicName.validate(topicName)) return { opStatus: ReasonPhrases.BAD_REQUEST, error: "Topic name incorrect format" }
70+
if (!TopicName.validate(name)) return { opStatus: ReasonPhrases.BAD_REQUEST, error: "Topic name incorrect format" }
7171

72-
const name = new TopicName(topicName);
72+
const topicName = new TopicName(name);
7373
this.logger.debug(`Incoming topic '${name}' request`);
7474

75-
let topic = this.serviceDiscovery.topicsController.get(name);
75+
let topic = this.serviceDiscovery.topicsController.get(topicName);
7676
if (topic) {
7777
const topicContentType = topic.options().contentType;
7878
if (contentType !== topicContentType) {
@@ -82,35 +82,39 @@ class TopicRouter {
8282
};
8383
}
8484
} else {
85-
topic = new Topic(name, contentType, { id: "TopicDownstream", type: "hub" });
85+
// FIXME: Single responsibility rule validation
86+
topic = new Topic(topicName, contentType, { id: "TopicDownstream", type: "hub" });
8687
}
8788
req.pipe(topic, { end: false });
8889

8990
if (!cpm) {
9091
await this.serviceDiscovery.update({
91-
provides: topic, contentType: contentType, topicName: topic
92+
provides: topic.id(), contentType: contentType, topicName: topic.id()
9293
});
9394
} else {
9495
this.logger.debug(`Incoming Downstream CPM request for topic '${topic}'`);
9596
}
96-
return {};
97+
return { opStatus: ReasonPhrases.OK };
9798
}
9899

99-
async topicUpstream(req: ParsedMessage) {
100+
async topicUpstream(req: TopicStreamReq) {
101+
const { "content-type": contentType = "", cpm } = req.headers;
102+
const { topic: name = "" } = req.params || {};
103+
if (!isContentType(contentType)) return { opStatus: ReasonPhrases.BAD_REQUEST, error: "Unsupported content-type" }
104+
if (!TopicName.validate(name)) return { opStatus: ReasonPhrases.BAD_REQUEST, error: "Topic name incorrect format" }
105+
106+
const topicName = new TopicName(name);
100107
//TODO: what should be the default content type and where to store this information?
101-
const contentType = req.headers["content-type"] || "application/x-ndjson";
102-
const { topic } = req.params || {};
103-
const { cpm } = req.headers;
104108

105109
if (!cpm) {
106110
await this.serviceDiscovery.update({
107-
requires: topic, contentType, topicName: topic
111+
requires: name, contentType, topicName: topicName.toString()
108112
});
109113
} else {
110-
this.logger.debug(`Incoming Upstream CPM request for topic '${topic}'`);
114+
this.logger.debug(`Incoming Upstream CPM request for topic '${name}'`);
111115
}
112116

113-
return this.serviceDiscovery.createTopicIfNotExist({ topic, contentType });
117+
return this.serviceDiscovery.createTopicIfNotExist({ topic: topicName, contentType });
114118
}
115119
}
116120

0 commit comments

Comments
 (0)