Skip to content

feat: [NET-1393] Collect RTTs #30

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions initialize-database.sql
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ CREATE TABLE IF NOT EXISTS neighbors (
streamPartId VARCHAR(500) NOT NULL,
nodeId1 CHAR(40) NOT NULL,
nodeId2 CHAR(40) NOT NULL,
rtt INTEGER UNSIGNED,
PRIMARY KEY (streamPartId, nodeId1, nodeId2),
FOREIGN KEY (nodeId1) REFERENCES nodes(id),
FOREIGN KEY (nodeId2) REFERENCES nodes(id),
Expand Down
6 changes: 3 additions & 3 deletions src/crawler/Crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ const createNodeInfoLogOutput = (nodeInfo: NormalizedNodeInfo) => {
neighbors: nodeInfo.controlLayer.neighbors.map(toNodeId),
connections: nodeInfo.controlLayer.connections.map(toNodeId)
},
streamPartitions: nodeInfo.streamPartitions.map((sp: any) => ({
streamPartitions: nodeInfo.streamPartitions.map((sp) => ({
id: sp.id,
controlLayerNeighbors: sp.controlLayerNeighbors.map(toNodeId),
contentDeliveryLayerNeighbors: sp.contentDeliveryLayerNeighbors.map((n: any) => toNodeId(n.peerDescriptor)) // TODO better type
contentDeliveryLayerNeighbors: sp.contentDeliveryLayerNeighbors.map((n) => ({ nodeId: toNodeId(n.peerDescriptor), rtt: n.rtt }))
})),
applicationVersion: nodeInfo.applicationVersion
}
Expand Down Expand Up @@ -192,7 +192,7 @@ export class Crawler {
logger.info(`Analyze ${id}`)
const peersByPartition = new Map<number, Set<DhtAddress>>
for (const partition of range(getStreamPartitionCount(metadata))) {
peersByPartition.set(partition, topology.getPeers(toStreamPartID(id, partition)))
peersByPartition.set(partition, topology.getPeerNodeIds(toStreamPartID(id, partition)))
}
try {
const publisherCount = await this.client.getPublisherOrSubscriberCount(id, StreamPermission.PUBLISH)
Expand Down
27 changes: 19 additions & 8 deletions src/crawler/Topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,29 @@ import { DhtAddress, StreamPartID } from '@streamr/sdk'
import { Multimap, numberToIpv4, StreamPartIDUtils } from '@streamr/utils'
import { NormalizedNodeInfo } from './NetworkNodeFacade'

export interface Neighbor {
nodeId: DhtAddress
rtt?: number
}

export interface Node {
id: DhtAddress
streamPartNeighbors: Multimap<StreamPartID, DhtAddress>
streamPartNeighbors: Multimap<StreamPartID, Neighbor>
ipAddress?: string
}

export class Topology {

private nodes: Map<DhtAddress, Node> = new Map()

constructor(infos: NormalizedNodeInfo[]) {
constructor(infos: Pick<NormalizedNodeInfo, 'peerDescriptor' | 'streamPartitions'>[]) {
const nodeIds = new Set(...[infos.map((info) => toNodeId(info.peerDescriptor))])
for (const info of infos) {
const streamPartNeighbors: Multimap<StreamPartID, DhtAddress> = new Multimap()
const streamPartNeighbors: Multimap<StreamPartID, Neighbor> = new Multimap()
for (const streamPartitionInfo of info.streamPartitions) {
const neighbors = streamPartitionInfo.contentDeliveryLayerNeighbors
.map((n) => toNodeId(n.peerDescriptor))
.filter((id) => nodeIds.has(id))
.map((n) => ({ nodeId: toNodeId(n.peerDescriptor), rtt: n.rtt }))
.filter((n) => nodeIds.has(n.nodeId))
streamPartNeighbors.addAll(StreamPartIDUtils.parse(streamPartitionInfo.id), neighbors)
}
const nodeId = toNodeId(info.peerDescriptor)
Expand All @@ -36,18 +41,24 @@ export class Topology {
return [...this.nodes.values()]
}

getNeighbors(nodeId: DhtAddress, streamPartId: StreamPartID): DhtAddress[] {
getNeighbors(nodeId: DhtAddress, streamPartId: StreamPartID): Neighbor[] {
return this.nodes.get(nodeId)?.streamPartNeighbors.get(streamPartId) ?? []
}

getPeers(streamPartId: StreamPartID): Set<DhtAddress> {
// gets the neighbor if nodeId2 is a neighbor of nodeId1 (but doesn't check the neighborhood other way around)
getNeighbor(nodeId1: DhtAddress, nodeId2: DhtAddress, streamPartId: StreamPartID): Neighbor | undefined {
const neighbors = this.nodes.get(nodeId1)?.streamPartNeighbors.get(streamPartId) ?? []
return neighbors.find((n) => n.nodeId == nodeId2)
}

getPeerNodeIds(streamPartId: StreamPartID): Set<DhtAddress> {
const nodeIds: Set<DhtAddress> = new Set()
for (const node of this.nodes.values()) {
const neighbors = node.streamPartNeighbors.get(streamPartId)
if (neighbors.length > 0) {
nodeIds.add(node.id)
for (const neighbor of neighbors) {
nodeIds.add(neighbor)
nodeIds.add(neighbor.nodeId)
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/entities/Node.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Field, Float, ObjectType } from 'type-graphql'
import { Field, Float, Int, ObjectType } from 'type-graphql'

/* eslint-disable indent */
@ObjectType()
Expand Down Expand Up @@ -42,6 +42,8 @@ export class Neighbor {
nodeId1!: string
@Field()
nodeId2!: string
@Field(() => Int, { nullable: true })
rtt!: number | null
}

/* eslint-disable indent */
Expand Down
24 changes: 17 additions & 7 deletions src/repository/NodeRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ import { DhtAddress } from '@streamr/dht'
import { StreamID, StreamPartID } from '@streamr/sdk'
import { Logger } from '@streamr/utils'
import { Inject, Service } from 'typedi'
import { Topology } from '../crawler/Topology'
import { Topology, Neighbor } from '../crawler/Topology'
import { createSqlQuery } from '../utils'
import { ConnectionPool, PaginatedListFragment } from './ConnectionPool'
import { mean, without } from 'lodash'

export interface NodeRow {
id: string
Expand All @@ -15,10 +16,18 @@ interface NeighborRow {
streamPartId: string
nodeId1: string
nodeId2: string
rtt: number | null
}

const logger = new Logger(module)

const getRtt = (neighbor1: Neighbor, neighbor2: DhtAddress, streamPartId: StreamPartID, topology: Topology): number | undefined => {
const rtt1 = neighbor1.rtt
const rtt2 = topology.getNeighbor(neighbor1.nodeId, neighbor2, streamPartId)?.rtt
const rtts = without([rtt1, rtt2], undefined)
return (rtts.length > 0) ? mean(rtts) : undefined
}

@Service()
export class NodeRepository {

Expand Down Expand Up @@ -90,7 +99,7 @@ export class NodeRepository {
params.push(`${streamId}#%`)
}
const sql = createSqlQuery(
'SELECT streamPartId, nodeId1, nodeId2 FROM neighbors',
'SELECT streamPartId, nodeId1, nodeId2, rtt FROM neighbors',
whereClauses
)
return this.connectionPool.queryPaginated<NeighborRow>(
Expand All @@ -105,17 +114,18 @@ export class NodeRepository {
const nodes = topology.getNodes().map((node) => {
return [node.id, node.ipAddress]
})
const neighbors: [StreamPartID, DhtAddress, DhtAddress][] = []
const neighbors: [StreamPartID, DhtAddress, DhtAddress, number?][] = []
for (const node of topology.getNodes()) {
for (const streamPartId of node.streamPartNeighbors.keys()) {
const streamPartNeighbors = node.streamPartNeighbors.get(streamPartId)
for (const neighbor of streamPartNeighbors) {
// If node A and B are neighbors, we assume that there are two associations in the topology:
// A->B and B-A. We don't need to store both associations to the DB. The following comparison
// A->B and B->A. We don't need to store both associations to the DB. The following comparison
// filters out the duplication. Note that if there is only one side of the association
// in the topology, that association is maybe not stored at all.
if (node.id < neighbor) {
neighbors.push([streamPartId, node.id, neighbor])
if (node.id < neighbor.nodeId) {
const rtt = getRtt(neighbor, node.id, streamPartId, topology)
neighbors.push([streamPartId, node.id, neighbor.nodeId, rtt])
}
}
}
Expand All @@ -126,7 +136,7 @@ export class NodeRepository {
await connection.query('DELETE FROM neighbors')
await connection.query('DELETE FROM nodes')
await connection.query('INSERT INTO nodes (id, ipAddress) VALUES ?', [nodes])
await connection.query('INSERT INTO neighbors (streamPartId, nodeId1, nodeId2) VALUES ?', [neighbors])
await connection.query('INSERT INTO neighbors (streamPartId, nodeId1, nodeId2, rtt) VALUES ?', [neighbors])
await connection.commit()
} catch (e) {
connection.rollback()
Expand Down
55 changes: 36 additions & 19 deletions test/APIServer.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import 'reflect-metadata'

import { randomDhtAddress, DhtAddress } from '@streamr/dht'
import { Multimap, StreamID, StreamPartID, StreamPartIDUtils, utf8ToBinary } from '@streamr/utils'
import { DhtAddress, NodeType, randomDhtAddress, toDhtAddressRaw } from '@streamr/dht'
import { ipv4ToNumber, Multimap, StreamID, StreamPartID, StreamPartIDUtils, utf8ToBinary } from '@streamr/utils'
import { range, without } from 'lodash'
import Container from 'typedi'
import { APIServer } from '../src/api/APIServer'
import { CONFIG_TOKEN } from '../src/Config'
import { Topology } from '../src/crawler/Topology'
import { ContentType } from '../src/entities/Message'
import { MessageRepository } from '../src/repository/MessageRepository'
import { NodeRepository } from '../src/repository/NodeRepository'
Expand All @@ -14,6 +15,14 @@ import { StreamrClientFacade } from '../src/StreamrClientFacade'
import { createDatabase, queryAPI } from '../src/utils'
import { dropTestDatabaseIfExists, TEST_DATABASE_NAME } from './utils'

const toMockPeerDescriptor = (nodeId: DhtAddress) => {
return {
nodeId: toDhtAddressRaw(nodeId),
type: NodeType.NODEJS,
ipAddress: ipv4ToNumber('123.1.2.3')
}
}

const storeTestTopology = async (
streamParts: {
id: StreamPartID
Expand All @@ -22,22 +31,27 @@ const storeTestTopology = async (
) => {
const nodeRepository = Container.get(NodeRepository)
const nodeIds: Set<DhtAddress> = new Set(streamParts.map((sp) => sp.nodeIds).flat())
const getNodes = () => {
return [...nodeIds].map((nodeId: DhtAddress) => {
const streamPartNeighbors = new Multimap()
for (const streamPart of streamParts) {
if (streamPart.nodeIds.includes(nodeId)) {
streamPartNeighbors.addAll(streamPart.id, without(streamPart.nodeIds, nodeId))
}
}
return {
id: nodeId,
streamPartNeighbors,
ipAddress: '123.1.2.3'
const nodes = [...nodeIds].map((nodeId: DhtAddress) => {
const streamPartNeighbors = new Multimap<string, DhtAddress>()
for (const streamPart of streamParts) {
if (streamPart.nodeIds.includes(nodeId)) {
streamPartNeighbors.addAll(streamPart.id, without(streamPart.nodeIds, nodeId))
}
})
}
await nodeRepository.replaceNetworkTopology({ getNodes } as any)
}
return {
peerDescriptor: toMockPeerDescriptor(nodeId),
streamPartitions: [...streamPartNeighbors.keys()].map((streamPartId) => ({
id: streamPartId,
contentDeliveryLayerNeighbors: streamPartNeighbors.get(streamPartId).map((n) => ({
peerDescriptor: toMockPeerDescriptor(n),
rtt: 123
})),
controlLayerNeighbors: undefined as any
}))
}
})
const topology = new Topology(nodes)
await nodeRepository.replaceNetworkTopology(topology)
}

describe('APIServer', () => {
Expand Down Expand Up @@ -423,18 +437,21 @@ describe('APIServer', () => {
})

it('filter by node', async () => {
const response1 = await queryAPI(`{
const response = await queryAPI(`{
neighbors(node: "${node1}") {
items {
streamPartId
nodeId1
nodeId2
rtt
}
}
}`, apiPort)
const neighbors = response1.items
const neighbors = response.items
expect(neighbors).toHaveLength(1)
const actualNodes = neighbors.map((n: any) => [n.nodeId1, n.nodeId2]).flat()
expect(actualNodes).toIncludeSameMembers([node1, node2])
expect(neighbors[0].rtt).toBe(123)
})

it('filter by stream part', async () => {
Expand Down
2 changes: 1 addition & 1 deletion test/Crawler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@ describe('Crawler', () => {
''
)
expect(localNode.fetchNodeInfo).toHaveBeenCalledTimes(nodes.length)
expect([...topology.getPeers(STREAM_PART_ID)]).toIncludeSameMembers(nodes.map(toNodeId))
expect([...topology.getPeerNodeIds(STREAM_PART_ID)]).toIncludeSameMembers(nodes.map(toNodeId))
})
})
68 changes: 68 additions & 0 deletions test/NodeRepository.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import 'reflect-metadata'

import { PeerDescriptor, randomDhtAddress, toDhtAddress, toDhtAddressRaw } from '@streamr/dht'
import { StreamPartIDUtils } from '@streamr/utils'
import { range } from 'lodash'
import Container from 'typedi'
import { CONFIG_TOKEN } from '../src/Config'
import { Topology } from '../src/crawler/Topology'
import { NodeRepository } from '../src/repository/NodeRepository'
import { createDatabase } from '../src/utils'
import { TEST_DATABASE_NAME, dropTestDatabaseIfExists } from './utils'

const STREAM_PART_ID = StreamPartIDUtils.parse('stream#1')

describe('NodeRepository', () => {

beforeEach(async () => {
const config = {
database: {
host: '10.200.10.1',
name: TEST_DATABASE_NAME,
user: 'root',
password: 'password'
}
}
await dropTestDatabaseIfExists(config.database)
await createDatabase(config.database)
Container.set(CONFIG_TOKEN, config)
})

afterEach(() => {
Container.reset()
})

it('replace topology', async () => {
const repository = Container.get(NodeRepository)

const peerDescriptors: PeerDescriptor[] = range(2).map(() => ({
nodeId: toDhtAddressRaw(randomDhtAddress())
} as any))
const nodes = [{
peerDescriptor: peerDescriptors[0],
streamPartitions: [{
id: STREAM_PART_ID,
contentDeliveryLayerNeighbors: [{ peerDescriptor: peerDescriptors[1], rtt: 100 }],
controlLayerNeighbors: undefined as any
}]
}, {
peerDescriptor: peerDescriptors[1],
streamPartitions: [{
id: STREAM_PART_ID,
contentDeliveryLayerNeighbors: [{ peerDescriptor: peerDescriptors[0], rtt: 200 }],
controlLayerNeighbors: undefined as any
}]
}]
const topology = new Topology(nodes)
await repository.replaceNetworkTopology(topology)

const nodeIds = peerDescriptors.map((p) => toDhtAddress(p.nodeId))
const actualNodes = await repository.getNodes()
expect(actualNodes.items.map((item) => item.id)).toIncludeSameMembers(nodeIds)
const actualNeighbors = await repository.getNeighbors()
expect(actualNeighbors.items).toHaveLength(1)
expect(actualNeighbors.items[0].streamPartId).toBe(STREAM_PART_ID)
expect(actualNeighbors.items[0].rtt).toBe(150)
expect([actualNeighbors.items[0].nodeId1, actualNeighbors.items[0].nodeId2]).toIncludeSameMembers(nodeIds)
})
})
12 changes: 7 additions & 5 deletions test/Topology.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ describe('Topology', () => {
contentDeliveryLayerNeighbors: [
{ peerDescriptor: nodes[1] },
{ peerDescriptor: nodes[2] }
]
],
controlLayerNeighbors: undefined as any
}]
}, {
peerDescriptor: nodes[2],
Expand All @@ -29,13 +30,14 @@ describe('Topology', () => {
{ peerDescriptor: nodes[0] },
{ peerDescriptor: nodes[1] },
{ peerDescriptor: nodes[2] }
]
],
controlLayerNeighbors: undefined as any
}]
}] as any)
expect([...topology.getNeighbors(toNodeId(nodes[0]), STREAM_PART_ID_1)]).toIncludeSameMembers([
}])
expect(topology.getNeighbors(toNodeId(nodes[0]), STREAM_PART_ID_1).map((n) => n.nodeId)).toIncludeSameMembers([
toNodeId(nodes[2])
])
expect([...topology.getNeighbors(toNodeId(nodes[2]), STREAM_PART_ID_2)]).toIncludeSameMembers([
expect(topology.getNeighbors(toNodeId(nodes[2]), STREAM_PART_ID_2).map((n) => n.nodeId)).toIncludeSameMembers([
toNodeId(nodes[0]), toNodeId(nodes[2])
])
})
Expand Down
Loading