Skip to content

Commit 8eb2f11

Browse files
alexmalyshevlehecka
authored andcommitted
Add ConnectionAcceptor::stop() (#406)
Its behavior is to have the server stop listening, whatever server may be inside the ConnectionAcceptor... This is very helpful towards having sane shutdown semantics. Moves us closer to having ~RSocketServer not race as hard.
1 parent 59e070a commit 8eb2f11

File tree

4 files changed

+42
-17
lines changed

4 files changed

+42
-17
lines changed

experimental/rsocket-src/RSocketServer.cpp

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,14 @@ RSocketServer::RSocketServer(
4747
acceptor_(ProtocolVersion::Unknown) {}
4848

4949
RSocketServer::~RSocketServer() {
50+
// Stop accepting new connections.
51+
lazyAcceptor_->stop();
52+
53+
// FIXME(alexanderm): This is where we /should/ close the FrameTransports
54+
// sitting around in the ServerConnectionAcceptor, but we can't yet...
55+
56+
// Asynchronously close all existing ReactiveSockets. If there are none, then
57+
// we can do an early exit.
5058
{
5159
auto locked = sockets_.lock();
5260
if (locked->empty()) {
@@ -61,35 +69,34 @@ RSocketServer::~RSocketServer() {
6169
}
6270
}
6371

72+
// Wait for all ReactiveSockets to close.
6473
shutdown_->wait();
6574
DCHECK(sockets_.lock()->empty());
75+
76+
// All requests are fully finished, worker threads can be safely killed off.
6677
}
6778

6879
void RSocketServer::start(OnAccept onAccept) {
6980
if (connectionHandler_) {
7081
throw std::runtime_error("RSocketServer::start() already called.");
7182
}
7283

73-
LOG(INFO) << "RSocketServer => initialize connection acceptor on start";
84+
LOG(INFO) << "Initializing connection acceptor on start";
7485

75-
LOG(INFO) << "RSocketServer => initialize connection acceptor on start";
7686
connectionHandler_ =
7787
std::make_unique<RSocketServerConnectionHandler>(this, onAccept);
7888

7989
lazyAcceptor_
8090
->start([this](
8191
std::unique_ptr<DuplexConnection> conn,
8292
folly::Executor& executor) {
83-
LOG(INFO) << "RSocketServer => received new connection";
93+
LOG(INFO) << "Going to accept duplex connection";
8494

85-
LOG(INFO) << "RSocketServer => going to accept duplex connection";
86-
// the callbacks above are wired up, now accept the connection
8795
// FIXME(alexanderm): This isn't thread safe
8896
acceptor_.accept(std::move(conn), connectionHandler_);
8997
})
9098
.onError([](const folly::exception_wrapper& ex) {
91-
LOG(FATAL) << "RSocketServer => failed to start HttpAcceptor: "
92-
<< ex.what();
99+
LOG(FATAL) << "Failed to start ConnectionAcceptor: " << ex.what();
93100
});
94101
}
95102

experimental/rsocket-src/transports/TcpConnectionAcceptor.cpp

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,13 @@ TcpConnectionAcceptor::TcpConnectionAcceptor(Options options)
6060
: options_(std::move(options)) {}
6161

6262
TcpConnectionAcceptor::~TcpConnectionAcceptor() {
63-
LOG(INFO) << "Shutting down TCP listener";
64-
65-
// Need to terminate ServerSocket before the EventBase. The socket will
66-
// access the EventBase in its destructor.
67-
serverThread_->getEventBase()->runInEventBaseThread(
68-
[this] { serverSocket_.reset(); });
69-
serverThread_.reset();
63+
if (serverThread_) {
64+
stop();
65+
}
7066
}
7167

68+
////////////////////////////////////////////////////////////////////////////////
69+
7270
folly::Future<folly::Unit> TcpConnectionAcceptor::start(
7371
std::function<void(std::unique_ptr<DuplexConnection>, folly::EventBase&)>
7472
acceptor) {
@@ -116,4 +114,12 @@ folly::Future<folly::Unit> TcpConnectionAcceptor::start(
116114
LOG(INFO) << "ConnectionAcceptor => leave start";
117115
return folly::unit;
118116
}
117+
118+
void TcpConnectionAcceptor::stop() {
119+
LOG(INFO) << "Shutting down TCP listener";
120+
121+
serverThread_->getEventBase()->runInEventBaseThread(
122+
[this] { serverSocket_.reset(); });
123+
serverThread_.reset();
124+
}
119125
}

experimental/rsocket/ConnectionAcceptor.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ using OnDuplexConnectionAccept = std::function<
2020
*
2121
* Built-in implementations can be found in rsocket/transports/, such as
2222
* rsocket/transports/TcpConnectionAcceptor.h
23+
*
24+
* TODO: Add way of specifying number of worker threads.
2325
*/
2426
class ConnectionAcceptor {
2527
public:
@@ -43,6 +45,13 @@ class ConnectionAcceptor {
4345
*/
4446
virtual folly::Future<folly::Unit> start(
4547
OnDuplexConnectionAccept onAccept) = 0;
46-
// TODO need to add numThreads option (either overload or arg with default=1)
48+
49+
/**
50+
* Stop listening for new connections.
51+
*
52+
* This can only be called once. Must be called in or before
53+
* the implementation's destructor. Must be synchronous.
54+
*/
55+
virtual void stop() = 0;
4756
};
4857
}

experimental/rsocket/transports/TcpConnectionAcceptor.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,17 @@ class TcpConnectionAcceptor : public ConnectionAcceptor {
4040

4141
/**
4242
* Bind an AsyncServerSocket and start accepting TCP connections.
43-
*
44-
* This can only be called once.
4543
*/
4644
folly::Future<folly::Unit> start(
4745
std::function<void(
4846
std::unique_ptr<reactivesocket::DuplexConnection>,
4947
folly::EventBase&)>) override;
5048

49+
/**
50+
* Shutdown the AsyncServerSocket and associated listener thread.
51+
*/
52+
void stop() override;
53+
5154
private:
5255
class SocketCallback;
5356

0 commit comments

Comments
 (0)