Skip to content

Commit 67adb88

Browse files
committed
feat: Dynamic subscription filters
1 parent bdb54b7 commit 67adb88

File tree

6 files changed

+465
-129
lines changed

6 files changed

+465
-129
lines changed

examples/data-history-museum/index.ts

Lines changed: 81 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { TransactionResult } from '@algorandfoundation/algokit-utils/types/index
33
import algosdk from 'algosdk'
44
import fs from 'fs'
55
import path from 'path'
6-
import { AlgorandSubscriber } from '../../src/subscriber'
6+
import { DynamicAlgorandSubscriber } from '../../src'
77
import TransactionType = algosdk.TransactionType
88

99
if (!fs.existsSync(path.join(__dirname, '..', '..', '.env')) && !process.env.ALGOD_SERVER) {
@@ -20,27 +20,54 @@ interface DHMAsset {
2020
metadata: Record<string, unknown>
2121
created: string
2222
lastModified: string
23+
owner: string
24+
ownerModified: string
25+
}
26+
27+
interface DHMFilterState {
28+
assetIds: number[]
2329
}
2430

2531
async function getDHMSubscriber() {
2632
const algod = await algokit.getAlgoClient()
2733
const indexer = await algokit.getAlgoIndexerClient()
28-
const subscriber = new AlgorandSubscriber(
34+
const subscriber = new DynamicAlgorandSubscriber<DHMFilterState>(
2935
{
30-
filters: [
31-
{
32-
name: 'dhm-asset',
33-
filter: {
34-
type: TransactionType.acfg,
35-
// Data History Museum creator accounts
36-
sender: (await algokit.isTestNet(algod))
37-
? 'ER7AMZRPD5KDVFWTUUVOADSOWM4RQKEEV2EDYRVSA757UHXOIEKGMBQIVU'
38-
: 'EHYQCYHUC6CIWZLBX5TDTLVJ4SSVE4RRTMKFDCG4Z4Q7QSQ2XWIQPMKBPU',
39-
},
40-
},
36+
maxIndexerRoundsToSync: 5_000_000,
37+
dynamicFilters: async (filterState, pollLevel) => [
38+
...(pollLevel === 0
39+
? [
40+
{
41+
name: 'dhm-asset',
42+
filter: {
43+
type: TransactionType.acfg,
44+
// Data History Museum creator accounts
45+
sender: (await algokit.isTestNet(algod))
46+
? 'ER7AMZRPD5KDVFWTUUVOADSOWM4RQKEEV2EDYRVSA757UHXOIEKGMBQIVU'
47+
: 'EHYQCYHUC6CIWZLBX5TDTLVJ4SSVE4RRTMKFDCG4Z4Q7QSQ2XWIQPMKBPU',
48+
},
49+
},
50+
]
51+
: []),
52+
...(filterState.assetIds.length > 0
53+
? [
54+
{
55+
name: 'dhm-ownership-change',
56+
filter: {
57+
type: TransactionType.axfer,
58+
assetId: filterState.assetIds,
59+
minAmount: 1,
60+
},
61+
},
62+
]
63+
: []),
4164
],
42-
frequencyInSeconds: 5,
43-
maxRoundsToSync: 100,
65+
filterStatePersistence: {
66+
get: getFilterState,
67+
set: saveFilterState,
68+
},
69+
frequencyInSeconds: 1,
70+
maxRoundsToSync: 500,
4471
syncBehaviour: 'catchup-with-indexer',
4572
watermarkPersistence: {
4673
get: getLastWatermark,
@@ -52,9 +79,18 @@ async function getDHMSubscriber() {
5279
)
5380
subscriber.onBatch('dhm-asset', async (events) => {
5481
// eslint-disable-next-line no-console
55-
console.log(`Received ${events.length} asset changes`)
82+
console.log(`Received ${events.length} asset changes (${events.filter((t) => t['created-asset-index']).length} new assets)`)
83+
84+
// Append any new asset ids to the filter state so ownership is picked up of them
85+
subscriber.appendFilterState({ assetIds: events.filter((e) => e['created-asset-index']).map((e) => e['created-asset-index']!) })
86+
})
87+
subscriber.onBatch('dhm-ownership-change', async (events) => {
88+
// eslint-disable-next-line no-console
89+
console.log(`Received ${events.length} ownership changes`)
90+
})
91+
subscriber.onPoll(async (pollMetadata) => {
5692
// Save all of the Data History Museum Verifiably Authentic Digital Historical Artifacts
57-
await saveDHMTransactions(events)
93+
await saveDHMTransactions(pollMetadata.subscribedTransactions)
5894
})
5995
return subscriber
6096
}
@@ -81,8 +117,10 @@ async function saveDHMTransactions(transactions: TransactionResult[]) {
81117
metadata: getArc69Metadata(t),
82118
created: new Date(t['round-time']! * 1000).toISOString(),
83119
lastModified: new Date(t['round-time']! * 1000).toISOString(),
120+
owner: t.sender,
121+
ownerModified: new Date(t['round-time']! * 1000).toISOString(),
84122
})
85-
} else {
123+
} else if (t['asset-config-transaction']) {
86124
const asset = assets.find((a) => a.id === t['asset-config-transaction']!['asset-id'])
87125
if (!asset) {
88126
// eslint-disable-next-line no-console
@@ -96,6 +134,17 @@ async function saveDHMTransactions(transactions: TransactionResult[]) {
96134
asset!.metadata = getArc69Metadata(t)
97135
asset!.lastModified = new Date(t['round-time']! * 1000).toISOString()
98136
}
137+
} else if (t['asset-transfer-transaction']) {
138+
const asset = assets.find((a) => a.id === t['asset-transfer-transaction']!['asset-id'])
139+
if (!asset) {
140+
// eslint-disable-next-line no-console
141+
console.error(t)
142+
throw new Error(`Unable to find existing asset data for ${t['asset-transfer-transaction']!['asset-id']}`)
143+
}
144+
if (t['asset-transfer-transaction'].amount > 0) {
145+
asset.owner = t['asset-transfer-transaction']!.receiver
146+
asset.ownerModified = new Date(t['round-time']! * 1000).toISOString()
147+
}
99148
}
100149
}
101150

@@ -104,12 +153,25 @@ async function saveDHMTransactions(transactions: TransactionResult[]) {
104153

105154
// Basic methods that persist using filesystem - for illustrative purposes only
106155

156+
async function saveFilterState(state: DHMFilterState) {
157+
fs.writeFileSync(path.join(__dirname, 'filters.json'), JSON.stringify(state), { encoding: 'utf-8' })
158+
}
159+
160+
async function getFilterState(): Promise<DHMFilterState> {
161+
if (!fs.existsSync(path.join(__dirname, 'filters.json'))) return { assetIds: [] }
162+
const existing = fs.readFileSync(path.join(__dirname, 'filters.json'), 'utf-8')
163+
const existingData = JSON.parse(existing) as DHMFilterState
164+
// eslint-disable-next-line no-console
165+
console.log(`Found existing filter state in filters.json; syncing with ${existingData.assetIds.length} assets`)
166+
return existingData
167+
}
168+
107169
async function saveWatermark(watermark: number) {
108170
fs.writeFileSync(path.join(__dirname, 'watermark.txt'), watermark.toString(), { encoding: 'utf-8' })
109171
}
110172

111173
async function getLastWatermark(): Promise<number> {
112-
if (!fs.existsSync(path.join(__dirname, 'watermark.txt'))) return 0
174+
if (!fs.existsSync(path.join(__dirname, 'watermark.txt'))) return 15_000_000
113175
const existing = fs.readFileSync(path.join(__dirname, 'watermark.txt'), 'utf-8')
114176
// eslint-disable-next-line no-console
115177
console.log(`Found existing sync watermark in watermark.txt; syncing from ${existing}`)

src/dynamic-subscriber.ts

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
import algosdk from 'algosdk'
2+
import { AlgorandSubscriber } from './subscriber'
3+
import {
4+
getAlgodSubscribedTransactions,
5+
getArc28EventsToProcess,
6+
getIndexerCatchupTransactions,
7+
prepareSubscriptionPoll,
8+
processExtraSubscriptionTransactionFields,
9+
} from './subscriptions'
10+
import type {
11+
DynamicAlgorandSubscriberConfig,
12+
NamedTransactionFilter,
13+
SubscribedTransaction,
14+
TransactionSubscriptionResult,
15+
} from './types/subscription'
16+
import Algodv2 = algosdk.Algodv2
17+
import Indexer = algosdk.Indexer
18+
19+
export class DynamicAlgorandSubscriber<T> extends AlgorandSubscriber {
20+
private pendingStateChanges: { action: 'append' | 'delete' | 'set'; stateChange: Partial<T> }[] = []
21+
private dynamicConfig: DynamicAlgorandSubscriberConfig<T>
22+
23+
constructor(config: DynamicAlgorandSubscriberConfig<T>, algod: Algodv2, indexer?: Indexer) {
24+
super(
25+
{
26+
filters: [],
27+
...config,
28+
},
29+
algod,
30+
indexer,
31+
)
32+
this.dynamicConfig = config
33+
}
34+
35+
protected override async _pollOnce(watermark: number): Promise<TransactionSubscriptionResult> {
36+
let subscribedTransactions: SubscribedTransaction[] = []
37+
let filterState: T = await this.dynamicConfig.filterStatePersistence.get()
38+
39+
const subscribe = async (filters: NamedTransactionFilter[]) => {
40+
const catchupTransactions = await getIndexerCatchupTransactions(filters, pollMetadata, arc28EventsToProcess, this.indexer)
41+
const algodTransactions = await getAlgodSubscribedTransactions(filters, pollMetadata, arc28EventsToProcess)
42+
const subscribedTransactions = catchupTransactions
43+
.concat(algodTransactions)
44+
.map((t) => processExtraSubscriptionTransactionFields(t, arc28EventsToProcess, this.config.arc28Events ?? []))
45+
this._processFilters({ subscribedTransactions, ...pollMetadata })
46+
return subscribedTransactions
47+
}
48+
49+
const filters = await this.dynamicConfig.dynamicFilters(filterState, 0)
50+
this.filterNames = filters
51+
.map((f) => f.name)
52+
.filter((value, index, self) => {
53+
// Remove duplicates
54+
return self.findIndex((x) => x === value) === index
55+
})
56+
const pollMetadata = await prepareSubscriptionPoll({ ...this.config, watermark, filters }, this.algod)
57+
const arc28EventsToProcess = getArc28EventsToProcess(this.config.arc28Events ?? [])
58+
59+
subscribedTransactions = await subscribe(filters)
60+
61+
let pollLevel = 0
62+
while (this.pendingStateChanges.length > 0) {
63+
let filterStateToProcess = { ...filterState }
64+
for (const change of this.pendingStateChanges) {
65+
switch (change.action) {
66+
case 'append':
67+
for (const key of Object.keys(change.stateChange)) {
68+
const k = key as keyof T
69+
if (!filterState[k] || !Array.isArray(filterState[k])) {
70+
filterState[k] = change.stateChange[k]!
71+
} else {
72+
filterState[k] = (filterState[k] as unknown[]).concat(change.stateChange[k]) as T[keyof T]
73+
}
74+
}
75+
filterStateToProcess = { ...filterStateToProcess, ...change.stateChange }
76+
break
77+
case 'delete':
78+
for (const key of Object.keys(change.stateChange)) {
79+
const k = key as keyof T
80+
delete filterState[k]
81+
delete filterStateToProcess[k]
82+
}
83+
break
84+
case 'set':
85+
filterState = { ...filterState, ...change.stateChange }
86+
filterStateToProcess = { ...filterState, ...change.stateChange }
87+
break
88+
}
89+
}
90+
this.pendingStateChanges = []
91+
const newFilters = await this.dynamicConfig.dynamicFilters(filterStateToProcess, ++pollLevel)
92+
this.filterNames = newFilters
93+
.map((f) => f.name)
94+
.filter((value, index, self) => {
95+
// Remove duplicates
96+
return self.findIndex((x) => x === value) === index
97+
})
98+
subscribedTransactions = subscribedTransactions.concat(await subscribe(newFilters))
99+
}
100+
101+
await this.dynamicConfig.filterStatePersistence.set(filterState)
102+
103+
return {
104+
syncedRoundRange: pollMetadata.syncedRoundRange,
105+
newWatermark: pollMetadata.newWatermark,
106+
currentRound: pollMetadata.currentRound,
107+
subscribedTransactions: subscribedTransactions.sort(
108+
(a, b) => a['confirmed-round']! - b['confirmed-round']! || a['intra-round-offset']! - b['intra-round-offset']!,
109+
),
110+
}
111+
}
112+
113+
appendFilterState(stateChange: Partial<T>) {
114+
this.pendingStateChanges.push({ action: 'append', stateChange })
115+
}
116+
117+
deleteFilterState(stateChange: (keyof T)[]) {
118+
this.pendingStateChanges.push({
119+
action: 'delete',
120+
stateChange: stateChange.reduce((acc, key) => ({ ...acc, [key]: true }), {} as Partial<T>),
121+
})
122+
}
123+
124+
setFilterState(stateChange: Partial<T>) {
125+
this.pendingStateChanges.push({ action: 'set', stateChange })
126+
}
127+
}

src/index.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1-
export * from './subscriber'
2-
export * from './subscriptions'
1+
export { DynamicAlgorandSubscriber } from './dynamic-subscriber'
2+
export { AlgorandSubscriber } from './subscriber'
3+
export { getSubscribedTransactions } from './subscriptions'

0 commit comments

Comments
 (0)