@@ -3,7 +3,7 @@ import * as path from "path";
3
3
import { DigPeer } from "./DigPeer" ;
4
4
import { DataStore , ServerCoin } from "../blockchain" ;
5
5
import { DIG_FOLDER_PATH } from "../utils/config" ;
6
- import { withTimeout } from "../utils" ;
6
+ import { withTimeout , PeerRanker , PeerMetrics } from "../utils" ;
7
7
8
8
export class DigNetwork {
9
9
private dataStore : DataStore ;
@@ -27,99 +27,110 @@ export class DigNetwork {
27
27
await digNetwork . syncStoreFromPeers ( ) ;
28
28
}
29
29
30
+ public static getAllNetworkDataStoreIds ( ) : string [ ] {
31
+ throw new Error ( "Method not implemented." ) ;
32
+ }
33
+
30
34
public static async getUdiContent ( udi : string ) {
31
- // TODO: Implement this method
35
+ throw new Error ( "Method not implemented." ) ;
32
36
}
33
37
34
38
/**
35
- * Find a peer that has the store key and root hash.
39
+ * Find a peer that has the store key and root hash, using ranked peers first and searching in groups of 5 .
36
40
*
37
41
* @param {string } storeId - The ID of the store.
38
42
* @param {string } rootHash - The root hash of the store.
39
43
* @param {string } [key] - Optional key to check for in the store.
40
- * @param {string[] } [initialBlackList] - Initial list of blacklisted peer IPs.
41
44
* @returns {Promise<DigPeer | null> } - A valid peer or null if none found.
42
45
*/
43
46
public static async findPeerWithStoreKey (
44
47
storeId : string ,
45
48
rootHash : string ,
46
- key ?: string ,
47
- initialBlackList : string [ ] = [ ]
49
+ key ?: string
48
50
) : Promise < DigPeer | null > {
49
- const peerBlackList = new Set ( initialBlackList ) ;
50
51
const serverCoin = new ServerCoin ( storeId ) ;
51
52
52
- while ( true ) {
53
- try {
54
- // Sample 10 peers from the current epoch excluding blacklisted peers
55
- const digPeers = await serverCoin . sampleCurrentEpoch (
56
- 10 ,
57
- Array . from ( peerBlackList )
58
- ) ;
53
+ try {
54
+ // Fetch all active peers for the current epoch
55
+ const digPeers = await serverCoin . getActiveEpochPeers ( ) ;
59
56
60
- // If no peers are returned, break out of the loop
61
- if ( digPeers . length === 0 ) {
62
- console . log ( "No more peers found." ) ;
63
- break ;
64
- }
57
+ // If no peers are returned, exit early
58
+ if ( digPeers . length === 0 ) {
59
+ console . log ( "No peers found." ) ;
60
+ return null ;
61
+ }
65
62
66
- // Create a race of promises for all peers
67
- const peerPromises = digPeers . map ( ( peerIp ) => {
68
- return new Promise < DigPeer | null > ( async ( resolve ) => {
69
- try {
70
- const digPeer = new DigPeer ( peerIp , storeId ) ;
71
- const { storeExists, rootHashExists } =
72
- await digPeer . propagationServer . checkStoreExists ( rootHash ) ;
73
-
74
- // Check if the store and root hash exist on the peer
75
- if ( storeExists && rootHashExists ) {
76
- console . log (
77
- `Found Peer at ${ peerIp } for storeId: ${ storeId } , root hash ${ rootHash } `
78
- ) ;
79
-
80
- // If no key is provided, resolve the peer
81
- if ( ! key ) {
82
- return resolve ( digPeer ) ;
83
- }
84
-
85
- // If key is provided, check if the peer has it
86
- const keyResponse = await digPeer . contentServer . headKey (
87
- key ,
88
- rootHash
89
- ) ;
90
- if ( keyResponse . headers ?. [ "x-key-exists" ] === "true" ) {
91
- return resolve ( digPeer ) ;
92
- }
93
- }
94
- } catch ( error ) {
95
- console . error ( `Error connecting to DIG Peer ${ peerIp } .` ) ;
96
- }
63
+ // Initialize PeerRanker with the list of digPeers (IP addresses)
64
+ const peerRanker = new PeerRanker ( digPeers ) ;
97
65
98
- // If the peer does not meet the criteria, resolve with null
99
- resolve ( null ) ;
100
- } ) ;
101
- } ) ;
66
+ // Rank the peers based on latency and bandwidth
67
+ const rankedPeers = await peerRanker . rankPeers ( ) ;
102
68
103
- // Wait for the first valid peer that resolves
104
- const firstValidPeer = await Promise . race ( peerPromises ) ;
69
+ // If no peers are returned after ranking, exit early
70
+ if ( rankedPeers . length === 0 ) {
71
+ console . log ( "No valid peers found after ranking." ) ;
72
+ return null ;
73
+ }
74
+
75
+ // Define the iterator function to process each peer
76
+ const iteratorFn = async (
77
+ peerMetrics : PeerMetrics
78
+ ) : Promise < DigPeer | null > => {
79
+ const peerIp = peerMetrics . ip ;
80
+ try {
81
+ const digPeer = new DigPeer ( peerIp , storeId ) ;
82
+
83
+ // Wrap the store check with a 10-second timeout
84
+ const { storeExists, rootHashExists } = await withTimeout (
85
+ digPeer . propagationServer . checkStoreExists ( rootHash ) ,
86
+ 10000 ,
87
+ `Timeout while checking store on peer ${ peerIp } `
88
+ ) ;
89
+
90
+ // Check if the store and root hash exist on the peer
91
+ if ( storeExists && rootHashExists ) {
92
+ console . log (
93
+ `Found Peer at ${ peerIp } for storeId: ${ storeId } , root hash ${ rootHash } `
94
+ ) ;
95
+
96
+ // If no key is provided, return the peer
97
+ if ( ! key ) {
98
+ return digPeer ;
99
+ }
105
100
106
- // If a valid peer is found, return it
107
- if ( firstValidPeer ) {
108
- return firstValidPeer ;
101
+ // If key is provided, wrap key check with a 10-second timeout
102
+ const keyResponse = await withTimeout (
103
+ digPeer . contentServer . headKey ( key , rootHash ) ,
104
+ 10000 ,
105
+ `Timeout while checking key on peer ${ peerIp } `
106
+ ) ;
107
+
108
+ if ( keyResponse . headers ?. [ "x-key-exists" ] === "true" ) {
109
+ return digPeer ;
110
+ }
111
+ }
112
+ } catch ( error : any ) {
113
+ console . error (
114
+ `Error connecting to DIG Peer ${ peerIp } :` ,
115
+ error . message
116
+ ) ;
109
117
}
110
118
111
- // If none of the peers were valid, add them to the blacklist
112
- digPeers . forEach ( ( peerIp ) => peerBlackList . add ( peerIp ) ) ;
119
+ // If the peer does not meet the criteria, return null
120
+ return null ;
121
+ } ;
113
122
114
- // Retry with the next set of peers
115
- console . log ( "No valid peers found, retrying with new peers..." ) ;
116
- } catch ( error ) {
117
- console . error ( "Error sampling peers. Resampling..." ) ;
118
- }
119
- }
123
+ // Use Promise.race to return the first valid peer found
124
+ const validPeer = await Promise . race (
125
+ rankedPeers . map ( ( peer ) => iteratorFn ( peer ) )
126
+ ) ;
120
127
121
- // Return null if no valid peer was found after all attempts
122
- return null ;
128
+ // Return the first valid peer or null if none is found
129
+ return validPeer || null ;
130
+ } catch ( error ) {
131
+ console . error ( "Error sampling peers:" , error ) ;
132
+ return null ;
133
+ }
123
134
}
124
135
125
136
public static unsubscribeFromStore ( storeId : string ) : void {
@@ -156,7 +167,6 @@ export class DigNetwork {
156
167
}
157
168
console . log ( "Starting network sync for store:" , this . dataStore . StoreId ) ;
158
169
DigNetwork . networkSyncMap . set ( this . dataStore . StoreId , true ) ;
159
- let peerBlackList : string [ ] = [ ] ;
160
170
161
171
try {
162
172
const rootHistory = await this . dataStore . getRootHistory ( ) ;
@@ -189,61 +199,35 @@ export class DigNetwork {
189
199
190
200
// Process the root hashes sequentially
191
201
for ( const rootInfo of rootsToProcess ) {
192
- let selectedPeer : DigPeer | null = null ;
193
-
194
- while ( true ) {
195
- try {
196
- // Find a peer with the store and root hash
197
- if ( prioritizedPeer ) {
198
- selectedPeer = prioritizedPeer ;
199
- } else {
200
- selectedPeer = await DigNetwork . findPeerWithStoreKey (
201
- this . dataStore . StoreId ,
202
- rootInfo . root_hash ,
203
- undefined ,
204
- peerBlackList
205
- ) ;
206
- }
207
-
208
- if ( ! selectedPeer ) {
209
- console . error (
210
- `No peer found with root hash ${ rootInfo . root_hash } . Moving to next root.`
211
- ) ;
212
- break ; // Exit the while loop to proceed to the next rootInfo
213
- }
214
-
215
- // Check if the selected peer has the store and root hash
216
- const { storeExists, rootHashExists } =
217
- await selectedPeer . propagationServer . checkStoreExists (
218
- rootInfo . root_hash
219
- ) ;
220
-
221
- if ( ! storeExists || ! rootHashExists ) {
222
- console . warn (
223
- `Peer ${ selectedPeer . IpAddress } does not have the required store or root hash. Trying another peer...`
224
- ) ;
225
- peerBlackList . push ( selectedPeer . IpAddress ) ; // Blacklist and retry
226
- continue ;
227
- }
202
+ try {
203
+ let selectedPeer : DigPeer | null = prioritizedPeer || null ;
204
+
205
+ if ( ! selectedPeer ) {
206
+ // Use the `findPeerWithStoreKey` method to find a peer with the store and root hash
207
+ selectedPeer = await DigNetwork . findPeerWithStoreKey (
208
+ this . dataStore . StoreId ,
209
+ rootInfo . root_hash
210
+ ) ;
211
+ }
228
212
229
- // Download the store root and associated data
230
- await selectedPeer . downloadStoreRoot ( rootInfo . root_hash ) ;
231
-
232
- // Clear the blacklist upon successful download
233
- peerBlackList = [ ] ;
234
-
235
- // Break after successful download to proceed to next root hash
236
- break ;
237
- } catch ( error : any ) {
238
- if ( error . message )
239
- console . error (
240
- `Error downloading from peer ${ selectedPeer ?. IpAddress } . Retrying with another peer.` ,
241
- error
242
- ) ;
243
- if ( selectedPeer ) {
244
- peerBlackList . push ( selectedPeer . IpAddress ) ; // Blacklist and retry
245
- }
213
+ if ( ! selectedPeer ) {
214
+ console . error (
215
+ `No peer found with root hash ${ rootInfo . root_hash } . Moving to next root.`
216
+ ) ;
217
+ continue ; // Move to the next rootInfo
246
218
}
219
+
220
+ // Download the store root and associated data
221
+ await selectedPeer . downloadStoreRoot ( rootInfo . root_hash ) ;
222
+
223
+ // Break after successful download to proceed to next root hash
224
+ } catch ( error : any ) {
225
+ if ( error . message )
226
+ console . error (
227
+ `Error downloading from peer ${ prioritizedPeer ?. IpAddress } . Retrying with another peer.` ,
228
+ error
229
+ ) ;
230
+ // Continue to next rootInfo in case of error
247
231
}
248
232
}
249
233
0 commit comments