Skip to content

Commit fd729a3

Browse files
authored
Merge pull request #6 from wmo-im/mqp-client-side-filters
add client side filtering recipe
2 parents 322271c + 3905e7f commit fd729a3

File tree

1 file changed

+182
-0
lines changed

1 file changed

+182
-0
lines changed

cookbook/sections/data-publishers.adoc

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,5 +119,187 @@ image::images/data-publishers-wcmp2-validate-response.png[WIS2 GDC online valida
119119

120120
A response will be provided with validation results.
121121

122+
=== Advertise client side filters for data subscriptions in WCMP2 and WNM
122123

124+
A key concept of a WCMP2 record is "actionable links"; this means being able to access a dataset or data granule
125+
without any further interactions. For real-time data, a WCMP2 record provides linkages to the WIS2 Global Broker
126+
via the MQTT protocol. At its core, MQTT has two key components:
123127

128+
- topic: the topic to subscribe to
129+
- message payload: the message provided as part of a notification to a given topic
130+
131+
WIS2 defines the WIS2 Topic Hierarchy (WTH) and WIS2 Notification Message (WNM) standards which provide a standards-based
132+
GeoJSON payload/message.
133+
134+
A typical MQTT link in a WCMP2 document is defined as follows:
135+
136+
.Typical WCMP2 MQTT link
137+
[source,json]
138+
----
139+
{
140+
"rel" : "items",
141+
"type" : "application/geo+json",
142+
"title": "WIS2 notification service",
143+
"href" : "mqtts://example.org",
144+
"channel": "cache/a/wis2/ca-eccc-msc/data/core/weather/surface-based-observations/synop"
145+
}
146+
----
147+
148+
Given WCMP2, WTH and WNM, a user can subscribe to topics related to data of interest for download and access.
149+
150+
In some cases, a dataset may be organized in a manner which requires additional further "filtering" such that a
151+
data consumer is only interested in a certain subset of the data granules being advertised by a given WNM. Some examples include (but are not limited to), where a data consumer may be only be interested in:
152+
153+
- surface weather observations from a certain station, or
154+
- numerical weather prediction forecast data for a certain timestep or weather parameter
155+
156+
To implement this behaviour, add additional properties to both WCMP2 and WNM as follows:
157+
158+
==== Example: Surface weather observations
159+
160+
.Surface weather observations: WCMP2 MQTT link with additional properties
161+
[source,json]
162+
----
163+
{
164+
"rel" : "items",
165+
"type" : "application/geo+json",
166+
"title": "Real-time notifications",
167+
"href" : "mqtts://globalbroker.meteo.fr:8883",
168+
"channel": "cache/a/wis2/ca-eccc-msc/data/core/weather/surface-based-observations/synop",
169+
"properties": {
170+
"wigos_station_identifier": {
171+
"type": "string",
172+
"title": "WIGOS station identifier"
173+
}
174+
}
175+
}
176+
----
177+
178+
.Surface weather observations: WNM additional properties
179+
[source,json]
180+
----
181+
{
182+
"properties": {
183+
"wigos_station_identifier": "0-20000-0-71628"
184+
}
185+
----
186+
187+
When implemented by a data producer, a data consumer can:
188+
189+
- subscribe to real-time notifications to the given topic
190+
- perform client side filtering by against all incoming WNMs with `properties.wigos_station_identifier = "0-20000-0-71628"`
191+
192+
==== Example: Numerical weather prediction based forecast
193+
194+
.Numerical weather prediction: WCMP2 MQTT link with additional properties
195+
[source,json]
196+
----
197+
{
198+
"rel" : "items",
199+
"type" : "application/geo+json",
200+
"title": "Real-time notifications",
201+
"href" : "mqtts://globalbroker.meteo.fr:8883",
202+
"channel": "origin/a/wis2/ca-eccc-msc/data/core/weather/prediction/forecast/medium-range/deterministic/global",
203+
"properties": {
204+
"model_run": {
205+
"type": "string",
206+
"title": "Model run",
207+
"enum": [
208+
"00",
209+
"12"
210+
],
211+
"example": "00"
212+
},
213+
"forecast_hour": {
214+
"type": "string",
215+
"title": "Forecast hour",
216+
"example": "004"
217+
}
218+
}
219+
}
220+
----
221+
222+
.Numerical weather prediction: WNM additional properties
223+
[source,json]
224+
----
225+
{
226+
"properties": {
227+
"model_run": "00",
228+
"forecast_hour": "004"
229+
}
230+
----
231+
232+
A data producer would extend WCMP2 and WNM as follows:
233+
234+
- WCMP2: add a link `properties` object for MQTT links, where each key of the link `properties` object is a https://json-schema.org/understanding-json-schema/reference/object#properties[JSON Schema property definition].
235+
- WNM: add additional properties (key: value pairs) in the `properties` object as desired
236+
237+
When implemented by a data producer, a data consumer can:
238+
239+
- subscribe to real-time notifications to the given topic
240+
- perform client side filtering against all incoming WNMs with `properties.model_run = "00" and properties.forecast_hour = "004"`
241+
242+
A sample Python script can be found below. The script connects to the Météo-France Global Broker, subscribed to weather notifications
243+
from Environment and Climate Change Canada, Meteorological Service of Canada. The script then performs client side filtering by
244+
evaluating (for each WNM) the `properties.wigos_station_identifier` value to match a particular station (`0-20000-0-71628`).
245+
246+
.Sample Python script to perform client side filtering
247+
[source,python]
248+
----
249+
import json
250+
from paho.mqtt import client as mqtt_client
251+
252+
broker = 'globalbroker.meteo.fr'
253+
port = 8883
254+
username = 'everyone'
255+
password = 'everyone'
256+
topic = 'cache/a/wis2/ca-eccc-msc/data/core/weather/surface-based-observations/synop'
257+
258+
wsi_to_filter = '0-20000-0-71628'
259+
260+
261+
def connect_mqtt() -> mqtt_client:
262+
def on_connect(client, userdata, flags, reason_code, properties):
263+
if reason_code == 0:
264+
print(f'Connected to {broker}')
265+
else:
266+
print(f'Failed to connect: {reason_code}')
267+
268+
def on_log(client, userdata, level, message):
269+
print("LOG:", message)
270+
271+
client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2,
272+
client_id='s123')
273+
client.username_pw_set(username, password)
274+
client.on_connect = on_connect
275+
client.on_log = on_log
276+
client.tls_set(tls_version=2)
277+
client.connect(broker, port)
278+
279+
return client
280+
281+
282+
def subscribe(client: mqtt_client):
283+
def on_message(client, userdata, message):
284+
message_dict = json.loads(message.payload.decode())
285+
286+
print('Performing client side filtering')
287+
wsi = message_dict['properties'].get('wigos_station_identifier')
288+
289+
if wsi != wsi_to_filter:
290+
print(f'Topic: {message.topic}')
291+
print(f'Payload: {message.payload.decode()}')
292+
293+
client.subscribe(topic)
294+
client.on_message = on_message
295+
296+
297+
def run():
298+
client = connect_mqtt()
299+
subscribe(client)
300+
client.loop_forever()
301+
302+
303+
if __name__ == '__main__':
304+
run()
305+
----

0 commit comments

Comments
 (0)