Skip to content

Commit a9bc269

Browse files
authored
Fix benchmarks (#418)
* move setup and resume frame handling from StandardReactiveSocket to ConnectionAutomaton * fixing benchmarks build
1 parent 8f801d4 commit a9bc269

File tree

4 files changed

+79
-52
lines changed

4 files changed

+79
-52
lines changed

benchmarks/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ function(benchmark name file)
2929
${name}
3030
rsocket_experimental
3131
ReactiveSocket
32+
yarpl
3233
${GOOGLE_BENCHMARK_LIBS}
3334
${FOLLY_LIBRARIES}
3435
${GFLAGS_LIBRARY}

benchmarks/RequestResponseLatency.cpp

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <src/SubscriptionBase.h>
1111
#include "rsocket/RSocket.h"
1212
#include "rsocket/transports/TcpConnectionFactory.h"
13+
#include "yarpl/Flowable.h"
1314

1415
using namespace ::reactivesocket;
1516
using namespace ::folly;
@@ -58,24 +59,32 @@ class BM_Subscription : public SubscriptionBase {
5859
std::atomic_bool cancelled_;
5960
};
6061

61-
class BM_RequestHandler : public DefaultRequestHandler
62+
class BM_RequestHandler : public RSocketRequestHandler
6263
{
6364
public:
64-
void handleRequestResponse(
65-
Payload request, StreamId streamId, const std::shared_ptr<Subscriber<Payload>> &response) noexcept override
66-
{
67-
LOG(INFO) << "BM_RequestHandler.handleRequestResponse " << request;
68-
69-
response->onSubscribe(
70-
std::make_shared<BM_Subscription>(response, MESSAGE_LENGTH));
71-
}
72-
73-
std::shared_ptr<StreamState> handleSetupPayload(
74-
ReactiveSocket &socket, ConnectionSetupPayload request) noexcept override
75-
{
76-
LOG(INFO) << "BM_RequestHandler.handleSetupPayload " << request;
77-
return nullptr;
78-
}
65+
// TODO(lehecka): enable when we have support for request-response
66+
yarpl::Reference<yarpl::flowable::Flowable<reactivesocket::Payload>>
67+
handleRequestStream(
68+
reactivesocket::Payload request,
69+
reactivesocket::StreamId streamId) override {
70+
CHECK(false) << "not implemented";
71+
}
72+
73+
// void handleRequestResponse(
74+
// Payload request, StreamId streamId, const std::shared_ptr<Subscriber<Payload>> &response) noexcept override
75+
// {
76+
// LOG(INFO) << "BM_RequestHandler.handleRequestResponse " << request;
77+
78+
// response->onSubscribe(
79+
// std::make_shared<BM_Subscription>(response, MESSAGE_LENGTH));
80+
// }
81+
82+
// std::shared_ptr<StreamState> handleSetupPayload(
83+
// ReactiveSocket &socket, ConnectionSetupPayload request) noexcept override
84+
// {
85+
// LOG(INFO) << "BM_RequestHandler.handleSetupPayload " << request;
86+
// return nullptr;
87+
// }
7988
};
8089

8190
class BM_Subscriber
@@ -194,8 +203,11 @@ class BM_RsFixture : public benchmark::Fixture
194203

195204
BENCHMARK_F(BM_RsFixture, BM_RequestResponse_Latency)(benchmark::State &state)
196205
{
197-
auto clientRs = RSocket::createClient(
198-
std::make_unique<TcpConnectionFactory>(host_, port_));
206+
folly::SocketAddress address;
207+
address.setFromHostPort(host_, port_);
208+
209+
auto clientRs = RSocket::createClient(std::make_unique<TcpConnectionFactory>(
210+
std::move(address)));
199211
int reqs = 0;
200212

201213
auto rs = clientRs->connect().get();

benchmarks/RequestResponseThroughput.cpp

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <src/SubscriptionBase.h>
1111
#include "rsocket/RSocket.h"
1212
#include "rsocket/transports/TcpConnectionFactory.h"
13+
#include "yarpl/Flowable.h"
1314

1415
using namespace ::reactivesocket;
1516
using namespace ::folly;
@@ -58,24 +59,32 @@ class BM_Subscription : public SubscriptionBase {
5859
std::atomic_bool cancelled_;
5960
};
6061

61-
class BM_RequestHandler : public DefaultRequestHandler
62+
class BM_RequestHandler : public RSocketRequestHandler
6263
{
6364
public:
64-
void handleRequestResponse(
65-
Payload request, StreamId streamId, const std::shared_ptr<Subscriber<Payload>> &response) noexcept override
66-
{
67-
LOG(INFO) << "BM_RequestHandler.handleRequestResponse " << request;
68-
69-
response->onSubscribe(
70-
std::make_shared<BM_Subscription>(response, MESSAGE_LENGTH));
71-
}
72-
73-
std::shared_ptr<StreamState> handleSetupPayload(
74-
ReactiveSocket &socket, ConnectionSetupPayload request) noexcept override
75-
{
76-
LOG(INFO) << "BM_RequestHandler.handleSetupPayload " << request;
77-
return nullptr;
78-
}
65+
// TODO(lehecka): enable when we have support for request-response
66+
yarpl::Reference<yarpl::flowable::Flowable<reactivesocket::Payload>>
67+
handleRequestStream(
68+
reactivesocket::Payload request,
69+
reactivesocket::StreamId streamId) override {
70+
CHECK(false) << "not implemented";
71+
}
72+
73+
// void handleRequestResponse(
74+
// Payload request, StreamId streamId, const std::shared_ptr<Subscriber<Payload>> &response) noexcept override
75+
// {
76+
// LOG(INFO) << "BM_RequestHandler.handleRequestResponse " << request;
77+
78+
// response->onSubscribe(
79+
// std::make_shared<BM_Subscription>(response, MESSAGE_LENGTH));
80+
// }
81+
82+
// std::shared_ptr<StreamState> handleSetupPayload(
83+
// ReactiveSocket &socket, ConnectionSetupPayload request) noexcept override
84+
// {
85+
// LOG(INFO) << "BM_RequestHandler.handleSetupPayload " << request;
86+
// return nullptr;
87+
// }
7988
};
8089

8190
class BM_Subscriber
@@ -193,8 +202,11 @@ class BM_RsFixture : public benchmark::Fixture
193202

194203
BENCHMARK_DEFINE_F(BM_RsFixture, BM_RequestResponse_Throughput)(benchmark::State &state)
195204
{
205+
folly::SocketAddress address;
206+
address.setFromHostPort(host_, port_);
207+
196208
auto clientRs = RSocket::createClient(std::make_unique<TcpConnectionFactory>(
197-
host_, port_));
209+
std::move(address)));
198210
int reqs = 0;
199211
int numSubscribers = state.range(0);
200212
int mask = numSubscribers - 1;

benchmarks/StreamThroughput.cpp

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
#include <src/NullRequestHandler.h>
1010
#include <src/SubscriptionBase.h>
1111
#include "rsocket/RSocket.h"
12+
#include "rsocket/OldNewBridge.h"
1213
#include "rsocket/transports/TcpConnectionFactory.h"
14+
#include "yarpl/Flowables.h"
1315

1416
using namespace ::reactivesocket;
1517
using namespace ::folly;
@@ -59,23 +61,17 @@ class BM_Subscription : public SubscriptionBase {
5961
std::atomic_bool cancelled_;
6062
};
6163

62-
class BM_RequestHandler : public DefaultRequestHandler
64+
class BM_RequestHandler : public RSocketRequestHandler
6365
{
6466
public:
65-
void handleRequestStream(
66-
Payload request, StreamId streamId, const std::shared_ptr<Subscriber<Payload>> &response) noexcept override
67-
{
68-
LOG(INFO) << "BM_RequestHandler.handleRequestStream " << request;
69-
70-
response->onSubscribe(
71-
std::make_shared<BM_Subscription>(response, MESSAGE_LENGTH));
72-
}
73-
74-
std::shared_ptr<StreamState> handleSetupPayload(
75-
ReactiveSocket &socket, ConnectionSetupPayload request) noexcept override
76-
{
77-
LOG(INFO) << "BM_RequestHandler.handleSetupPayload " << request;
78-
return nullptr;
67+
yarpl::Reference<yarpl::flowable::Flowable<reactivesocket::Payload>>
68+
handleRequestStream(
69+
reactivesocket::Payload request,
70+
reactivesocket::StreamId streamId) override {
71+
CHECK(false) << "not implemented";
72+
// TODO(lehecka) need to implement new operator fromGenerator
73+
// return yarpl::flowable::Flowables::fromGenerator<reactivesocket::Payload>(
74+
// []{return Payload(std::string(MESSAGE_LENGTH, 'a')); });
7975
}
8076
};
8177

@@ -206,7 +202,11 @@ class BM_RsFixture : public benchmark::Fixture
206202

207203
BENCHMARK_DEFINE_F(BM_RsFixture, BM_Stream_Throughput)(benchmark::State &state)
208204
{
209-
auto clientRs = RSocket::createClient(TcpConnectionFactory::create(host_, port_));
205+
folly::SocketAddress address;
206+
address.setFromHostPort(host_, port_);
207+
208+
auto clientRs = RSocket::createClient(std::make_unique<TcpConnectionFactory>(
209+
std::move(address)));
210210

211211
auto s = std::make_shared<BM_Subscriber>(state.range(0));
212212

@@ -215,7 +215,9 @@ BENCHMARK_DEFINE_F(BM_RsFixture, BM_Stream_Throughput)(benchmark::State &state)
215215
.then(
216216
[s](std::shared_ptr<RSocketRequester> rs)
217217
{
218-
rs->requestStream(Payload("BM_Stream"), s);
218+
rs->requestStream(Payload("BM_Stream"))->subscribe(
219+
yarpl::Reference<yarpl::flowable::Subscriber<Payload>>(
220+
new NewToOldSubscriber(s)));
219221
});
220222

221223
while (state.KeepRunning())

0 commit comments

Comments
 (0)