Skip to content

Commit dece7fa

Browse files
committed
feat: rewrite keepalive_ready feature based on reader_state table
1 parent 91935c4 commit dece7fa

File tree

1 file changed

+27
-38
lines changed

1 file changed

+27
-38
lines changed

lib/resty/http.lua

+27-38
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ function _M.new(_)
135135
return nil, err
136136
end
137137
return setmetatable({
138-
sock = sock, keepalive_supported = true, keepalive_ready = false
138+
sock = sock, keepalive_supported = true, reader_state = { keepalive_ready = false, mark_keepalive_ready_on_body_read = true }
139139
}, mt)
140140
end
141141

@@ -211,7 +211,7 @@ function _M.set_keepalive(self, ...)
211211
end
212212

213213
if self.keepalive_supported == true then
214-
if not self.keepalive_ready then
214+
if not self.reader_state.keepalive_ready then
215215
return nil, "response not fully read"
216216
end
217217

@@ -435,18 +435,7 @@ end
435435
_M.transfer_encoding_is_chunked = transfer_encoding_is_chunked
436436

437437

438-
local function _reader_keepalive_ready_mark(http_client)
439-
return function()
440-
http_client.keepalive_ready = true
441-
end
442-
end
443-
444-
local function _reader_keepalive_ready_no_op()
445-
return function() end
446-
end
447-
448-
449-
local function _chunked_body_reader(keepalive_ready_callback, sock, default_chunk_size)
438+
local function _chunked_body_reader(reader_state, sock, default_chunk_size)
450439
return co_wrap(function(max_chunk_size)
451440
local remaining = 0
452441
local length
@@ -505,12 +494,14 @@ local function _chunked_body_reader(keepalive_ready_callback, sock, default_chun
505494

506495
until length == 0
507496

508-
keepalive_ready_callback()
497+
if reader_state.mark_keepalive_ready_on_body_read then
498+
reader_state.keepalive_ready = true
499+
end
509500
end)
510501
end
511502

512503

513-
local function _body_reader(keepalive_ready_callback, sock, content_length, default_chunk_size)
504+
local function _body_reader(reader_state, sock, content_length, default_chunk_size)
514505
return co_wrap(function(max_chunk_size)
515506
max_chunk_size = max_chunk_size or default_chunk_size
516507

@@ -540,8 +531,9 @@ local function _body_reader(keepalive_ready_callback, sock, content_length, defa
540531
elseif not max_chunk_size then
541532
-- We have a length and potentially keep-alive, but want everything.
542533
co_yield(sock:receive(content_length))
543-
keepalive_ready_callback()
544-
534+
if reader_state.mark_keepalive_ready_on_body_read then
535+
reader_state.keepalive_ready = true
536+
end
545537
else
546538
-- We have a length and potentially a keep-alive, and wish to stream
547539
-- the response.
@@ -569,7 +561,9 @@ local function _body_reader(keepalive_ready_callback, sock, content_length, defa
569561
end
570562

571563
until length == 0
572-
keepalive_ready_callback()
564+
if reader_state.mark_keepalive_ready_on_body_read then
565+
reader_state.keepalive_ready = true
566+
end
573567
end
574568
end)
575569
end
@@ -608,10 +602,11 @@ local function _read_body(res)
608602
end
609603

610604

611-
local function _trailer_reader(keepalive_ready_callback, sock)
605+
local function _trailer_reader(reader_state, sock)
612606
return co_wrap(function()
613607
co_yield(_receive_headers(sock))
614-
keepalive_ready_callback()
608+
-- We can always pool after reading trailers
609+
reader_state.keepalive_ready = true
615610
end)
616611
end
617612

@@ -677,7 +672,7 @@ function _M.send_request(self, params)
677672
setmetatable(params, { __index = DEFAULT_PARAMS })
678673

679674
-- Sending a new request makes keepalive disabled until its response is fully read
680-
self.keepalive_ready = false
675+
self.reader_state.keepalive_ready = false
681676

682677
local sock = self.sock
683678
local body = params.body
@@ -830,23 +825,16 @@ function _M.read_response(self, params)
830825
local trailer_reader
831826
local has_body = false
832827
local has_trailer = false
833-
local body_reader_keepalive_ready_callback
834828

835-
if res_headers["Trailer"] then
836-
has_trailer = true
837-
-- If there are trailers - fully reading response body doesn't mean socket is ready to be pooled
838-
body_reader_keepalive_ready_callback = _reader_keepalive_ready_no_op()
839-
else
840-
-- If there are no trailers - fully reading response body means socket is ready to be pooled
841-
body_reader_keepalive_ready_callback = _reader_keepalive_ready_mark(self)
842-
end
829+
has_trailer = (res_headers["Trailer"] ~= nil)
830+
self.reader_state.mark_keepalive_ready_on_body_read = not has_trailer
843831

844832
-- Receive the body_reader
845833
if _should_receive_body(params.method, status) then
846834
has_body = true
847835

848836
if version == 1.1 and transfer_encoding_is_chunked(res_headers) then
849-
body_reader, err = _chunked_body_reader(body_reader_keepalive_ready_callback, sock)
837+
body_reader, err = _chunked_body_reader(self.reader_state, sock)
850838
else
851839
local length
852840
ok, length = pcall(tonumber, res_headers["Content-Length"])
@@ -855,17 +843,17 @@ function _M.read_response(self, params)
855843
length = nil
856844
end
857845

858-
body_reader, err = _body_reader(body_reader_keepalive_ready_callback, sock, length)
846+
body_reader, err = _body_reader(self.reader_state, sock, length)
859847
end
860848
else
861849
if not has_trailer then
862850
-- If there's no body and no trailer - it's ready for keep-alive
863-
self.keepalive_ready = true
851+
self.reader_state.keepalive_ready = true
864852
end
865853
end
866854

867855
if has_trailer then
868-
trailer_reader, err = _trailer_reader(_reader_keepalive_ready_mark(self), sock)
856+
trailer_reader, err = _trailer_reader(self.reader_state, sock)
869857
end
870858

871859
if err then
@@ -1024,14 +1012,15 @@ function _M.get_client_body_reader(_, chunksize, sock)
10241012
end
10251013
end
10261014

1027-
local reader_keep_alive_ready_callback = _reader_keepalive_ready_no_op()
1015+
-- Reading the request body has nothing to do with pooling the upstream server socket
1016+
local request_body_reader_state = { mark_keepalive_ready_on_body_read = false }
10281017
local headers = ngx_req_get_headers()
10291018
local length = headers.content_length
10301019
if length then
1031-
return _body_reader(reader_keep_alive_ready_callback, sock, tonumber(length), chunksize)
1020+
return _body_reader(request_body_reader_state, sock, tonumber(length), chunksize)
10321021
elseif transfer_encoding_is_chunked(headers) then
10331022
-- Not yet supported by ngx_lua but should just work...
1034-
return _chunked_body_reader(reader_keep_alive_ready_callback, sock, chunksize)
1023+
return _chunked_body_reader(request_body_reader_state, sock, chunksize)
10351024
else
10361025
return nil
10371026
end

0 commit comments

Comments
 (0)