@@ -25,25 +25,17 @@ const CACHE_DURATION = 30000; // in milliseconds
25
25
const COOLDOWN_DURATION = 300000 ; // 5 minutes in milliseconds
26
26
const MAX_PEERS_TO_FETCH = 5 ; // Maximum number of peers to fetch from DNS
27
27
const MAX_RETRIES = 3 ; // Maximum number of retry attempts
28
- const MAX_REQUESTS_PER_MINUTE = 100 ; // Throttle limit
28
+ const MAX_REQUESTS_PER_MINUTE = 100 ; // Per-peer rate limit
29
29
30
30
/**
31
- * Represents a peer with its reliability weight and address .
31
+ * Represents a peer with its reliability weight, address, and rate limiter .
32
32
*/
33
33
interface PeerInfo {
34
34
peer : Peer ;
35
35
weight : number ;
36
36
address : string ;
37
37
isConnected : boolean ; // Indicates if the peer is currently connected
38
- }
39
-
40
- /**
41
- * Represents a queued method call.
42
- */
43
- interface QueuedCall {
44
- execute : ( ) => Promise < any > ;
45
- resolve : ( value : any ) => void ;
46
- reject : ( reason ?: any ) => void ;
38
+ limiter : Bottleneck ; // Rate limiter for the peer
47
39
}
48
40
49
41
/**
@@ -71,11 +63,11 @@ export class FullNodePeer {
71
63
// Cache for fetched peer IPs
72
64
private static peerIPCache = new NodeCache ( { stdTTL : CACHE_DURATION / 1000 } ) ;
73
65
74
- // Bottleneck instance for global throttling
75
- private static limiter = new Bottleneck ( {
76
- maxConcurrent : 1 , // Ensures only one request is processed at a time
77
- minTime : 60000 / MAX_REQUESTS_PER_MINUTE , // Calculated delay between requests
78
- } ) ;
66
+ // List of available peers for round-robin
67
+ private static availablePeers : string [ ] = [ ] ;
68
+
69
+ // Current index for round-robin selection
70
+ private static currentPeerIndex : number = 0 ;
79
71
80
72
// Private constructor for singleton pattern
81
73
private constructor ( private peer : Peer ) { }
@@ -302,32 +294,16 @@ export class FullNodePeer {
302
294
}
303
295
304
296
/**
305
- * Selects a peer based on weighted random selection.
306
- * Prioritized peers have higher weights.
297
+ * Selects the next peer based on round-robin selection.
307
298
* @returns {string } The selected peer IP.
308
299
*/
309
- private static selectPeerByWeight ( ) : string {
310
- const peers = Array . from ( FullNodePeer . peerWeights . entries ( ) )
311
- . filter ( ( [ ip , _ ] ) => ! FullNodePeer . cooldownCache . has ( ip ) )
312
- . map ( ( [ ip , weight ] ) => ( { ip, weight } ) ) ;
313
-
314
- const totalWeight = peers . reduce ( ( sum , peer ) => sum + peer . weight , 0 ) ;
315
- if ( totalWeight === 0 ) {
316
- throw new Error ( "All peers are in cooldown." ) ;
300
+ private static getNextPeerIP ( ) : string {
301
+ if ( FullNodePeer . availablePeers . length === 0 ) {
302
+ throw new Error ( "No available peers to select." ) ;
317
303
}
318
-
319
- const random = Math . random ( ) * totalWeight ;
320
- let cumulative = 0 ;
321
-
322
- for ( const peer of peers ) {
323
- cumulative += peer . weight ;
324
- if ( random < cumulative ) {
325
- return peer . ip ;
326
- }
327
- }
328
-
329
- // Fallback
330
- return peers [ peers . length - 1 ] . ip ;
304
+ const peerIP = FullNodePeer . availablePeers [ FullNodePeer . currentPeerIndex ] ;
305
+ FullNodePeer . currentPeerIndex = ( FullNodePeer . currentPeerIndex + 1 ) % FullNodePeer . availablePeers . length ;
306
+ return peerIP ;
331
307
}
332
308
333
309
/**
@@ -348,7 +324,7 @@ export class FullNodePeer {
348
324
}
349
325
350
326
/**
351
- * Connects to the best available peer based on weighted selection and reliability.
327
+ * Connects to the best available peer based on round-robin selection and reliability.
352
328
* @returns {Promise<Peer> } The connected Peer instance.
353
329
*/
354
330
private static async getBestPeer ( ) : Promise < Peer > {
@@ -369,65 +345,119 @@ export class FullNodePeer {
369
345
// Setup peer weights with prioritization
370
346
FullNodePeer . setupPeers ( peerIPs ) ;
371
347
372
- // Weighted random selection
373
- let selectedPeerIP : string ;
374
- try {
375
- selectedPeerIP = FullNodePeer . selectPeerByWeight ( ) ;
376
- } catch ( error : any ) {
377
- throw new Error ( `Failed to select a peer: ${ error . message } ` ) ;
378
- }
348
+ // Initialize or update peerInfos and availablePeers
349
+ for ( const ip of peerIPs ) {
350
+ if ( ! FullNodePeer . peerInfos . has ( ip ) ) {
351
+ // Create a new Bottleneck limiter for the peer
352
+ const limiter = new Bottleneck ( {
353
+ maxConcurrent : 1 , // One request at a time per peer
354
+ minTime : 60000 / MAX_REQUESTS_PER_MINUTE , // 600 ms between requests for 100 requests/min
355
+ } ) ;
379
356
380
- // Attempt to create a peer connection
381
- const sslFolder = path . resolve ( os . homedir ( ) , ".dig" , "ssl" ) ;
382
- const certFile = path . join ( sslFolder , "public_dig.crt" ) ;
383
- const keyFile = path . join ( sslFolder , "public_dig.key" ) ;
357
+ // Attempt to create a peer connection
358
+ const sslFolder = path . resolve ( os . homedir ( ) , ".dig" , "ssl" ) ;
359
+ const certFile = path . join ( sslFolder , "public_dig.crt" ) ;
360
+ const keyFile = path . join ( sslFolder , "public_dig.key" ) ;
384
361
385
- if ( ! fs . existsSync ( sslFolder ) ) {
386
- fs . mkdirSync ( sslFolder , { recursive : true } ) ;
387
- }
362
+ if ( ! fs . existsSync ( sslFolder ) ) {
363
+ fs . mkdirSync ( sslFolder , { recursive : true } ) ;
364
+ }
388
365
389
- const tls = new Tls ( certFile , keyFile ) ;
366
+ const tls = new Tls ( certFile , keyFile ) ;
367
+
368
+ let peer : Peer ;
369
+ try {
370
+ peer = await Peer . new ( `${ ip } :${ FULLNODE_PORT } ` , false , tls ) ;
371
+ } catch ( error : any ) {
372
+ console . error ( `Failed to create peer for IP ${ ip } : ${ error . message } ` ) ;
373
+ // Add to cooldown
374
+ FullNodePeer . cooldownCache . set ( ip , true ) ;
375
+ // Decrease weight or remove peer
376
+ const currentWeight = FullNodePeer . peerWeights . get ( ip ) || 1 ;
377
+ if ( currentWeight > 1 ) {
378
+ FullNodePeer . peerWeights . set ( ip , currentWeight - 1 ) ;
379
+ } else {
380
+ FullNodePeer . peerWeights . delete ( ip ) ;
381
+ }
382
+ continue ; // Skip adding this peer
383
+ }
390
384
391
- let peer : Peer ;
392
- try {
393
- peer = await Peer . new ( `${ selectedPeerIP } :${ FULLNODE_PORT } ` , false , tls ) ;
394
- } catch ( error : any ) {
395
- console . error (
396
- `Failed to create peer for IP ${ selectedPeerIP } : ${ error . message } `
397
- ) ;
398
- // Add to cooldown
399
- FullNodePeer . cooldownCache . set ( selectedPeerIP , true ) ;
400
- // Decrease weight or remove peer
401
- const currentWeight = FullNodePeer . peerWeights . get ( selectedPeerIP ) || 1 ;
402
- if ( currentWeight > 1 ) {
403
- FullNodePeer . peerWeights . set ( selectedPeerIP , currentWeight - 1 ) ;
385
+ // Wrap the peer with proxy to handle errors and retries
386
+ const proxiedPeer = FullNodePeer . createPeerProxy ( peer , ip ) ;
387
+
388
+ // Store PeerInfo
389
+ FullNodePeer . peerInfos . set ( ip , {
390
+ peer : proxiedPeer ,
391
+ weight : FullNodePeer . peerWeights . get ( ip ) || 1 ,
392
+ address : ip ,
393
+ isConnected : true , // Mark as connected
394
+ limiter, // Assign the limiter
395
+ } ) ;
396
+
397
+ // Add to availablePeers
398
+ FullNodePeer . availablePeers . push ( ip ) ;
404
399
} else {
405
- FullNodePeer . peerWeights . delete ( selectedPeerIP ) ;
400
+ const peerInfo = FullNodePeer . peerInfos . get ( ip ) ! ;
401
+ if ( ! peerInfo . isConnected ) {
402
+ // Peer is back from cooldown, re-establish connection
403
+ const sslFolder = path . resolve ( os . homedir ( ) , ".dig" , "ssl" ) ;
404
+ const certFile = path . join ( sslFolder , "public_dig.crt" ) ;
405
+ const keyFile = path . join ( sslFolder , "public_dig.key" ) ;
406
+
407
+ if ( ! fs . existsSync ( sslFolder ) ) {
408
+ fs . mkdirSync ( sslFolder , { recursive : true } ) ;
409
+ }
410
+
411
+ const tls = new Tls ( certFile , keyFile ) ;
412
+
413
+ let peer : Peer ;
414
+ try {
415
+ peer = await Peer . new ( `${ ip } :${ FULLNODE_PORT } ` , false , tls ) ;
416
+ } catch ( error : any ) {
417
+ console . error ( `Failed to reconnect peer for IP ${ ip } : ${ error . message } ` ) ;
418
+ // Re-add to cooldown
419
+ FullNodePeer . cooldownCache . set ( ip , true ) ;
420
+ // Decrease weight or remove peer
421
+ const currentWeight = FullNodePeer . peerWeights . get ( ip ) || 1 ;
422
+ if ( currentWeight > 1 ) {
423
+ FullNodePeer . peerWeights . set ( ip , currentWeight - 1 ) ;
424
+ } else {
425
+ FullNodePeer . peerWeights . delete ( ip ) ;
426
+ }
427
+ continue ; // Skip adding this peer
428
+ }
429
+
430
+ // Wrap the peer with proxy to handle errors and retries
431
+ const proxiedPeer = FullNodePeer . createPeerProxy ( peer , ip ) ;
432
+
433
+ // Update PeerInfo
434
+ peerInfo . peer = proxiedPeer ;
435
+ peerInfo . isConnected = true ;
436
+
437
+ // Add back to availablePeers
438
+ FullNodePeer . availablePeers . push ( ip ) ;
439
+ }
406
440
}
407
- throw new Error ( `Unable to connect to peer ${ selectedPeerIP } ` ) ;
408
441
}
409
442
410
- // Wrap the peer with proxy to handle errors and retries
411
- const proxiedPeer = FullNodePeer . createPeerProxy ( peer , selectedPeerIP ) ;
443
+ if ( FullNodePeer . availablePeers . length === 0 ) {
444
+ throw new Error ( "No available peers to connect." ) ;
445
+ }
412
446
413
- // Store PeerInfo
414
- FullNodePeer . peerInfos . set ( selectedPeerIP , {
415
- peer : proxiedPeer ,
416
- weight : FullNodePeer . peerWeights . get ( selectedPeerIP ) || 1 ,
417
- address : selectedPeerIP ,
418
- isConnected : true , // Mark as connected
419
- } ) ;
447
+ // Select the next peer in round-robin
448
+ const selectedPeerIP = FullNodePeer . getNextPeerIP ( ) ;
449
+ const selectedPeerInfo = FullNodePeer . peerInfos . get ( selectedPeerIP ) ! ;
420
450
421
451
// Cache the peer
422
- FullNodePeer . cachedPeer = { peer : proxiedPeer , timestamp : now } ;
452
+ FullNodePeer . cachedPeer = { peer : selectedPeerInfo . peer , timestamp : now } ;
423
453
424
454
console . log ( `Using Fullnode Peer: ${ selectedPeerIP } ` ) ;
425
455
426
- return proxiedPeer ;
456
+ return selectedPeerInfo . peer ;
427
457
}
428
458
429
459
/**
430
- * Creates a proxy for the peer to handle errors, implement retries, and enforce throttling.
460
+ * Creates a proxy for the peer to handle errors, implement retries, and enforce per-peer throttling.
431
461
* @param {Peer } peer - The Peer instance.
432
462
* @param {string } peerIP - The IP address of the peer.
433
463
* @param {number } [retryCount=0] - The current retry attempt.
@@ -455,33 +485,43 @@ export class FullNodePeer {
455
485
456
486
if ( typeof originalMethod === "function" ) {
457
487
return ( ...args : any [ ] ) => {
458
- // Wrap the method call with Bottleneck's scheduling
459
- return FullNodePeer . limiter . schedule ( async ( ) => {
460
- const peerInfo = FullNodePeer . peerInfos . get ( peerIP ) ;
488
+ // Select the next peer in round-robin
489
+ let selectedPeerIP : string ;
490
+ try {
491
+ selectedPeerIP = FullNodePeer . getNextPeerIP ( ) ;
492
+ } catch ( error : any ) {
493
+ return Promise . reject ( error ) ;
494
+ }
495
+
496
+ const selectedPeerInfo = FullNodePeer . peerInfos . get ( selectedPeerIP ) ! ;
497
+
498
+ // Schedule the method call via the selected peer's limiter
499
+ return selectedPeerInfo . limiter . schedule ( async ( ) => {
500
+ const peerInfo = FullNodePeer . peerInfos . get ( selectedPeerIP ) ;
461
501
if ( ! peerInfo || ! peerInfo . isConnected ) {
462
- throw new Error ( `Cannot perform operation: Peer ${ peerIP } is disconnected.` ) ;
502
+ throw new Error ( `Cannot perform operation: Peer ${ selectedPeerIP } is disconnected.` ) ;
463
503
}
464
504
465
505
try {
466
- const result = await originalMethod . apply ( target , args ) ;
506
+ const result = await originalMethod . apply ( peerInfo . peer , args ) ;
467
507
// On successful operation, increase the weight slightly
468
- const currentWeight = FullNodePeer . peerWeights . get ( peerIP ) || 1 ;
469
- FullNodePeer . peerWeights . set ( peerIP , currentWeight + 0.1 ) ; // Increment weight
508
+ const currentWeight = FullNodePeer . peerWeights . get ( selectedPeerIP ) || 1 ;
509
+ FullNodePeer . peerWeights . set ( selectedPeerIP , currentWeight + 0.1 ) ; // Increment weight
470
510
return result ;
471
511
} catch ( error : any ) {
472
- console . error ( `Peer ${ peerIP } encountered an error: ${ error . message } ` ) ;
512
+ console . error ( `Peer ${ selectedPeerIP } encountered an error: ${ error . message } ` ) ;
473
513
474
514
// Check if the error is related to WebSocket or Operation timed out
475
515
if (
476
516
error . message . includes ( "WebSocket" ) ||
477
517
error . message . includes ( "Operation timed out" )
478
518
) {
479
519
// Handle the disconnection and mark the peer accordingly
480
- FullNodePeer . handlePeerDisconnection ( peerIP ) ;
520
+ FullNodePeer . handlePeerDisconnection ( selectedPeerIP ) ;
481
521
482
522
// If maximum retries reached, throw the error
483
523
if ( retryCount >= MAX_RETRIES ) {
484
- console . error ( `Max retries reached for method ${ String ( prop ) } on peer ${ peerIP } .` ) ;
524
+ console . error ( `Max retries reached for method ${ String ( prop ) } on peer ${ selectedPeerIP } .` ) ;
485
525
throw error ;
486
526
}
487
527
@@ -542,13 +582,25 @@ export class FullNodePeer {
542
582
FullNodePeer . peerInfos . set ( peerIP , peerInfo ) ;
543
583
}
544
584
585
+ // Remove from availablePeers if present
586
+ const index = FullNodePeer . availablePeers . indexOf ( peerIP ) ;
587
+ if ( index !== - 1 ) {
588
+ FullNodePeer . availablePeers . splice ( index , 1 ) ;
589
+ // Adjust currentPeerIndex if necessary
590
+ if ( FullNodePeer . currentPeerIndex >= FullNodePeer . availablePeers . length ) {
591
+ FullNodePeer . currentPeerIndex = 0 ;
592
+ }
593
+ }
594
+
545
595
// If the disconnected peer was the cached peer, invalidate the cache
546
596
if (
547
597
FullNodePeer . cachedPeer &&
548
598
FullNodePeer . extractPeerIP ( FullNodePeer . cachedPeer . peer ) === peerIP
549
599
) {
550
600
FullNodePeer . cachedPeer = null ;
551
601
}
602
+
603
+ console . warn ( `Peer ${ peerIP } has been marked as disconnected and is in cooldown.` ) ;
552
604
}
553
605
554
606
/**
0 commit comments