Skip to content

Commit 06f5659

Browse files
committed
WSocket - TCP window control
In case if there are pending messages in the Q we clamp window size gradually to push sending party back. The larger the Q grows then more window is closed. This works pretty well for messages sized about or more than TCP windows size (5,7k default for Arduino). It could prevent Q overflow and sieze incoming data flow without blocking the entie network stack. Mostly usefull with websocket worker where AsyncTCP thread is not blocked by user callbacks.
1 parent 3c1fc59 commit 06f5659

File tree

2 files changed

+74
-15
lines changed

2 files changed

+74
-15
lines changed

src/AsyncWSocket.cpp

Lines changed: 70 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -166,11 +166,10 @@ WSocketClient::WSocketClient(uint32_t id, AsyncWebServerRequest *request, WSocke
166166
_client->setNoDelay(true);
167167
// set AsyncTCP callbacks
168168
_client->onAck( [](void *r, AsyncClient *c, size_t len, uint32_t rtt) { (void)c; reinterpret_cast<WSocketClient*>(r)->_clientSend(len); }, this );
169-
//_client->onAck( [](void *r, AsyncClient *c, size_t len, uint32_t rtt) { (void)c; reinterpret_cast<WSocketClient*>(r)->_onAck(len, rtt); }, this );
170169
_client->onDisconnect( [](void *r, AsyncClient *c) { reinterpret_cast<WSocketClient*>(r)->_onDisconnect(c); }, this );
171170
_client->onTimeout( [](void *r, AsyncClient *c, uint32_t time) { (void)c; reinterpret_cast<WSocketClient*>(r)->_onTimeout(time); }, this );
172171
_client->onData( [](void *r, AsyncClient *c, void *buf, size_t len) { (void)c; reinterpret_cast<WSocketClient*>(r)->_onData(buf, len); }, this );
173-
_client->onPoll( [](void *r, AsyncClient *c) { (void)c; reinterpret_cast<WSocketClient*>(r)->_keepalive(); reinterpret_cast<WSocketClient*>(r)->_clientSend(); }, this );
172+
_client->onPoll( [](void *r, AsyncClient *c) { reinterpret_cast<WSocketClient*>(r)->_onPoll(c); }, this );
174173
_client->onError( [](void *r, AsyncClient *c, int8_t error) { (void)c; log_e("err:%d", error); }, this );
175174
// bind URL hash
176175
setURLHash(request->url().c_str());
@@ -212,9 +211,9 @@ void WSocketClient::_clientSend(size_t acked_bytes){
212211
// Let's ignore polled acks and acks in case when we have more in-flight data then the available socket buff space.
213212
// That way we could balance on having half the buffer in-flight while another half is filling up and minimizing events in asynctcp's Q
214213
if (acked_bytes){
215-
auto sock_space = _client->space();
216214
//log_d("ack:%u/%u, sock space:%u", acked_bytes, _in_flight, sock_space);
217215
_in_flight -= std::min(acked_bytes, _in_flight);
216+
auto sock_space = _client->space();
218217
if (!sock_space){
219218
return;
220219
}
@@ -236,10 +235,10 @@ void WSocketClient::_clientSend(size_t acked_bytes){
236235
if (!lock.try_lock())
237236
return;
238237

239-
auto sock_space = _client->space();
238+
//auto sock_space = _client->space();
240239
//log_d("no ack infl:%u, space:%u, data pending:%u", _in_flight, sock_space, (uint32_t)(_outFrame.len - _outFrame.index));
241240
//
242-
if (!sock_space)
241+
if (!_client->space())
243242
return;
244243
}
245244

@@ -277,7 +276,6 @@ void WSocketClient::_clientSend(size_t acked_bytes){
277276

278277
if (_outFrame.index == _outFrame.len){
279278
// if we complete writing entire message, send the frame right away
280-
// increment in-flight counter and take the credit
281279
if (!_client->send())
282280
_client->abort();
283281

@@ -296,7 +294,7 @@ void WSocketClient::_clientSend(size_t acked_bytes){
296294
// no use case for this for now
297295
//_sendEvent(event_t::msgSent);
298296

299-
// if there are free in-flight credits and buffer space available try to pull next msg from Q
297+
// if there is still buffer space available try to pull next msg from Q
300298
if (_client->space() > WS_MAX_HEADER_SIZE && _evictOutQueue()){
301299
// generate header and add to the socket buffer
302300
_in_flight += webSocketSendHeader(_client, _outFrame);
@@ -369,10 +367,12 @@ void WSocketClient::_onDisconnect(AsyncClient *c) {
369367
}
370368

371369
void WSocketClient::_onData(void *pbuf, size_t plen) {
372-
//log_d("_onData, len:%u\n", plen);
370+
//log_d("_onData, 0x%08" PRIx32 " len:%u", (uint32_t)pbuf, plen);
373371
if (!pbuf || !plen || _connection == conn_state_t::disconnected) return;
374372
char *data = (char *)pbuf;
375373

374+
size_t pb_len = plen;
375+
376376
while (plen){
377377
if (!_inFrame.msg){
378378
// it's a new frame, need to parse header data
@@ -401,6 +401,7 @@ void WSocketClient::_onData(void *pbuf, size_t plen) {
401401

402402
// todo: for now assume object will consume all the payload provided
403403
_inFrame.msg->addChunk(data, payload_len, _inFrame.index);
404+
_inFrame.index += payload_len;
404405
data += payload_len;
405406
plen -= payload_len;
406407
}
@@ -461,6 +462,9 @@ void WSocketClient::_onData(void *pbuf, size_t plen) {
461462
// just reply to ping, does user needs this ping message?
462463
_messageQueueOut.emplace_front( std::make_shared<WSMessageContainer<std::string>>(WSFrameType_t::pong, true, _inFrame.msg->getData()) );
463464
_inFrame.msg.reset();
465+
// send frame is no other message is in progress
466+
if (!_outFrame.msg)
467+
_clientSend();
464468
break;
465469
}
466470

@@ -479,6 +483,40 @@ void WSocketClient::_onData(void *pbuf, size_t plen) {
479483
}
480484
}
481485
}
486+
487+
/*
488+
Applying TCP window control here. In case if there are pending messages in the Q
489+
we clamp window size gradually to push sending party back. The larger the Q grows
490+
then more window is closed. This works pretty well for messages sized about or more
491+
than TCP windows size (5,7k default for Arduino). It could prevent Q overflow and
492+
sieze incoming data flow without blocking the entie network stack. Mostly usefull
493+
with websocket worker where AsyncTCP thread is not blocked by user callbacks.
494+
*/
495+
if (_messageQueueIn.size()){
496+
_client->ackLater();
497+
size_t reduce_size = pb_len * _messageQueueIn.size() / _max_qcap;
498+
_client->ack(pb_len - reduce_size);
499+
_pending_ack += reduce_size;
500+
//log_d("delay ack:%u, total pending:%u", reduce_size, _pending_ack);
501+
}
502+
}
503+
504+
void WSocketClient::_onPoll(AsyncClient *c){
505+
/*
506+
Window control - we open window deproportionally to Q size letting data flow a bit
507+
*/
508+
if (_pending_ack){
509+
size_t to_keep = _pending_ack * (_messageQueueIn.size() + 1) / _max_qcap;
510+
_client->ack(_pending_ack - to_keep);
511+
size_t bak = _pending_ack;
512+
_pending_ack = to_keep;
513+
//log_d("poll ack:%u, left:%u\n", bak - _pending_ack, _pending_ack);
514+
}
515+
_keepalive();
516+
// call send if no other message is in progress and Q is not empty somehow,
517+
// otherwise rely on ack events
518+
if (!_outFrame.msg && _messageQueueOut.size())
519+
_clientSend();
482520
}
483521

484522
std::pair<size_t, uint16_t> WSocketClient::_mkNewFrame(char* data, size_t len, WSMessageFrame& frame){
@@ -522,8 +560,8 @@ std::pair<size_t, uint16_t> WSocketClient::_mkNewFrame(char* data, size_t len, W
522560
if (masked && len >= offset + 4) {
523561
// mask bytes order are LSB, so we can copy it as-is
524562
frame.mask = *reinterpret_cast<uint32_t*>(data + offset);
525-
Serial.printf("mask key at %u, :0x", offset);
526-
Serial.println(frame.mask, HEX);
563+
//Serial.printf("mask key at %u, :0x", offset);
564+
//Serial.println(frame.mask, HEX);
527565
offset += 4;
528566
}
529567

@@ -629,22 +667,37 @@ WSocketClient::err_t WSocketClient::enqueueMessage(WSMessagePtr mptr){
629667
#endif
630668
_messageQueueOut.emplace_back( std::move(mptr) );
631669
}
632-
_clientSend();
670+
// send frame if no other message is in progress
671+
if (!_outFrame.msg)
672+
_clientSend();
633673
return err_t::ok;
634674
}
635675

636676
return err_t::outQfull;
637677
}
638678

639679
WSMessagePtr WSocketClient::dequeueMessage(){
640-
#ifdef ESP32
641-
std::unique_lock<std::recursive_mutex> lock(_inQlock);
642-
#endif
643680
WSMessagePtr msg;
644681
if (_messageQueueIn.size()){
682+
#ifdef ESP32
683+
std::unique_lock<std::recursive_mutex> lock(_inQlock);
684+
#endif
645685
msg.swap(_messageQueueIn.front());
646686
_messageQueueIn.pop_front();
647687
}
688+
/*
689+
Window control - we open window deproportionally to Q size letting data flow once a message is deQ'd
690+
*/
691+
if (_pending_ack){
692+
if (!_messageQueueIn.size()){
693+
_client->ack(0xffff); // on empty Q we ack whatever is left (max TCP win size)
694+
} else {
695+
size_t ackpart =_pending_ack * (_max_qcap - _messageQueueIn.size()) / _max_qcap;
696+
//log_d("ackdq:%u/%u", ackpart, _pending_ack);
697+
_client->ack(ackpart);
698+
_pending_ack -= ackpart;
699+
}
700+
}
648701
return msg;
649702
}
650703

@@ -671,7 +724,9 @@ WSocketClient::err_t WSocketClient::close(uint16_t code, const char *message){
671724
_messageQueueOut.emplace_front( std::make_shared<WSMessageClose>(code, message) );
672725
else
673726
_messageQueueOut.emplace_front( std::make_shared<WSMessageClose>(code) );
674-
_clientSend();
727+
// send frame if no other message is in progress
728+
if (!_outFrame.msg)
729+
_clientSend();
675730
return err_t::ok;
676731
}
677732

src/AsyncWSocket.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,8 @@ class WSocketClient {
567567

568568
// amount of sent data in-flight, i.e. copied to socket buffer, but not acked yet from lwip side
569569
size_t _in_flight{0};
570+
// counter for consumed data from tcp_pcbs, but delayd ack to hold the window
571+
size_t _pending_ack{0};
570572

571573
// keepalive
572574
unsigned long _keepAlivePeriod{0}, _lastPong;
@@ -606,6 +608,7 @@ class WSocketClient {
606608
void _onTimeout(uint32_t time);
607609
void _onDisconnect(AsyncClient *c);
608610
void _onData(void *pbuf, size_t plen);
611+
void _onPoll(AsyncClient *c);
609612
};
610613

611614
/**
@@ -658,6 +661,7 @@ class WSocketServer : public AsyncWebHandler {
658661

659662
/**
660663
* @copydoc WSClient::setOverflowPolicy(overflow_t policy)
664+
* @note default is 'disconnect'
661665
*/
662666
void setOverflowPolicy(WSocketClient::overflow_t policy){ _overflow_policy = policy; }
663667
WSocketClient::overflow_t getOverflowPolicy() const { return _overflow_policy; }

0 commit comments

Comments
 (0)