Skip to content
This repository was archived by the owner on Aug 5, 2024. It is now read-only.

Commit d5706f0

Browse files
committed
mqtt support
1 parent 55855cd commit d5706f0

File tree

10 files changed

+220
-18
lines changed

10 files changed

+220
-18
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
"fastify": "^4.9.2",
4242
"http-errors": "^2.0.0",
4343
"json5": "^2.2.3",
44+
"mqtt": "^4.3.7",
4445
"node-fetch": "^2.6.7",
4546
"openapi-types": "^12.1.0",
4647
"pino-pretty": "^9.1.1",

src/apidevices.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ export async function sendJSON(id: DeviceId, topic: string, json: any) {
114114
await pubToDevice(id, msg)
115115
}
116116

117-
async function sendBinary(id: DeviceId, topic: string, buf: Buffer) {
117+
export async function sendBinary(id: DeviceId, topic: string, buf: Buffer) {
118118
if (buf.length > MAX_WSSK_SIZE)
119119
throwStatus(
120120
413,

src/azure/appinsights.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ export async function setup() {
4141
registerMessageSink({
4242
name: "app insights events",
4343
topicName: "tev",
44-
ingest: async (message, device) => {
44+
ingest: async (topic, message, device) => {
4545
const {
4646
n: name,
4747
p: properties,
@@ -58,7 +58,7 @@ export async function setup() {
5858
registerMessageSink({
5959
name: "app insights metrics",
6060
topicName: "tme",
61-
ingest: async (message, device) => {
61+
ingest: async (topic, message, device) => {
6262
const {
6363
n: name,
6464
v: value,

src/azure/eventhub.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@ export async function setup() {
2121
registerMessageSink({
2222
name: "event hub",
2323
topicName: "*",
24-
ingest: async (message, device) => {
24+
ingest: async (topic, message, device) => {
2525
const batch = await producer.createBatch()
2626
const correlationId = device.sessionId
2727
const body = {
2828
context: {
2929
deviceId: device.id,
3030
deviceName: device.dev.name,
31+
topic,
3132
},
3233
data: message,
3334
}

src/azure/storagequeue.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,20 @@ export async function setup() {
1414
return
1515
}
1616

17+
console.log("registering Azure Storage Queue message sink")
1718
const queueServiceClient = QueueServiceClient.fromConnectionString(connStr)
1819
await queueServiceClient.createQueue("messages")
1920
const queueClient = queueServiceClient.getQueueClient("messages")
2021
registerMessageSink({
2122
name: "storage queue",
2223
topicName: "*",
23-
ingest: async (message, device) => {
24+
ingest: async (topic, message, device) => {
2425
const correlationId = device.sessionId
2526
const body = {
2627
context: {
2728
deviceId: device.id,
2829
deviceName: device.dev.name,
30+
topic,
2931
correlationId,
3032
},
3133
data: message,

src/env.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ export async function setup() {
66
registerMessageSink({
77
name: "environment variables",
88
topicName: "env",
9-
ingest: async (message: EnvironmentFromDevice, device) => {
9+
ingest: async (topic: string, message: EnvironmentFromDevice, device) => {
1010
const { dev, id } = device
1111
const { envJSON } = dev
1212
const { fields } = message

src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import { setup as eventHubSetup } from "./azure/eventhub"
1717
import { setup as storageSetup, defaultPartition } from "./storage"
1818
import { setup as envSetup } from "./env"
1919
import { setup as storageQueueSetup } from "./azure/storagequeue"
20+
import { setup as mqttSetup } from "./mqtt"
2021

2122
async function initAuth(server: FastifyInstance) {
2223
console.log(`starting gateway...`)
@@ -126,6 +127,7 @@ async function main() {
126127
await storageSetup()
127128
await eventHubSetup()
128129
await storageQueueSetup()
130+
await mqttSetup()
129131
await envSetup()
130132
await initAuth(server)
131133
await wsskInit(server)

src/messages.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@ export interface MessageSink {
1616
* Ingest incoming message
1717
* @param message
1818
*/
19-
ingest(message: Message, device: ConnectedDevice): Promise<void>
19+
ingest(
20+
topic: string,
21+
message: Message,
22+
device: ConnectedDevice
23+
): Promise<void>
2024
}
2125

2226
const messageSinks: MessageSink[] = []
@@ -36,19 +40,19 @@ export function registerMessageSink(sink: MessageSink) {
3640
* @returns
3741
*/
3842
export async function ingestMessage(
39-
topicName: string,
43+
topic: string,
4044
message: Message,
4145
device: ConnectedDevice
4246
) {
4347
if (!message) return
4448

4549
// collect sinks interrested
46-
let sinks = messageSinks.filter(({ topicName: type }) => type === topicName)
50+
let sinks = messageSinks.filter(({ topicName: type }) => type === topic)
4751
if (!sinks.length)
4852
sinks = messageSinks.filter(({ topicName }) => topicName === "*")
4953

5054
// dispatch
51-
await Promise.all(sinks.map(sink => sink.ingest(message, device)))
55+
await Promise.all(sinks.map(sink => sink.ingest(topic, message, device)))
5256
}
5357

5458
export type LogSink = (logs: string[], device: ConnectedDevice) => Promise<void>

src/mqtt.ts

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import { connect } from "mqtt" // import connect from mqtt
2+
import { serverTelemetry } from "./azure/appinsights"
3+
import { registerMessageSink } from "./messages"
4+
import { createSecretClient } from "./secrets"
5+
import { pubToDevice } from "./devutil"
6+
import { defaultPartition, getDevice } from "./storage"
7+
import { sendBinary, sendJSON } from "./apidevices"
8+
import { DeviceId } from "./schema"
9+
10+
export async function setup() {
11+
const secrets = createSecretClient()
12+
const connectionStringSecretName =
13+
process.env.DEVS_MQTT_CONNECTION_STRING_SECRET || "mqttConnectionString"
14+
const connStrSecret = await secrets.getSecret(connectionStringSecretName)
15+
const connStr = connStrSecret.value
16+
if (!connStr) {
17+
console.log("no MQTT connection string secret, skipping registration")
18+
return
19+
}
20+
21+
console.log("registering MQTT broker")
22+
const telemetry = serverTelemetry()
23+
const client = connect(connStr) // create a client
24+
25+
// device to mqtt
26+
registerMessageSink({
27+
name: "mqtt",
28+
topicName: "*",
29+
ingest: async (topic, message, device) => {
30+
if (!client.connected) return
31+
32+
const mqTopic = `devs/from/${device.id}/${topic}`
33+
client.publish(
34+
mqTopic,
35+
Buffer.from(JSON.stringify(message), "utf-8"),
36+
{ qos: 0 },
37+
err => {
38+
if (err) {
39+
console.error(err)
40+
telemetry?.trackException({ exception: err })
41+
}
42+
}
43+
)
44+
},
45+
})
46+
47+
// dispatch messages to devices
48+
client.on("connect", () => {
49+
// devs/deviceid/json/topic
50+
client.subscribe("devs/to/+/#", err => {
51+
if (err) {
52+
console.error(err)
53+
telemetry?.trackException({ exception: err })
54+
}
55+
})
56+
})
57+
client.on("message", async function (topic, message) {
58+
const { deviceId, msgTopic } =
59+
/^devs\/(?<deviceId>.+?)\/(?<msgTopic>.+)$/.exec(topic)?.groups ||
60+
{}
61+
if (!deviceId || !msgTopic) return // unknown topic
62+
const did: DeviceId = {
63+
partitionKey: defaultPartition,
64+
rowKey: deviceId,
65+
}
66+
const dev = await getDevice(did)
67+
if (!dev) return // unknown device
68+
await sendJSON(did, msgTopic, JSON.parse(message.toString("utf-8")))
69+
})
70+
}

0 commit comments

Comments
 (0)