Skip to content
68 changes: 48 additions & 20 deletions indexer/lib/advertisement-walker.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ export async function processNextAdvertisement ({
assert(state.tail)

try {
const { previousAdvertisementCid, ...entry } = await fetchAdvertisedPayload(
const { previousAdvertisementCid, entriesFetchError, ...entry } = await fetchAdvertisedPayload(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is entriesFetchError plural and entry singular?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me explain how this works:

  • First, we fetch the advertisement object. This object contains a CID linking to another object containing (payload block) entries.
  • Second, we fetch the object with the entries.
  • Finally, we take the first entry and discard the rest.

So, the error happens when we are fetching entries, but then we return only a single entry.

providerInfo.providerAddress,
state.tail,
{ fetchTimeout }
Expand All @@ -169,7 +169,10 @@ export async function processNextAdvertisement ({
state.status = `Walking the advertisements from ${state.head}, next step: ${state.tail}`
}

const indexEntry = entry.pieceCid ? entry : undefined
if (entriesFetchError) {
state.entriesNotRetrievable = (state.entriesNotRetrievable ?? 0) + 1
}
const indexEntry = entry.pieceCid && entry.payloadCid ? entry : undefined
const finished = !state.tail
return {
newState: state,
Expand All @@ -179,21 +182,26 @@ export async function processNextAdvertisement ({
} catch (err) {
let reason
if (err instanceof Error) {
const url = 'url' in err ? err.url : undefined
const url = 'url' in err ? err.url : providerInfo.providerAddress
if ('serverMessage' in err && err.serverMessage) {
reason = err.serverMessage
if ('statusCode' in err && err.statusCode) {
reason = `${err.statusCode} ${reason}`
}
} else if ('statusCode' in err && err.statusCode) {
reason = `HTTP request to ${url ?? providerInfo.providerAddress} failed: ${err.statusCode}`
reason = err.statusCode
} else if (err.name === 'TimeoutError') {
reason = `HTTP request to ${url ?? providerInfo.providerAddress} timed out`
reason = 'operation timed out'
} else if (
err.name === 'TypeError' &&
err.message === 'fetch failed' &&
err.cause &&
err.cause instanceof Error
) {
reason = `HTTP request to ${url ?? providerInfo.providerAddress} failed: ${err.cause.message}`
reason = err.cause.message
}

reason = `HTTP request to ${url} failed: ${reason}`
}

debug(
Expand Down Expand Up @@ -252,23 +260,43 @@ export async function fetchAdvertisedPayload (providerAddress, advertisementCid,
return { previousAdvertisementCid }
}

const entriesChunk =
const meta = parseMetadata(advertisement.Metadata['/'].bytes)
const pieceCid = meta.deal?.PieceCID.toString()

try {
const entriesChunk =
/** @type {{
Entries: { '/' : { bytes: string } }[]
}} */(
await fetchCid(providerAddress, entriesCid, { fetchTimeout })
)
debug('entriesChunk %s %j', entriesCid, entriesChunk.Entries.slice(0, 5))
const entryHash = entriesChunk.Entries[0]['/'].bytes
const payloadCid = CID.create(1, 0x55 /* raw */, multihash.decode(Buffer.from(entryHash, 'base64'))).toString()
await fetchCid(providerAddress, entriesCid, { fetchTimeout })
)
debug('entriesChunk %s %j', entriesCid, entriesChunk.Entries.slice(0, 5))
const entryHash = entriesChunk.Entries[0]['/'].bytes
const payloadCid = CID.create(1, 0x55 /* raw */, multihash.decode(Buffer.from(entryHash, 'base64'))).toString()

const meta = parseMetadata(advertisement.Metadata['/'].bytes)
const pieceCid = meta.deal?.PieceCID.toString()

return {
previousAdvertisementCid,
pieceCid,
payloadCid
return {
previousAdvertisementCid,
pieceCid,
payloadCid
}
} catch (err) {
if (err && typeof err === 'object' && 'statusCode' in err && err.statusCode === 404) {
// The index provider cannot find the advertised entries. We cannot do much about that,
// it's unlikely that further request will succeed. Let's skip this advertisement.
debug(
'Cannot fetch ad %s entries %s: %s %s',
advertisementCid,
entriesCid,
err.statusCode,
/** @type {any} */(err).serverMessage ?? '<not found>'
)
return {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we returning an object with { error } instead of throwing it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this can be confusing, and I am thinking about redesigning my implementation.

What I wanted to accomplish in this PR:

  • If the request for entries returns 404, we want to skip the advertisement and walk to the next advertisement in the chain. This is handled by returning { error }
  • If the request for entries fails with a different error, we want to retry it. This is handled by (re)throwing the error.

After thinking about this some more, I am leaning towards the following design:

  • If the request for entries returns 404, we skip the advertisement. (No change from the current proposal.)
  • If the request for entries fails differently, we retry it several times. If all attempts fail, then we skip the advertisement, too.

That way, we optimise for finishing the walk of all advertisements so that we can process new advertisements and avoid repeating the same failing request forever.

entriesFetchError: true,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the best property name, can you suggest a better one? Or may I should set the value to a string instead of an error?

Suggested change
entriesFetchError: true,
entriesFetchError: String(err.statusCode),

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just looking up this property, as was assuming it's one of these

  • an Error
  • an array of entries for which there was a fetch error
  • the count of entries for which there was a fetch error

What about entriesFetchError: err? Or otherwise hasEntriesFetchError: true

previousAdvertisementCid,
pieceCid
}
}
throw err
}
}

Expand All @@ -291,7 +319,7 @@ async function fetchCid (providerBaseUrl, cid, { fetchTimeout } = {}) {
if (err && typeof err === 'object') {
Object.assign(err, { url })
}
debug('Error from %s -> %o', url, err)
debug('Error from %s ->', url, err)
throw err
}
}
Expand Down
38 changes: 0 additions & 38 deletions indexer/lib/typings.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,41 +14,3 @@ export interface ProviderInfo {

export type ProviderToInfoMap = Map<string, ProviderInfo>;

/**
lastAdCID --> [ ] -\
↓ |
... | entries announced after we started the current walk
↓ |
[ ] -/
head --> [ ] -\
↓ |
... | entries visited in this walk
↓ |
[ ] -/
tail --> [ ] -\
↓ |
... | entries NOT visited yet
↓ |
[ ] -/
lastHead --> [ ] -\
↓ |
... | entries visited in the previous walks
↓ |
[ ] -/
(null)
*/
export interface WalkerState {
head?: string;
tail?: string;
lastHead?: string;
status: string;
}

export type ProviderToWalkerStateMap = Map<string, WalkerState>

export type PiecePayloadCIDs = string[];

58 changes: 48 additions & 10 deletions indexer/test/advertisement-walker.test.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import { RedisRepository } from '@filecoin-station/spark-piece-indexer-repository'
import { Redis } from 'ioredis'
import assert from 'node:assert'
import { once } from 'node:events'
import http from 'node:http'
import { after, before, beforeEach, describe, it } from 'node:test'
import { setTimeout } from 'node:timers/promises'
import {
fetchAdvertisedPayload,
processNextAdvertisement,
walkOneStep
} from '../lib/advertisement-walker.js'
import { givenHttpServer } from './helpers/http-server.js'
import { FRISBII_ADDRESS, FRISBII_AD_CID } from './helpers/test-data.js'
import { assertOkResponse } from '../lib/http-assertions.js'

/** @import { ProviderInfo, WalkerState } from '../lib/typings.js' */

Expand Down Expand Up @@ -253,19 +253,15 @@ describe('processNextAdvertisement', () => {
})

it('handles timeout errors and explains the problem in the status', async () => {
const server = http.createServer(async (_req, res) => {
await setTimeout(500)
const { serverUrl } = await givenHttpServer(async (_req, res) => {
await setTimeout(100)
res.statusCode = 501
res.end()
})
server.listen(0, '127.0.0.1')
server.unref()
await once(server, 'listening')
const serverPort = /** @type {import('node:net').AddressInfo} */(server.address()).port

/** @type {ProviderInfo} */
const providerInfo = {
providerAddress: `http://127.0.0.1:${serverPort}/`,
providerAddress: serverUrl,
lastAdvertisementCID: 'baguqeeraTEST'
}

Expand All @@ -282,7 +278,49 @@ describe('processNextAdvertisement', () => {
head: 'baguqeeraTEST',
tail: 'baguqeeraTEST',
lastHead: undefined,
status: `Error processing baguqeeraTEST: HTTP request to http://127.0.0.1:${serverPort}/ipni/v1/ad/baguqeeraTEST timed out`
status: `Error processing baguqeeraTEST: HTTP request to ${serverUrl}ipni/v1/ad/baguqeeraTEST failed: operation timed out`
}
})
})

it('skips entries where the server responds with 404 cid not found', async () => {
const { serverUrl } = await givenHttpServer(async (req, res) => {
if (req.url === `/ipni/v1/ad/${FRISBII_AD_CID}`) {
const frisbeeRes = await fetch(FRISBII_ADDRESS + req.url)
await assertOkResponse(frisbeeRes)
// FIXME: can we pipe `frisbeeRes.body` directly to `res`?
// `frisbeeRes.body` is a Web API ReadableStream, `res` is a Node.js WritableStream
const body = await frisbeeRes.arrayBuffer()
res.write(new Uint8Array(body))
res.end()
} else {
res.statusCode = 404
res.write('cid not found')
res.end()
}
})

/** @type {ProviderInfo} */
const providerInfo = {
providerAddress: serverUrl,
lastAdvertisementCID: FRISBII_AD_CID
}

const result = await processNextAdvertisement({
providerId,
providerInfo,
walkerState: undefined
})

assert.deepStrictEqual(result, {
finished: true,
indexEntry: undefined,
newState: {
entriesNotRetrievable: 1,
head: undefined,
tail: undefined,
lastHead: FRISBII_AD_CID,
status: `All advertisements from ${FRISBII_AD_CID} to the end of the chain were processed.`
}
})
})
Expand Down
25 changes: 25 additions & 0 deletions indexer/test/helpers/http-server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { once } from 'node:events'
import http from 'node:http'

/**
* @param {http.RequestListener} handler
*/
export async function givenHttpServer (handler) {
const server = http.createServer((req, res) => {
;(async () => {
await handler(req, res)
})().catch(err => {
console.log('Unhandled server error:', err)
res.statusCode = 500
res.write(err.message || err.toString())
res.end()
})
})

server.listen(0, '127.0.0.1')
server.unref()
await once(server, 'listening')
const serverPort = /** @type {import('node:net').AddressInfo} */(server.address()).port
const serverUrl = `http://127.0.0.1:${serverPort}/`
return { server, serverPort, serverUrl }
}
1 change: 1 addition & 0 deletions repository/lib/typings.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export interface WalkerState {
tail?: string;
lastHead?: string;
status: string;
entriesNotRetrievable?: number;
}

export type ProviderToWalkerStateMap = Map<string, WalkerState>
Expand Down