Skip to content

Commit cc1d276

Browse files
require only one relay connection: (#1360)
modify the peer init relay connection to require only one from the provided urls and igonre the failed ones without break
1 parent aeab47f commit cc1d276

File tree

2 files changed

+53
-17
lines changed

2 files changed

+53
-17
lines changed

rmb-sdk-go/peer/connection.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ func (c *InnerConnection) Start(ctx context.Context, output chan []byte) {
174174
if err == context.Canceled {
175175
break
176176
} else if err != nil {
177-
log.Error().Err(err).Send()
177+
log.Error().Err(err).Str("url", c.url).Msg("relay connection error")
178178
}
179179

180180
<-time.After(2 * time.Second)
@@ -192,6 +192,18 @@ func (c *InnerConnection) listenAndServe(ctx context.Context, output chan []byte
192192
return c.loop(ctx, con, output)
193193
}
194194

195+
// TryConnect attempts to establish a connection and returns true if successful, false otherwise
196+
func (c *InnerConnection) TryConnect() bool {
197+
con, err := c.connect()
198+
if err != nil {
199+
log.Debug().Err(err).Str("url", c.url).Msg("failed to connect to relay")
200+
return false
201+
}
202+
203+
con.Close()
204+
return true
205+
}
206+
195207
func (c *InnerConnection) connect() (*websocket.Conn, error) {
196208
token, err := NewJWT(c.identity, c.twinID, c.session, 60)
197209
if err != nil {

rmb-sdk-go/peer/peer.go

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,41 @@ func generateSecureKey(identity substrate.Identity) (*secp256k1.PrivateKey, erro
129129
return priv, nil
130130
}
131131

132+
// getRelayConnections tries to connect to all relays and returns only the successful ones
133+
func getRelayConnections(relayURLs []string, identity substrate.Identity, session string, twinID uint32) ([]string, []InnerConnection, error) {
134+
var successfulRelayURLs []string
135+
var successfulConnections []InnerConnection
136+
137+
for _, relayURL := range relayURLs {
138+
parsedURL, err := url.Parse(relayURL)
139+
if err != nil {
140+
log.Warn().Err(err).Str("url", relayURL).Msg("failed to parse relay URL")
141+
continue
142+
}
143+
144+
conn := NewConnection(identity, relayURL, session, twinID)
145+
if conn.TryConnect() {
146+
log.Info().Str("url", relayURL).Msg("connected")
147+
successfulRelayURLs = append(successfulRelayURLs, parsedURL.Host)
148+
successfulConnections = append(successfulConnections, conn)
149+
continue
150+
}
151+
152+
log.Warn().Str("url", relayURL).Msg("failed to connect")
153+
}
154+
155+
if len(successfulRelayURLs) == 0 {
156+
return nil, nil, errors.New("failed to connect to any relay")
157+
}
158+
159+
sort.Slice(successfulRelayURLs, func(i, j int) bool {
160+
return strings.ToLower(successfulRelayURLs[i]) < strings.ToLower(successfulRelayURLs[j])
161+
})
162+
successfulRelayURLs = slices.Compact(successfulRelayURLs)
163+
164+
return successfulRelayURLs, successfulConnections, nil
165+
}
166+
132167
func getIdentity(keytype string, mnemonics string) (substrate.Identity, error) {
133168
var identity substrate.Identity
134169
var err error
@@ -220,21 +255,12 @@ func NewPeer(
220255
publicKey = privKey.PubKey().SerializeCompressed()
221256
}
222257

223-
var relayURLs []string
224-
for _, relayURL := range cfg.relayURLs {
225-
url, err := url.Parse(relayURL)
226-
if err != nil {
227-
return nil, errors.Wrapf(err, "failed to parse url: %s", relayURL)
228-
}
229-
relayURLs = append(relayURLs, url.Host)
258+
relayURLs, conns, err := getRelayConnections(cfg.relayURLs, identity, cfg.session, twin.ID)
259+
if err != nil {
260+
return nil, err
230261
}
231262

232-
// sort and remove duplicates
233-
sort.Slice(relayURLs, func(i, j int) bool { return strings.ToLower(relayURLs[i]) < strings.ToLower(relayURLs[j]) })
234-
relayURLs = slices.Compact(relayURLs)
235-
236263
joinURLs := strings.Join(relayURLs, "_")
237-
238264
if !bytes.Equal(twin.E2EKey, publicKey) || twin.Relay == nil || joinURLs != *twin.Relay {
239265
log.Info().Str("Relay url/s", joinURLs).Msg("twin relay/public key didn't match, updating on chain ...")
240266
if _, err = subConn.UpdateTwin(identity, joinURLs, publicKey); err != nil {
@@ -243,10 +269,8 @@ func NewPeer(
243269
}
244270

245271
reader := make(chan []byte)
246-
247-
weightCons := make([]WeightItem[InnerConnection], len(cfg.relayURLs))
248-
for _, url := range cfg.relayURLs {
249-
conn := NewConnection(identity, url, cfg.session, twin.ID)
272+
weightCons := make([]WeightItem[InnerConnection], 0, len(conns))
273+
for _, conn := range conns {
250274
conn.Start(ctx, reader)
251275
weightCons = append(weightCons, WeightItem[InnerConnection]{Item: conn, Weight: 1})
252276
}

0 commit comments

Comments
 (0)