Skip to content

Commit 0c67054

Browse files
committed
lws_wsmsg: buflists for ws msg reassembly
New apis allow very cheap management of multiple incoming fragmented messages into a single queue that doesn't interrupt any ongoing message on the upstream queue.
1 parent c0268b2 commit 0c67054

File tree

4 files changed

+188
-3
lines changed

4 files changed

+188
-3
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,4 @@ doc
6464
/destdir/
6565
/bb1/
6666
/bb3/
67+
/bin/

include/libwebsockets/lws-misc.h

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,64 @@ LWS_VISIBLE LWS_EXTERN void *
198198
lws_buflist_get_frag_start_or_NULL(struct lws_buflist **head);
199199

200200

201+
struct lws_wsmsg_info;
202+
203+
typedef void (*lws_wsmsg_transfer_cb)(struct lws_wsmsg_info *info);
204+
205+
typedef struct lws_wsmsg_info {
206+
struct lws_buflist **head_upstream; /* the upstream buflist */
207+
struct lws_buflist **private_heads; /* the private reassembly heads */
208+
int private_source_idx; /* which index to use in private_heads */
209+
lws_wsmsg_transfer_cb optional_cb; /* optional transfer callback */
210+
void *opaque; /* optional opaque pointer */
211+
const uint8_t *buf; /* array to add */
212+
size_t len; /* length of bytes in array */
213+
unsigned int ss_flags; /* SS flags for SOM / EOM */
214+
} lws_wsmsg_info_t;
215+
216+
/**
217+
* lws_wsmsg_append() - append to buflist via private buflists
218+
*
219+
* \param info: the info struct, filled in before calling
220+
*
221+
* This api allows you to pass incoming fragments through a private buflist
222+
* until the message it contains is reassembled and complete. At that point, if
223+
* the upstream buflist is not blocked by being in the middle of a message itself,
224+
* the fragments are added to the end of an upstream buflist and the private
225+
* buflist left empty.
226+
*
227+
* As an optimization, if the upstream buflist is not blocked waiting for an EOM,
228+
* the private buflist is empty, and the incoming data represents a complete
229+
* message, then the incoming message is added directly to the upstream buflist.
230+
*
231+
* This method allows potentially many async connections with fragmented messages
232+
* to contribute to a single buflist containing only complete messages. It's
233+
* not possible to add a partial message (without the EOM flag) to the upstream
234+
* buflist; when all parts of it have arrived it will be added.
235+
*
236+
* An array of private buflists is supported so that messages from many different
237+
* connections can be reassembled before moving to the upstream buflist.
238+
*
239+
* Optionally, instead of transfer to another buflist when the message is complete,
240+
* if the optional_cb is provided, this is called instead with the info struct, so
241+
* arbitrary operations can be performed by the user.
242+
*
243+
* The info->opaque pointer is not used by lws, it's there to facilitate passing
244+
* related user parameters in the callback case.
245+
*/
246+
LWS_VISIBLE LWS_EXTERN int
247+
lws_wsmsg_append(lws_wsmsg_info_t *info);
248+
249+
/*
250+
* lws_wsmsg_destroy() - free all allocations in private buflists
251+
*
252+
* \param private_heads: the private buflists
253+
* \param count_private_heads: the number of private buflists
254+
*/
255+
LWS_VISIBLE LWS_EXTERN void
256+
lws_wsmsg_destroy(struct lws_buflist *private_heads[], size_t count_private_heads);
257+
258+
201259

