Skip to content
This repository was archived by the owner on Aug 5, 2024. It is now read-only.

Commit c5b7bfd

Browse files
committed
surface mqtt info in openapi
1 parent bc4c235 commit c5b7bfd

File tree

6 files changed

+64
-24
lines changed

6 files changed

+64
-24
lines changed

local.env

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ DEVS_STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=http;AccountName=devsto
66
DEVS_PASSWORDS_SECRET="DEVS_LOCAL_USER_PASSWORD"
77
DEVS_LOCAL_USER_PASSWORD="devstoreaccount1:Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
88
DEVS_CONNECTION_STRING="AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;ApiRoot=$WEBSITE_PROTOCOL://$WEBSITE_HOSTNAME"
9-
DEVS_MQTT_CONNETION_STRING="mqtt://public.mqtthq.com:1883"
9+
DEVS_MQTT_SERVER="mqtt://public.mqtthq.com:1883"

src/apidevices.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import {
3131
stringifyMeta,
3232
updateDevice,
3333
} from "./storage"
34+
import { mqttTopicPrefix } from "./mqtt"
3435

3536
const CONNECTED_TIMEOUT = 2 * 60 * 1000
3637
export const MAX_WSSK_SIZE = 230 // for to-device JSON and binary messages
@@ -45,6 +46,7 @@ function externalDevice(info: DeviceInfo) {
4546
conn,
4647
scriptId: info.scriptId,
4748
scriptVersion: info.scriptVersion,
49+
mqttTopic: mqttTopicPrefix(info),
4850
deployedHash: info.deployedHash,
4951
lastAct: info.lastAct ? new Date(info.lastAct).toISOString() : "",
5052
meta: tryParseJSON(info.metaJSON),

src/apigateway.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import { FastifyInstance } from "fastify"
2+
import { mqttServer } from "./mqtt"
3+
4+
export async function initGatewayRoutes(server: FastifyInstance) {
5+
server.get("/info", async req => {
6+
return {
7+
mqttServer: mqttServer(),
8+
}
9+
})
10+
}

src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import { setup as storageSetup, defaultPartition } from "./storage"
1818
import { setup as envSetup } from "./env"
1919
import { setup as storageQueueSetup } from "./azure/storagequeue"
2020
import { setup as mqttSetup } from "./mqtt"
21+
import { initGatewayRoutes } from "./apigateway"
2122

2223
async function initAuth(server: FastifyInstance) {
2324
console.log(`starting gateway...`)
@@ -162,6 +163,7 @@ async function main() {
162163

163164
await initHubRoutes(server)
164165
await initScriptRoutes(server)
166+
await initGatewayRoutes(server)
165167

166168
server.all("*", async req => {
167169
throwStatus(404, "no such API")

src/mqtt.ts

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,36 @@
1-
import { connect } from "mqtt" // import connect from mqtt
1+
import { MqttClient, connect } from "mqtt" // import connect from mqtt
22
import { serverTelemetry } from "./azure/appinsights"
33
import { registerMessageSink } from "./messages"
44
import { getSecret } from "./secrets"
55
import { defaultPartition, getDevice } from "./storage"
66
import { sendJSON } from "./apidevices"
77
import { DeviceId } from "./schema"
88

9+
let client: MqttClient
10+
let serverUrl: string
11+
12+
export function mqttTopicPrefix(deviceid: DeviceId) {
13+
return client ? `devs/${deviceid.rowKey}` : undefined
14+
}
15+
16+
export function mqttServer() {
17+
return serverUrl
18+
}
19+
920
export async function setup() {
10-
const connStr = await getSecret(
21+
serverUrl = await getSecret(
1122
"mqttConnectionString",
12-
"DEVS_MQTT_CONNECTION_STRING_SECRET",
13-
"DEVS_MQTT_CONNETION_STRING"
23+
"DEVS_MQTT_SERVER_SECRET",
24+
"DEVS_MQTT_SERVER"
1425
)
15-
if (!connStr) {
26+
if (!serverUrl) {
1627
console.log("no MQTT connection string secret, skipping registration")
1728
return
1829
}
1930

20-
console.log("registering MQTT broker")
31+
console.log(`MQTT server: ${serverUrl}`)
2132
const telemetry = serverTelemetry()
22-
const client = connect(connStr) // create a client
33+
client = connect(serverUrl) // create a client
2334

2435
// device to mqtt
2536
registerMessageSink({
@@ -28,7 +39,7 @@ export async function setup() {
2839
ingest: async (topic, message, device) => {
2940
if (!client.connected) return
3041

31-
const mqTopic = `devs/from/${device.dev.rowKey}/${topic}`
42+
const mqTopic = `${mqttTopicPrefix(device.dev)}/from/${topic}`
3243
client.publish(
3344
mqTopic,
3445
Buffer.from(JSON.stringify(message), "utf-8"),
@@ -46,18 +57,18 @@ export async function setup() {
4657
// dispatch messages to devices
4758
client.on("connect", () => {
4859
// devs/deviceid/json/topic
49-
client.subscribe("devs/to/+/#", err => {
60+
client.subscribe("devs/+/to/#", err => {
5061
if (err) {
5162
console.error(err)
5263
telemetry?.trackException({ exception: err })
5364
} else {
54-
console.log(`mqtt: subscribe to 'devs/to/+/#'`)
65+
console.log(`mqtt: subscribe to 'devs/+/to/#'`)
5566
}
5667
})
5768
})
5869
client.on("message", async function (topic, message) {
5970
const { deviceId, msgTopic } =
60-
/^devs\/to\/(?<deviceId>.+?)\/(?<msgTopic>.+)$/.exec(topic)
71+
/^devs\/(?<deviceId>.+?)\/to\/(?<msgTopic>.+)$/.exec(topic)
6172
?.groups || {}
6273
if (!deviceId || !msgTopic) return // unknown topic
6374
const did: DeviceId = {

src/swagger/openapi.ts

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,16 @@ export function generateOpenApiSpec() {
5555

5656
const methodParam = sString("Method name", "Must match the value on device")
5757

58+
const gatewaySchema = define(
59+
"GatewaySchema",
60+
sObj({
61+
mqttServer: sString(
62+
"MQTT server",
63+
"URL(:PORT) of the MQTT server, if any"
64+
),
65+
})
66+
)
67+
5868
const eventSchema = define(
5969
"EventSchema",
6070
sObj({
@@ -91,25 +101,17 @@ export function generateOpenApiSpec() {
91101
"Last Connected",
92102
"When was device last connected"
93103
),
104+
mqttTopic: sString(
105+
"MQTT Topic prefix",
106+
"Topic prefix to filter MQTT messages from/to this device"
107+
),
94108
...allNonOptional(devProps),
95109
deployedHash: sString(
96110
"SHA256 Hash of Currently Deployed Script",
97111
"This property persists when the device is disconnected, so may not be up to date"
98112
),
99113
})
100114
)
101-
const serviceSchema = sString(
102-
"Service Name",
103-
"Type of sensor (eg. 'temperature')"
104-
)
105-
const sensorIdSchema = sString(
106-
"Sensor ID",
107-
"Unique identifier of the sensor"
108-
)
109-
const serviceIdxSchema = sNumber(
110-
"Service Index",
111-
"Typically 0; used when there is more than one instance of service in the sensor"
112-
)
113115

114116
post(
115117
"/hooks",
@@ -130,6 +132,19 @@ export function generateOpenApiSpec() {
130132
)
131133
)
132134

135+
get(
136+
"/info",
137+
action(
138+
"Information",
139+
"Information about the Gateway",
140+
"Various information about the gateway server",
141+
[],
142+
{
143+
"200": response("Information", gatewaySchema),
144+
}
145+
)
146+
)
147+
133148
get(
134149
"/devices",
135150
action(

0 commit comments

Comments
 (0)