diff --git a/CHANGELOG.md b/CHANGELOG.md index 5733c0639e..a4878fef05 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,10 +11,13 @@ Changes before Tatum release are not documented in this file. ### @streamr/sdk #### Added + - Add support for using the plumtree optimization in stream partitions (https://github.com/streamr-dev/network/pull/3147) #### Changed +- Optimize `StreamrClient#searchStreams()` (https://github.com/streamr-dev/network/pull/3132) + #### Deprecated #### Removed diff --git a/packages/sdk/src/contracts/StreamRegistry.ts b/packages/sdk/src/contracts/StreamRegistry.ts index 28b08da72f..0259e5fe44 100644 --- a/packages/sdk/src/contracts/StreamRegistry.ts +++ b/packages/sdk/src/contracts/StreamRegistry.ts @@ -47,17 +47,6 @@ import { ContractFactory } from './ContractFactory' import { ObservableContract, initContractEventGateway, waitForTx } from './contract' import { InternalSearchStreamsPermissionFilter, searchStreams as _searchStreams } from './searchStreams' -/* - * On-chain registry of stream metadata and permissions. - * - * Does not support system streams (the key exchange stream) - */ - -export interface StreamQueryResult { - id: string - metadata: string -} - interface StreamPublisherOrSubscriberItem { id: string userId: string @@ -303,8 +292,8 @@ export class StreamRegistry { permissionFilter, this.theGraphClient) for await (const item of queryResult) { - const id = toStreamID(item.stream.id) - this.populateMetadataCache(id, parseMetadata(item.stream.metadata)) + const id = toStreamID(item.id) + this.populateMetadataCache(id, parseMetadata(item.metadata)) yield id } } diff --git a/packages/sdk/src/contracts/searchStreams.ts b/packages/sdk/src/contracts/searchStreams.ts index edd74ed726..cf3da23615 100644 --- a/packages/sdk/src/contracts/searchStreams.ts +++ b/packages/sdk/src/contracts/searchStreams.ts @@ -1,7 +1,5 @@ import { ChangeFieldType, GraphQLQuery, HexString, TheGraphClient, toUserId, UserID } from '@streamr/utils' -import { ChainPermissions, convertChainPermissionsToStreamPermissions, PUBLIC_PERMISSION_USER_ID, StreamPermission } from '../permission' -import { filter, unique } from '../utils/GeneratorUtils' -import { StreamQueryResult } from './StreamRegistry' +import { PUBLIC_PERMISSION_USER_ID, StreamPermission } from '../permission' export interface SearchStreamsPermissionFilter { userId: HexString @@ -15,10 +13,10 @@ export interface SearchStreamsPermissionFilter { export type InternalSearchStreamsPermissionFilter = ChangeFieldType -export type SearchStreamsResultItem = { +export interface SearchStreamsResultItem { id: string - stream: StreamQueryResult -} & ChainPermissions + metadata: string +} export const toInternalSearchStreamsPermissionFilter = (filter: SearchStreamsPermissionFilter): InternalSearchStreamsPermissionFilter => { return { @@ -32,101 +30,87 @@ export async function* searchStreams( permissionFilter: InternalSearchStreamsPermissionFilter | undefined, theGraphClient: TheGraphClient, ): AsyncGenerator { - const backendResults = theGraphClient.queryEntities( + yield* theGraphClient.queryEntities( (lastId: string, pageSize: number) => buildQuery(term, permissionFilter, lastId, pageSize) ) - /* - * There can be orphaned permission entities if a stream is deleted (currently - * we don't remove the assigned permissions, see ETH-222) - * TODO remove the filtering when ETH-222 has been implemented - */ - const withoutOrphaned = filter(backendResults, (p) => p.stream !== null) - /* - * As we query via permissions entity, any stream can appear multiple times (once per - * permission user) if we don't do have exactly one userId in the GraphQL query. - * That is the case if no permission filter is defined at all, or if permission.allowPublic - * is true (then it appears twice: once for the user, and once for the public address). - */ - const withoutDuplicates = unique(withoutOrphaned, (p) => p.stream.id) +} - if (permissionFilter !== undefined) { - /* - * There are situations where the The Graph may contain empty assignments (all boolean flags false, - * and all expirations in the past). E.g.: - * - if we granted some permissions to a user, but then removed all those permissions - * - if we granted an expirable permission (subscribe or publish), and it has now expired - * We don't want to return empty assignments to the user, because from user's perspective those are - * non-existing assignments. - * -> Here we filter out the empty assignments by defining a fallback value for anyOf filter - */ - const anyOf = permissionFilter.anyOf ?? Object.values(StreamPermission) as StreamPermission[] - yield* filter(withoutDuplicates, (item: SearchStreamsResultItem) => { - const actual = convertChainPermissionsToStreamPermissions(item) - return anyOf.some((p) => actual.includes(p)) - }) - } else { - yield* withoutDuplicates +const escapeStringValue = (s: string) => s.replace(/\\/g, '\\\\').replace(/"/g, '\\"') // escape backslashes and double quotes +const wrapWithQuotes = (s: string) => `"${s}"` +const wrapSubExpression = (s: string) => `{ ${s} }` + +const createPermissionFilterExpression = (permissions: StreamPermission[], operator: 'and' | 'or', nowTimestampInSeconds: number) => { + const subExpressions: string[] = [] + if (permissions.includes(StreamPermission.EDIT)) { + subExpressions.push('canEdit: true') } + if (permissions.includes(StreamPermission.DELETE)) { + subExpressions.push('canDelete: true') + } + if (permissions.includes(StreamPermission.PUBLISH)) { + subExpressions.push(`publishExpiration_gt: "${nowTimestampInSeconds}"`) + } + if (permissions.includes(StreamPermission.SUBSCRIBE)) { + subExpressions.push(`subscribeExpiration_gt: "${nowTimestampInSeconds}"`) + } + if (permissions.includes(StreamPermission.GRANT)) { + subExpressions.push('canGrant: true') + } + return `${operator}: [${subExpressions.map(wrapSubExpression).join(', ')}]` } -/* - * Note that we query the results via permissions entity even if there is no permission filter - * defined. It is maybe possible to optimize the non-permission related queries by searching over - * the Stream entity. To support that we'd need to add a new field to The Graph (e.g. "idAsString"), - * as we can't do substring filtering by Stream id field (there is no "id_contains" because - * ID type is not a string) - */ const buildQuery = ( term: string | undefined, permissionFilter: InternalSearchStreamsPermissionFilter | undefined, lastId: string, pageSize: number ): GraphQLQuery => { - const variables: Record = { - stream_contains: term, - id_gt: lastId + const whereExpressions: string[] = [] + whereExpressions.push(`id_gt: "${escapeStringValue(lastId)}"`) + if (term !== undefined) { + whereExpressions.push(`idAsString_contains: "${escapeStringValue(term)}"`) } if (permissionFilter !== undefined) { - variables.userId_in = [permissionFilter.userId] + const permissionExpressions: string[] = [] + const userId: string[] = [permissionFilter.userId] if (permissionFilter.allowPublic) { - variables.userId_in.push(PUBLIC_PERMISSION_USER_ID) + userId.push(PUBLIC_PERMISSION_USER_ID) } + permissionExpressions.push(`userId_in: [${userId.map(wrapWithQuotes).join(',')}]`) + const nowTimestampInSeconds = Math.round(Date.now() / 1000) if (permissionFilter.allOf !== undefined) { - const now = String(Math.round(Date.now() / 1000)) - variables.canEdit = permissionFilter.allOf.includes(StreamPermission.EDIT) ? true : undefined - variables.canDelete = permissionFilter.allOf.includes(StreamPermission.DELETE) ? true : undefined - variables.publishExpiration_gt = permissionFilter.allOf.includes(StreamPermission.PUBLISH) ? now : undefined - variables.subscribeExpiration_gt = permissionFilter.allOf.includes(StreamPermission.SUBSCRIBE) ? now : undefined - variables.canGrant = permissionFilter.allOf.includes(StreamPermission.GRANT) ? true : undefined + permissionExpressions.push(createPermissionFilterExpression(permissionFilter.allOf, 'and', nowTimestampInSeconds)) + } + /* + * There are situations where the The Graph may contain empty assignments (all boolean flags false, + * and all expirations in the past). E.g.: + * - if we granted some permissions to a user, but then removed all those permissions + * - if we granted an expirable permission (subscribe or publish), and it has now expired + * We don't want to return empty assignments to the user, because from user's perspective those are + * non-existing assignments. That's why we apply this extra virtual anyOf filter if none of the user-given + * permission filters limit the result set in any way. + */ + const anyOfFilter = permissionFilter.anyOf + ?? (((permissionFilter.allOf === undefined) || (permissionFilter.allOf.length === 0)) + ? Object.values(StreamPermission) as StreamPermission[] + : undefined) + if (anyOfFilter !== undefined) { + permissionExpressions.push(createPermissionFilterExpression(anyOfFilter, 'or', nowTimestampInSeconds)) } + whereExpressions.push(`permissions_: { and: [${permissionExpressions.map(wrapSubExpression).join(', ')}] }`) } const query = ` - query ( - $stream_contains: String, - $userId_in: [Bytes!] - $canEdit: Boolean - $canDelete: Boolean - $publishExpiration_gt: BigInt - $subscribeExpiration_gt: BigInt - $canGrant: Boolean - $id_gt: String - ) { - streamPermissions ( - first: ${pageSize}, - orderBy: "stream__id", - ${TheGraphClient.createWhereClause(variables)} + query { + streams ( + first: ${pageSize} + orderBy: "id" + where: { + ${whereExpressions.join(', ')} + } ) { id - stream { - id - metadata - } - canEdit - canDelete - publishExpiration - subscribeExpiration - canGrant + metadata } }` - return { query, variables } + return { query } } diff --git a/packages/utils/src/TheGraphClient.ts b/packages/utils/src/TheGraphClient.ts index 96174ff31d..90bdc4a5e8 100644 --- a/packages/utils/src/TheGraphClient.ts +++ b/packages/utils/src/TheGraphClient.ts @@ -113,14 +113,6 @@ export class TheGraphClient { // eslint-disable-next-line no-underscore-dangle return response._meta.block.number } - - static createWhereClause(variables: Record): string { - const parameterList = Object.keys(variables) - .filter((k) => variables[k] !== undefined) - .map((k) => k + ': $' + k) - .join(' ') - return `where: { ${parameterList} }` - } } class BlockNumberGate extends Gate {