diff --git a/src/crawler/Crawler.ts b/src/crawler/Crawler.ts index 2592caf..560a247 100644 --- a/src/crawler/Crawler.ts +++ b/src/crawler/Crawler.ts @@ -135,20 +135,7 @@ export class Crawler { this.client.on('streamCreated', this.onStreamCreated) let iterationIndex = 0 while (true) { - try { - const topology = await crawlTopology( - networkNodeFacade, - this.client.getEntryPoints(), - (nodeInfo: NormalizedNodeInfo) => nodeInfo.controlLayer.neighbors, - `full-${Date.now()}` - ) - await this.nodeRepository.replaceNetworkTopology(topology) - await this.analyzeContractStreams(topology, this.subscribeGate) - } catch (e) { - logger.error('Error', { err: e }) - await wait(RECOVERY_DELAY) - } - logger.info('Crawl iteration completed') + await this.runCrawlIteration(networkNodeFacade) if ((iterationCount === undefined) || (iterationIndex < iterationCount - 1)) { await wait(this.config.crawler.iterationDelay) iterationIndex++ @@ -158,9 +145,25 @@ export class Crawler { } } + private async runCrawlIteration(networkNodeFacade: NetworkNodeFacade): Promise { + try { + const topology = await crawlTopology( + networkNodeFacade, + this.client.getEntryPoints(), + (nodeInfo: NormalizedNodeInfo) => nodeInfo.controlLayer.neighbors, + `full-${Date.now()}` + ) + await this.nodeRepository.replaceNetworkTopology(topology) + await this.analyzeContractStreams(topology) + } catch (e) { + logger.error('Error', { err: e }) + await wait(RECOVERY_DELAY) + } + logger.info('Crawl iteration completed') + } + private async analyzeContractStreams( - topology: Topology, - subscribeGate: SubscribeGate + topology: Topology ): Promise { // wrap this.client.getAllStreams() with retry because in streamr-docker-dev environment // the graph-node dependency may not be available immediately after the service has @@ -176,7 +179,7 @@ export class Crawler { const workedThreadLimit = pLimit(MAX_SUBSCRIPTION_COUNT) await Promise.all(sortedContractStreams.map((stream: Stream) => { return workedThreadLimit(async () => { - await this.analyzeStream(stream.id, await stream.getMetadata(), topology, subscribeGate) + await this.analyzeStream(stream.id, await stream.getMetadata(), topology) }) })) @@ -186,8 +189,7 @@ export class Crawler { private async analyzeStream( id: StreamID, metadata: StreamMetadata, - topology: Topology, - subscribeGate: SubscribeGate + topology: Topology ): Promise { logger.info(`Analyze ${id}`) const peersByPartition = new Map> @@ -204,7 +206,7 @@ export class Crawler { [...peersByPartition.keys()], isPublicStream(subscriberCount), await this.client.getNetworkNodeFacade(), - subscribeGate, + this.subscribeGate!, this.config ) : { messagesPerSecond: 0, bytesPerSecond: 0 } @@ -275,7 +277,7 @@ export class Crawler { return (streamPartitions.map((sp) => sp.contentDeliveryLayerNeighbors.map((n) => n.peerDescriptor))).flat() }, `stream-${payload.streamId}-${Date.now()}`) // TODO could add new nodes and neighbors to NodeRepository? - await this.analyzeStream(payload.streamId, payload.metadata, topology, this.subscribeGate!) + await this.analyzeStream(payload.streamId, payload.metadata, topology) } catch (e: any) { logger.error(`Failed to handle new stream ${payload.streamId}`, e) }