Skip to content

Commit 005ad9e

Browse files
authored
deps: Bump @streamr network packages to v102.0.0-beta.3 (#27)
Applied these API changes: - `nodeInfo.version` field has been renamed to `applicationVersion` - some type assertions removed as `NodeInfo` has stricter type - `user` field has been renamed to `userId` in permission API - stream partition count is queried via `getStreamPartitionCount()` function - `environment` config option is used instead of `CONFIG_TEST` - `waitForCondition()` test utility has been renamed `until()` - use `fetchPrivateKeyWithGas()` instead of hardcoded private key for end-to-end test The Docker build requires update to Node 20/22. This is done in a separate PR: #28
1 parent 63c1374 commit 005ad9e

File tree

9 files changed

+1907
-1604
lines changed

9 files changed

+1907
-1604
lines changed

package-lock.json

Lines changed: 1878 additions & 1573 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313
"author": "Streamr Network AG <contact@streamr.network>",
1414
"dependencies": {
1515
"@streamr/config": "^5.3.7",
16-
"@streamr/dht": "102.0.0-beta.0",
17-
"@streamr/sdk": "102.0.0-beta.0",
18-
"@streamr/trackerless-network": "102.0.0-beta.0",
19-
"@streamr/utils": "102.0.0-beta.0",
16+
"@streamr/dht": "102.0.0-beta.3",
17+
"@streamr/sdk": "102.0.0-beta.3",
18+
"@streamr/trackerless-network": "102.0.0-beta.3",
19+
"@streamr/utils": "102.0.0-beta.3",
2020
"@types/node-fetch": "^2.6.3",
2121
"class-validator": "^0.14.1",
2222
"cors": "^2.8.5",
@@ -36,6 +36,7 @@
3636
"typedi": "^0.10.0"
3737
},
3838
"devDependencies": {
39+
"@streamr/test-utils": "^102.0.0-beta.3",
3940
"@tsconfig/node20": "^20.1.2",
4041
"@types/cors": "^2.8.17",
4142
"@types/express": "^4.17.21",

src/StreamrClientFacade.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ export class StreamrClientFacade {
4444

4545
searchStreams(owner: string): AsyncIterable<Stream> {
4646
return this.client.searchStreams(undefined, {
47-
user: owner,
47+
userId: owner,
4848
allowPublic: false,
4949
allOf: [StreamPermission.GRANT]
5050
})

src/crawler/Crawler.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { PeerDescriptor, toNodeId } from '@streamr/dht'
2-
import { DhtAddress, Stream, StreamCreationEvent, StreamMetadata, StreamPermission } from '@streamr/sdk'
2+
import { DhtAddress, getStreamPartitionCount, Stream, StreamCreationEvent, StreamMetadata, StreamPermission } from '@streamr/sdk'
33
import { Logger, StreamID, StreamPartID, StreamPartIDUtils, binaryToHex, toStreamPartID, wait } from '@streamr/utils'
44
import { difference, range, sortBy } from 'lodash'
55
import pLimit from 'p-limit'
@@ -58,7 +58,7 @@ const createNodeInfoLogOutput = (nodeInfo: NormalizedNodeInfo) => {
5858
controlLayerNeighbors: sp.controlLayerNeighbors.map(toNodeId),
5959
contentDeliveryLayerNeighbors: sp.contentDeliveryLayerNeighbors.map((n: any) => toNodeId(n.peerDescriptor)) // TODO better type
6060
})),
61-
version: nodeInfo.version
61+
applicationVersion: nodeInfo.applicationVersion
6262
}
6363
}
6464

@@ -176,7 +176,7 @@ export class Crawler {
176176
const workedThreadLimit = pLimit(MAX_SUBSCRIPTION_COUNT)
177177
await Promise.all(sortedContractStreams.map((stream: Stream) => {
178178
return workedThreadLimit(async () => {
179-
await this.analyzeStream(stream.id, stream.getMetadata(), topology, subscribeGate)
179+
await this.analyzeStream(stream.id, await stream.getMetadata(), topology, subscribeGate)
180180
})
181181
}))
182182

@@ -191,7 +191,7 @@ export class Crawler {
191191
): Promise<void> {
192192
logger.info(`Analyze ${id}`)
193193
const peersByPartition = new Map<number, Set<DhtAddress>>
194-
for (const partition of range(metadata.partitions)) {
194+
for (const partition of range(getStreamPartitionCount(metadata))) {
195195
peersByPartition.set(partition, topology.getPeers(toStreamPartID(id, partition)))
196196
}
197197
try {
@@ -212,7 +212,7 @@ export class Crawler {
212212
logger.info(`Replace ${id}`)
213213
await this.streamRepository.replaceStream({
214214
id,
215-
description: metadata.description ?? null,
215+
description: metadata.description as string ?? null,
216216
peerCount: peerIds.size,
217217
messagesPerSecond: messageRate.messagesPerSecond,
218218
bytesPerSecond: messageRate.bytesPerSecond,
@@ -253,7 +253,7 @@ export class Crawler {
253253
// is the only publisher and subscriber
254254
await this.streamRepository.replaceStream({
255255
id: payload.streamId,
256-
description: payload.metadata.description ?? null,
256+
description: payload.metadata.description as string ?? null,
257257
peerCount: 0,
258258
messagesPerSecond: 0,
259259
bytesPerSecond: 0,
@@ -265,14 +265,14 @@ export class Crawler {
265265
await wait(this.config.crawler.newStreamAnalysisDelay)
266266
// the entryPoints may contain duplicates (i.e. same node is an entry point for
267267
// multiple partitions), but crawlTopology can ignore those
268-
const entryPoints = (await Promise.all(range(payload.metadata.partitions)
268+
const entryPoints = (await Promise.all(range(getStreamPartitionCount(payload.metadata))
269269
.map((p) => toStreamPartID(payload.streamId, p))
270270
.map((sp) => localNode.fetchStreamPartEntryPoints(sp)))).flat()
271271
const topology = await crawlTopology(localNode, entryPoints, (nodeInfo: NormalizedNodeInfo) => {
272272
const streamPartitions = nodeInfo.streamPartitions.filter(
273273
(sp) => StreamPartIDUtils.getStreamID(sp.id as StreamPartID) === payload.streamId
274274
)
275-
return (streamPartitions.map((sp) => sp.contentDeliveryLayerNeighbors.map((n) => n.peerDescriptor!))).flat()
275+
return (streamPartitions.map((sp) => sp.contentDeliveryLayerNeighbors.map((n) => n.peerDescriptor))).flat()
276276
}, `stream-${payload.streamId}-${Date.now()}`)
277277
// TODO could add new nodes and neighbors to NodeRepository?
278278
await this.analyzeStream(payload.streamId, payload.metadata, topology, this.subscribeGate!)

src/crawler/NetworkNodeFacade.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ export type NormalizedNodeInfo = Omit<NodeInfo, 'streamPartitions'>
1717
& { streamPartitions: Omit<ArrayElement<NodeInfo['streamPartitions']>, 'deprecatedContentDeliveryLayerNeighbors'>[] }
1818

1919
const toNormalizeNodeInfo = (info: NodeInfo): NormalizedNodeInfo => {
20-
const isLegacyFormat = semver.satisfies(semver.coerce(info.version)!, '< 102.0.0')
20+
const isLegacyFormat = semver.satisfies(semver.coerce(info.applicationVersion)!, '< 102.0.0')
2121
return {
2222
...info,
2323
streamPartitions: info.streamPartitions.map((sp) => ({

src/crawler/Topology.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ export class Topology {
1919
const streamPartNeighbors: Multimap<StreamPartID, DhtAddress> = new Multimap()
2020
for (const streamPartitionInfo of info.streamPartitions) {
2121
const neighbors = streamPartitionInfo.contentDeliveryLayerNeighbors
22-
.map((n) => toNodeId(n.peerDescriptor!))
22+
.map((n) => toNodeId(n.peerDescriptor))
2323
.filter((id) => nodeIds.has(id))
2424
streamPartNeighbors.addAll(StreamPartIDUtils.parse(streamPartitionInfo.id), neighbors)
2525
}

test/Crawler.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ describe('Crawler', () => {
2525
peerDescriptor: n
2626
})) ?? []
2727
}],
28-
version: '102.0.0'
28+
applicationVersion: '102.0.0'
2929
}
3030
}
3131

@@ -60,7 +60,7 @@ describe('Crawler', () => {
6060
const topology = await crawlTopology(
6161
localNode as any,
6262
[nodes[0], nodes[5]],
63-
(response: NormalizedNodeInfo) => response.streamPartitions[0].contentDeliveryLayerNeighbors.map((n) => n.peerDescriptor!),
63+
(response: NormalizedNodeInfo) => response.streamPartitions[0].contentDeliveryLayerNeighbors.map((n) => n.peerDescriptor),
6464
''
6565
)
6666
expect(localNode.fetchNodeInfo).toHaveBeenCalledTimes(nodes.length)

test/NetworkNodeFacade.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ const NORMAL_INFO = {
2929
rtt: 123
3030
}]
3131
}],
32-
version: '102.0.0-beta.0'
32+
applicationVersion: '102.0.0-beta.2'
3333
}
3434
const LEGACY_INFO = {
3535
peerDescriptor: createTestPeerDescriptor(),
@@ -43,7 +43,7 @@ const LEGACY_INFO = {
4343
deprecatedContentDeliveryLayerNeighbors: [createTestPeerDescriptor()],
4444
contentDeliveryLayerNeighbors: []
4545
}],
46-
version: '101.1.1-beta.1'
46+
applicationVersion: '101.1.1-beta.1'
4747
}
4848

4949
const createMockNode = (rawNodeInfo: NodeInfo): Partial<NetworkNode> => {

test/end-to-end.test.ts

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import 'reflect-metadata'
22

33
import { DhtAddress, NodeType, randomDhtAddress, toDhtAddress, toDhtAddressRaw } from '@streamr/dht'
4-
import StreamrClient, { CONFIG_TEST, NetworkNodeType, PeerDescriptor, StreamID, StreamPermission, StreamrClientConfig } from '@streamr/sdk'
4+
import StreamrClient, { NetworkNodeType, PeerDescriptor, StreamID, StreamPermission, StreamrClientConfig } from '@streamr/sdk'
55
import { NetworkNode, createNetworkNode } from '@streamr/trackerless-network'
6-
import { StreamPartID, collect, setAbortableInterval, toStreamPartID, waitForCondition } from '@streamr/utils'
6+
import { StreamPartID, collect, setAbortableInterval, toStreamPartID, until } from '@streamr/utils'
7+
import { fetchPrivateKeyWithGas } from '@streamr/test-utils'
78
import { sample, uniq, without } from 'lodash'
89
import Container from 'typedi'
910
import { CONFIG_TOKEN } from '../src/Config'
@@ -15,8 +16,6 @@ import { Stream } from '../src/entities/Stream'
1516
import { createDatabase, queryAPI } from '../src/utils'
1617
import { TEST_DATABASE_NAME, dropTestDatabaseIfExists } from './utils'
1718

18-
const PUBLISHER_PRIVATE_KEY = '0x0000000000000000000000000000000000000000000000000000000000000001'
19-
const SUBSCRIBER_PRIVATE_KEY = '0x0000000000000000000000000000000000000000000000000000000000000002'
2019
const ENTRY_POINT_PORT = 40501
2120
const PARTITION_COUNT = 3
2221
const ACTIVE_PARTITIONS = [1, 2]
@@ -50,11 +49,9 @@ const startEntryPoint = async (): Promise<NetworkNode> => {
5049

5150
const createClientConfig = (entryPointPeerDescriptor: PeerDescriptor): StreamrClientConfig => {
5251
return {
53-
...CONFIG_TEST,
52+
environment: 'dev2',
5453
network: {
55-
...CONFIG_TEST.network,
5654
controlLayer: {
57-
...CONFIG_TEST.network!.controlLayer,
5855
entryPoints: [{
5956
nodeId: toDhtAddress(entryPointPeerDescriptor.nodeId),
6057
type: NetworkNodeType.NODEJS,
@@ -156,7 +153,7 @@ describe('end-to-end', () => {
156153
if (isPublic) {
157154
await stream.grantPermissions({ public: true, permissions })
158155
} else {
159-
await stream.grantPermissions({ user: await subscriber.getAddress(), permissions })
156+
await stream.grantPermissions({ userId: await subscriber.getAddress(), permissions })
160157
}
161158
return stream
162159
}
@@ -180,7 +177,7 @@ describe('end-to-end', () => {
180177
const waitForTheGraphToIndex = async (streamIds: StreamID[]): Promise<void> => {
181178
const client = createClient(undefined, entryPoint.getPeerDescriptor())
182179
for (const streamId of streamIds) {
183-
await waitForCondition(async () => {
180+
await until(async () => {
184181
const streams = await collect(client.searchStreams(streamId, undefined))
185182
return streams.length > 0
186183
}, 5000, 500)
@@ -210,8 +207,8 @@ describe('end-to-end', () => {
210207
await dropTestDatabaseIfExists(config.database)
211208
await createDatabase(config.database)
212209
Container.set(CONFIG_TOKEN, config)
213-
publisher = createClient(PUBLISHER_PRIVATE_KEY, entryPoint.getPeerDescriptor())
214-
subscriber = createClient(SUBSCRIBER_PRIVATE_KEY, entryPoint.getPeerDescriptor())
210+
publisher = createClient(await fetchPrivateKeyWithGas(), entryPoint.getPeerDescriptor())
211+
subscriber = createClient(await fetchPrivateKeyWithGas(), entryPoint.getPeerDescriptor())
215212
const server = Container.get(APIServer)
216213
await server.start()
217214
apiPort = Container.get(APIServer).getPort()
@@ -277,7 +274,7 @@ describe('end-to-end', () => {
277274
const newStream = await createTestStream(false)
278275
await startPublisherAndSubscriberForStream(newStream.id, publishingAbortControler.signal)
279276

280-
await waitForCondition(async () => {
277+
await until(async () => {
281278
const metrics = await queryStreamMetrics(newStream.id, apiPort)
282279
return (metrics !== undefined) && (metrics.peerCount >= 2)
283280
}, 20 * 1000, 1000)

0 commit comments

Comments
 (0)