Open
Description
Benchmarking simple nil echo server to compare rsocket and http to see the overhead for small payloads to see how much can I squeeze out of rsocket.
For http I'm using fasthttp and it can do ~367000 req/s with 100 concurrent requests.
With RSocket, to achieve the best performance that I could I had to create 100 connections to get ~400000 req/s
In both situations the CPU was 100%.
Is it possible to increase performance? Possibly with less opened connections.
func TestHTTP(t *testing.T) {
go fasthttp.ListenAndServe(":8081", func(ctx *fasthttp.RequestCtx) { ctx.Response.SetBody(ctx.Request.Body()) })
time.Sleep(time.Second)
client := &fasthttp.Client{
ReadTimeout: time.Second * 5,
WriteTimeout: time.Second * 5,
MaxIdleConnDuration: time.Hour,
NoDefaultUserAgentHeader: true,
DisableHeaderNamesNormalizing: true,
DisablePathNormalizing: true,
Dial: (&fasthttp.TCPDialer{Concurrency: 4096, DNSCacheDuration: time.Hour}).Dial,
}
limit := make(chan struct{}, 100)
wg := sync.WaitGroup{}
const n = 10_000_000
wg.Add(n)
now := time.Now()
for i := 0; i < n; i++ {
limit <- struct{}{}
go func() {
req := fasthttp.AcquireRequest()
req.SetRequestURI("http://localhost:8081/")
req.Header.SetMethod(fasthttp.MethodGet)
resp := fasthttp.AcquireResponse()
err := client.Do(req, resp)
fasthttp.ReleaseRequest(req)
if err != nil {
panic(fmt.Sprintf("ERR Connection error: %v\n", err))
}
fasthttp.ReleaseResponse(resp)
<-limit
wg.Done()
}()
}
wg.Wait()
fmt.Println(time.Since(now))
// 10_000_000/27,181s = 367904 req/s
}
func TestRR(t *testing.T) {
go func() {
rsocket.
Receive().
Acceptor(func(ctx context.Context, setup payload.SetupPayload, socket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
return rsocket.NewAbstractSocket(
rsocket.RequestResponse(func(request payload.Payload) (response mono.Mono) {
return mono.Create(func(ctx context.Context, s mono.Sink) {
s.Success(request)
})
}),
), nil
}).
Transport(rsocket.TCPServer().SetHostAndPort("0.0.0.0", 9000).Build()).
Serve(context.Background())
}()
time.Sleep(time.Second)
const cons = 100
count := int64(0)
conWg := sync.WaitGroup{}
conWg.Add(cons)
for i := 0; i < cons; i++ {
go func() {
p := payload.New(nil, nil)
client, err := rsocket.
Connect().
Transport(rsocket.TCPClient().SetHostAndPort("127.0.0.1", 9000).Build()).
Start(context.Background())
if err != nil {
panic(err)
}
now := time.Now()
limit := make(chan struct{}, 1000)
const n = 100_000
wg := sync.WaitGroup{}
wg.Add(n)
for i := 0; i < n; i++ {
limit <- struct{}{}
client.
RequestResponse(p).
DoOnSuccess(func(input payload.Payload) error {
atomic.AddInt64(&count, 1)
<-limit
wg.Done()
return nil
}).
Subscribe(context.Background())
}
wg.Wait()
fmt.Println(count, time.Since(now))
conWg.Done()
}()
}
conWg.Wait()
fmt.Println(count)
// 10_000_000/24.923s = 401235 req/s
}