Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 23 additions & 21 deletions src/crawler/Crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,20 +135,7 @@ export class Crawler {
this.client.on('streamCreated', this.onStreamCreated)
let iterationIndex = 0
while (true) {
try {
const topology = await crawlTopology(
networkNodeFacade,
this.client.getEntryPoints(),
(nodeInfo: NormalizedNodeInfo) => nodeInfo.controlLayer.neighbors,
`full-${Date.now()}`
)
await this.nodeRepository.replaceNetworkTopology(topology)
await this.analyzeContractStreams(topology, this.subscribeGate)
} catch (e) {
logger.error('Error', { err: e })
await wait(RECOVERY_DELAY)
}
logger.info('Crawl iteration completed')
await this.runCrawlIteration(networkNodeFacade)
if ((iterationCount === undefined) || (iterationIndex < iterationCount - 1)) {
await wait(this.config.crawler.iterationDelay)
iterationIndex++
Expand All @@ -158,9 +145,25 @@ export class Crawler {
}
}

private async runCrawlIteration(networkNodeFacade: NetworkNodeFacade): Promise<void> {
try {
const topology = await crawlTopology(
networkNodeFacade,
this.client.getEntryPoints(),
(nodeInfo: NormalizedNodeInfo) => nodeInfo.controlLayer.neighbors,
`full-${Date.now()}`
)
await this.nodeRepository.replaceNetworkTopology(topology)
await this.analyzeContractStreams(topology)
} catch (e) {
logger.error('Error', { err: e })
await wait(RECOVERY_DELAY)
}
logger.info('Crawl iteration completed')
}

private async analyzeContractStreams(
topology: Topology,
subscribeGate: SubscribeGate
topology: Topology
): Promise<void> {
// wrap this.client.getAllStreams() with retry because in streamr-docker-dev environment
// the graph-node dependency may not be available immediately after the service has
Expand All @@ -176,7 +179,7 @@ export class Crawler {
const workedThreadLimit = pLimit(MAX_SUBSCRIPTION_COUNT)
await Promise.all(sortedContractStreams.map((stream: Stream) => {
return workedThreadLimit(async () => {
await this.analyzeStream(stream.id, await stream.getMetadata(), topology, subscribeGate)
await this.analyzeStream(stream.id, await stream.getMetadata(), topology)
})
}))

Expand All @@ -186,8 +189,7 @@ export class Crawler {
private async analyzeStream(
id: StreamID,
metadata: StreamMetadata,
topology: Topology,
subscribeGate: SubscribeGate
topology: Topology
): Promise<void> {
logger.info(`Analyze ${id}`)
const peersByPartition = new Map<number, Set<DhtAddress>>
Expand All @@ -204,7 +206,7 @@ export class Crawler {
[...peersByPartition.keys()],
isPublicStream(subscriberCount),
await this.client.getNetworkNodeFacade(),
subscribeGate,
this.subscribeGate!,
this.config
)
: { messagesPerSecond: 0, bytesPerSecond: 0 }
Expand Down Expand Up @@ -275,7 +277,7 @@ export class Crawler {
return (streamPartitions.map((sp) => sp.contentDeliveryLayerNeighbors.map((n) => n.peerDescriptor))).flat()
}, `stream-${payload.streamId}-${Date.now()}`)
// TODO could add new nodes and neighbors to NodeRepository?
await this.analyzeStream(payload.streamId, payload.metadata, topology, this.subscribeGate!)
await this.analyzeStream(payload.streamId, payload.metadata, topology)
} catch (e: any) {
logger.error(`Failed to handle new stream ${payload.streamId}`, e)
}
Expand Down