Skip to content

Commit 05224e3

Browse files
committed
add replication outbound msgs
1 parent d052869 commit 05224e3

File tree

36 files changed

+1999
-893
lines changed

36 files changed

+1999
-893
lines changed

api/grpcclient/client.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
2+
// See the file LICENSE for licensing terms.
3+
4+
package grpcclient
5+
6+
import (
7+
"context"
8+
"fmt"
9+
10+
"google.golang.org/grpc"
11+
"google.golang.org/grpc/metadata"
12+
13+
"github.com/ava-labs/avalanchego/ids"
14+
)
15+
16+
// NewChainClient returns a grpc.ClientConn that sets the chain-id header for
17+
// all requests
18+
func NewChainClient(uri string, chainID ids.ID, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
19+
dialOpts := []grpc.DialOption{
20+
grpc.WithUnaryInterceptor(SetChainIDHeaderUnaryClientInterceptor(chainID)),
21+
grpc.WithStreamInterceptor(SetChainIDHeaderStreamClientInterceptor(chainID)),
22+
}
23+
24+
dialOpts = append(dialOpts, opts...)
25+
26+
conn, err := grpc.NewClient(uri, dialOpts...)
27+
if err != nil {
28+
return nil, fmt.Errorf("failed to initialize chain grpc client: %w", err)
29+
}
30+
31+
return conn, nil
32+
}
33+
34+
// SetChainIDHeaderUnaryClientInterceptor sets the chain-id header for unary
35+
// requests
36+
func SetChainIDHeaderUnaryClientInterceptor(chainID ids.ID) grpc.UnaryClientInterceptor {
37+
return func(
38+
ctx context.Context,
39+
method string,
40+
req any,
41+
reply any,
42+
cc *grpc.ClientConn,
43+
invoker grpc.UnaryInvoker,
44+
opts ...grpc.CallOption,
45+
) error {
46+
ctx = newContextWithChainIDHeader(ctx, chainID)
47+
return invoker(ctx, method, req, reply, cc, opts...)
48+
}
49+
}
50+
51+
// SetChainIDHeaderStreamClientInterceptor sets the chain-id header for
52+
// streaming requests
53+
func SetChainIDHeaderStreamClientInterceptor(chainID ids.ID) grpc.StreamClientInterceptor {
54+
return func(
55+
ctx context.Context,
56+
desc *grpc.StreamDesc,
57+
cc *grpc.ClientConn,
58+
method string,
59+
streamer grpc.Streamer,
60+
opts ...grpc.CallOption,
61+
) (grpc.ClientStream, error) {
62+
ctx = newContextWithChainIDHeader(ctx, chainID)
63+
return streamer(ctx, desc, cc, method, opts...)
64+
}
65+
}
66+
67+
// newContextWithChainHeader sets the chain-id header which the server uses
68+
// to route the client grpc request
69+
func newContextWithChainIDHeader(ctx context.Context, chainID ids.ID) context.Context {
70+
return metadata.AppendToOutgoingContext(ctx, "chain-id", chainID.String())
71+
}

api/server/http2_router.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
2+
// See the file LICENSE for licensing terms.
3+
4+
package server
5+
6+
import (
7+
"net/http"
8+
"sync"
9+
10+
"github.com/ava-labs/avalanchego/ids"
11+
)
12+
13+
var _ http.Handler = (*http2Router)(nil)
14+
15+
type http2Router struct {
16+
lock sync.RWMutex
17+
handlers map[string]http.Handler
18+
}
19+
20+
func newHTTP2Router() *http2Router {
21+
return &http2Router{
22+
handlers: make(map[string]http.Handler),
23+
}
24+
}
25+
26+
func (h *http2Router) ServeHTTP(w http.ResponseWriter, r *http.Request) {
27+
// the chain-id header must be set to route the request to the correct chain
28+
// http2 handler
29+
chainID := r.Header.Get("chain-id")
30+
if len(chainID) == 0 {
31+
w.WriteHeader(http.StatusBadRequest)
32+
return
33+
}
34+
35+
h.lock.RLock()
36+
handler, ok := h.handlers[chainID]
37+
h.lock.RUnlock()
38+
if !ok {
39+
w.WriteHeader(http.StatusNotFound)
40+
return
41+
}
42+
43+
handler.ServeHTTP(w, r)
44+
}
45+
46+
func (h *http2Router) Add(chainID ids.ID, handler http.Handler) bool {
47+
h.lock.Lock()
48+
defer h.lock.Unlock()
49+
50+
chainIDStr := chainID.String()
51+
if _, ok := h.handlers[chainIDStr]; ok {
52+
return false
53+
}
54+
55+
h.handlers[chainIDStr] = handler
56+
return true
57+
}

api/server/http2_router_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
2+
// See the file LICENSE for licensing terms.
3+
4+
package server
5+
6+
import (
7+
"net/http"
8+
"net/http/httptest"
9+
"testing"
10+
11+
"github.com/stretchr/testify/require"
12+
13+
"github.com/ava-labs/avalanchego/ids"
14+
)
15+
16+
func TestHTTP2RouterAdd(t *testing.T) {
17+
require := require.New(t)
18+
h := newHTTP2Router()
19+
handler := http.HandlerFunc(func(http.ResponseWriter, *http.Request) {})
20+
21+
require.True(h.Add(ids.Empty, handler))
22+
require.False(h.Add(ids.Empty, handler))
23+
}
24+
25+
func TestHTTP2RouterServeHTTP(t *testing.T) {
26+
tests := []struct {
27+
name string
28+
chainIDs []ids.ID
29+
header http.Header
30+
wantCode int
31+
}{
32+
{
33+
name: "missing chain-id header",
34+
wantCode: http.StatusBadRequest,
35+
},
36+
{
37+
name: "unknown referenced chain-id",
38+
header: http.Header{
39+
http.CanonicalHeaderKey("chain-id"): []string{ids.Empty.String()},
40+
},
41+
wantCode: http.StatusNotFound,
42+
},
43+
{
44+
name: "valid handler",
45+
chainIDs: []ids.ID{ids.Empty},
46+
header: http.Header{
47+
http.CanonicalHeaderKey("chain-id"): []string{ids.Empty.String()},
48+
},
49+
wantCode: http.StatusOK,
50+
},
51+
}
52+
53+
for _, tt := range tests {
54+
t.Run(tt.name, func(t *testing.T) {
55+
require := require.New(t)
56+
57+
h := newHTTP2Router()
58+
handler := http.HandlerFunc(func(http.ResponseWriter, *http.Request) {})
59+
writer := httptest.NewRecorder()
60+
request := httptest.NewRequest(http.MethodPost, "/", nil)
61+
request.Header = tt.header
62+
63+
for _, chainID := range tt.chainIDs {
64+
require.True(h.Add(chainID, handler))
65+
}
66+
67+
h.ServeHTTP(writer, request)
68+
require.Equal(tt.wantCode, writer.Code)
69+
})
70+
}
71+
}

api/server/server.go

Lines changed: 70 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@ import (
1212
"path"
1313
"time"
1414

15-
"github.com/NYTimes/gziphandler"
1615
"github.com/prometheus/client_golang/prometheus"
1716
"github.com/rs/cors"
1817
"go.uber.org/zap"
1918
"golang.org/x/net/http2"
19+
"golang.org/x/net/http2/h2c"
2020

2121
"github.com/ava-labs/avalanchego/api"
2222
"github.com/ava-labs/avalanchego/ids"
@@ -88,7 +88,8 @@ type server struct {
8888
metrics *metrics
8989

9090
// Maps endpoints to handlers
91-
router *router
91+
router *router
92+
http2Router *http2Router
9293

9394
srv *http.Server
9495

@@ -115,33 +116,29 @@ func New(
115116
}
116117

117118
router := newRouter()
118-
allowedHostsHandler := filterInvalidHosts(router, allowedHosts)
119-
corsHandler := cors.New(cors.Options{
120-
AllowedOrigins: allowedOrigins,
121-
AllowCredentials: true,
122-
}).Handler(allowedHostsHandler)
123-
gzipHandler := gziphandler.GzipHandler(corsHandler)
124-
var handler http.Handler = http.HandlerFunc(
125-
func(w http.ResponseWriter, r *http.Request) {
126-
// Attach this node's ID as a header
127-
w.Header().Set("node-id", nodeID.String())
128-
gzipHandler.ServeHTTP(w, r)
129-
},
130-
)
119+
handler := wrapHandler(router, nodeID, allowedOrigins, allowedHosts)
120+
121+
http2Router := newHTTP2Router()
122+
http2Handler := wrapHandler(http2Router, nodeID, allowedOrigins, allowedHosts)
131123

132124
httpServer := &http.Server{
133-
Handler: handler,
125+
Handler: h2c.NewHandler(
126+
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
127+
if r.ProtoMajor == 2 {
128+
http2Handler.ServeHTTP(w, r)
129+
return
130+
}
131+
132+
handler.ServeHTTP(w, r)
133+
}),
134+
&http2.Server{
135+
MaxConcurrentStreams: maxConcurrentStreams,
136+
}),
134137
ReadTimeout: httpConfig.ReadTimeout,
135138
ReadHeaderTimeout: httpConfig.ReadHeaderTimeout,
136139
WriteTimeout: httpConfig.WriteTimeout,
137140
IdleTimeout: httpConfig.IdleTimeout,
138141
}
139-
err = http2.ConfigureServer(httpServer, &http2.Server{
140-
MaxConcurrentStreams: maxConcurrentStreams,
141-
})
142-
if err != nil {
143-
return nil, err
144-
}
145142

146143
log.Info("API created",
147144
zap.Strings("allowedOrigins", allowedOrigins),
@@ -154,6 +151,7 @@ func New(
154151
tracer: tracer,
155152
metrics: m,
156153
router: router,
154+
http2Router: http2Router,
157155
srv: httpServer,
158156
listener: listener,
159157
}, nil
@@ -199,6 +197,30 @@ func (s *server) RegisterChain(chainName string, ctx *snow.ConsensusContext, vm
199197
)
200198
}
201199
}
200+
201+
ctx.Lock.Lock()
202+
http2Handler, err := vm.CreateHTTP2Handler(context.TODO())
203+
ctx.Lock.Unlock()
204+
if err != nil {
205+
s.log.Error("failed to create http2 handler",
206+
zap.String("chainName", chainName),
207+
zap.Error(err),
208+
)
209+
return
210+
}
211+
212+
if http2Handler == nil {
213+
return
214+
}
215+
216+
http2Handler = s.wrapMiddleware(chainName, http2Handler, ctx)
217+
if !s.http2Router.Add(ctx.ChainID, http2Handler) {
218+
s.log.Error(
219+
"failed to add route to http2 handler",
220+
zap.String("chainName", chainName),
221+
zap.Error(err),
222+
)
223+
}
202224
}
203225

204226
func (s *server) addChainRoute(chainName string, handler http.Handler, ctx *snow.ConsensusContext, base, endpoint string) error {
@@ -207,13 +229,17 @@ func (s *server) addChainRoute(chainName string, handler http.Handler, ctx *snow
207229
zap.String("url", url),
208230
zap.String("endpoint", endpoint),
209231
)
232+
handler = s.wrapMiddleware(chainName, handler, ctx)
233+
return s.router.AddRouter(url, endpoint, handler)
234+
}
235+
236+
func (s *server) wrapMiddleware(chainName string, handler http.Handler, ctx *snow.ConsensusContext) http.Handler {
210237
if s.tracingEnabled {
211238
handler = api.TraceHandler(handler, chainName, s.tracer)
212239
}
213240
// Apply middleware to reject calls to the handler before the chain finishes bootstrapping
214241
handler = rejectMiddleware(handler, ctx)
215-
handler = s.metrics.wrapHandler(chainName, handler)
216-
return s.router.AddRouter(url, endpoint, handler)
242+
return s.metrics.wrapHandler(chainName, handler)
217243
}
218244

219245
func (s *server) AddRoute(handler http.Handler, base, endpoint string) error {
@@ -299,3 +325,23 @@ func (a readPathAdder) AddRoute(handler http.Handler, base, endpoint string) err
299325
func (a readPathAdder) AddAliases(endpoint string, aliases ...string) error {
300326
return a.pather.AddAliasesWithReadLock(endpoint, aliases...)
301327
}
328+
329+
func wrapHandler(
330+
handler http.Handler,
331+
nodeID ids.NodeID,
332+
allowedOrigins []string,
333+
allowedHosts []string,
334+
) http.Handler {
335+
h := filterInvalidHosts(handler, allowedHosts)
336+
h = cors.New(cors.Options{
337+
AllowedOrigins: allowedOrigins,
338+
AllowCredentials: true,
339+
}).Handler(h)
340+
return http.HandlerFunc(
341+
func(w http.ResponseWriter, r *http.Request) {
342+
// Attach this node's ID as a header
343+
w.Header().Set("node-id", nodeID.String())
344+
h.ServeHTTP(w, r)
345+
},
346+
)
347+
}

go.mod

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,11 @@ go 1.23.9
1111

1212
require (
1313
github.com/DataDog/zstd v1.5.2
14-
github.com/NYTimes/gziphandler v1.1.1
1514
github.com/StephenButtolph/canoto v0.15.0
1615
github.com/antithesishq/antithesis-sdk-go v0.3.8
17-
github.com/ava-labs/coreth v0.15.1-rc.0.0.20250530184801-28421010abae
16+
github.com/ava-labs/coreth v0.15.2-rc.0.0.20250610170140-2fcf45f828a2
1817
github.com/ava-labs/ledger-avalanche/go v0.0.0-20241009183145-e6f90a8a1a60
19-
github.com/ava-labs/libevm v1.13.14-0.2.0.release
18+
github.com/ava-labs/libevm v0.0.0-20250610142802-2672fbd7cdfc
2019
github.com/btcsuite/btcd/btcutil v1.1.3
2120
github.com/cockroachdb/pebble v0.0.0-20230928194634-aa077af62593
2221
github.com/compose-spec/compose-go v1.20.2

0 commit comments

Comments
 (0)