Skip to content

Commit 646b76a

Browse files
authored
Support for Status Codes in http streaming (#3233)
Drogon patch delaying the response header write as late as possible
1 parent d6ea460 commit 646b76a

File tree

3 files changed

+149
-13
lines changed

3 files changed

+149
-13
lines changed

src/drogon_http_async_writer_impl.cpp

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,23 @@ namespace ovms {
2727
void DrogonHttpAsyncWriterImpl::OverwriteResponseHeader(const std::string& key, const std::string& value) {
2828
this->additionalHeaders[key] = value;
2929
}
30+
void DrogonHttpAsyncWriterImpl::sendHeaderIfFirstResponse(HTTPStatusCode status) {
31+
if (firstResponse) {
32+
firstResponse = false;
33+
this->responsePtr->setCustomStatusCode(int(status));
34+
this->stream->sendHeader(this->responsePtr->renderHeaderToString());
35+
}
36+
}
3037
void DrogonHttpAsyncWriterImpl::PartialReplyWithStatus(std::string message, HTTPStatusCode status) {
3138
if (this->isDisconnected) {
3239
return;
3340
}
41+
this->sendHeaderIfFirstResponse(status);
3442
if (!this->stream->send(message))
3543
this->isDisconnected = true;
3644
}
3745
void DrogonHttpAsyncWriterImpl::PartialReplyBegin(std::function<void()> actualWorkloadCallback) {
38-
auto resp = drogon::HttpResponse::newAsyncStreamResponse(
46+
this->responsePtr = drogon::HttpResponse::newAsyncStreamResponse(
3947
[this, actualWorkloadCallback = std::move(actualWorkloadCallback)](drogon::ResponseStreamPtr stream) {
4048
this->stream = std::move(stream);
4149
this->pool.Schedule([actualWorkloadCallback = std::move(actualWorkloadCallback)] {
@@ -52,23 +60,22 @@ void DrogonHttpAsyncWriterImpl::PartialReplyBegin(std::function<void()> actualWo
5260
// Convert headers to drogon format
5361
for (const auto& [key, value] : this->additionalHeaders) {
5462
if (key == "Content-Type") {
55-
resp->setContentTypeString(value);
63+
this->responsePtr->setContentTypeString(value);
5664
continue;
5765
}
58-
resp->addHeader(key, value);
66+
this->responsePtr->addHeader(key, value);
5967
}
60-
this->drogonResponseInitializeCallback(resp);
68+
69+
// Originally this also sent http response header (with status code)
70+
// We have drogon patch that delays it till first streaming response
71+
this->drogonResponseInitializeCallback(this->responsePtr);
6172
}
6273
void DrogonHttpAsyncWriterImpl::PartialReplyEnd() {
6374
this->stream->close();
6475
}
6576
// Used by graph executor impl
6677
void DrogonHttpAsyncWriterImpl::PartialReply(std::string message) {
67-
if (this->IsDisconnected()) {
68-
return;
69-
}
70-
if (!this->stream->send(message))
71-
this->isDisconnected = true;
78+
return PartialReplyWithStatus(std::move(message), HTTPStatusCode::OK);
7279
}
7380
// Used by calculator via HttpClientConnection
7481
bool DrogonHttpAsyncWriterImpl::IsDisconnected() const {

src/drogon_http_async_writer_impl.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class DrogonHttpAsyncWriterImpl : public HttpAsyncWriter {
3636
bool isDisconnected = false;
3737
std::unordered_map<std::string, std::string> additionalHeaders;
3838
const drogon::HttpRequestPtr requestPtr{nullptr};
39+
drogon::HttpResponsePtr responsePtr{nullptr};
3940

4041
public:
4142
DrogonHttpAsyncWriterImpl(
@@ -58,6 +59,10 @@ class DrogonHttpAsyncWriterImpl : public HttpAsyncWriter {
5859
// Used by calculator via HttpClientConnection
5960
bool IsDisconnected() const override;
6061
void RegisterDisconnectionCallback(std::function<void()> onDisconnectedCallback) override;
62+
63+
private:
64+
void sendHeaderIfFirstResponse(HTTPStatusCode status);
65+
bool firstResponse{true};
6166
};
6267

6368
} // namespace ovms

third_party/drogon/ovms_drogon_trantor.patch

Lines changed: 128 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,49 @@
1+
diff --git a/lib/inc/drogon/HttpResponse.h b/lib/inc/drogon/HttpResponse.h
2+
index 620577e5..30798b59 100644
3+
--- a/lib/inc/drogon/HttpResponse.h
4+
+++ b/lib/inc/drogon/HttpResponse.h
5+
@@ -74,8 +74,8 @@ inline HttpResponsePtr toResponse<Json::Value &>(Json::Value &pJson)
6+
class DROGON_EXPORT ResponseStream
7+
{
8+
public:
9+
- explicit ResponseStream(trantor::AsyncStreamPtr asyncStream)
10+
- : asyncStream_(std::move(asyncStream))
11+
+ explicit ResponseStream(trantor::AsyncStreamPtr asyncStream, trantor::TcpConnectionPtr tcpConnection)
12+
+ : asyncStream_(std::move(asyncStream)), tcpConnection_(std::move(tcpConnection))
13+
{
14+
}
15+
16+
@@ -84,6 +84,11 @@ class DROGON_EXPORT ResponseStream
17+
close();
18+
}
19+
20+
+ void sendHeader(const std::string& data)
21+
+ {
22+
+ tcpConnection_->send(data);
23+
+ }
24+
+
25+
bool send(const std::string &data)
26+
{
27+
if (!asyncStream_)
28+
@@ -109,6 +114,7 @@ class DROGON_EXPORT ResponseStream
29+
30+
private:
31+
trantor::AsyncStreamPtr asyncStream_;
32+
+ trantor::TcpConnectionPtr tcpConnection_;
33+
};
34+
35+
using ResponseStreamPtr = std::unique_ptr<ResponseStream>;
36+
@@ -315,6 +321,8 @@ class DROGON_EXPORT HttpResponse
37+
return body();
38+
}
39+
40+
+ virtual std::string renderHeaderToString() = 0;
41+
+
42+
/// Return the string of http version of request, such as HTTP/1.0,
43+
/// HTTP/1.1, etc.
44+
virtual const char *versionString() const = 0;
145
diff --git a/lib/src/HttpAppFrameworkImpl.cc b/lib/src/HttpAppFrameworkImpl.cc
2-
index 4c49c259..058203c8 100644
46+
index 623d906a..bd61837d 100644
347
--- a/lib/src/HttpAppFrameworkImpl.cc
448
+++ b/lib/src/HttpAppFrameworkImpl.cc
549
@@ -525,15 +525,15 @@ void HttpAppFrameworkImpl::run()
@@ -27,11 +71,53 @@ index 4c49c259..058203c8 100644
2771
if (runAsDaemon_)
2872
{
2973
// go daemon!
74+
diff --git a/lib/src/HttpResponseImpl.cc b/lib/src/HttpResponseImpl.cc
75+
index 5e9e46f1..533d3581 100644
76+
--- a/lib/src/HttpResponseImpl.cc
77+
+++ b/lib/src/HttpResponseImpl.cc
78+
@@ -726,6 +726,12 @@ std::shared_ptr<trantor::MsgBuffer> HttpResponseImpl::renderToBuffer()
79+
return httpString;
80+
}
81+
82+
+std::string HttpResponseImpl::renderHeaderToString()
83+
+{
84+
+ auto msg = this->renderToBuffer();
85+
+ return std::string(msg->peek(), msg->readableBytes());
86+
+}
87+
+
88+
std::shared_ptr<trantor::MsgBuffer> HttpResponseImpl::
89+
renderHeaderForHeadMethod()
90+
{
91+
diff --git a/lib/src/HttpResponseImpl.h b/lib/src/HttpResponseImpl.h
92+
index d6b949c8..fadb6e8b 100644
93+
--- a/lib/src/HttpResponseImpl.h
94+
+++ b/lib/src/HttpResponseImpl.h
95+
@@ -177,6 +177,8 @@ class DROGON_EXPORT HttpResponseImpl : public HttpResponse
96+
97+
void addHeader(const char *start, const char *colon, const char *end);
98+
99+
+ std::string renderHeaderToString() override;
100+
+
101+
void addCookie(const std::string &key, const std::string &value) override
102+
{
103+
cookies_[key] = Cookie(key, value);
30104
diff --git a/lib/src/HttpServer.cc b/lib/src/HttpServer.cc
31-
index 51506f95..ef784a68 100644
105+
index 257976b1..1d407514 100644
32106
--- a/lib/src/HttpServer.cc
33107
+++ b/lib/src/HttpServer.cc
34-
@@ -978,7 +978,10 @@ void HttpServer::sendResponse(const TcpConnectionPtr &conn,
108+
@@ -965,24 +965,36 @@ void HttpServer::sendResponse(const TcpConnectionPtr &conn,
109+
auto respImplPtr = static_cast<HttpResponseImpl *>(response.get());
110+
if (!isHeadMethod)
111+
{
112+
- auto httpString = respImplPtr->renderToBuffer();
113+
- conn->send(httpString);
114+
+ // It used to send http response header before we even have any streaming response or http status
115+
+ // It is moved to ResponseStream API
116+
+ // auto httpString = respImplPtr->renderToBuffer();
117+
+ // conn->send(httpString);
118+
+
119+
if (!respImplPtr->contentLengthIsAllowed())
120+
return;
35121
auto &asyncStreamCallback = respImplPtr->asyncStreamCallback();
36122
if (asyncStreamCallback)
37123
{
@@ -43,7 +129,24 @@ index 51506f95..ef784a68 100644
43129
{
44130
asyncStreamCallback(
45131
std::make_unique<ResponseStream>(conn->sendAsyncStream(
46-
@@ -1062,7 +1065,11 @@ void HttpServer::sendResponses(
132+
- respImplPtr->asyncStreamKickoffDisabled())));
133+
+ respImplPtr->asyncStreamKickoffDisabled()), conn));
134+
}
135+
else
136+
{
137+
LOG_INFO << "Chunking Set CloseConnection !!!";
138+
}
139+
}
140+
+ else
141+
+ {
142+
+ // This is called for non-streaming scenarios, we do not need to delay
143+
+ auto httpString = respImplPtr->renderToBuffer();
144+
+ conn->send(httpString);
145+
+ }
146+
auto &streamCallback = respImplPtr->streamCallback();
147+
const std::string &sendfileName = respImplPtr->sendfileName();
148+
if (streamCallback || !sendfileName.empty())
149+
@@ -1056,11 +1068,16 @@ void HttpServer::sendResponses(
47150
{
48151
conn->send(buffer);
49152
buffer.retrieveAll();
@@ -56,6 +159,12 @@ index 51506f95..ef784a68 100644
56159
{
57160
asyncStreamCallback(
58161
std::make_unique<ResponseStream>(conn->sendAsyncStream(
162+
- respImplPtr->asyncStreamKickoffDisabled())));
163+
+ respImplPtr->asyncStreamKickoffDisabled()),
164+
+ conn));
165+
}
166+
else
167+
{
59168
diff --git a/lib/src/Utilities.cc b/lib/src/Utilities.cc
60169
index c6601f61..8c55ed15 100644
61170
--- a/lib/src/Utilities.cc
@@ -160,3 +269,18 @@ index 2aff5a0..145f9eb 100755
160269
}
161270
}
162271
int Socket::accept(InetAddress *peeraddr)
272+
diff --git a/trantor/trantor/net/inner/TcpConnectionImpl.cc b/trantor/trantor/net/inner/TcpConnectionImpl.cc
273+
index 6a75707..4b436b3 100644
274+
--- a/trantor/trantor/net/inner/TcpConnectionImpl.cc
275+
+++ b/trantor/trantor/net/inner/TcpConnectionImpl.cc
276+
@@ -391,7 +391,9 @@ void TcpConnectionImpl::sendInLoop(const char *buffer, size_t length)
277+
return;
278+
}
279+
ssize_t sendLen = 0;
280+
- if (!ioChannelPtr_->isWriting() && writeBufferList_.empty())
281+
+
282+
+ // Always send directly to avoid order issue: streaming response before unary one
283+
+ //if (!ioChannelPtr_->isWriting() && writeBufferList_.empty())
284+
{
285+
// send directly
286+
sendLen = writeInLoop(buffer, length);

0 commit comments

Comments
 (0)