202260
/*
203261
* Optional helpers for closely-managed stream flow control. These are useful

lib/core/buflist.c

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,3 +338,127 @@ lws_flow_req(lws_flow_t *flow)
338338
flow->state != LWSDLOFLOW_STATE_READ ? LWS_SRET_OK :
339339
LWS_SRET_WANT_INPUT;
340340
}
341+
342+
343+
static void
344+
lws_wsmsg_transfer(lws_wsmsg_info_t *info)
345+
{
346+
struct lws_buflist *bl = info->private_heads[info->private_source_idx],
347+
*ubl = *info->head_upstream;
348+
349+
/*
350+
* If we arrived at a complete message, and the upstream is
351+
* not blocked awaiting EOM, transfer the segments to the
352+
* upstream, emptying the private buflist
353+
*/
354+
355+
if (!bl) {
356+
lwsl_notice("%s: denied: no content to transfer\n", __func__);
357+
return;
358+
}
359+
360+
while (bl && bl->next)
361+
bl = bl->next;
362+
363+
if (bl->awaiting_eom) {
364+
lwsl_notice("%s: denied: head awaiting EOM\n", __func__);
365+
return;
366+
}
367+
368+
if (!*info->head_upstream) {
369+
/*
370+
* If the upstream is empty, create it by pointing
371+
* it to the whole private chain, taking ownership
372+
*/
373+
374+
*info->head_upstream = info->private_heads[info->private_source_idx];
375+
info->private_heads[info->private_source_idx] = NULL;
376+
377+
lwsl_notice("%s: transferred: head -> head_upstream\n", __func__);
378+
379+
return;
380+
}
381+
382+
383+
/* find the end of the existing upstream */
384+
385+
while (ubl && ubl->next)
386+
ubl = ubl->next;
387+
388+
if (ubl->awaiting_eom) {
389+
lwsl_notice("%s: denied: no content to transfer\n", __func__);
390+
return;
391+
}
392+
393+
/*
394+
* Add the private buflist on to the end of
395+
* the upstream buflist, taking ownership
396+
*/
397+
398+
ubl->next = info->private_heads[info->private_source_idx];
399+
info->private_heads[info->private_source_idx] = NULL; /* now it transferred upstream, private owns nothing */
400+
}
401+
402+
int
403+
lws_wsmsg_append(lws_wsmsg_info_t *info)
404+
{
405+
struct lws_buflist *bl;
406+
407+
/*
408+
* if there's nothing already stored, the new message is complete,
409+
* and the upstream is either empty, or is not blocked awaiting EOM,
410+
* then just apply the message directly to the upstream.
411+
*/
412+
413+
if (!info->private_heads[info->private_source_idx] &&
414+
(info->ss_flags == (LWSSS_FLAG_SOM | LWSSS_FLAG_EOM)) &&
415+
(!(*info->head_upstream) || !(*info->head_upstream)->awaiting_eom)) {
416+
417+
lwsl_notice("%s: directly applying upstream\n", __func__);
418+
419+
if (lws_buflist_append_segment(info->head_upstream, info->buf, info->len) < 0)
420+
return -1;
421+
422+
/*
423+
* Let's tag the tail buflist we just added,
424+
* with extra information useful for debugging
425+
*/
426+
427+
bl = *info->head_upstream;
428+
} else {
429+
/*
430+
* Otherwise, apply the message to the private buflist first
431+
*/
432+
433+
lwsl_notice("%s: applying via private buflist\n", __func__);
434+
435+
if (lws_buflist_append_segment(&info->private_heads[info->private_source_idx],
436+
info->buf, info->len) < 0)
437+
return -1;
438+
439+
bl = info->private_heads[info->private_source_idx];
440+
}
441+
442+
while (bl && bl->next)
443+
bl = bl->next;
444+
445+
if (!bl)
446+
return 0;
447+
448+
bl->awaiting_eom = !(info->ss_flags & LWSSS_FLAG_EOM);
449+
bl->src_channel = (unsigned char)info->private_source_idx;
450+
451+
lws_wsmsg_transfer(info);
452+
453+
return 0;
454+
}
455+
456+
void
457+
lws_wsmsg_destroy(struct lws_buflist *private_heads[], size_t count_private_heads)
458+
{
459+
size_t m = 0;
460+
461+
while (m < count_private_heads)
462+
lws_buflist_destroy_all_segments(&private_heads[m++]);
463+
}
464+

lib/core/private-lib-core.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -436,9 +436,11 @@ typedef struct lws_ss_sinks {
436436
#endif
437437

438438
typedef struct lws_buflist {
439-
struct lws_buflist *next;
440-
size_t len;
441-
size_t pos;
439+
struct lws_buflist *next;
440+
size_t len;
441+
size_t pos;
442+
unsigned char awaiting_eom;
443+
unsigned char src_channel;
442444
} lws_buflist_t;
443445

444446

0 commit comments

Comments
 (0)