Skip to content

Performance tuning #130

Open
Open
@Codebreaker101

Description

@Codebreaker101

Benchmarking simple nil echo server to compare rsocket and http to see the overhead for small payloads to see how much can I squeeze out of rsocket.

For http I'm using fasthttp and it can do ~367000 req/s with 100 concurrent requests.
With RSocket, to achieve the best performance that I could I had to create 100 connections to get ~400000 req/s
In both situations the CPU was 100%.

Is it possible to increase performance? Possibly with less opened connections.

func TestHTTP(t *testing.T) {
	go fasthttp.ListenAndServe(":8081", func(ctx *fasthttp.RequestCtx) { ctx.Response.SetBody(ctx.Request.Body()) })

	time.Sleep(time.Second)
	client := &fasthttp.Client{
		ReadTimeout:                   time.Second * 5,
		WriteTimeout:                  time.Second * 5,
		MaxIdleConnDuration:           time.Hour,
		NoDefaultUserAgentHeader:      true,
		DisableHeaderNamesNormalizing: true,
		DisablePathNormalizing:        true,
		Dial:                          (&fasthttp.TCPDialer{Concurrency: 4096, DNSCacheDuration: time.Hour}).Dial,
	}

	limit := make(chan struct{}, 100)
	wg := sync.WaitGroup{}
	const n = 10_000_000
	wg.Add(n)
	now := time.Now()
	for i := 0; i < n; i++ {
		limit <- struct{}{}
		go func() {
			req := fasthttp.AcquireRequest()
			req.SetRequestURI("http://localhost:8081/")
			req.Header.SetMethod(fasthttp.MethodGet)
			resp := fasthttp.AcquireResponse()
			err := client.Do(req, resp)
			fasthttp.ReleaseRequest(req)
			if err != nil {
				panic(fmt.Sprintf("ERR Connection error: %v\n", err))
			}
			fasthttp.ReleaseResponse(resp)
			<-limit
			wg.Done()
		}()
	}
	wg.Wait()
	fmt.Println(time.Since(now))
	// 10_000_000/27,181s = 367904 req/s
}
func TestRR(t *testing.T) {
	go func() {
		rsocket.
			Receive().
			Acceptor(func(ctx context.Context, setup payload.SetupPayload, socket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
				return rsocket.NewAbstractSocket(
					rsocket.RequestResponse(func(request payload.Payload) (response mono.Mono) {
						return mono.Create(func(ctx context.Context, s mono.Sink) {
							s.Success(request)
						})
					}),
				), nil
			}).
			Transport(rsocket.TCPServer().SetHostAndPort("0.0.0.0", 9000).Build()).
			Serve(context.Background())
	}()
	time.Sleep(time.Second)

	const cons = 100
	count := int64(0)
	conWg := sync.WaitGroup{}
	conWg.Add(cons)
	for i := 0; i < cons; i++ {
		go func() {
			p := payload.New(nil, nil)

			client, err := rsocket.
				Connect().
				Transport(rsocket.TCPClient().SetHostAndPort("127.0.0.1", 9000).Build()).
				Start(context.Background())
			if err != nil {
				panic(err)
			}

			now := time.Now()
			limit := make(chan struct{}, 1000)
			const n = 100_000
			wg := sync.WaitGroup{}
			wg.Add(n)
			for i := 0; i < n; i++ {
				limit <- struct{}{}

				client.
					RequestResponse(p).
					DoOnSuccess(func(input payload.Payload) error {
						atomic.AddInt64(&count, 1)
						<-limit
						wg.Done()
						return nil
					}).
					Subscribe(context.Background())
			}

			wg.Wait()
			fmt.Println(count, time.Since(now))
			conWg.Done()
		}()
	}
	conWg.Wait()
	fmt.Println(count)
	// 10_000_000/24.923s = 401235 req/s
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions