diff --git a/initialize-database.sql b/initialize-database.sql index fdb2175..a3e9a86 100755 --- a/initialize-database.sql +++ b/initialize-database.sql @@ -30,6 +30,7 @@ CREATE TABLE IF NOT EXISTS neighbors ( streamPartId VARCHAR(500) NOT NULL, nodeId1 CHAR(40) NOT NULL, nodeId2 CHAR(40) NOT NULL, + rtt INTEGER UNSIGNED, PRIMARY KEY (streamPartId, nodeId1, nodeId2), FOREIGN KEY (nodeId1) REFERENCES nodes(id), FOREIGN KEY (nodeId2) REFERENCES nodes(id), diff --git a/src/crawler/Crawler.ts b/src/crawler/Crawler.ts index 0fc695c..2592caf 100644 --- a/src/crawler/Crawler.ts +++ b/src/crawler/Crawler.ts @@ -53,10 +53,10 @@ const createNodeInfoLogOutput = (nodeInfo: NormalizedNodeInfo) => { neighbors: nodeInfo.controlLayer.neighbors.map(toNodeId), connections: nodeInfo.controlLayer.connections.map(toNodeId) }, - streamPartitions: nodeInfo.streamPartitions.map((sp: any) => ({ + streamPartitions: nodeInfo.streamPartitions.map((sp) => ({ id: sp.id, controlLayerNeighbors: sp.controlLayerNeighbors.map(toNodeId), - contentDeliveryLayerNeighbors: sp.contentDeliveryLayerNeighbors.map((n: any) => toNodeId(n.peerDescriptor)) // TODO better type + contentDeliveryLayerNeighbors: sp.contentDeliveryLayerNeighbors.map((n) => ({ nodeId: toNodeId(n.peerDescriptor), rtt: n.rtt })) })), applicationVersion: nodeInfo.applicationVersion } @@ -192,7 +192,7 @@ export class Crawler { logger.info(`Analyze ${id}`) const peersByPartition = new Map> for (const partition of range(getStreamPartitionCount(metadata))) { - peersByPartition.set(partition, topology.getPeers(toStreamPartID(id, partition))) + peersByPartition.set(partition, topology.getPeerNodeIds(toStreamPartID(id, partition))) } try { const publisherCount = await this.client.getPublisherOrSubscriberCount(id, StreamPermission.PUBLISH) diff --git a/src/crawler/Topology.ts b/src/crawler/Topology.ts index 7510751..cbf4ed7 100644 --- a/src/crawler/Topology.ts +++ b/src/crawler/Topology.ts @@ -3,9 +3,14 @@ import { DhtAddress, StreamPartID } from '@streamr/sdk' import { Multimap, numberToIpv4, StreamPartIDUtils } from '@streamr/utils' import { NormalizedNodeInfo } from './NetworkNodeFacade' +export interface Neighbor { + nodeId: DhtAddress + rtt?: number +} + export interface Node { id: DhtAddress - streamPartNeighbors: Multimap + streamPartNeighbors: Multimap ipAddress?: string } @@ -13,14 +18,14 @@ export class Topology { private nodes: Map = new Map() - constructor(infos: NormalizedNodeInfo[]) { + constructor(infos: Pick[]) { const nodeIds = new Set(...[infos.map((info) => toNodeId(info.peerDescriptor))]) for (const info of infos) { - const streamPartNeighbors: Multimap = new Multimap() + const streamPartNeighbors: Multimap = new Multimap() for (const streamPartitionInfo of info.streamPartitions) { const neighbors = streamPartitionInfo.contentDeliveryLayerNeighbors - .map((n) => toNodeId(n.peerDescriptor)) - .filter((id) => nodeIds.has(id)) + .map((n) => ({ nodeId: toNodeId(n.peerDescriptor), rtt: n.rtt })) + .filter((n) => nodeIds.has(n.nodeId)) streamPartNeighbors.addAll(StreamPartIDUtils.parse(streamPartitionInfo.id), neighbors) } const nodeId = toNodeId(info.peerDescriptor) @@ -36,18 +41,24 @@ export class Topology { return [...this.nodes.values()] } - getNeighbors(nodeId: DhtAddress, streamPartId: StreamPartID): DhtAddress[] { + getNeighbors(nodeId: DhtAddress, streamPartId: StreamPartID): Neighbor[] { return this.nodes.get(nodeId)?.streamPartNeighbors.get(streamPartId) ?? [] } - getPeers(streamPartId: StreamPartID): Set { + // gets the neighbor if nodeId2 is a neighbor of nodeId1 (but doesn't check the neighborhood other way around) + getNeighbor(nodeId1: DhtAddress, nodeId2: DhtAddress, streamPartId: StreamPartID): Neighbor | undefined { + const neighbors = this.nodes.get(nodeId1)?.streamPartNeighbors.get(streamPartId) ?? [] + return neighbors.find((n) => n.nodeId === nodeId2) + } + + getPeerNodeIds(streamPartId: StreamPartID): Set { const nodeIds: Set = new Set() for (const node of this.nodes.values()) { const neighbors = node.streamPartNeighbors.get(streamPartId) if (neighbors.length > 0) { nodeIds.add(node.id) for (const neighbor of neighbors) { - nodeIds.add(neighbor) + nodeIds.add(neighbor.nodeId) } } } diff --git a/src/entities/Node.ts b/src/entities/Node.ts index 6a8cef1..cb1ad46 100644 --- a/src/entities/Node.ts +++ b/src/entities/Node.ts @@ -1,4 +1,4 @@ -import { Field, Float, ObjectType } from 'type-graphql' +import { Field, Float, Int, ObjectType } from 'type-graphql' /* eslint-disable indent */ @ObjectType() @@ -42,6 +42,8 @@ export class Neighbor { nodeId1!: string @Field() nodeId2!: string + @Field(() => Int, { nullable: true }) + rtt!: number | null } /* eslint-disable indent */ diff --git a/src/repository/NodeRepository.ts b/src/repository/NodeRepository.ts index 9dbcde8..b6632c7 100644 --- a/src/repository/NodeRepository.ts +++ b/src/repository/NodeRepository.ts @@ -2,9 +2,10 @@ import { DhtAddress } from '@streamr/dht' import { StreamID, StreamPartID } from '@streamr/sdk' import { Logger } from '@streamr/utils' import { Inject, Service } from 'typedi' -import { Topology } from '../crawler/Topology' +import { Topology, Neighbor } from '../crawler/Topology' import { createSqlQuery } from '../utils' import { ConnectionPool, PaginatedListFragment } from './ConnectionPool' +import { mean, without } from 'lodash' export interface NodeRow { id: string @@ -15,10 +16,18 @@ interface NeighborRow { streamPartId: string nodeId1: string nodeId2: string + rtt: number | null } const logger = new Logger(module) +const getRtt = (neighbor1: Neighbor, neighbor2: DhtAddress, streamPartId: StreamPartID, topology: Topology): number | undefined => { + const rtt1 = neighbor1.rtt + const rtt2 = topology.getNeighbor(neighbor1.nodeId, neighbor2, streamPartId)?.rtt + const rtts = without([rtt1, rtt2], undefined) + return (rtts.length > 0) ? mean(rtts) : undefined +} + @Service() export class NodeRepository { @@ -90,7 +99,7 @@ export class NodeRepository { params.push(`${streamId}#%`) } const sql = createSqlQuery( - 'SELECT streamPartId, nodeId1, nodeId2 FROM neighbors', + 'SELECT streamPartId, nodeId1, nodeId2, rtt FROM neighbors', whereClauses ) return this.connectionPool.queryPaginated( @@ -105,17 +114,18 @@ export class NodeRepository { const nodes = topology.getNodes().map((node) => { return [node.id, node.ipAddress] }) - const neighbors: [StreamPartID, DhtAddress, DhtAddress][] = [] + const neighbors: [StreamPartID, DhtAddress, DhtAddress, number?][] = [] for (const node of topology.getNodes()) { for (const streamPartId of node.streamPartNeighbors.keys()) { const streamPartNeighbors = node.streamPartNeighbors.get(streamPartId) for (const neighbor of streamPartNeighbors) { // If node A and B are neighbors, we assume that there are two associations in the topology: - // A->B and B-A. We don't need to store both associations to the DB. The following comparison + // A->B and B->A. We don't need to store both associations to the DB. The following comparison // filters out the duplication. Note that if there is only one side of the association // in the topology, that association is maybe not stored at all. - if (node.id < neighbor) { - neighbors.push([streamPartId, node.id, neighbor]) + if (node.id < neighbor.nodeId) { + const rtt = getRtt(neighbor, node.id, streamPartId, topology) + neighbors.push([streamPartId, node.id, neighbor.nodeId, rtt]) } } } @@ -126,7 +136,7 @@ export class NodeRepository { await connection.query('DELETE FROM neighbors') await connection.query('DELETE FROM nodes') await connection.query('INSERT INTO nodes (id, ipAddress) VALUES ?', [nodes]) - await connection.query('INSERT INTO neighbors (streamPartId, nodeId1, nodeId2) VALUES ?', [neighbors]) + await connection.query('INSERT INTO neighbors (streamPartId, nodeId1, nodeId2, rtt) VALUES ?', [neighbors]) await connection.commit() } catch (e) { connection.rollback() diff --git a/test/APIServer.test.ts b/test/APIServer.test.ts index 28d201b..0305ac2 100644 --- a/test/APIServer.test.ts +++ b/test/APIServer.test.ts @@ -1,11 +1,12 @@ import 'reflect-metadata' -import { randomDhtAddress, DhtAddress } from '@streamr/dht' -import { Multimap, StreamID, StreamPartID, StreamPartIDUtils, utf8ToBinary } from '@streamr/utils' +import { DhtAddress, NodeType, randomDhtAddress, toDhtAddressRaw } from '@streamr/dht' +import { ipv4ToNumber, Multimap, StreamID, StreamPartID, StreamPartIDUtils, utf8ToBinary } from '@streamr/utils' import { range, without } from 'lodash' import Container from 'typedi' import { APIServer } from '../src/api/APIServer' import { CONFIG_TOKEN } from '../src/Config' +import { Topology } from '../src/crawler/Topology' import { ContentType } from '../src/entities/Message' import { MessageRepository } from '../src/repository/MessageRepository' import { NodeRepository } from '../src/repository/NodeRepository' @@ -14,6 +15,14 @@ import { StreamrClientFacade } from '../src/StreamrClientFacade' import { createDatabase, queryAPI } from '../src/utils' import { dropTestDatabaseIfExists, TEST_DATABASE_NAME } from './utils' +const toMockPeerDescriptor = (nodeId: DhtAddress) => { + return { + nodeId: toDhtAddressRaw(nodeId), + type: NodeType.NODEJS, + ipAddress: ipv4ToNumber('123.1.2.3') + } +} + const storeTestTopology = async ( streamParts: { id: StreamPartID @@ -22,22 +31,27 @@ const storeTestTopology = async ( ) => { const nodeRepository = Container.get(NodeRepository) const nodeIds: Set = new Set(streamParts.map((sp) => sp.nodeIds).flat()) - const getNodes = () => { - return [...nodeIds].map((nodeId: DhtAddress) => { - const streamPartNeighbors = new Multimap() - for (const streamPart of streamParts) { - if (streamPart.nodeIds.includes(nodeId)) { - streamPartNeighbors.addAll(streamPart.id, without(streamPart.nodeIds, nodeId)) - } - } - return { - id: nodeId, - streamPartNeighbors, - ipAddress: '123.1.2.3' + const nodes = [...nodeIds].map((nodeId: DhtAddress) => { + const streamPartNeighbors = new Multimap() + for (const streamPart of streamParts) { + if (streamPart.nodeIds.includes(nodeId)) { + streamPartNeighbors.addAll(streamPart.id, without(streamPart.nodeIds, nodeId)) } - }) - } - await nodeRepository.replaceNetworkTopology({ getNodes } as any) + } + return { + peerDescriptor: toMockPeerDescriptor(nodeId), + streamPartitions: [...streamPartNeighbors.keys()].map((streamPartId) => ({ + id: streamPartId, + contentDeliveryLayerNeighbors: streamPartNeighbors.get(streamPartId).map((n) => ({ + peerDescriptor: toMockPeerDescriptor(n), + rtt: 123 + })), + controlLayerNeighbors: undefined as any + })) + } + }) + const topology = new Topology(nodes) + await nodeRepository.replaceNetworkTopology(topology) } describe('APIServer', () => { @@ -423,18 +437,21 @@ describe('APIServer', () => { }) it('filter by node', async () => { - const response1 = await queryAPI(`{ + const response = await queryAPI(`{ neighbors(node: "${node1}") { items { streamPartId nodeId1 nodeId2 + rtt } } }`, apiPort) - const neighbors = response1.items + const neighbors = response.items + expect(neighbors).toHaveLength(1) const actualNodes = neighbors.map((n: any) => [n.nodeId1, n.nodeId2]).flat() expect(actualNodes).toIncludeSameMembers([node1, node2]) + expect(neighbors[0].rtt).toBe(123) }) it('filter by stream part', async () => { diff --git a/test/Crawler.test.ts b/test/Crawler.test.ts index 04024be..17fb961 100644 --- a/test/Crawler.test.ts +++ b/test/Crawler.test.ts @@ -64,6 +64,6 @@ describe('Crawler', () => { '' ) expect(localNode.fetchNodeInfo).toHaveBeenCalledTimes(nodes.length) - expect([...topology.getPeers(STREAM_PART_ID)]).toIncludeSameMembers(nodes.map(toNodeId)) + expect([...topology.getPeerNodeIds(STREAM_PART_ID)]).toIncludeSameMembers(nodes.map(toNodeId)) }) }) diff --git a/test/NodeRepository.test.ts b/test/NodeRepository.test.ts new file mode 100644 index 0000000..4072a79 --- /dev/null +++ b/test/NodeRepository.test.ts @@ -0,0 +1,68 @@ +import 'reflect-metadata' + +import { PeerDescriptor, randomDhtAddress, toDhtAddress, toDhtAddressRaw } from '@streamr/dht' +import { StreamPartIDUtils } from '@streamr/utils' +import { range } from 'lodash' +import Container from 'typedi' +import { CONFIG_TOKEN } from '../src/Config' +import { Topology } from '../src/crawler/Topology' +import { NodeRepository } from '../src/repository/NodeRepository' +import { createDatabase } from '../src/utils' +import { TEST_DATABASE_NAME, dropTestDatabaseIfExists } from './utils' + +const STREAM_PART_ID = StreamPartIDUtils.parse('stream#1') + +describe('NodeRepository', () => { + + beforeEach(async () => { + const config = { + database: { + host: '10.200.10.1', + name: TEST_DATABASE_NAME, + user: 'root', + password: 'password' + } + } + await dropTestDatabaseIfExists(config.database) + await createDatabase(config.database) + Container.set(CONFIG_TOKEN, config) + }) + + afterEach(() => { + Container.reset() + }) + + it('replace topology', async () => { + const repository = Container.get(NodeRepository) + + const peerDescriptors: PeerDescriptor[] = range(2).map(() => ({ + nodeId: toDhtAddressRaw(randomDhtAddress()) + } as any)) + const nodes = [{ + peerDescriptor: peerDescriptors[0], + streamPartitions: [{ + id: STREAM_PART_ID, + contentDeliveryLayerNeighbors: [{ peerDescriptor: peerDescriptors[1], rtt: 100 }], + controlLayerNeighbors: undefined as any + }] + }, { + peerDescriptor: peerDescriptors[1], + streamPartitions: [{ + id: STREAM_PART_ID, + contentDeliveryLayerNeighbors: [{ peerDescriptor: peerDescriptors[0], rtt: 200 }], + controlLayerNeighbors: undefined as any + }] + }] + const topology = new Topology(nodes) + await repository.replaceNetworkTopology(topology) + + const nodeIds = peerDescriptors.map((p) => toDhtAddress(p.nodeId)) + const actualNodes = await repository.getNodes() + expect(actualNodes.items.map((item) => item.id)).toIncludeSameMembers(nodeIds) + const actualNeighbors = await repository.getNeighbors() + expect(actualNeighbors.items).toHaveLength(1) + expect(actualNeighbors.items[0].streamPartId).toBe(STREAM_PART_ID) + expect(actualNeighbors.items[0].rtt).toBe(150) + expect([actualNeighbors.items[0].nodeId1, actualNeighbors.items[0].nodeId2]).toIncludeSameMembers(nodeIds) + }) +}) diff --git a/test/Topology.test.ts b/test/Topology.test.ts index 217a8a7..de04aae 100644 --- a/test/Topology.test.ts +++ b/test/Topology.test.ts @@ -19,7 +19,8 @@ describe('Topology', () => { contentDeliveryLayerNeighbors: [ { peerDescriptor: nodes[1] }, { peerDescriptor: nodes[2] } - ] + ], + controlLayerNeighbors: undefined as any }] }, { peerDescriptor: nodes[2], @@ -29,13 +30,14 @@ describe('Topology', () => { { peerDescriptor: nodes[0] }, { peerDescriptor: nodes[1] }, { peerDescriptor: nodes[2] } - ] + ], + controlLayerNeighbors: undefined as any }] - }] as any) - expect([...topology.getNeighbors(toNodeId(nodes[0]), STREAM_PART_ID_1)]).toIncludeSameMembers([ + }]) + expect(topology.getNeighbors(toNodeId(nodes[0]), STREAM_PART_ID_1).map((n) => n.nodeId)).toIncludeSameMembers([ toNodeId(nodes[2]) ]) - expect([...topology.getNeighbors(toNodeId(nodes[2]), STREAM_PART_ID_2)]).toIncludeSameMembers([ + expect(topology.getNeighbors(toNodeId(nodes[2]), STREAM_PART_ID_2).map((n) => n.nodeId)).toIncludeSameMembers([ toNodeId(nodes[0]), toNodeId(nodes[2]) ]) })