From 2683a497ba3955e233ced07729ec7b8201d98be4 Mon Sep 17 00:00:00 2001 From: Eslam-Nawara Date: Mon, 7 Jul 2025 13:06:21 +0300 Subject: [PATCH 1/8] stop listening changes in storage and flist Changes --- pkg/environment/config.go | 8 ++++---- pkg/environment/environment.go | 29 +++++++++++++++-------------- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/pkg/environment/config.go b/pkg/environment/config.go index 7ac5281b..c26c7a4f 100644 --- a/pkg/environment/config.go +++ b/pkg/environment/config.go @@ -39,14 +39,14 @@ type Config struct { BinRepo string `json:"bin_repo"` GeoipURLs []string `json:"geoip_urls"` - FlistURL string `json:"flist_url"` - V4FlistURL string `json:"v4_flist_url"` + // FlistURL string `json:"flist_url"` + // V4FlistURL string `json:"v4_flist_url"` HubURL string `json:"hub_url"` V4HubURL string `json:"v4_hub_url"` - HubStorage string `json:"hub_storage"` - V4HubStorage string `json:"v4_hub_storage"` + // HubStorage string `json:"hub_storage"` + // V4HubStorage string `json:"v4_hub_storage"` } // Merge, updates current config with cfg merging and override config diff --git a/pkg/environment/environment.go b/pkg/environment/environment.go index e2dff0c5..e9a6e4f1 100644 --- a/pkg/environment/environment.go +++ b/pkg/environment/environment.go @@ -368,14 +368,15 @@ func getEnvironmentFromParams(params kernel.Params) (Environment, error) { if geoip := config.GeoipURLs; len(geoip) > 0 { env.GeoipURLs = geoip } - - if flist := config.FlistURL; len(flist) > 0 { - env.FlistURL = flist - } - - if storage := config.HubStorage; len(storage) > 0 { - env.HubStorage = storage - } + // flist url and hub urls shouldn't listen to changes in config as long as we can't change it at run time. + // it would cause breakage in vmd that needs a reboot to be recovered. + // if flist := config.FlistURL; len(flist) > 0 { + // env.FlistURL = flist + // } + // + // if storage := config.HubStorage; len(storage) > 0 { + // env.HubStorage = storage + // } if hub := config.HubURL; len(hub) > 0 { env.HubURL = hub @@ -389,14 +390,14 @@ func getEnvironmentFromParams(params kernel.Params) (Environment, error) { // if the node running v4 chage urls to use v4 hub if params.IsV4() { env.FlistURL = defaultV4FlistURL - if flist := config.V4FlistURL; len(flist) > 0 { - env.FlistURL = flist - } + // if flist := config.V4FlistURL; len(flist) > 0 { + // env.FlistURL = flist + // } env.HubStorage = defaultV4HubStorage - if storage := config.V4HubStorage; len(storage) > 0 { - env.HubStorage = storage - } + // if storage := config.V4HubStorage; len(storage) > 0 { + // env.HubStorage = storage + // } } From 55268faf4c6c5cf668de206d5b7af97b907363fa Mon Sep 17 00:00:00 2001 From: Eslam-Nawara Date: Wed, 2 Jul 2025 17:17:09 +0300 Subject: [PATCH 2/8] update get substrate to update manager if sub_url changed --- pkg/environment/environment.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/pkg/environment/environment.go b/pkg/environment/environment.go index e9a6e4f1..abebfc82 100644 --- a/pkg/environment/environment.go +++ b/pkg/environment/environment.go @@ -127,6 +127,7 @@ const ( var ( pool substrate.Manager + subURLs []string poolOnce sync.Once envDev = Environment{ @@ -288,9 +289,16 @@ func GetSubstrate() (substrate.Manager, error) { return nil, errors.Wrap(err, "failed to get boot environment") } - poolOnce.Do(func() { + // if substrate url changed then update subURLs and update pool with new manager + if !slices.Equal(subURLs, env.SubstrateURL) { + log.Debug().Strs("substrate_urls", env.SubstrateURL).Msg("updating to sub manager with url") + subURLs = env.SubstrateURL pool = substrate.NewManager(env.SubstrateURL...) - }) + } + + // poolOnce.Do(func() { + // pool = substrate.NewManager(env.SubstrateURL...) + // }) return pool, nil } @@ -323,6 +331,7 @@ func getEnvironmentFromParams(params kernel.Params) (Environment, error) { env = envProd } + // update it to read local config file instead of trying to download config over and over again config, err := getConfig(env.RunningMode, baseExtendedURL, http.DefaultClient) if err != nil { // maybe the node can't reach the internet right now From 96ccbc2499f4136d61ac9940f464b3842fd6371d Mon Sep 17 00:00:00 2001 From: Eslam-Nawara Date: Wed, 2 Jul 2025 17:17:24 +0300 Subject: [PATCH 3/8] update events packeage to allow updating sub connection --- pkg/events/events.go | 28 ++++++++++++++++++++++++++-- pkg/events/redis.go | 20 ++++++++++++-------- 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/pkg/events/events.go b/pkg/events/events.go index 5c675073..53f1fb0a 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -66,7 +66,8 @@ type Callback func(events *substrate.EventRecords) // Events processor receives all events starting from the given state // and for each set of events calls callback cb type Processor struct { - sub substrate.Manager + sub substrate.Manager + updated bool cb Callback state State @@ -97,6 +98,7 @@ func (e *Processor) process(changes []types.StorageChangeSet, meta *types.Metada } } } + func (e *Processor) eventsTo(cl *gsrpc.SubstrateAPI, meta *types.Metadata, block types.Header) error { // last, err := e.state.Get(cl) @@ -120,7 +122,7 @@ func (e *Processor) eventsTo(cl *gsrpc.SubstrateAPI, meta *types.Metadata, block return errors.Wrapf(err, "failed to get block hash '%d'", start) } - //state.ErrUnknownBlock + // state.ErrUnknownBlock changes, err := cl.RPC.State.QueryStorageAt([]types.StorageKey{key}, hash) if err, ok := err.(rpc.Error); ok { if err.ErrorCode() == -32000 { // block is too old not in archive anymore @@ -169,6 +171,23 @@ func (e *Processor) subscribe(ctx context.Context) error { if err := e.state.Set(block.Number); err != nil { return errors.Wrap(err, "failed to commit last block number") } + + } + + if e.updated { + e.updated = false + newCL, NewMeta, err := e.sub.Raw() + if err != nil { + log.Debug().Err(err).Msg("failed to update substrate connection") + } + + // only update cl and mata after creating the new connection successfully + cl.Client.Close() + cl = newCL + meta = NewMeta + + log.Debug().Msg("done updating sub connection for substrate events listener") + } } } @@ -184,3 +203,8 @@ func (e *Processor) Start(ctx context.Context) { return } } + +func (e *Processor) updateSubstrateConn(sub substrate.Manager) { + e.sub = sub + e.updated = true +} diff --git a/pkg/events/redis.go b/pkg/events/redis.go index 87eb46d4..ea3ed44c 100644 --- a/pkg/events/redis.go +++ b/pkg/events/redis.go @@ -29,11 +29,12 @@ const ( ) type RedisStream struct { - sub substrate.Manager - state string - farm pkg.FarmID - node uint32 - pool *redis.Pool + sub substrate.Manager + state string + farm pkg.FarmID + node uint32 + pool *redis.Pool + processor *Processor } func NewRedisStream(sub substrate.Manager, address string, farm pkg.FarmID, node uint32, state string) (*RedisStream, error) { @@ -51,6 +52,11 @@ func NewRedisStream(sub substrate.Manager, address string, farm pkg.FarmID, node }, nil } +func (r *RedisStream) UpdateSubstrateManager(sub substrate.Manager) { + r.sub = sub + r.processor.updateSubstrateConn(sub) +} + func (r *RedisStream) push(con redis.Conn, queue string, event interface{}) error { var buffer bytes.Buffer enc := gob.NewEncoder(&buffer) @@ -139,11 +145,11 @@ func (r *RedisStream) process(events *substrate.EventRecords) { log.Error().Err(err).Msg("failed to push event") } } - } func (r *RedisStream) Start(ctx context.Context) { ps := NewProcessor(r.sub, r.process, NewFileState(r.state)) + r.processor = ps ps.Start(ctx) } @@ -184,7 +190,6 @@ func (r *RedisConsumer) pop(con redis.Conn, group, stream string) ([]payload, er "BLOCK", 0, "STREAMS", stream, 0)) - if err != nil { return nil, err } @@ -202,7 +207,6 @@ func (r *RedisConsumer) pop(con redis.Conn, group, stream string) ([]payload, er "BLOCK", 3000, "STREAMS", stream, ">")) - if err != nil { return nil, err } From 8cafa8925a355819365c396fbaa2fb09f5cb4c8f Mon Sep 17 00:00:00 2001 From: Eslam-Nawara Date: Wed, 2 Jul 2025 17:19:52 +0300 Subject: [PATCH 4/8] update substrate gateway to allow updating sub connection --- pkg/api_gateway.go | 1 + pkg/stubs/api_gateway_stub.go | 15 +++++++++++++++ pkg/substrate_gateway/substrate_gateway.go | 15 +++++++++++++++ 3 files changed, 31 insertions(+) diff --git a/pkg/api_gateway.go b/pkg/api_gateway.go index 730736aa..7bed48c4 100644 --- a/pkg/api_gateway.go +++ b/pkg/api_gateway.go @@ -10,6 +10,7 @@ import ( //go:generate zbusc -module api-gateway -version 0.0.1 -name api-gateway -package stubs github.com/threefoldtech/zosbase/pkg+SubstrateGateway stubs/api_gateway_stub.go type SubstrateGateway interface { + UpdateSubstrateGatewayConnection(manager substrate.Manager) (err error) CreateNode(node substrate.Node) (uint32, error) CreateTwin(relay string, pk []byte) (uint32, error) EnsureAccount(activationURL []string, termsAndConditionsLink string, termsAndConditionsHash string) (info substrate.AccountInfo, err error) diff --git a/pkg/stubs/api_gateway_stub.go b/pkg/stubs/api_gateway_stub.go index 6d8a4056..5a29485f 100644 --- a/pkg/stubs/api_gateway_stub.go +++ b/pkg/stubs/api_gateway_stub.go @@ -387,3 +387,18 @@ func (s *SubstrateGatewayStub) UpdateNodeUptimeV2(ctx context.Context, arg0 uint } return } + +func (s *SubstrateGatewayStub) UpdateSubstrateGatewayConnection(ctx context.Context, arg0 tfchainclientgo.Manager) (ret0 error) { + args := []interface{}{arg0} + result, err := s.client.RequestContext(ctx, s.module, s.object, "UpdateSubstrateGatewayConnection", args...) + if err != nil { + panic(err) + } + result.PanicOnError() + ret0 = result.CallError() + loader := zbus.Loader{} + if err := result.Unmarshal(&loader); err != nil { + panic(err) + } + return +} diff --git a/pkg/substrate_gateway/substrate_gateway.go b/pkg/substrate_gateway/substrate_gateway.go index 8d175f4f..b2ffce59 100644 --- a/pkg/substrate_gateway/substrate_gateway.go +++ b/pkg/substrate_gateway/substrate_gateway.go @@ -31,6 +31,20 @@ func NewSubstrateGateway(manager substrate.Manager, identity substrate.Identity) return gw, nil } +// UpdateSubstrateGatewayConnection allow modules to update substrate manager so that the node can recover chain outage +func (g *substrateGateway) UpdateSubstrateGatewayConnection(manager substrate.Manager) error { + sub, err := manager.Substrate() + if err != nil { + return err + } + + // close the old connection + g.sub.Close() + + g.sub = sub + return nil +} + func (g *substrateGateway) GetZosVersion() (string, error) { log.Debug().Str("method", "GetZosVersion").Msg("method called") @@ -207,6 +221,7 @@ func (g *substrateGateway) UpdateNodeUptimeV2(uptime uint64, timestampHint uint6 defer g.mu.Unlock() return g.sub.UpdateNodeUptimeV2(g.identity, uptime, timestampHint) } + func (g *substrateGateway) GetTime() (time.Time, error) { log.Trace().Str("method", "Time").Msg("method called") From 5e9fb91b093c65e65270e03884ccc4e03322cf08 Mon Sep 17 00:00:00 2001 From: Eslam-Nawara Date: Wed, 2 Jul 2025 17:23:40 +0300 Subject: [PATCH 5/8] remove unused poolOnce --- pkg/environment/environment.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/environment/environment.go b/pkg/environment/environment.go index abebfc82..191f283b 100644 --- a/pkg/environment/environment.go +++ b/pkg/environment/environment.go @@ -5,7 +5,6 @@ import ( "os" "slices" "strconv" - "sync" "github.com/pkg/errors" "github.com/rs/zerolog/log" @@ -126,9 +125,9 @@ const ( ) var ( - pool substrate.Manager - subURLs []string - poolOnce sync.Once + pool substrate.Manager + subURLs []string + // poolOnce sync.Once envDev = Environment{ RunningMode: RunningDev, From 7af2de70d6cd6858f7d914d0182d9e683583095d Mon Sep 17 00:00:00 2001 From: Eslam-Nawara Date: Mon, 7 Jul 2025 12:59:41 +0300 Subject: [PATCH 6/8] stop listening changes in storage and flist Changes --- pkg/environment/environment.go | 2 -- pkg/events/events.go | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/environment/environment.go b/pkg/environment/environment.go index 191f283b..d8d6e8a4 100644 --- a/pkg/environment/environment.go +++ b/pkg/environment/environment.go @@ -127,7 +127,6 @@ const ( var ( pool substrate.Manager subURLs []string - // poolOnce sync.Once envDev = Environment{ RunningMode: RunningDev, @@ -330,7 +329,6 @@ func getEnvironmentFromParams(params kernel.Params) (Environment, error) { env = envProd } - // update it to read local config file instead of trying to download config over and over again config, err := getConfig(env.RunningMode, baseExtendedURL, http.DefaultClient) if err != nil { // maybe the node can't reach the internet right now diff --git a/pkg/events/events.go b/pkg/events/events.go index 53f1fb0a..42dd8756 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -179,6 +179,7 @@ func (e *Processor) subscribe(ctx context.Context) error { newCL, NewMeta, err := e.sub.Raw() if err != nil { log.Debug().Err(err).Msg("failed to update substrate connection") + continue } // only update cl and mata after creating the new connection successfully From 88609be4a78aa4e73b892906d0f82a9dc28c7c2e Mon Sep 17 00:00:00 2001 From: Eslam-Nawara Date: Wed, 9 Jul 2025 17:16:06 +0300 Subject: [PATCH 7/8] update rmb to latest --- go.mod | 14 ++++++-------- go.sum | 34 +++++++++++++++------------------- pkg/events/events.go | 30 +++++++++++++++--------------- 3 files changed, 36 insertions(+), 42 deletions(-) diff --git a/go.mod b/go.mod index 1350fe4e..dea49ae9 100644 --- a/go.mod +++ b/go.mod @@ -41,12 +41,12 @@ require ( github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/pkg/errors v0.9.1 - github.com/rs/zerolog v1.33.0 + github.com/rs/zerolog v1.34.0 github.com/shirou/gopsutil v3.21.11+incompatible github.com/stretchr/testify v1.10.0 github.com/threefoldtech/0-fs v1.3.1-0.20240424140157-b488dfedcc56 github.com/threefoldtech/tfchain/clients/tfchain-client-go v0.0.0-20241127100051-77e684bcb1b2 - github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go v0.16.1-0.20241229121208-76ac3fea5e67 + github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go v0.16.8 github.com/threefoldtech/zbus v1.0.1 github.com/tyler-smith/go-bip39 v1.1.0 github.com/vishvananda/netlink v1.1.1-0.20201029203352-d40f9887b852 @@ -67,9 +67,7 @@ require ( github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/rogpeppe/go-internal v1.14.1 // indirect go.opentelemetry.io/otel v1.34.0 // indirect - golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect golang.org/x/tools v0.30.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287 // indirect ) @@ -158,13 +156,13 @@ require ( go.opencensus.io v0.24.0 // indirect go.uber.org/atomic v1.9.0 // indirect golang.org/x/net v0.35.0 // indirect - golang.org/x/sync v0.11.0 // indirect - golang.org/x/text v0.22.0 // indirect + golang.org/x/sync v0.12.0 // indirect + golang.org/x/text v0.23.0 // indirect golang.zx2c4.com/wireguard v0.0.20200320 // indirect - gonum.org/v1/gonum v0.15.0 // indirect + gonum.org/v1/gonum v0.16.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/grpc v1.70.0 // indirect - google.golang.org/protobuf v1.36.4 // indirect + google.golang.org/protobuf v1.36.6 // indirect gopkg.in/djherbis/times.v1 v1.2.0 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 4b53387f..8330bf46 100644 --- a/go.sum +++ b/go.sum @@ -509,15 +509,15 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o= -github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= -github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= +github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/rs/cors v1.10.1 h1:L0uuZVXIKlI1SShY2nhFfo44TYvDPQ1w4oFkUJNfhyo= github.com/rs/cors v1.10.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= -github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= github.com/rs/zerolog v1.14.3/go.mod h1:3WXPzbXEEliJ+a6UFE4vhIxV8qR1EML6ngzP9ug4eYg= -github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= -github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= +github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY= +github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/safchain/ethtool v0.0.0-20190326074333-42ed695e3de8/go.mod h1:Z0q5wiBQGYcxhMZ6gUqHn6pYNLypFAvaL3UvgZLR0U4= github.com/safchain/ethtool v0.0.0-20201023143004-874930cb3ce0 h1:eskphjc5kRCykOJyX7HHVbJCs25/8knprttvrVvEd8o= @@ -564,8 +564,8 @@ github.com/threefoldtech/0-fs v1.3.1-0.20240424140157-b488dfedcc56 h1:uWd8JfE8N3 github.com/threefoldtech/0-fs v1.3.1-0.20240424140157-b488dfedcc56/go.mod h1:lZjR32SiNo3dP70inVFxaLMyZjmKX1ucS+5O31dbPNM= github.com/threefoldtech/tfchain/clients/tfchain-client-go v0.0.0-20241127100051-77e684bcb1b2 h1:VW2J36F8g/kJn4IkY0JiRFmb1gFcdjiOyltfJLJ0mYU= github.com/threefoldtech/tfchain/clients/tfchain-client-go v0.0.0-20241127100051-77e684bcb1b2/go.mod h1:cOL5YgHUmDG5SAXrsZxFjUECRQQuAqOoqvXhZG5sEUw= -github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go v0.16.1-0.20241229121208-76ac3fea5e67 h1:Ii9TmXPBC1GYxRirReSygRZvEGXfAsQRaIipMEzGik0= -github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go v0.16.1-0.20241229121208-76ac3fea5e67/go.mod h1:93SROfr+QjgaJ5/jIWtIpLkhaD8Pv8WbdfwvwMNG2p4= +github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go v0.16.8 h1:BDuus/zqEBDsmPQA0h3inJyPnM83O6l4oMe6hqn00xA= +github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go v0.16.8/go.mod h1:93SROfr+QjgaJ5/jIWtIpLkhaD8Pv8WbdfwvwMNG2p4= github.com/threefoldtech/zbus v1.0.1 h1:3KaEpyOiDYAw+lrAyoQUGIvY9BcjVRXlQ1beBRqhRNk= github.com/threefoldtech/zbus v1.0.1/go.mod h1:E/v/xEvG/l6z/Oj0aDkuSUXFm/1RVJkhKBwDTAIdsHo= github.com/tinylib/msgp v1.1.5 h1:2gXmtWueD2HefZHQe1QOy9HVzmFrLOVvsXwXBQ0ayy0= @@ -635,8 +635,6 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= -go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU= -go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM= go.uber.org/mock v0.5.2 h1:LbtPTcP8A5k9WPXj54PPPbjcI4Y6lhyOZXn+VS7wNko= go.uber.org/mock v0.5.2/go.mod h1:wLlUxC2vVTPTaE3UD51E0BGOAElKrILxhVSDYQLld5o= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= @@ -657,8 +655,6 @@ golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5 golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk= -golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -714,8 +710,8 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= -golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= +golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -786,8 +782,8 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7-0.20210503195748-5c7c50ebbd4f/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= -golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= +golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= +golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -822,8 +818,8 @@ golang.zx2c4.com/wireguard v0.0.20200320/go.mod h1:lDian4Sw4poJ04SgHh35nzMVwGSYl golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200609130330-bd2cb7843e1b h1:l4mBVCYinjzZuR5DtxHuBD6wyd4348TGiavJ5vLrhEc= golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200609130330-bd2cb7843e1b/go.mod h1:UdS9frhv65KTfwxME1xE8+rHYoFpbm36gOud1GhBe9c= golang.zx2c4.com/wireguard/windows v0.3.14/go.mod h1:3P4IEAsb+BjlKZmpUXgy74c0iX9AVwwr3WcVJ8nPgME= -gonum.org/v1/gonum v0.15.0 h1:2lYxjRbTYyxkJxlhC+LvJIx3SsANPdRybu1tGj9/OrQ= -gonum.org/v1/gonum v0.15.0/go.mod h1:xzZVBJBtS+Mz4q0Yl2LJTk+OxOg4jiXZ7qBoM0uISGo= +gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= +gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -857,8 +853,8 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM= -google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/events/events.go b/pkg/events/events.go index 42dd8756..9fcaa93f 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -171,24 +171,24 @@ func (e *Processor) subscribe(ctx context.Context) error { if err := e.state.Set(block.Number); err != nil { return errors.Wrap(err, "failed to commit last block number") } + default: + if e.updated { + e.updated = false + newCL, NewMeta, err := e.sub.Raw() + if err != nil { + log.Debug().Err(err).Msg("failed to update substrate connection") + continue + } + + // only update cl and mata after creating the new connection successfully + cl.Client.Close() + cl = newCL + meta = NewMeta + + log.Debug().Msg("done updating sub connection for substrate events listener") - } - - if e.updated { - e.updated = false - newCL, NewMeta, err := e.sub.Raw() - if err != nil { - log.Debug().Err(err).Msg("failed to update substrate connection") - continue } - // only update cl and mata after creating the new connection successfully - cl.Client.Close() - cl = newCL - meta = NewMeta - - log.Debug().Msg("done updating sub connection for substrate events listener") - } } } From 51aa984399e48a1d65b557f32a9928ca4b40d246 Mon Sep 17 00:00:00 2001 From: Eslam-Nawara Date: Wed, 9 Jul 2025 17:40:32 +0300 Subject: [PATCH 8/8] use atomic bool for update in events.go --- pkg/environment/environment.go | 3 +++ pkg/events/events.go | 17 +++++++++-------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/pkg/environment/environment.go b/pkg/environment/environment.go index d8d6e8a4..2447de1a 100644 --- a/pkg/environment/environment.go +++ b/pkg/environment/environment.go @@ -287,6 +287,9 @@ func GetSubstrate() (substrate.Manager, error) { return nil, errors.Wrap(err, "failed to get boot environment") } + slices.Sort(subURLs) + slices.Sort(env.SubstrateURL) + // if substrate url changed then update subURLs and update pool with new manager if !slices.Equal(subURLs, env.SubstrateURL) { log.Debug().Strs("substrate_urls", env.SubstrateURL).Msg("updating to sub manager with url") diff --git a/pkg/events/events.go b/pkg/events/events.go index 9fcaa93f..248fdf4d 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -4,6 +4,7 @@ import ( "context" "encoding/binary" "os" + "sync/atomic" "time" gsrpc "github.com/centrifuge/go-substrate-rpc-client/v4" @@ -67,7 +68,7 @@ type Callback func(events *substrate.EventRecords) // and for each set of events calls callback cb type Processor struct { sub substrate.Manager - updated bool + updated atomic.Bool cb Callback state State @@ -172,23 +173,23 @@ func (e *Processor) subscribe(ctx context.Context) error { return errors.Wrap(err, "failed to commit last block number") } default: - if e.updated { - e.updated = false - newCL, NewMeta, err := e.sub.Raw() + if e.updated.Load() { + e.updated.Swap(false) + + newCL, newMeta, err := e.sub.Raw() if err != nil { log.Debug().Err(err).Msg("failed to update substrate connection") - continue + break } // only update cl and mata after creating the new connection successfully cl.Client.Close() cl = newCL - meta = NewMeta + meta = newMeta log.Debug().Msg("done updating sub connection for substrate events listener") } - } } } @@ -207,5 +208,5 @@ func (e *Processor) Start(ctx context.Context) { func (e *Processor) updateSubstrateConn(sub substrate.Manager) { e.sub = sub - e.updated = true + e.updated.Swap(true) }