-
Notifications
You must be signed in to change notification settings - Fork 1
feat: initial implementation of the indexer #7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 22 commits
c53055f
2ec7716
fe0e9cc
9bbe3bc
ef48905
df599e1
7482b0b
8c612a1
3e0d552
459a14c
36174d8
17a05e8
9a93803
72da9eb
39e257a
a22bc7a
c86f645
a6cb5e2
778f3e9
d14a7f0
89ec9b2
a0a153a
1e31450
cafe142
455b1ae
d89d33a
0755f2a
fa18f10
052ed7a
cc70be0
faaa285
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
import assert from 'assert' | ||
import { Redis } from 'ioredis' | ||
import { walkChain } from '../lib/advertisement-walker.js' | ||
import { runIpniSync } from '../lib/ipni-watcher.js' | ||
import { RedisRepository } from '../lib/redis-repository.js' | ||
|
||
/** @import { ProviderToInfoMap } from '../lib/typings.d.ts' */ | ||
|
||
const { | ||
REDIS_URL: redisUrl = 'redis://localhost:6379' | ||
} = process.env | ||
|
||
// TODO: setup Sentry | ||
|
||
const redisUrlParsed = new URL(redisUrl) | ||
const redis = new Redis({ | ||
host: redisUrlParsed.hostname, | ||
port: Number(redisUrlParsed.port), | ||
username: redisUrlParsed.username, | ||
password: redisUrlParsed.password, | ||
lazyConnect: true, // call connect() explicitly so that we can exit on connection error | ||
family: 6 // required for upstash | ||
}) | ||
|
||
await redis.connect() | ||
const repository = new RedisRepository(redis) | ||
|
||
/** @type {Set<string>} */ | ||
const providerIdsActivelyWalked = new Set() | ||
bajtos marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
/** @type {ProviderToInfoMap} */ | ||
const recentProvidersInfo = new Map() | ||
|
||
/** | ||
* @param {string} providerId | ||
*/ | ||
const getProviderInfo = async (providerId) => { | ||
const info = recentProvidersInfo.get(providerId) | ||
assert(!!info, `Unknown providerId ${providerId}`) | ||
bajtos marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
return info | ||
} | ||
|
||
for await (const providerInfos of runIpniSync({ minSyncIntervalInMs: 60_000 })) { | ||
for (const [providerId, providerInfo] of providerInfos.entries()) { | ||
recentProvidersInfo.set(providerId, providerInfo) | ||
if (providerIdsActivelyWalked.has(providerId)) continue | ||
|
||
providerIdsActivelyWalked.add(providerId) | ||
walkChain({ | ||
repository, | ||
providerId, | ||
getProviderInfo, | ||
minStepIntervalInMs: 100 | ||
}).finally( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you worried about the maximum concurrency this can run in? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great question! My current thinking is that there are several thousand storage providers now, plus maybe a few hundred non-Filecoin index providers. We should not be running more than 10k concurrent walkers. Walkers spend most of their time waiting for I/O (Redis, HTTP requests to index providers) or sleeping between steps. I think that Node.js can easily handle such load. WDYT? But! Maybe this is a sign that we need more visibility into this aspect. What I can do - but preferably in a follow-up pull request, is add an InfluxDB client and periodically write a data point with the number of concurrent walkers. WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm happy to let it fail, debug and then see that maximum concurrency is the issue. It would be something else if we knew yes 100% it is, but like this I think it's fine. What we need monitoring for is whether the indexer is delayed, I believe as long as it's not, no other metric is important. |
||
() => providerIdsActivelyWalked.delete(providerId) | ||
) | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.