Skip to content

fix(sdk): [NET-1462] StreamrClient#searchStreams() queries using Stream entity #3132

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions packages/sdk/src/StreamrClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import { OperatorRegistry } from './contracts/OperatorRegistry'
import { StorageNodeMetadata, StorageNodeRegistry } from './contracts/StorageNodeRegistry'
import { StreamRegistry } from './contracts/StreamRegistry'
import { StreamStorageRegistry } from './contracts/StreamStorageRegistry'
import { SearchStreamsOrderBy, SearchStreamsPermissionFilter, toInternalSearchStreamsPermissionFilter } from './contracts/searchStreams'
import { SearchStreamsPermissionFilter, toInternalSearchStreamsPermissionFilter } from './contracts/searchStreams'
import { GroupKey } from './encryption/GroupKey'
import { LocalGroupKeyStore, UpdateEncryptionKeyOptions } from './encryption/LocalGroupKeyStore'
import { PublisherKeyExchange } from './encryption/PublisherKeyExchange'
Expand Down Expand Up @@ -442,17 +442,15 @@ export class StreamrClient {
*/
searchStreams(
term: string | undefined,
permissionFilter: SearchStreamsPermissionFilter | undefined,
orderBy: SearchStreamsOrderBy = { field: 'id', direction: 'asc' }
permissionFilter: SearchStreamsPermissionFilter | undefined
): AsyncIterable<Stream> {
logger.debug('Search for streams', { term, permissionFilter })
if ((term === undefined) && (permissionFilter === undefined)) {
throw new Error('Requires a search term or a permission filter')
}
const streamIds = this.streamRegistry.searchStreams(
term,
(permissionFilter !== undefined) ? toInternalSearchStreamsPermissionFilter(permissionFilter) : undefined,
orderBy
(permissionFilter !== undefined) ? toInternalSearchStreamsPermissionFilter(permissionFilter) : undefined
)
return map(streamIds, (id) => new Stream(id, this))
}
Expand Down
19 changes: 3 additions & 16 deletions packages/sdk/src/contracts/StreamRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,7 @@ import { Mapping, createCacheMap } from '../utils/Mapping'
import { ChainEventPoller } from './ChainEventPoller'
import { ContractFactory } from './ContractFactory'
import { ObservableContract, initContractEventGateway, waitForTx } from './contract'
import { InternalSearchStreamsPermissionFilter, SearchStreamsOrderBy, searchStreams as _searchStreams } from './searchStreams'

/*
* On-chain registry of stream metadata and permissions.
*
* Does not support system streams (the key exchange stream)
*/

export interface StreamQueryResult {
id: string
metadata: string
}
import { InternalSearchStreamsPermissionFilter, searchStreams as _searchStreams } from './searchStreams'

interface StreamPublisherOrSubscriberItem {
id: string
Expand Down Expand Up @@ -297,16 +286,14 @@ export class StreamRegistry {
async* searchStreams(
term: string | undefined,
permissionFilter: InternalSearchStreamsPermissionFilter | undefined,
orderBy: SearchStreamsOrderBy
): AsyncGenerator<StreamID> {
const queryResult = _searchStreams(
term,
permissionFilter,
orderBy,
this.theGraphClient)
for await (const item of queryResult) {
const id = toStreamID(item.stream.id)
this.populateMetadataCache(id, parseMetadata(item.stream.metadata))
const id = toStreamID(item.id)
this.populateMetadataCache(id, parseMetadata(item.metadata))
yield id
}
}
Expand Down
153 changes: 65 additions & 88 deletions packages/sdk/src/contracts/searchStreams.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { ChangeFieldType, GraphQLQuery, HexString, TheGraphClient, toUserId, UserID } from '@streamr/utils'
import { ChainPermissions, convertChainPermissionsToStreamPermissions, PUBLIC_PERMISSION_USER_ID, StreamPermission } from '../permission'
import { filter, unique } from '../utils/GeneratorUtils'
import { StreamQueryResult } from './StreamRegistry'
import { ChainPermissions, PUBLIC_PERMISSION_USER_ID, StreamPermission } from '../permission'

export interface SearchStreamsPermissionFilter {
userId: HexString
Expand All @@ -15,15 +13,11 @@ export interface SearchStreamsPermissionFilter {

export type InternalSearchStreamsPermissionFilter = ChangeFieldType<SearchStreamsPermissionFilter, 'userId', UserID>

export interface SearchStreamsOrderBy {
field: 'id' | 'createdAt' | 'updatedAt'
direction: 'asc' | 'desc'
}

export type SearchStreamsResultItem = {
export interface SearchStreamsResultItem {
id: string
stream: StreamQueryResult
} & ChainPermissions
metadata: string
permissions: ChainPermissions[]
}

export const toInternalSearchStreamsPermissionFilter = (filter: SearchStreamsPermissionFilter): InternalSearchStreamsPermissionFilter => {
return {
Expand All @@ -35,106 +29,89 @@ export const toInternalSearchStreamsPermissionFilter = (filter: SearchStreamsPer
export async function* searchStreams(
term: string | undefined,
permissionFilter: InternalSearchStreamsPermissionFilter | undefined,
orderBy: SearchStreamsOrderBy,
theGraphClient: TheGraphClient,
): AsyncGenerator<SearchStreamsResultItem> {
const backendResults = theGraphClient.queryEntities<SearchStreamsResultItem>(
(lastId: string, pageSize: number) => buildQuery(term, permissionFilter, orderBy, lastId, pageSize)
yield* theGraphClient.queryEntities<SearchStreamsResultItem>(
(lastId: string, pageSize: number) => buildQuery(term, permissionFilter, lastId, pageSize)
)
/*
* There can be orphaned permission entities if a stream is deleted (currently
* we don't remove the assigned permissions, see ETH-222)
* TODO remove the filtering when ETH-222 has been implemented
*/
const withoutOrphaned = filter(backendResults, (p) => p.stream !== null)
/*
* As we query via permissions entity, any stream can appear multiple times (once per
* permission user) if we don't do have exactly one userId in the GraphQL query.
* That is the case if no permission filter is defined at all, or if permission.allowPublic
* is true (then it appears twice: once for the user, and once for the public address).
*/
const withoutDuplicates = unique(withoutOrphaned, (p) => p.stream.id)
}

if (permissionFilter !== undefined) {
/*
* There are situations where the The Graph may contain empty assignments (all boolean flags false,
* and all expirations in the past). E.g.:
* - if we granted some permissions to a user, but then removed all those permissions
* - if we granted an expirable permission (subscribe or publish), and it has now expired
* We don't want to return empty assignments to the user, because from user's perspective those are
* non-existing assignments.
* -> Here we filter out the empty assignments by defining a fallback value for anyOf filter
*/
const anyOf = permissionFilter.anyOf ?? Object.values(StreamPermission) as StreamPermission[]
yield* filter(withoutDuplicates, (item: SearchStreamsResultItem) => {
const actual = convertChainPermissionsToStreamPermissions(item)
return anyOf.some((p) => actual.includes(p))
})
} else {
yield* withoutDuplicates
const escapeStringValue = (s: string) => s.replace(/\\/g, '\\\\').replace(/"/g, '\\"') // escape backslashes and double quotes
const wrapWithQuotes = (s: string) => `"${s}"`
const wrapSubExpression = (s: string) => `{ ${s} }`

const createPermissionFilterExpression = (permissions: StreamPermission[], operator: 'and' | 'or', nowTimestampInSeconds: number) => {
const subExpressions: string[] = []
if (permissions.includes(StreamPermission.EDIT)) {
subExpressions.push('canEdit: true')
}
if (permissions.includes(StreamPermission.DELETE)) {
subExpressions.push('canDelete: true')
}
if (permissions.includes(StreamPermission.PUBLISH)) {
subExpressions.push(`publishExpiration_gt: "${nowTimestampInSeconds}"`)
}
if (permissions.includes(StreamPermission.SUBSCRIBE)) {
subExpressions.push(`subscribeExpiration_gt: "${nowTimestampInSeconds}"`)
}
if (permissions.includes(StreamPermission.GRANT)) {
subExpressions.push('canGrant: true')
}
return `${operator}: [${subExpressions.map(wrapSubExpression).join(', ')}]`
}

/*
* Note that we query the results via permissions entity even if there is no permission filter
* defined. It is maybe possible to optimize the non-permission related queries by searching over
* the Stream entity. To support that we'd need to add a new field to The Graph (e.g. "idAsString"),
* as we can't do substring filtering by Stream id field (there is no "id_contains" because
* ID type is not a string)
*/
const buildQuery = (
term: string | undefined,
permissionFilter: InternalSearchStreamsPermissionFilter | undefined,
orderBy: SearchStreamsOrderBy,
lastId: string,
pageSize: number
): GraphQLQuery => {
const variables: Record<string, any> = {
stream_contains: term,
id_gt: lastId
const whereExpressions: string[] = []
whereExpressions.push(`id_gt: "${escapeStringValue(lastId)}"`)
if (term !== undefined) {
whereExpressions.push(`idAsString_contains: "${escapeStringValue(term)}"`)
}
if (permissionFilter !== undefined) {
variables.userId_in = [permissionFilter.userId]
const permissionExpressions: string[] = []
const userId: string[] = [permissionFilter.userId]
if (permissionFilter.allowPublic) {
variables.userId_in.push(PUBLIC_PERMISSION_USER_ID)
userId.push(PUBLIC_PERMISSION_USER_ID)
}
permissionExpressions.push(`userId_in: [${userId.map(wrapWithQuotes).join(',')}]`)
const nowTimestampInSeconds = Math.round(Date.now() / 1000)
if (permissionFilter.allOf !== undefined) {
const now = String(Math.round(Date.now() / 1000))
variables.canEdit = permissionFilter.allOf.includes(StreamPermission.EDIT) ? true : undefined
variables.canDelete = permissionFilter.allOf.includes(StreamPermission.DELETE) ? true : undefined
variables.publishExpiration_gt = permissionFilter.allOf.includes(StreamPermission.PUBLISH) ? now : undefined
variables.subscribeExpiration_gt = permissionFilter.allOf.includes(StreamPermission.SUBSCRIBE) ? now : undefined
variables.canGrant = permissionFilter.allOf.includes(StreamPermission.GRANT) ? true : undefined
permissionExpressions.push(createPermissionFilterExpression(permissionFilter.allOf, 'and', nowTimestampInSeconds))
}
/*
* There are situations where the The Graph may contain empty assignments (all boolean flags false,
* and all expirations in the past). E.g.:
* - if we granted some permissions to a user, but then removed all those permissions
* - if we granted an expirable permission (subscribe or publish), and it has now expired
* We don't want to return empty assignments to the user, because from user's perspective those are
* non-existing assignments. That's why we apply this extra virtual anyOf filter if none of the user-given
* permission filters limit the result set in any way.
*/
const anyOfFilter = permissionFilter.anyOf
?? (((permissionFilter.allOf === undefined) || (permissionFilter.allOf.length === 0))
? Object.values(StreamPermission) as StreamPermission[]
: undefined)
if (anyOfFilter !== undefined) {
permissionExpressions.push(createPermissionFilterExpression(anyOfFilter, 'or', nowTimestampInSeconds))
}
whereExpressions.push(`permissions_: { and: [${permissionExpressions.map(wrapSubExpression).join(', ')}] }`)
}
const query = `
query (
$stream_contains: String,
$userId_in: [Bytes!]
$canEdit: Boolean
$canDelete: Boolean
$publishExpiration_gt: BigInt
$subscribeExpiration_gt: BigInt
$canGrant: Boolean
$id_gt: String
) {
streamPermissions (
first: ${pageSize},
orderBy: "stream__${orderBy.field}",
orderDirection: "${orderBy.direction}",
${TheGraphClient.createWhereClause(variables)}
query {
streams (
first: ${pageSize}
orderBy: "id"
where: {
${whereExpressions.join(', ')}
}
) {
id
stream {
id
metadata
}
canEdit
canDelete
publishExpiration
subscribeExpiration
canGrant
metadata
}
}`
return { query, variables }
return { query }
}
2 changes: 1 addition & 1 deletion packages/sdk/src/exports.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export {
export type { StreamCreationEvent } from './contracts/StreamRegistry'
export type { StorageNodeAssignmentEvent } from './contracts/StreamStorageRegistry'
export type { StorageNodeMetadata } from './contracts/StorageNodeRegistry'
export type { SearchStreamsPermissionFilter, SearchStreamsOrderBy } from './contracts/searchStreams'
export type { SearchStreamsPermissionFilter } from './contracts/searchStreams'
export {
type StreamrClientConfig,
type ConnectionInfo,
Expand Down
8 changes: 3 additions & 5 deletions packages/sdk/test/end-to-end/searchStreams.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createTestPrivateKey, randomUserId } from '@streamr/test-utils'
import { collect } from '@streamr/utils'
import { collect, StreamID } from '@streamr/utils'
import { Stream } from '../../src/Stream'
import { StreamrClient } from '../../src/StreamrClient'
import { SearchStreamsPermissionFilter } from '../../src/contracts/searchStreams'
Expand Down Expand Up @@ -31,11 +31,9 @@ describe('searchStreams', () => {
return streams
}

const searchStreamIds = async (searchTerm: string, permissionFilter?: SearchStreamsPermissionFilter) => {
const searchStreamIds = async (searchTerm: string, permissionFilter?: SearchStreamsPermissionFilter): Promise<StreamID[]> => {
const streams = client.searchStreams(searchTerm, permissionFilter)
const ids = (await collect(streams)).map((stream) => stream.id)
ids.sort()
return ids
return (await collect(streams)).map((stream) => stream.id)
}

beforeAll(async () => {
Expand Down
52 changes: 0 additions & 52 deletions packages/sdk/test/unit/searchStreams.test.ts

This file was deleted.

8 changes: 0 additions & 8 deletions packages/utils/src/TheGraphClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,6 @@ export class TheGraphClient {
// eslint-disable-next-line no-underscore-dangle
return response._meta.block.number
}

static createWhereClause(variables: Record<string, any>): string {
const parameterList = Object.keys(variables)
.filter((k) => variables[k] !== undefined)
.map((k) => k + ': $' + k)
.join(' ')
return `where: { ${parameterList} }`
}
}

class BlockNumberGate extends Gate {
Expand Down
Loading