Skip to content

Commit 7f35f59

Browse files
committed
feat: Dynamic subscription filters
1 parent 6221b2a commit 7f35f59

File tree

6 files changed

+606
-109
lines changed

6 files changed

+606
-109
lines changed
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
import * as algokit from '@algorandfoundation/algokit-utils'
2+
import { TransactionResult } from '@algorandfoundation/algokit-utils/types/indexer'
3+
import algosdk from 'algosdk'
4+
import fs from 'fs'
5+
import path from 'path'
6+
import { DynamicAlgorandSubscriber } from '../../src'
7+
import TransactionType = algosdk.TransactionType
8+
9+
if (!fs.existsSync(path.join(__dirname, '..', '..', '.env')) && !process.env.ALGOD_SERVER) {
10+
// eslint-disable-next-line no-console
11+
console.error('Copy /.env.sample to /.env before starting the application.')
12+
process.exit(1)
13+
}
14+
15+
interface DHMAsset {
16+
id: number
17+
name: string
18+
unit: string
19+
mediaUrl: string
20+
metadata: Record<string, unknown>
21+
created: string
22+
lastModified: string
23+
owner: string
24+
ownerModified: string
25+
}
26+
27+
interface DHMFilterState {
28+
assetIds: number[]
29+
}
30+
31+
async function getDHMSubscriber() {
32+
const algod = await algokit.getAlgoClient()
33+
const indexer = await algokit.getAlgoIndexerClient()
34+
const subscriber = new DynamicAlgorandSubscriber<DHMFilterState>(
35+
{
36+
maxIndexerRoundsToSync: 10_000_000,
37+
dynamicFilters: async (filterState, pollLevel) => [
38+
...(pollLevel === 0
39+
? [
40+
{
41+
name: 'dhm-asset',
42+
filter: {
43+
type: TransactionType.acfg,
44+
// Data History Museum creator accounts
45+
sender: (await algokit.isTestNet(algod))
46+
? 'ER7AMZRPD5KDVFWTUUVOADSOWM4RQKEEV2EDYRVSA757UHXOIEKGMBQIVU'
47+
: 'EHYQCYHUC6CIWZLBX5TDTLVJ4SSVE4RRTMKFDCG4Z4Q7QSQ2XWIQPMKBPU',
48+
},
49+
},
50+
]
51+
: []),
52+
...(filterState.assetIds.length > 0
53+
? [
54+
{
55+
name: 'dhm-ownership-change',
56+
filter: {
57+
type: TransactionType.axfer,
58+
assetId: filterState.assetIds,
59+
minAmount: 1,
60+
},
61+
},
62+
]
63+
: []),
64+
],
65+
filterStatePersistence: {
66+
get: getFilterState,
67+
set: saveFilterState,
68+
},
69+
frequencyInSeconds: 1,
70+
maxRoundsToSync: 500,
71+
syncBehaviour: 'catchup-with-indexer',
72+
watermarkPersistence: {
73+
get: getLastWatermark,
74+
set: saveWatermark,
75+
},
76+
},
77+
algod,
78+
indexer,
79+
)
80+
subscriber.onBatch('dhm-asset', async (events) => {
81+
// eslint-disable-next-line no-console
82+
console.log(`Received ${events.length} asset changes (${events.filter((t) => t['created-asset-index']).length} new assets)`)
83+
84+
// Append any new asset ids to the filter state so ownership is picked up of them
85+
subscriber.appendFilterState({ assetIds: events.filter((e) => e['created-asset-index']).map((e) => e['created-asset-index']!) })
86+
})
87+
subscriber.onBatch('dhm-ownership-change', async (events) => {
88+
// eslint-disable-next-line no-console
89+
console.log(`Received ${events.length} ownership changes`)
90+
})
91+
subscriber.onPoll(async (pollMetadata) => {
92+
// Save all of the Data History Museum Verifiably Authentic Digital Historical Artifacts
93+
await saveDHMTransactions(pollMetadata.subscribedTransactions)
94+
})
95+
return subscriber
96+
}
97+
98+
function getArc69Metadata(t: TransactionResult) {
99+
let metadata = {}
100+
try {
101+
if (t.note && t.note.startsWith('ey')) metadata = JSON.parse(Buffer.from(t.note, 'base64').toString('utf-8'))
102+
// eslint-disable-next-line no-empty
103+
} catch (e) {}
104+
return metadata
105+
}
106+
107+
async function saveDHMTransactions(transactions: TransactionResult[]) {
108+
const assets = await getSavedTransactions<DHMAsset>('dhm-assets.json')
109+
110+
for (const t of transactions) {
111+
if (t['created-asset-index']) {
112+
assets.push({
113+
id: t['created-asset-index'],
114+
name: t['asset-config-transaction']!.params!.name!,
115+
unit: t['asset-config-transaction']!.params!['unit-name']!,
116+
mediaUrl: t['asset-config-transaction']!.params!.url!,
117+
metadata: getArc69Metadata(t),
118+
created: new Date(t['round-time']! * 1000).toISOString(),
119+
lastModified: new Date(t['round-time']! * 1000).toISOString(),
120+
owner: t.sender,
121+
ownerModified: new Date(t['round-time']! * 1000).toISOString(),
122+
})
123+
} else if (t['asset-config-transaction']) {
124+
const asset = assets.find((a) => a.id === t['asset-config-transaction']!['asset-id'])
125+
if (!asset) {
126+
// eslint-disable-next-line no-console
127+
console.error(t)
128+
throw new Error(`Unable to find existing asset data for ${t['asset-config-transaction']!['asset-id']}`)
129+
}
130+
if (!t['asset-config-transaction']!.params) {
131+
// Asset was deleted, remove it
132+
assets.splice(assets.indexOf(asset), 1)
133+
} else {
134+
asset!.metadata = getArc69Metadata(t)
135+
asset!.lastModified = new Date(t['round-time']! * 1000).toISOString()
136+
}
137+
} else if (t['asset-transfer-transaction']) {
138+
const asset = assets.find((a) => a.id === t['asset-transfer-transaction']!['asset-id'])
139+
if (!asset) {
140+
// eslint-disable-next-line no-console
141+
console.error(t)
142+
throw new Error(`Unable to find existing asset data for ${t['asset-transfer-transaction']!['asset-id']}`)
143+
}
144+
if (t['asset-transfer-transaction'].amount > 0) {
145+
asset.owner = t['asset-transfer-transaction']!.receiver
146+
asset.ownerModified = new Date(t['round-time']! * 1000).toISOString()
147+
}
148+
}
149+
}
150+
151+
await saveTransactions(assets, 'dhm-assets.json')
152+
}
153+
154+
// Basic methods that persist using filesystem - for illustrative purposes only
155+
156+
async function saveFilterState(state: DHMFilterState) {
157+
fs.writeFileSync(path.join(__dirname, 'filters.json'), JSON.stringify(state), { encoding: 'utf-8' })
158+
}
159+
160+
async function getFilterState(): Promise<DHMFilterState> {
161+
if (!fs.existsSync(path.join(__dirname, 'filters.json'))) return { assetIds: [] }
162+
const existing = fs.readFileSync(path.join(__dirname, 'filters.json'), 'utf-8')
163+
const existingData = JSON.parse(existing) as DHMFilterState
164+
// eslint-disable-next-line no-console
165+
console.log(`Found existing filter state in filters.json; syncing with ${existingData.assetIds.length} assets`)
166+
return existingData
167+
}
168+
169+
async function saveWatermark(watermark: number) {
170+
fs.writeFileSync(path.join(__dirname, 'watermark.txt'), watermark.toString(), { encoding: 'utf-8' })
171+
}
172+
173+
async function getLastWatermark(): Promise<number> {
174+
if (!fs.existsSync(path.join(__dirname, 'watermark.txt'))) return 15_000_000
175+
const existing = fs.readFileSync(path.join(__dirname, 'watermark.txt'), 'utf-8')
176+
// eslint-disable-next-line no-console
177+
console.log(`Found existing sync watermark in watermark.txt; syncing from ${existing}`)
178+
return Number(existing)
179+
}
180+
181+
async function getSavedTransactions<T>(fileName: string): Promise<T[]> {
182+
const existing = fs.existsSync(path.join(__dirname, fileName))
183+
? (JSON.parse(fs.readFileSync(path.join(__dirname, fileName), 'utf-8')) as T[])
184+
: []
185+
return existing
186+
}
187+
188+
async function saveTransactions(transactions: unknown[], fileName: string) {
189+
fs.writeFileSync(path.join(__dirname, fileName), JSON.stringify(transactions, undefined, 2), { encoding: 'utf-8' })
190+
// eslint-disable-next-line no-console
191+
console.log(`Saved ${transactions.length} transactions to ${fileName}`)
192+
}
193+
194+
// eslint-disable-next-line no-console
195+
process.on('uncaughtException', (e) => console.error(e))
196+
;(async () => {
197+
const subscriber = await getDHMSubscriber()
198+
199+
if (process.env.RUN_LOOP === 'true') {
200+
subscriber.start()
201+
;['SIGINT', 'SIGTERM', 'SIGQUIT'].forEach((signal) =>
202+
process.on(signal, () => {
203+
// eslint-disable-next-line no-console
204+
console.log(`Received ${signal}; stopping subscriber...`)
205+
subscriber.stop(signal)
206+
}),
207+
)
208+
} else {
209+
await subscriber.pollOnce()
210+
}
211+
})().catch((e) => {
212+
// eslint-disable-next-line no-console
213+
console.error(e)
214+
})

src/dynamic-subscriber.ts

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import * as algokit from '@algorandfoundation/algokit-utils'
2+
import algosdk from 'algosdk'
3+
import { AlgorandSubscriber } from './subscriber'
4+
import {
5+
getAlgodSubscribedTransactions,
6+
getArc28EventsToProcess,
7+
getIndexerCatchupTransactions,
8+
prepareSubscriptionPoll,
9+
processExtraSubscriptionTransactionFields,
10+
} from './subscriptions'
11+
import type {
12+
DynamicAlgorandSubscriberConfig,
13+
NamedTransactionFilter,
14+
SubscribedTransaction,
15+
TransactionSubscriptionResult,
16+
} from './types/subscription'
17+
import Algodv2 = algosdk.Algodv2
18+
import Indexer = algosdk.Indexer
19+
20+
export class DynamicAlgorandSubscriber<T> extends AlgorandSubscriber {
21+
private pendingStateChanges: { action: 'append' | 'delete' | 'set'; stateChange: Partial<T> }[] = []
22+
private dynamicConfig: DynamicAlgorandSubscriberConfig<T>
23+
24+
constructor(config: DynamicAlgorandSubscriberConfig<T>, algod: Algodv2, indexer?: Indexer) {
25+
super(
26+
{
27+
filters: [],
28+
...config,
29+
},
30+
algod,
31+
indexer,
32+
)
33+
this.dynamicConfig = config
34+
}
35+
36+
protected override async _pollOnce(watermark: number): Promise<TransactionSubscriptionResult> {
37+
let subscribedTransactions: SubscribedTransaction[] = []
38+
let filterState: T = await this.dynamicConfig.filterStatePersistence.get()
39+
40+
const subscribe = async (filters: NamedTransactionFilter[]) => {
41+
if (filters.length === 0) return []
42+
const catchupTransactions = await getIndexerCatchupTransactions(filters, pollMetadata, arc28EventsToProcess, this.indexer)
43+
const algodTransactions = await getAlgodSubscribedTransactions(filters, pollMetadata, arc28EventsToProcess)
44+
const subscribedTransactions = catchupTransactions
45+
.concat(algodTransactions)
46+
.map((t) => processExtraSubscriptionTransactionFields(t, arc28EventsToProcess, this.config.arc28Events ?? []))
47+
await this._processFilters({ subscribedTransactions, ...pollMetadata })
48+
return subscribedTransactions
49+
}
50+
51+
const filters = await this.dynamicConfig.dynamicFilters(filterState, 0, watermark)
52+
this.filterNames = filters
53+
.map((f) => f.name)
54+
.filter((value, index, self) => {
55+
// Remove duplicates
56+
return self.findIndex((x) => x === value) === index
57+
})
58+
const pollMetadata = await prepareSubscriptionPoll({ ...this.config, watermark, filters }, this.algod)
59+
const arc28EventsToProcess = getArc28EventsToProcess(this.config.arc28Events ?? [])
60+
61+
subscribedTransactions = await subscribe(filters)
62+
63+
let pollLevel = 0
64+
while (this.pendingStateChanges.length > 0) {
65+
const stateChangeCount = this.pendingStateChanges.length
66+
let filterStateToProcess = { ...filterState }
67+
for (const change of this.pendingStateChanges) {
68+
switch (change.action) {
69+
case 'append':
70+
for (const key of Object.keys(change.stateChange)) {
71+
const k = key as keyof T
72+
if (!filterState[k] || !Array.isArray(filterState[k])) {
73+
filterState[k] = change.stateChange[k]!
74+
} else {
75+
filterState[k] = (filterState[k] as unknown[]).concat(change.stateChange[k]) as T[keyof T]
76+
}
77+
}
78+
filterStateToProcess = { ...filterStateToProcess, ...change.stateChange }
79+
break
80+
case 'delete':
81+
for (const key of Object.keys(change.stateChange)) {
82+
const k = key as keyof T
83+
delete filterState[k]
84+
delete filterStateToProcess[k]
85+
}
86+
break
87+
case 'set':
88+
filterState = { ...filterState, ...change.stateChange }
89+
filterStateToProcess = { ...filterState, ...change.stateChange }
90+
break
91+
}
92+
}
93+
this.pendingStateChanges = []
94+
const newFilters = await this.dynamicConfig.dynamicFilters(filterStateToProcess, ++pollLevel, watermark)
95+
this.filterNames = newFilters
96+
.map((f) => f.name)
97+
.filter((value, index, self) => {
98+
// Remove duplicates
99+
return self.findIndex((x) => x === value) === index
100+
})
101+
102+
algokit.Config.logger.debug(
103+
`Poll level ${pollLevel}: Found ${stateChangeCount} pending state changes and applied them to get ${newFilters.length} filters; syncing...`,
104+
)
105+
106+
subscribedTransactions = subscribedTransactions.concat(await subscribe(newFilters))
107+
}
108+
109+
await this.dynamicConfig.filterStatePersistence.set(filterState)
110+
111+
return {
112+
syncedRoundRange: pollMetadata.syncedRoundRange,
113+
newWatermark: pollMetadata.newWatermark,
114+
currentRound: pollMetadata.currentRound,
115+
subscribedTransactions: subscribedTransactions.sort(
116+
(a, b) => a['confirmed-round']! - b['confirmed-round']! || a['intra-round-offset']! - b['intra-round-offset']!,
117+
),
118+
}
119+
}
120+
121+
appendFilterState(stateChange: Partial<T>) {
122+
this.pendingStateChanges.push({ action: 'append', stateChange })
123+
}
124+
125+
deleteFilterState(stateChange: (keyof T)[]) {
126+
this.pendingStateChanges.push({
127+
action: 'delete',
128+
stateChange: stateChange.reduce((acc, key) => ({ ...acc, [key]: true }), {} as Partial<T>),
129+
})
130+
}
131+
132+
setFilterState(stateChange: Partial<T>) {
133+
this.pendingStateChanges.push({ action: 'set', stateChange })
134+
}
135+
}

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1+
export * from './dynamic-subscriber'
12
export * from './subscriber'
23
export * from './subscriptions'

0 commit comments

Comments
 (0)