@@ -7,6 +7,7 @@ import * as multihash from 'multiformats/hashes/digest'
7
7
import assert from 'node:assert'
8
8
import timers from 'node:timers/promises'
9
9
import { assertOkResponse } from './http-assertions.js'
10
+ import pRetry from 'p-retry'
10
11
11
12
/** @import { ProviderInfo, WalkerState } from './typings.js' */
12
13
/** @import { RedisRepository as Repository } from '@filecoin-station/spark-piece-indexer-repository' */
@@ -151,7 +152,7 @@ export async function processNextAdvertisement ({
151
152
assert ( state . tail )
152
153
153
154
try {
154
- const { previousAdvertisementCid, entriesFetchError , ... entry } = await fetchAdvertisedPayload (
155
+ const { previousAdvertisementCid, entry , error } = await fetchAdvertisedPayload (
155
156
providerInfo . providerAddress ,
156
157
state . tail ,
157
158
{ fetchTimeout }
@@ -169,60 +170,68 @@ export async function processNextAdvertisement ({
169
170
state . status = `Walking the advertisements from ${ state . head } , next step: ${ state . tail } `
170
171
}
171
172
172
- if ( entriesFetchError ) {
173
+ if ( error === 'CANNOT_FETCH_ENTRIES' ) {
173
174
state . entriesNotRetrievable = ( state . entriesNotRetrievable ?? 0 ) + 1
175
+ } else if ( error === 'CANNOT_DETERMINE_PIECE_CID' ) {
176
+ state . adsMissingPieceCID = ( state . adsMissingPieceCID ?? 0 ) + 1
174
177
}
175
- const indexEntry = ( entry . pieceCid && entry . payloadCid ) ? entry : undefined
178
+
179
+ const indexEntry = entry ?. pieceCid ? entry : undefined
176
180
const finished = ! state . tail
177
181
return {
178
182
newState : state ,
179
183
indexEntry,
180
184
finished
181
185
}
182
186
} catch ( err ) {
183
- let reason
184
- if ( err instanceof Error ) {
185
- const url = 'url' in err ? err . url : providerInfo . providerAddress
186
- if ( 'serverMessage' in err && err . serverMessage ) {
187
- reason = err . serverMessage
188
- if ( 'statusCode' in err && err . statusCode ) {
189
- reason = `${ err . statusCode } ${ reason } `
190
- }
191
- } else if ( 'statusCode' in err && err . statusCode ) {
192
- reason = err . statusCode
193
- } else if ( err . name === 'TimeoutError' ) {
194
- reason = 'operation timed out'
195
- } else if (
196
- err . name === 'TypeError' &&
197
- err . message === 'fetch failed' &&
198
- err . cause &&
199
- err . cause instanceof Error
200
- ) {
201
- reason = err . cause . message
202
- }
203
-
204
- reason = `HTTP request to ${ url } failed: ${ reason } `
205
- }
187
+ const errorDescription = describeFetchError ( err , providerInfo . providerAddress )
206
188
207
189
debug (
208
190
'Cannot process provider %s (%s) advertisement %s: %s' ,
209
191
providerId ,
210
192
providerInfo . providerAddress ,
211
193
state . tail ,
212
- reason ?? err
194
+ errorDescription ?? err
213
195
)
214
- state . status = `Error processing ${ state . tail } : ${ reason ?? 'internal error' } `
196
+ state . status = `Error processing ${ state . tail } : ${ errorDescription ?? 'internal error' } `
215
197
return {
216
198
newState : state ,
217
199
failed : true
218
200
}
219
201
}
220
202
}
221
203
222
- /** @typedef {{
223
- pieceCid: string | undefined;
224
- payloadCid: string;
225
- }} AdvertisedPayload */
204
+ /**
205
+ * @param {unknown } err
206
+ * @param {string } providerAddress
207
+ */
208
+ function describeFetchError ( err , providerAddress ) {
209
+ if ( ! ( err instanceof Error ) ) return undefined
210
+
211
+ let reason
212
+ if ( 'serverMessage' in err && err . serverMessage ) {
213
+ reason = err . serverMessage
214
+ if ( 'statusCode' in err && err . statusCode ) {
215
+ reason = `${ err . statusCode } ${ reason } `
216
+ }
217
+ } else if ( 'statusCode' in err && err . statusCode ) {
218
+ reason = err . statusCode
219
+ } else if ( err . name === 'TimeoutError' ) {
220
+ reason = 'operation timed out'
221
+ } else if (
222
+ err . name === 'TypeError' &&
223
+ err . message === 'fetch failed' &&
224
+ err . cause &&
225
+ err . cause instanceof Error
226
+ ) {
227
+ reason = err . cause . message
228
+ }
229
+ if ( ! reason ) return undefined
230
+
231
+ const url = 'url' in err ? err . url : providerAddress
232
+ reason = `HTTP request to ${ url } failed: ${ reason } `
233
+ return reason
234
+ }
226
235
227
236
/**
228
237
* @param {string } providerAddress
@@ -262,41 +271,51 @@ export async function fetchAdvertisedPayload (providerAddress, advertisementCid,
262
271
263
272
const meta = parseMetadata ( advertisement . Metadata [ '/' ] . bytes )
264
273
const pieceCid = meta . deal ?. PieceCID . toString ( )
265
-
266
- try {
267
- const entriesChunk =
268
- /** @type {{
269
- Entries: { '/' : { bytes: string } }[]
270
- }} */ (
271
- await fetchCid ( providerAddress , entriesCid , { fetchTimeout } )
272
- )
273
- debug ( 'entriesChunk %s %j' , entriesCid , entriesChunk . Entries . slice ( 0 , 5 ) )
274
- const entryHash = entriesChunk . Entries [ 0 ] [ '/' ] . bytes
275
- const payloadCid = CID . create ( 1 , 0x55 /* raw */ , multihash . decode ( Buffer . from ( entryHash , 'base64' ) ) ) . toString ( )
276
-
274
+ if ( ! pieceCid ) {
275
+ debug ( 'advertisement %s has no PieceCID in metadata: %j' , advertisementCid , meta . deal )
277
276
return {
278
- previousAdvertisementCid,
279
- pieceCid,
280
- payloadCid
277
+ error : /** @type {const } */ ( 'CANNOT_DETERMINE_PIECE_CID' ) ,
278
+ previousAdvertisementCid
281
279
}
282
- } catch ( err ) {
283
- if ( err && typeof err === 'object' && 'statusCode' in err && err . statusCode === 404 ) {
284
- // The index provider cannot find the advertised entries. We cannot do much about that,
285
- // it's unlikely that further request will succeed. Let's skip this advertisement.
286
- debug (
287
- 'Cannot fetch ad %s entries %s: %s %s' ,
288
- advertisementCid ,
289
- entriesCid ,
290
- err . statusCode ,
291
- /** @type {any } */ ( err ) . serverMessage ?? '<not found>'
292
- )
293
- return {
294
- entriesFetchError : true ,
295
- previousAdvertisementCid,
296
- pieceCid
280
+ }
281
+
282
+ let entriesChunk
283
+ try {
284
+ entriesChunk = await pRetry (
285
+ async ( ) =>
286
+ /** @type {{
287
+ Entries: { '/' : { bytes: string } }[]
288
+ }} */ (
289
+ await fetchCid ( providerAddress , entriesCid , { fetchTimeout } )
290
+ ) ,
291
+ {
292
+ shouldRetry : ( err ) =>
293
+ err && 'statusCode' in err && typeof err . statusCode === 'number' && err . statusCode >= 500
297
294
}
295
+ )
296
+ } catch ( err ) {
297
+ // We are not able to fetch the advertised entries. Skip this advertisement so that we can
298
+ // continue the ingestion of other advertisements.
299
+ const errorDescription = describeFetchError ( err , providerAddress )
300
+ console . warn (
301
+ 'Cannot fetch ad %s entries %s: %s' ,
302
+ advertisementCid ,
303
+ entriesCid ,
304
+ errorDescription ?? err
305
+ )
306
+ return {
307
+ error : /** @type {const } */ ( 'CANNOT_FETCH_ENTRIES' ) ,
308
+ previousAdvertisementCid
298
309
}
299
- throw err
310
+ }
311
+
312
+ debug ( 'entriesChunk %s %j' , entriesCid , entriesChunk . Entries . slice ( 0 , 5 ) )
313
+ const entryHash = entriesChunk . Entries [ 0 ] [ '/' ] . bytes
314
+ const payloadCid = CID . create ( 1 , 0x55 /* raw */ , multihash . decode ( Buffer . from ( entryHash , 'base64' ) ) ) . toString ( )
315
+
316
+ return {
317
+ previousAdvertisementCid,
318
+ entry : { pieceCid, payloadCid }
300
319
}
301
320
}
302
321
0 commit comments