|
| 1 | +## Introduction |
| 2 | + |
| 3 | +This spec enhances the standard MQTT 5.0 protocol with native support for queues and streams while maintaining compatibility with the MQTT 5.0 specification. |
| 4 | + |
| 5 | +The key idea is to use specially formatted topic names to indicate queue and stream subscriptions, and to leverage MQTT 5.0's User Properties for configuring queue/stream behavior. |
| 6 | + |
| 7 | +## Motivation |
| 8 | + |
| 9 | +While MQTT 5.0 offers robust publish/subscribe functionality, it lacks built-in support for queueing and streaming semantics. These are often implemented at the application level, leading to inconsistencies and increased complexity. This spec seeks to address this by providing standardized mechanisms for: |
| 10 | + |
| 11 | +- Queueing: |
| 12 | + |
| 13 | + Provides traditional first-in-first-out (FIFO) and store-and-forward queues. |
| 14 | + |
| 15 | +- Streaming: |
| 16 | + |
| 17 | + Implements data streaming functionality similar to Apache Kafka and AWS Kinesis. |
| 18 | + |
| 19 | +## MQTT Queue Subscription |
| 20 | + |
| 21 | +### Syntax |
| 22 | + |
| 23 | +Queue subscriptions are identified by a specific topic name format: |
| 24 | + |
| 25 | +`$queue/{queue_name}/{topic_filter}` |
| 26 | + |
| 27 | +Where: |
| 28 | + |
| 29 | +`$queue` is a reserved prefix indicating a queue subscription. |
| 30 | + |
| 31 | +`{queue_name}` is a unique identifier for the queue within the broker. |
| 32 | + |
| 33 | +`{topic_filter}` is a standard MQTT topic filter, specifying the messages that will be routed to the queue. |
| 34 | + |
| 35 | +### Queue Semantics |
| 36 | + |
| 37 | +- Messages matching the `{topic_filter}` are added to the queue. |
| 38 | + |
| 39 | +- Consumers subscribe to the queue (using the `$queue/...` syntax) to receive messages. |
| 40 | + |
| 41 | +- Messages are typically delivered to only one consumer (e.g., using a first-in, first-out delivery mechanism). This is similar to how shared subscriptions work, but the key difference is that with queues, the broker also manages message retention. |
| 42 | + |
| 43 | +- The broker is responsible for persisting messages in the queue, according to the configured retention policy. |
| 44 | + |
| 45 | +- If no consumers are online, messages are retained in the queue until a consumer becomes available or the retention policy expires. |
| 46 | + |
| 47 | +### Queue Properties |
| 48 | + |
| 49 | +Queue properties are specified using MQTT 5.0 User Properties within the SUBSCRIBE packet. The following properties are defined: |
| 50 | + |
| 51 | +- `size (integer)`: Maximum number of messages the queue can hold. If this limit is reached, the broker's behavior (e.g., rejecting new messages, discarding oldest messages) is implementation-specific, but the behavior SHOULD be documented. |
| 52 | + |
| 53 | +- `retention (string)`: Duration for which messages are retained in the queue. The format SHOULD follow the ISO 8601 duration format (e.g., "PT1H", "P1D", "PT2H30M"). If not provided, the broker SHOULD use a default retention policy. |
| 54 | + |
| 55 | +- `deliveryMode (string, optional)`: "exclusive" or "shared". |
| 56 | + |
| 57 | + "exclusive": Only one consumer can subscribe to the queue at a time. Subsequent subscriptions are rejected. This is the default. |
| 58 | + |
| 59 | + "shared": Multiple consumers can subscribe to the queue. Messages are distributed among consumers (similar to shared subscriptions). |
| 60 | + |
| 61 | +- description (string): Human-readable description of the queue. |
| 62 | + |
| 63 | +### Examples |
| 64 | + |
| 65 | +Assume messages are published to the topic `sensors/temperature/rooms/1`. |
| 66 | + |
| 67 | +- `$queue/sensors-data/sensors/#`: A queue named "sensors-data" that receives all sensor data. |
| 68 | + |
| 69 | + User Properties: size=1000, retention=PT1H, description=Temperature and humidity readings. |
| 70 | + |
| 71 | +- `$queue/rooms-data/sensors/+/rooms/+`: A queue named "rooms-data" that receives data for specific rooms. |
| 72 | + |
| 73 | + User Properties: size=500, retention=P1D, deliveryMode=shared. |
| 74 | + |
| 75 | +- `$queue/temperature-data/sensors/temperature/#`: A queue named "temperature-data" that receives only temperature data. |
| 76 | + |
| 77 | + User Properties: retention=PT12H. |
| 78 | + |
| 79 | +- `$queue/room1-temperature/sensors/temperature/rooms/1`: A queue named "room1-temperature" that receives temperature data only for room 1. |
| 80 | + |
| 81 | +## MQTT Stream Subscription |
| 82 | + |
| 83 | +### Syntax |
| 84 | + |
| 85 | +Stream subscriptions use the following topic name format: |
| 86 | + |
| 87 | +`$stream/{stream_name}/{topic_filter}` |
| 88 | + |
| 89 | +Where: |
| 90 | + |
| 91 | +- `$stream` is a reserved prefix indicating a stream subscription. |
| 92 | + |
| 93 | +- `{stream_name}` is a unique identifier for the stream. |
| 94 | + |
| 95 | +- `{topic_filter}` is a standard MQTT topic filter, specifying the messages that are included in the stream. |
| 96 | + |
| 97 | +### Stream Semantics |
| 98 | + |
| 99 | +- Messages matching the `{topic_filter}` are appended to the stream. |
| 100 | + |
| 101 | +- Consumers subscribe to the stream (using the `$stream/...` syntax) to receive messages. |
| 102 | + |
| 103 | +- Messages are delivered to consumers in the order they are added to the stream. |
| 104 | + |
| 105 | +- The broker persists messages in the stream, according to the configured retention policy. |
| 106 | + |
| 107 | +- Consumers can specify an offset to start consuming from a specific point in the stream, enabling replay functionality. |
| 108 | + |
| 109 | +- Multiple consumers can read from the same stream, each with its own offset. |
| 110 | + |
| 111 | +### Stream Properties |
| 112 | + |
| 113 | +Stream properties are specified using MQTT 5.0 User Properties in the SUBSCRIBE packet: |
| 114 | + |
| 115 | +- retention (string): Duration for which messages are retained in the stream. The format SHOULD follow the ISO 8601 duration format (e.g., "PT1H", "P1D", "PT2H30M"). If not provided, the broker SHOULD use a default retention policy. |
| 116 | + |
| 117 | +- partitions (integer, optional): Number of partitions in the stream. Partitions allow for parallel processing of the stream. If not specified, the broker SHOULD use a default number of partitions (e.g., 0). Partitioning details (how messages are assigned to partitions) are broker-specific. |
| 118 | + |
| 119 | +- offset (string, optional): The starting offset for the consumer. Valid values are: |
| 120 | + |
| 121 | + "earliest": Start consuming from the beginning of the stream. |
| 122 | + |
| 123 | + "latest": Start consuming from the most recently added message. |
| 124 | + |
| 125 | + A numeric value: The absolute offset. |
| 126 | + |
| 127 | + If not provided, the broker SHOULD use the "latest" offset. |
| 128 | + |
| 129 | +- description (string): Human-readable description of the stream. |
| 130 | + |
| 131 | +### Examples |
| 132 | + |
| 133 | +Assume messages are published to the topic sensors/temperature/rooms/1. |
| 134 | + |
| 135 | +- `$stream/sensors-data/sensors/#`: A stream named "sensors-data" containing all sensor data. |
| 136 | + |
| 137 | + User Properties: retention=P1D, partitions=3, description=Aggregated sensor readings. |
| 138 | + |
| 139 | +- `$stream/rooms-data/sensors/+/rooms/+`: A stream named "rooms-data" containing data for specific rooms. |
| 140 | + |
| 141 | + User Properties: retention=P7D, offset=earliest. |
| 142 | + |
| 143 | +- `$stream/temperature-data/sensors/temperature/#`: A stream named "temperature-data" containing only temperature data. |
| 144 | + |
| 145 | + User Properties: retention=PT12H, offset=1000. |
| 146 | + |
| 147 | +- `$stream/room1-temperature/sensors/temperature/rooms/1`: A stream named "room1-temperature" containing temperature data only for room 1. |
| 148 | + |
| 149 | +## MQTT Shared Subscription |
| 150 | + |
| 151 | +### Syntax |
| 152 | + |
| 153 | +Shared subscriptions follow the MQTT 5.0 specification: |
| 154 | + |
| 155 | +`$share/{group_name}/{topic_filter}` |
| 156 | + |
| 157 | +Where: |
| 158 | + |
| 159 | +- `$share` is the prefix indicating a shared subscription. |
| 160 | + |
| 161 | +- `{group_name}` is the name of the shared subscription group. |
| 162 | + |
| 163 | +- `{topic_filter}` is the MQTT topic filter. |
| 164 | + |
| 165 | +### Shared Subscription Semantics |
| 166 | + |
| 167 | +- MQTT 5.0 Shared subscriptions allow multiple clients to subscribe to the same topic filter, with the broker distributing messages among the subscribers in the group. |
| 168 | + |
| 169 | +- This provides load balancing and scalability for message consumption. |
| 170 | + |
| 171 | +### Examples |
| 172 | + |
| 173 | +For messages published to `sensors/temperature/rooms/1`: |
| 174 | + |
| 175 | +- `$share/sensors-group/sensors/#`: Messages are shared among clients in the "sensors-group". |
| 176 | + |
| 177 | +- `$share/temperature-group/+/temperature/#`: Messages are shared among clients in the "temperature-group" for all temperature topics. |
| 178 | + |
| 179 | +## Broker Behavior |
| 180 | + |
| 181 | +### Queue and Stream Declaration |
| 182 | + |
| 183 | +- The broker creates a queue or stream when it receives a SUBSCRIBE packet with a topic name matching the $queue/... or $stream/... syntax. |
| 184 | + |
| 185 | +- If a queue or stream with the given name already exists, the broker SHOULD validate that the properties in the new SUBSCRIBE packet are compatible with the existing configuration. If there are conflicts, the broker SHOULD return a SUBACK with an appropriate reason code. |
| 186 | + |
| 187 | +### Message Routing |
| 188 | + |
| 189 | +When a PUBLISH message is received, the broker routes the message to any matching queue and stream subscriptions, in addition to regular MQTT subscriptions. |
| 190 | + |
| 191 | +- For queue subscriptions, the broker adds the message to the queue according to the queue's size and retention properties. |
| 192 | + |
| 193 | +- For stream subscriptions, the broker adds the message to the stream, handling partitioning if configured. |
| 194 | + |
| 195 | +### Message Delivery |
| 196 | + |
| 197 | +- For queue subscriptions, the broker delivers messages from the queue to the consumers. The delivery mechanism (e.g., FIFO, round-robin) is broker-specific, but SHOULD be documented. |
| 198 | + |
| 199 | +- For stream subscriptions, the broker delivers messages from the stream to the consumers, starting from their specified offset. The broker MUST maintain the order of messages within a partition. |
| 200 | + |
| 201 | +### Error Handling |
| 202 | + |
| 203 | +- The broker SHOULD use MQTT 5.0 reason codes in SUBACK and PUBACK packets to indicate success or failure of queue/stream operations. |
| 204 | + |
| 205 | +- If a queue or stream cannot be created or a subscription fails, the broker SHOULD return a SUBACK with a specific reason code (e.g., "Queue Full", "Invalid Offset"). |
| 206 | + |
| 207 | +- If a PUBLISH message cannot be added to a queue (e.g., due to size limits), the broker's behavior is implementation-specific (e.g., sending a PUBACK with a failure reason code, discarding the message). This behavior SHOULD be documented. |
| 208 | + |
| 209 | +## Client Behavior |
| 210 | + |
| 211 | +### Subscribing to Queues and Streams |
| 212 | + |
| 213 | +- Clients subscribe to queues and streams using the `$queue/...` and `$stream/...` topic name formats in the SUBSCRIBE packet. |
| 214 | + |
| 215 | +- Clients use MQTT 5.0 User Properties in the SUBSCRIBE packet to specify queue/stream properties. |
| 216 | + |
| 217 | +### Consuming Messages |
| 218 | + |
| 219 | +Clients receive messages from queues and streams as regular PUBLISH messages. |
| 220 | + |
| 221 | +For streams, clients are responsible for managing their offset if they need to consume messages from a specific point or replay messages. The client can close the subscription and re-subscribe with a different offset. |
| 222 | + |
| 223 | +## Future Considerations |
| 224 | + |
| 225 | +- Queue/Stream Management API: Standardize a management API for creating, deleting, and configuring queues and streams, potentially using HTTP or MQTT control topics. |
| 226 | + |
| 227 | +- Message Batch: Add support for batching related messages within streams, ensuring they are delivered together to consumers. |
| 228 | + |
| 229 | +- Transaction Support: Investigate adding transactional capabilities for message publishing and consumption, ensuring atomicity. |
| 230 | + |
0 commit comments