From 36252658cb5c365c9b7b3c114b8a7c3d3888b529 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Tue, 24 Sep 2024 14:32:41 +0200 Subject: [PATCH 01/10] feat: skip ads where entries cannot be retrieved MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit While running the indexer in production, I noticed that many index providers return advertisemnt but then they return "404 cid not found" when we ask for the (payload) entries. Let's recover from that error by ignoring the advertisement and continuing the walk. Signed-off-by: Miroslav Bajtoš --- indexer/lib/advertisement-walker.js | 68 ++++++++++++++++------- indexer/lib/typings.d.ts | 38 ------------- indexer/test/advertisement-walker.test.js | 58 +++++++++++++++---- indexer/test/helpers/http-server.js | 25 +++++++++ repository/lib/typings.d.ts | 1 + 5 files changed, 122 insertions(+), 68 deletions(-) create mode 100644 indexer/test/helpers/http-server.js diff --git a/indexer/lib/advertisement-walker.js b/indexer/lib/advertisement-walker.js index b57e60c..921d9f6 100644 --- a/indexer/lib/advertisement-walker.js +++ b/indexer/lib/advertisement-walker.js @@ -151,7 +151,7 @@ export async function processNextAdvertisement ({ assert(state.tail) try { - const { previousAdvertisementCid, ...entry } = await fetchAdvertisedPayload( + const { previousAdvertisementCid, entriesFetchError, ...entry } = await fetchAdvertisedPayload( providerInfo.providerAddress, state.tail, { fetchTimeout } @@ -169,7 +169,10 @@ export async function processNextAdvertisement ({ state.status = `Walking the advertisements from ${state.head}, next step: ${state.tail}` } - const indexEntry = entry.pieceCid ? entry : undefined + if (entriesFetchError) { + state.entriesNotRetrievable = (state.entriesNotRetrievable ?? 0) + 1 + } + const indexEntry = entry.pieceCid && entry.payloadCid ? entry : undefined const finished = !state.tail return { newState: state, @@ -179,21 +182,26 @@ export async function processNextAdvertisement ({ } catch (err) { let reason if (err instanceof Error) { - const url = 'url' in err ? err.url : undefined + const url = 'url' in err ? err.url : providerInfo.providerAddress if ('serverMessage' in err && err.serverMessage) { reason = err.serverMessage + if ('statusCode' in err && err.statusCode) { + reason = `${err.statusCode} ${reason}` + } } else if ('statusCode' in err && err.statusCode) { - reason = `HTTP request to ${url ?? providerInfo.providerAddress} failed: ${err.statusCode}` + reason = err.statusCode } else if (err.name === 'TimeoutError') { - reason = `HTTP request to ${url ?? providerInfo.providerAddress} timed out` + reason = 'operation timed out' } else if ( err.name === 'TypeError' && err.message === 'fetch failed' && err.cause && err.cause instanceof Error ) { - reason = `HTTP request to ${url ?? providerInfo.providerAddress} failed: ${err.cause.message}` + reason = err.cause.message } + + reason = `HTTP request to ${url} failed: ${reason}` } debug( @@ -252,23 +260,43 @@ export async function fetchAdvertisedPayload (providerAddress, advertisementCid, return { previousAdvertisementCid } } - const entriesChunk = + const meta = parseMetadata(advertisement.Metadata['/'].bytes) + const pieceCid = meta.deal?.PieceCID.toString() + + try { + const entriesChunk = /** @type {{ Entries: { '/' : { bytes: string } }[] }} */( - await fetchCid(providerAddress, entriesCid, { fetchTimeout }) - ) - debug('entriesChunk %s %j', entriesCid, entriesChunk.Entries.slice(0, 5)) - const entryHash = entriesChunk.Entries[0]['/'].bytes - const payloadCid = CID.create(1, 0x55 /* raw */, multihash.decode(Buffer.from(entryHash, 'base64'))).toString() + await fetchCid(providerAddress, entriesCid, { fetchTimeout }) + ) + debug('entriesChunk %s %j', entriesCid, entriesChunk.Entries.slice(0, 5)) + const entryHash = entriesChunk.Entries[0]['/'].bytes + const payloadCid = CID.create(1, 0x55 /* raw */, multihash.decode(Buffer.from(entryHash, 'base64'))).toString() - const meta = parseMetadata(advertisement.Metadata['/'].bytes) - const pieceCid = meta.deal?.PieceCID.toString() - - return { - previousAdvertisementCid, - pieceCid, - payloadCid + return { + previousAdvertisementCid, + pieceCid, + payloadCid + } + } catch (err) { + if (err && typeof err === 'object' && 'statusCode' in err && err.statusCode === 404) { + // The index provider cannot find the advertised entries. We cannot do much about that, + // it's unlikely that further request will succeed. Let's skip this advertisement. + debug( + 'Cannot fetch ad %s entries %s: %s %s', + advertisementCid, + entriesCid, + err.statusCode, + /** @type {any} */(err).serverMessage ?? '' + ) + return { + entriesFetchError: true, + previousAdvertisementCid, + pieceCid + } + } + throw err } } @@ -291,7 +319,7 @@ async function fetchCid (providerBaseUrl, cid, { fetchTimeout } = {}) { if (err && typeof err === 'object') { Object.assign(err, { url }) } - debug('Error from %s -> %o', url, err) + debug('Error from %s ->', url, err) throw err } } diff --git a/indexer/lib/typings.d.ts b/indexer/lib/typings.d.ts index 8c84c5b..f492702 100644 --- a/indexer/lib/typings.d.ts +++ b/indexer/lib/typings.d.ts @@ -14,41 +14,3 @@ export interface ProviderInfo { export type ProviderToInfoMap = Map; -/** -lastAdCID --> [ ] -\ - ↓ | - ... | entries announced after we started the current walk - ↓ | - [ ] -/ - ↓ - head --> [ ] -\ - ↓ | - ... | entries visited in this walk - ↓ | - [ ] -/ - ↓ - tail --> [ ] -\ - ↓ | - ... | entries NOT visited yet - ↓ | - [ ] -/ - ↓ - lastHead --> [ ] -\ - ↓ | - ... | entries visited in the previous walks - ↓ | - [ ] -/ - ↓ - (null) - */ -export interface WalkerState { - head?: string; - tail?: string; - lastHead?: string; - status: string; -} - -export type ProviderToWalkerStateMap = Map - -export type PiecePayloadCIDs = string[]; - diff --git a/indexer/test/advertisement-walker.test.js b/indexer/test/advertisement-walker.test.js index 0184b86..5e4150e 100644 --- a/indexer/test/advertisement-walker.test.js +++ b/indexer/test/advertisement-walker.test.js @@ -1,8 +1,6 @@ import { RedisRepository } from '@filecoin-station/spark-piece-indexer-repository' import { Redis } from 'ioredis' import assert from 'node:assert' -import { once } from 'node:events' -import http from 'node:http' import { after, before, beforeEach, describe, it } from 'node:test' import { setTimeout } from 'node:timers/promises' import { @@ -10,7 +8,9 @@ import { processNextAdvertisement, walkOneStep } from '../lib/advertisement-walker.js' +import { givenHttpServer } from './helpers/http-server.js' import { FRISBII_ADDRESS, FRISBII_AD_CID } from './helpers/test-data.js' +import { assertOkResponse } from '../lib/http-assertions.js' /** @import { ProviderInfo, WalkerState } from '../lib/typings.js' */ @@ -253,19 +253,15 @@ describe('processNextAdvertisement', () => { }) it('handles timeout errors and explains the problem in the status', async () => { - const server = http.createServer(async (_req, res) => { - await setTimeout(500) + const { serverUrl } = await givenHttpServer(async (_req, res) => { + await setTimeout(100) res.statusCode = 501 res.end() }) - server.listen(0, '127.0.0.1') - server.unref() - await once(server, 'listening') - const serverPort = /** @type {import('node:net').AddressInfo} */(server.address()).port /** @type {ProviderInfo} */ const providerInfo = { - providerAddress: `http://127.0.0.1:${serverPort}/`, + providerAddress: serverUrl, lastAdvertisementCID: 'baguqeeraTEST' } @@ -282,7 +278,49 @@ describe('processNextAdvertisement', () => { head: 'baguqeeraTEST', tail: 'baguqeeraTEST', lastHead: undefined, - status: `Error processing baguqeeraTEST: HTTP request to http://127.0.0.1:${serverPort}/ipni/v1/ad/baguqeeraTEST timed out` + status: `Error processing baguqeeraTEST: HTTP request to ${serverUrl}ipni/v1/ad/baguqeeraTEST failed: operation timed out` + } + }) + }) + + it('skips entries where the server responds with 404 cid not found', async () => { + const { serverUrl } = await givenHttpServer(async (req, res) => { + if (req.url === `/ipni/v1/ad/${FRISBII_AD_CID}`) { + const frisbeeRes = await fetch(FRISBII_ADDRESS + req.url) + await assertOkResponse(frisbeeRes) + // FIXME: can we pipe `frisbeeRes.body` directly to `res`? + // `frisbeeRes.body` is a Web API ReadableStream, `res` is a Node.js WritableStream + const body = await frisbeeRes.arrayBuffer() + res.write(new Uint8Array(body)) + res.end() + } else { + res.statusCode = 404 + res.write('cid not found') + res.end() + } + }) + + /** @type {ProviderInfo} */ + const providerInfo = { + providerAddress: serverUrl, + lastAdvertisementCID: FRISBII_AD_CID + } + + const result = await processNextAdvertisement({ + providerId, + providerInfo, + walkerState: undefined + }) + + assert.deepStrictEqual(result, { + finished: true, + indexEntry: undefined, + newState: { + entriesNotRetrievable: 1, + head: undefined, + tail: undefined, + lastHead: FRISBII_AD_CID, + status: `All advertisements from ${FRISBII_AD_CID} to the end of the chain were processed.` } }) }) diff --git a/indexer/test/helpers/http-server.js b/indexer/test/helpers/http-server.js new file mode 100644 index 0000000..de8faf3 --- /dev/null +++ b/indexer/test/helpers/http-server.js @@ -0,0 +1,25 @@ +import { once } from 'node:events' +import http from 'node:http' + +/** + * @param {http.RequestListener} handler + */ +export async function givenHttpServer (handler) { + const server = http.createServer((req, res) => { + ;(async () => { + await handler(req, res) + })().catch(err => { + console.log('Unhandled server error:', err) + res.statusCode = 500 + res.write(err.message || err.toString()) + res.end() + }) + }) + + server.listen(0, '127.0.0.1') + server.unref() + await once(server, 'listening') + const serverPort = /** @type {import('node:net').AddressInfo} */(server.address()).port + const serverUrl = `http://127.0.0.1:${serverPort}/` + return { server, serverPort, serverUrl } +} diff --git a/repository/lib/typings.d.ts b/repository/lib/typings.d.ts index a51776a..520dd6f 100644 --- a/repository/lib/typings.d.ts +++ b/repository/lib/typings.d.ts @@ -30,6 +30,7 @@ export interface WalkerState { tail?: string; lastHead?: string; status: string; + entriesNotRetrievable?: number; } export type ProviderToWalkerStateMap = Map From bc75a247b65b3ff4b4e8cdc75411e42ab5fa4d38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Mon, 30 Sep 2024 15:25:27 +0200 Subject: [PATCH 02/10] Update indexer/test/helpers/http-server.js Co-authored-by: Julian Gruber --- indexer/test/helpers/http-server.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/indexer/test/helpers/http-server.js b/indexer/test/helpers/http-server.js index de8faf3..d519537 100644 --- a/indexer/test/helpers/http-server.js +++ b/indexer/test/helpers/http-server.js @@ -6,9 +6,7 @@ import http from 'node:http' */ export async function givenHttpServer (handler) { const server = http.createServer((req, res) => { - ;(async () => { - await handler(req, res) - })().catch(err => { + handler(req, res).catch(err => { console.log('Unhandled server error:', err) res.statusCode = 500 res.write(err.message || err.toString()) From 4311a1051050b3f8c38a3dc0efe043692a43eb7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Mon, 30 Sep 2024 15:27:40 +0200 Subject: [PATCH 03/10] Update indexer/lib/advertisement-walker.js Co-authored-by: Julian Gruber --- indexer/lib/advertisement-walker.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexer/lib/advertisement-walker.js b/indexer/lib/advertisement-walker.js index 921d9f6..257435e 100644 --- a/indexer/lib/advertisement-walker.js +++ b/indexer/lib/advertisement-walker.js @@ -172,7 +172,7 @@ export async function processNextAdvertisement ({ if (entriesFetchError) { state.entriesNotRetrievable = (state.entriesNotRetrievable ?? 0) + 1 } - const indexEntry = entry.pieceCid && entry.payloadCid ? entry : undefined + const indexEntry = (entry.pieceCid && entry.payloadCid) ? entry : undefined const finished = !state.tail return { newState: state, From a80f971dd396e5f679147fe454c7dd403d725295 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Mon, 30 Sep 2024 16:42:33 +0200 Subject: [PATCH 04/10] revert bc75a247b and add comment to explain MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miroslav Bajtoš --- indexer/test/helpers/http-server.js | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/indexer/test/helpers/http-server.js b/indexer/test/helpers/http-server.js index d519537..3239c85 100644 --- a/indexer/test/helpers/http-server.js +++ b/indexer/test/helpers/http-server.js @@ -6,7 +6,11 @@ import http from 'node:http' */ export async function givenHttpServer (handler) { const server = http.createServer((req, res) => { - handler(req, res).catch(err => { + // Wrap the handler() call in an async function block to ensure synchronously thrown errors + // are converted to rejected promises. + ;(async () => { + await handler(req, res) + })().catch(err => { console.log('Unhandled server error:', err) res.statusCode = 500 res.write(err.message || err.toString()) From 737f2847a59370262595e474a113ca36f3d09cab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Mon, 30 Sep 2024 16:44:33 +0200 Subject: [PATCH 05/10] pipe web stream to node response stream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miroslav Bajtoš --- indexer/test/advertisement-walker.test.js | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/indexer/test/advertisement-walker.test.js b/indexer/test/advertisement-walker.test.js index 5e4150e..bd8d642 100644 --- a/indexer/test/advertisement-walker.test.js +++ b/indexer/test/advertisement-walker.test.js @@ -11,6 +11,8 @@ import { import { givenHttpServer } from './helpers/http-server.js' import { FRISBII_ADDRESS, FRISBII_AD_CID } from './helpers/test-data.js' import { assertOkResponse } from '../lib/http-assertions.js' +import * as stream from 'node:stream' +import { pipeline } from 'node:stream/promises' /** @import { ProviderInfo, WalkerState } from '../lib/typings.js' */ @@ -286,13 +288,10 @@ describe('processNextAdvertisement', () => { it('skips entries where the server responds with 404 cid not found', async () => { const { serverUrl } = await givenHttpServer(async (req, res) => { if (req.url === `/ipni/v1/ad/${FRISBII_AD_CID}`) { - const frisbeeRes = await fetch(FRISBII_ADDRESS + req.url) - await assertOkResponse(frisbeeRes) - // FIXME: can we pipe `frisbeeRes.body` directly to `res`? - // `frisbeeRes.body` is a Web API ReadableStream, `res` is a Node.js WritableStream - const body = await frisbeeRes.arrayBuffer() - res.write(new Uint8Array(body)) - res.end() + const frisbiiRes = await fetch(FRISBII_ADDRESS + req.url) + await assertOkResponse(frisbiiRes) + assert(frisbiiRes.body, 'frisbii response does not have any body') + await pipeline(stream.Readable.fromWeb(frisbiiRes.body), res) } else { res.statusCode = 404 res.write('cid not found') From b83d77425b80305954c34b22432062ed26149727 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Tue, 1 Oct 2024 13:57:45 +0200 Subject: [PATCH 06/10] refactor: introduce pRetry and error codes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miroslav Bajtoš --- indexer/lib/advertisement-walker.js | 145 ++++++++++++---------- indexer/package.json | 3 +- indexer/test/advertisement-walker.test.js | 19 ++- package-lock.json | 43 ++++++- repository/lib/typings.d.ts | 1 + 5 files changed, 140 insertions(+), 71 deletions(-) diff --git a/indexer/lib/advertisement-walker.js b/indexer/lib/advertisement-walker.js index 257435e..fb5de31 100644 --- a/indexer/lib/advertisement-walker.js +++ b/indexer/lib/advertisement-walker.js @@ -7,6 +7,7 @@ import * as multihash from 'multiformats/hashes/digest' import assert from 'node:assert' import timers from 'node:timers/promises' import { assertOkResponse } from './http-assertions.js' +import pRetry from 'p-retry' /** @import { ProviderInfo, WalkerState } from './typings.js' */ /** @import { RedisRepository as Repository } from '@filecoin-station/spark-piece-indexer-repository' */ @@ -151,7 +152,7 @@ export async function processNextAdvertisement ({ assert(state.tail) try { - const { previousAdvertisementCid, entriesFetchError, ...entry } = await fetchAdvertisedPayload( + const { previousAdvertisementCid, entry, error } = await fetchAdvertisedPayload( providerInfo.providerAddress, state.tail, { fetchTimeout } @@ -169,10 +170,13 @@ export async function processNextAdvertisement ({ state.status = `Walking the advertisements from ${state.head}, next step: ${state.tail}` } - if (entriesFetchError) { + if (error === 'CANNOT_FETCH_ENTRIES') { state.entriesNotRetrievable = (state.entriesNotRetrievable ?? 0) + 1 + } else if (error === 'CANNOT_DETERMINE_PIECE_CID') { + state.adsMissingPieceCID = (state.adsMissingPieceCID ?? 0) + 1 } - const indexEntry = (entry.pieceCid && entry.payloadCid) ? entry : undefined + + const indexEntry = entry?.pieceCid ? entry : undefined const finished = !state.tail return { newState: state, @@ -180,38 +184,16 @@ export async function processNextAdvertisement ({ finished } } catch (err) { - let reason - if (err instanceof Error) { - const url = 'url' in err ? err.url : providerInfo.providerAddress - if ('serverMessage' in err && err.serverMessage) { - reason = err.serverMessage - if ('statusCode' in err && err.statusCode) { - reason = `${err.statusCode} ${reason}` - } - } else if ('statusCode' in err && err.statusCode) { - reason = err.statusCode - } else if (err.name === 'TimeoutError') { - reason = 'operation timed out' - } else if ( - err.name === 'TypeError' && - err.message === 'fetch failed' && - err.cause && - err.cause instanceof Error - ) { - reason = err.cause.message - } - - reason = `HTTP request to ${url} failed: ${reason}` - } + const errorDescription = describeFetchError(err, providerInfo.providerAddress) debug( 'Cannot process provider %s (%s) advertisement %s: %s', providerId, providerInfo.providerAddress, state.tail, - reason ?? err + errorDescription ?? err ) - state.status = `Error processing ${state.tail}: ${reason ?? 'internal error'}` + state.status = `Error processing ${state.tail}: ${errorDescription ?? 'internal error'}` return { newState: state, failed: true @@ -219,10 +201,37 @@ export async function processNextAdvertisement ({ } } -/** @typedef {{ - pieceCid: string | undefined; - payloadCid: string; -}} AdvertisedPayload */ +/** + * @param {unknown} err + * @param {string} providerAddress + */ +function describeFetchError (err, providerAddress) { + if (!(err instanceof Error)) return undefined + + let reason + if ('serverMessage' in err && err.serverMessage) { + reason = err.serverMessage + if ('statusCode' in err && err.statusCode) { + reason = `${err.statusCode} ${reason}` + } + } else if ('statusCode' in err && err.statusCode) { + reason = err.statusCode + } else if (err.name === 'TimeoutError') { + reason = 'operation timed out' + } else if ( + err.name === 'TypeError' && + err.message === 'fetch failed' && + err.cause && + err.cause instanceof Error + ) { + reason = err.cause.message + } + if (!reason) return undefined + + const url = 'url' in err ? err.url : providerAddress + reason = `HTTP request to ${url} failed: ${reason}` + return reason +} /** * @param {string} providerAddress @@ -262,41 +271,51 @@ export async function fetchAdvertisedPayload (providerAddress, advertisementCid, const meta = parseMetadata(advertisement.Metadata['/'].bytes) const pieceCid = meta.deal?.PieceCID.toString() - - try { - const entriesChunk = - /** @type {{ - Entries: { '/' : { bytes: string } }[] - }} */( - await fetchCid(providerAddress, entriesCid, { fetchTimeout }) - ) - debug('entriesChunk %s %j', entriesCid, entriesChunk.Entries.slice(0, 5)) - const entryHash = entriesChunk.Entries[0]['/'].bytes - const payloadCid = CID.create(1, 0x55 /* raw */, multihash.decode(Buffer.from(entryHash, 'base64'))).toString() - + if (!pieceCid) { + debug('advertisement %s has no PieceCID in metadata: %j', advertisementCid, meta.deal) return { - previousAdvertisementCid, - pieceCid, - payloadCid + error: /** @type {const} */('CANNOT_DETERMINE_PIECE_CID'), + previousAdvertisementCid } - } catch (err) { - if (err && typeof err === 'object' && 'statusCode' in err && err.statusCode === 404) { - // The index provider cannot find the advertised entries. We cannot do much about that, - // it's unlikely that further request will succeed. Let's skip this advertisement. - debug( - 'Cannot fetch ad %s entries %s: %s %s', - advertisementCid, - entriesCid, - err.statusCode, - /** @type {any} */(err).serverMessage ?? '' - ) - return { - entriesFetchError: true, - previousAdvertisementCid, - pieceCid + } + + let entriesChunk + try { + entriesChunk = await pRetry( + async () => + /** @type {{ + Entries: { '/' : { bytes: string } }[] + }} */( + await fetchCid(providerAddress, entriesCid, { fetchTimeout }) + ), + { + shouldRetry: (err) => + err && 'statusCode' in err && typeof err.statusCode === 'number' && err.statusCode >= 500 } + ) + } catch (err) { + // We are not able to fetch the advertised entries. Skip this advertisement so that we can + // continue the ingestion of other advertisements. + const errorDescription = describeFetchError(err, providerAddress) + console.warn( + 'Cannot fetch ad %s entries %s: %s', + advertisementCid, + entriesCid, + errorDescription ?? err + ) + return { + error: /** @type {const} */('CANNOT_FETCH_ENTRIES'), + previousAdvertisementCid } - throw err + } + + debug('entriesChunk %s %j', entriesCid, entriesChunk.Entries.slice(0, 5)) + const entryHash = entriesChunk.Entries[0]['/'].bytes + const payloadCid = CID.create(1, 0x55 /* raw */, multihash.decode(Buffer.from(entryHash, 'base64'))).toString() + + return { + previousAdvertisementCid, + entry: { pieceCid, payloadCid } } } diff --git a/indexer/package.json b/indexer/package.json index 915e352..b3051fd 100644 --- a/indexer/package.json +++ b/indexer/package.json @@ -16,6 +16,7 @@ "@sentry/node": "^8.31.0", "debug": "^4.3.7", "ioredis": "^5.4.1", - "multiformats": "^13.3.0" + "multiformats": "^13.3.0", + "p-retry": "^6.2.0" } } diff --git a/indexer/test/advertisement-walker.test.js b/indexer/test/advertisement-walker.test.js index bd8d642..1e04403 100644 --- a/indexer/test/advertisement-walker.test.js +++ b/indexer/test/advertisement-walker.test.js @@ -173,6 +173,7 @@ describe('processNextAdvertisement', () => { head: undefined, // we finished the walk, there is no head tail: undefined, // we finished the walk, there is no next step lastHead: FRISBII_AD_CID, // lastHead was updated to head of the walk we finished + adsMissingPieceCID: 1, status: `All advertisements from ${newState?.lastHead} to the end of the chain were processed.` })) @@ -199,6 +200,7 @@ describe('processNextAdvertisement', () => { head: undefined, // we finished the walk, there is no head tail: undefined, // we finished the walk, there is no next step lastHead: FRISBII_AD_CID, // lastHead was updated to head of the walk we finished + adsMissingPieceCID: 1, status: `All advertisements from ${newState?.lastHead} to the end of the chain were processed.` })) @@ -315,7 +317,7 @@ describe('processNextAdvertisement', () => { finished: true, indexEntry: undefined, newState: { - entriesNotRetrievable: 1, + adsMissingPieceCID: 1, head: undefined, tail: undefined, lastHead: FRISBII_AD_CID, @@ -331,17 +333,18 @@ describe('fetchAdvertisedPayload', () => { it('returns previousAdvertisementCid, pieceCid and payloadCid for Graphsync retrievals', async () => { const result = await fetchAdvertisedPayload(providerAddress, knownAdvertisement.adCid) assert.deepStrictEqual(result, /** @type {AdvertisedPayload} */({ - payloadCid: knownAdvertisement.payloadCid, - pieceCid: knownAdvertisement.pieceCid, + entry: { + payloadCid: knownAdvertisement.payloadCid, + pieceCid: knownAdvertisement.pieceCid + }, previousAdvertisementCid: knownAdvertisement.previousAdCid })) }) - it('returns undefined pieceCid for HTTP retrievals', async () => { + it('returns CANNOT_DETERMINE_PIECE_CID error for HTTP retrievals', async () => { const result = await fetchAdvertisedPayload(FRISBII_ADDRESS, FRISBII_AD_CID) assert.deepStrictEqual(result, /** @type {AdvertisedPayload} */({ - payloadCid: 'bafkreih5zasorm4tlfga4ztwvm2dlnw6jxwwuvgnokyt3mjamfn3svvpyy', - pieceCid: undefined, + error: 'CANNOT_DETERMINE_PIECE_CID', // Our Frisbii instance announced only one advertisement // That's unrelated to HTTP vs Graphsync retrievals previousAdvertisementCid: undefined @@ -452,6 +455,8 @@ describe('data schema for REST API', () => { // Is it a problem if our observability API does not tell the provider address? ingestionStatus: walkerState.status, lastHeadWalkedFrom: walkerState.lastHead ?? walkerState.head, + adsMissingPieceCID: walkerState.adsMissingPieceCID ?? 0, + entriesNotRetrievable: walkerState.entriesNotRetrievable ?? 0, piecesIndexed: await repository.countPiecesIndexed(providerId) } @@ -459,6 +464,8 @@ describe('data schema for REST API', () => { providerId, ingestionStatus: `Walking the advertisements from ${knownAdvertisement.adCid}, next step: ${knownAdvertisement.previousAdCid}`, lastHeadWalkedFrom: knownAdvertisement.adCid, + adsMissingPieceCID: 0, + entriesNotRetrievable: 0, piecesIndexed: 1 }) }) diff --git a/package-lock.json b/package-lock.json index ec47908..4113a86 100644 --- a/package-lock.json +++ b/package-lock.json @@ -41,7 +41,8 @@ "@sentry/node": "^8.31.0", "debug": "^4.3.7", "ioredis": "^5.4.1", - "multiformats": "^13.3.0" + "multiformats": "^13.3.0", + "p-retry": "^6.2.0" }, "devDependencies": { "standard": "^17.1.2" @@ -892,6 +893,11 @@ "@types/pg": "*" } }, + "node_modules/@types/retry": { + "version": "0.12.2", + "resolved": "https://registry.npmjs.org/@types/retry/-/retry-0.12.2.tgz", + "integrity": "sha512-XISRgDJ2Tc5q4TRqvgJtzsRkFYNJzZrhTdtMoGVBttwzzQJkPnS3WWTFc7kuDRoPtPakl+T+OfdEUjYJj7Jbow==" + }, "node_modules/@types/shimmer": { "version": "1.2.0", "resolved": "https://registry.npmjs.org/@types/shimmer/-/shimmer-1.2.0.tgz", @@ -2703,6 +2709,17 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/is-network-error": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/is-network-error/-/is-network-error-1.1.0.tgz", + "integrity": "sha512-tUdRRAnhT+OtCZR/LxZelH/C7QtjtFrTu5tXCA8pl55eTUElUHT+GPYV8MBMBvea/j+NxQqVt3LbWMRir7Gx9g==", + "engines": { + "node": ">=16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/is-number-object": { "version": "1.0.7", "resolved": "https://registry.npmjs.org/is-number-object/-/is-number-object-1.0.7.tgz", @@ -3248,6 +3265,22 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/p-retry": { + "version": "6.2.0", + "resolved": "https://registry.npmjs.org/p-retry/-/p-retry-6.2.0.tgz", + "integrity": "sha512-JA6nkq6hKyWLLasXQXUrO4z8BUZGUt/LjlJxx8Gb2+2ntodU/SS63YZ8b0LUTbQ8ZB9iwOfhEPhg4ykKnn2KsA==", + "dependencies": { + "@types/retry": "0.12.2", + "is-network-error": "^1.0.0", + "retry": "^0.13.1" + }, + "engines": { + "node": ">=16.17" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/p-try": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/p-try/-/p-try-2.2.0.tgz", @@ -3647,6 +3680,14 @@ "node": ">=4" } }, + "node_modules/retry": { + "version": "0.13.1", + "resolved": "https://registry.npmjs.org/retry/-/retry-0.13.1.tgz", + "integrity": "sha512-XQBQ3I8W1Cge0Seh+6gjj03LbmRFWuoszgK9ooCpwYIrhhoO80pfq4cUkU5DkknwfOfFteRwlZ56PYOGYyFWdg==", + "engines": { + "node": ">= 4" + } + }, "node_modules/reusify": { "version": "1.0.4", "resolved": "https://registry.npmjs.org/reusify/-/reusify-1.0.4.tgz", diff --git a/repository/lib/typings.d.ts b/repository/lib/typings.d.ts index 520dd6f..3732e02 100644 --- a/repository/lib/typings.d.ts +++ b/repository/lib/typings.d.ts @@ -31,6 +31,7 @@ export interface WalkerState { lastHead?: string; status: string; entriesNotRetrievable?: number; + adsMissingPieceCID?: number; } export type ProviderToWalkerStateMap = Map From d9707c48c54280f24821e55ae853998bb74ddefe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Tue, 1 Oct 2024 14:09:36 +0200 Subject: [PATCH 07/10] fixup! tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miroslav Bajtoš --- indexer/test/advertisement-walker.test.js | 40 +++++++++++++++++++---- 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/indexer/test/advertisement-walker.test.js b/indexer/test/advertisement-walker.test.js index 1e04403..8e643e4 100644 --- a/indexer/test/advertisement-walker.test.js +++ b/indexer/test/advertisement-walker.test.js @@ -287,13 +287,15 @@ describe('processNextAdvertisement', () => { }) }) - it('skips entries where the server responds with 404 cid not found', async () => { + it('skips entries where the server responds with 404 cid not found and updates the counter', async () => { + const { adCid, previousAdCid } = knownAdvertisement + const { serverUrl } = await givenHttpServer(async (req, res) => { - if (req.url === `/ipni/v1/ad/${FRISBII_AD_CID}`) { - const frisbiiRes = await fetch(FRISBII_ADDRESS + req.url) - await assertOkResponse(frisbiiRes) - assert(frisbiiRes.body, 'frisbii response does not have any body') - await pipeline(stream.Readable.fromWeb(frisbiiRes.body), res) + if (req.url === `/ipni/v1/ad/${adCid}`) { + const providerRes = await fetch(providerAddress + req.url) + await assertOkResponse(providerRes) + assert(providerRes.body, 'provider response does not have any body') + await pipeline(stream.Readable.fromWeb(providerRes.body), res) } else { res.statusCode = 404 res.write('cid not found') @@ -304,6 +306,32 @@ describe('processNextAdvertisement', () => { /** @type {ProviderInfo} */ const providerInfo = { providerAddress: serverUrl, + lastAdvertisementCID: adCid + } + + const result = await processNextAdvertisement({ + providerId, + providerInfo, + walkerState: undefined + }) + + assert.deepStrictEqual(result, { + finished: false, + indexEntry: undefined, + newState: { + entriesNotRetrievable: 1, + head: adCid, + tail: previousAdCid, + lastHead: undefined, + status: `Walking the advertisements from ${adCid}, next step: ${previousAdCid}` + } + }) + }) + + it('skips entries when PieceCID is not in advertisement metadata and updates the counter', async () => { + /** @type {ProviderInfo} */ + const providerInfo = { + providerAddress: FRISBII_ADDRESS, lastAdvertisementCID: FRISBII_AD_CID } From 1f06f37862798bc703adf002d3df92700f84811c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Wed, 2 Oct 2024 09:20:52 +0200 Subject: [PATCH 08/10] fixup! improve code formatting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miroslav Bajtoš --- indexer/lib/advertisement-walker.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/indexer/lib/advertisement-walker.js b/indexer/lib/advertisement-walker.js index fb5de31..0f56666 100644 --- a/indexer/lib/advertisement-walker.js +++ b/indexer/lib/advertisement-walker.js @@ -282,11 +282,11 @@ export async function fetchAdvertisedPayload (providerAddress, advertisementCid, let entriesChunk try { entriesChunk = await pRetry( - async () => - /** @type {{ - Entries: { '/' : { bytes: string } }[] - }} */( - await fetchCid(providerAddress, entriesCid, { fetchTimeout }) + () => + /** @type {Promise<{ + Entries: { '/' : { bytes: string } }[] + }>} */( + fetchCid(providerAddress, entriesCid, { fetchTimeout }) ), { shouldRetry: (err) => From a4d7f3a601d8786b4e2e1b44a7fc2598ef26982c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Wed, 2 Oct 2024 13:33:28 +0200 Subject: [PATCH 09/10] fixup! rename error codes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miroslav Bajtoš --- indexer/lib/advertisement-walker.js | 8 ++++---- indexer/test/advertisement-walker.test.js | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/indexer/lib/advertisement-walker.js b/indexer/lib/advertisement-walker.js index 0f56666..e1248b8 100644 --- a/indexer/lib/advertisement-walker.js +++ b/indexer/lib/advertisement-walker.js @@ -170,9 +170,9 @@ export async function processNextAdvertisement ({ state.status = `Walking the advertisements from ${state.head}, next step: ${state.tail}` } - if (error === 'CANNOT_FETCH_ENTRIES') { + if (error === 'ENTRIES_NOT_RETRIEVABLE') { state.entriesNotRetrievable = (state.entriesNotRetrievable ?? 0) + 1 - } else if (error === 'CANNOT_DETERMINE_PIECE_CID') { + } else if (error === 'MISSING_PIECE_CID') { state.adsMissingPieceCID = (state.adsMissingPieceCID ?? 0) + 1 } @@ -274,7 +274,7 @@ export async function fetchAdvertisedPayload (providerAddress, advertisementCid, if (!pieceCid) { debug('advertisement %s has no PieceCID in metadata: %j', advertisementCid, meta.deal) return { - error: /** @type {const} */('CANNOT_DETERMINE_PIECE_CID'), + error: /** @type {const} */('MISSING_PIECE_CID'), previousAdvertisementCid } } @@ -304,7 +304,7 @@ export async function fetchAdvertisedPayload (providerAddress, advertisementCid, errorDescription ?? err ) return { - error: /** @type {const} */('CANNOT_FETCH_ENTRIES'), + error: /** @type {const} */('ENTRIES_NOT_RETRIEVABLE'), previousAdvertisementCid } } diff --git a/indexer/test/advertisement-walker.test.js b/indexer/test/advertisement-walker.test.js index 8e643e4..1b9f7b8 100644 --- a/indexer/test/advertisement-walker.test.js +++ b/indexer/test/advertisement-walker.test.js @@ -369,10 +369,10 @@ describe('fetchAdvertisedPayload', () => { })) }) - it('returns CANNOT_DETERMINE_PIECE_CID error for HTTP retrievals', async () => { + it('returns MISSING_PIECE_CID error for HTTP retrievals', async () => { const result = await fetchAdvertisedPayload(FRISBII_ADDRESS, FRISBII_AD_CID) assert.deepStrictEqual(result, /** @type {AdvertisedPayload} */({ - error: 'CANNOT_DETERMINE_PIECE_CID', + error: 'MISSING_PIECE_CID', // Our Frisbii instance announced only one advertisement // That's unrelated to HTTP vs Graphsync retrievals previousAdvertisementCid: undefined From efd9da6eb33dea736104adcc5ff64fe31ecbd053 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Wed, 2 Oct 2024 13:35:18 +0200 Subject: [PATCH 10/10] fixup! jsdoc for return type MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miroslav Bajtoš --- indexer/lib/advertisement-walker.js | 1 + 1 file changed, 1 insertion(+) diff --git a/indexer/lib/advertisement-walker.js b/indexer/lib/advertisement-walker.js index e1248b8..527e5f0 100644 --- a/indexer/lib/advertisement-walker.js +++ b/indexer/lib/advertisement-walker.js @@ -204,6 +204,7 @@ export async function processNextAdvertisement ({ /** * @param {unknown} err * @param {string} providerAddress + * @returns {string | undefined} */ function describeFetchError (err, providerAddress) { if (!(err instanceof Error)) return undefined