Skip to content

Commit bbb9c6d

Browse files
committed
tests: Testing dynamic subscriber
1 parent 15c4a36 commit bbb9c6d

File tree

2 files changed

+370
-1
lines changed

2 files changed

+370
-1
lines changed

src/types/subscription.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,9 @@ export interface TransactionSubscriptionParams extends CoreTransactionSubscripti
238238
watermark: number
239239
}
240240

241+
/** A function that returns a set of filters based on a given filter state and hierarchical poll level. */
242+
export type DynamicFilterLambda<T> = (state: T, pollLevel: number, watermark: number) => Promise<SubscriberConfigFilter<unknown>[]>
243+
241244
/** Configuration for a `DynamicAlgorandSubscriber` */
242245
export interface DynamicAlgorandSubscriberConfig<T> extends Omit<AlgorandSubscriberConfig, 'filters'> {
243246
/**
@@ -247,7 +250,7 @@ export interface DynamicAlgorandSubscriberConfig<T> extends Omit<AlgorandSubscri
247250
* @param watermark The current watermark being processed
248251
* @returns The set of filters to subscribe to / emit events for
249252
*/
250-
dynamicFilters: (state: T, pollLevel: number, watermark: number) => Promise<SubscriberConfigFilter<unknown>[]>
253+
dynamicFilters: DynamicFilterLambda<T>
251254

252255
/** Methods to retrieve and persist the current filter state so syncing is resilient */
253256
filterStatePersistence: {
Lines changed: 366 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,366 @@
1+
/* eslint-disable no-console */
2+
import * as algokit from '@algorandfoundation/algokit-utils'
3+
import { algorandFixture } from '@algorandfoundation/algokit-utils/testing'
4+
import { SendTransactionFrom } from '@algorandfoundation/algokit-utils/types/transaction'
5+
import { Algodv2, Indexer } from 'algosdk'
6+
import { afterEach, beforeEach, describe, expect, test, vitest } from 'vitest'
7+
import { DynamicAlgorandSubscriber } from '../../src'
8+
import { DynamicAlgorandSubscriberConfig, DynamicFilterLambda } from '../../src/types'
9+
import { SendXTransactions } from '../transactions'
10+
import { waitFor } from '../wait'
11+
import { InMemoryWatermark } from '../watermarks'
12+
13+
describe('DynamicAlgorandSubscriber', () => {
14+
const localnet = algorandFixture()
15+
16+
beforeEach(localnet.beforeEach, 10e6)
17+
afterEach(() => {
18+
vitest.clearAllMocks()
19+
})
20+
21+
const InMemoryFilterState = <T>(get: () => T, set: (s: T) => void) => ({
22+
set: async (s: T) => {
23+
set(s)
24+
},
25+
get: async () => get(),
26+
})
27+
28+
const getSubscriber = <T>(
29+
config: {
30+
testAccount: SendTransactionFrom
31+
initialFilterState: T
32+
filters: DynamicFilterLambda<T>
33+
configOverrides?: Partial<DynamicAlgorandSubscriberConfig<T>>
34+
initialWatermark?: number
35+
},
36+
algod: Algodv2,
37+
indexer?: Indexer,
38+
) => {
39+
let watermark = config.initialWatermark ?? 0
40+
let filterState = config.initialFilterState
41+
const subscribedTxns: string[] = []
42+
43+
const subscriber = new DynamicAlgorandSubscriber(
44+
{
45+
...config.configOverrides,
46+
dynamicFilters: config.filters,
47+
syncBehaviour: config.configOverrides?.syncBehaviour ?? 'sync-oldest',
48+
watermarkPersistence: InMemoryWatermark(
49+
() => watermark,
50+
(w) => (watermark = w),
51+
),
52+
filterStatePersistence: InMemoryFilterState(
53+
() => filterState,
54+
(s) => (filterState = s),
55+
),
56+
},
57+
algod,
58+
indexer,
59+
)
60+
return {
61+
subscriber,
62+
subscribedTestAccountTxns: subscribedTxns,
63+
getWatermark: () => watermark,
64+
}
65+
}
66+
67+
test('Subscribes to transactions correctly when controlling polling', async () => {
68+
const { algod, testAccount, generateAccount } = localnet.context
69+
const { lastTxnRound, txIds } = await SendXTransactions(1, testAccount, algod)
70+
const {
71+
subscriber,
72+
subscribedTestAccountTxns: subscribedTxns,
73+
getWatermark,
74+
} = getSubscriber({ testAccount, initialWatermark: lastTxnRound - 1 }, algod)
75+
76+
// Initial catch up with indexer
77+
const result = await subscriber.pollOnce()
78+
expect(subscribedTxns.length).toBe(1)
79+
expect(subscribedTxns[0]).toBe(txIds[0])
80+
expect(getWatermark()).toBeGreaterThanOrEqual(lastTxnRound)
81+
expect(result.currentRound).toBeGreaterThanOrEqual(lastTxnRound)
82+
expect(result.newWatermark).toBe(result.currentRound)
83+
expect(result.syncedRoundRange).toEqual([lastTxnRound, result.currentRound])
84+
expect(result.subscribedTransactions.length).toBe(1)
85+
expect(result.subscribedTransactions.map((t) => t.id)).toEqual(txIds)
86+
87+
// Random transaction
88+
const { lastTxnRound: lastTxnRound2 } = await SendXTransactions(1, await generateAccount({ initialFunds: (3).algos() }), algod)
89+
await subscriber.pollOnce()
90+
expect(subscribedTxns.length).toBe(1)
91+
expect(getWatermark()).toBeGreaterThanOrEqual(lastTxnRound2)
92+
93+
// Another subscribed transaction
94+
const { lastTxnRound: lastTxnRound3, txIds: txIds3 } = await SendXTransactions(1, testAccount, algod)
95+
await subscriber.pollOnce()
96+
expect(subscribedTxns.length).toBe(2)
97+
expect(subscribedTxns[1]).toBe(txIds3[0])
98+
expect(getWatermark()).toBeGreaterThanOrEqual(lastTxnRound3)
99+
})
100+
101+
test('Subscribes to transactions with multiple filters correctly', async () => {
102+
const { algod, testAccount, generateAccount } = localnet.context
103+
const randomAccount = await generateAccount({ initialFunds: (3).algos() })
104+
const senders = [await generateAccount({ initialFunds: (5).algos() }), await generateAccount({ initialFunds: (5).algos() })]
105+
const sender1TxnIds: string[] = []
106+
let sender1TxnIdsfromBatch: string[] = []
107+
const sender2Rounds: number[] = []
108+
let sender2RoundsfromBatch: number[] = []
109+
const { lastTxnRound: firstTxnRound, txIds } = await SendXTransactions(1, testAccount, algod)
110+
const { txIds: txIds1 } = await SendXTransactions(2, senders[0], algod)
111+
const { lastTxnRound, txIds: txIds2, txns: txns2 } = await SendXTransactions(2, senders[1], algod)
112+
const { subscriber, getWatermark } = getSubscriber(
113+
{
114+
testAccount,
115+
initialWatermark: firstTxnRound - 1,
116+
configOverrides: {
117+
maxRoundsToSync: 100,
118+
filters: [
119+
{
120+
name: 'sender1',
121+
filter: {
122+
sender: algokit.getSenderAddress(senders[0]),
123+
},
124+
mapper: (txs) => Promise.resolve(txs.map((t) => t.id)),
125+
},
126+
{
127+
name: 'sender2',
128+
filter: {
129+
sender: algokit.getSenderAddress(senders[1]),
130+
},
131+
mapper: (txs) => Promise.resolve(txs.map((t) => t['confirmed-round']!)),
132+
},
133+
],
134+
},
135+
},
136+
algod,
137+
)
138+
subscriber.onBatch<string>('sender1', (r) => {
139+
sender1TxnIdsfromBatch = r
140+
})
141+
subscriber.on<string>('sender1', (r) => {
142+
sender1TxnIds.push(r)
143+
})
144+
subscriber.onBatch<number>('sender2', (r) => {
145+
sender2RoundsfromBatch = r
146+
})
147+
subscriber.on<number>('sender2', (r) => {
148+
sender2Rounds.push(r)
149+
})
150+
151+
// Initial catch up
152+
const result = await subscriber.pollOnce()
153+
console.log(
154+
`Synced ${result.subscribedTransactions.length} transactions from rounds ${result.syncedRoundRange[0]}-${result.syncedRoundRange[1]} when current round is ${result.currentRound}`,
155+
result.subscribedTransactions.map((t) => t.id),
156+
)
157+
const subscribedTxns = result.subscribedTransactions
158+
expect(subscribedTxns.length).toBe(5)
159+
expect(subscribedTxns[0].id).toBe(txIds[0])
160+
expect(subscribedTxns[1].id).toBe(txIds1[0])
161+
expect(subscribedTxns[2].id).toBe(txIds1[1])
162+
expect(subscribedTxns[3].id).toBe(txIds2[0])
163+
expect(subscribedTxns[4].id).toBe(txIds2[1])
164+
expect(result.currentRound).toBeGreaterThanOrEqual(lastTxnRound)
165+
expect(result.newWatermark).toBe(result.currentRound)
166+
expect(getWatermark()).toBeGreaterThanOrEqual(result.currentRound)
167+
expect(result.syncedRoundRange).toEqual([firstTxnRound, result.currentRound])
168+
expect(result.subscribedTransactions.length).toBe(5)
169+
expect(result.subscribedTransactions.map((t) => t.id)).toEqual(txIds.concat(txIds1, txIds2))
170+
expect(sender1TxnIds).toEqual(txIds1)
171+
expect(sender1TxnIdsfromBatch).toEqual(sender1TxnIds)
172+
expect(sender2Rounds).toEqual(txns2.map((t) => Number(t.confirmation!.confirmedRound!)))
173+
expect(sender2RoundsfromBatch).toEqual(sender2Rounds)
174+
175+
// Random transaction
176+
const { lastTxnRound: lastTxnRound2 } = await SendXTransactions(1, randomAccount, algod)
177+
const result2 = await subscriber.pollOnce()
178+
expect(result2.subscribedTransactions.length).toBe(0)
179+
expect(getWatermark()).toBeGreaterThanOrEqual(lastTxnRound2)
180+
181+
// More subscribed transactions
182+
const { txIds: txIds3 } = await SendXTransactions(1, testAccount, algod)
183+
const { txIds: txIds13 } = await SendXTransactions(2, senders[0], algod)
184+
const { lastTxnRound: lastSubscribedRound3, txIds: txIds23, txns: txns23 } = await SendXTransactions(2, senders[1], algod)
185+
186+
const result3 = await subscriber.pollOnce()
187+
console.log(
188+
`Synced ${result3.subscribedTransactions.length} transactions from rounds ${result3.syncedRoundRange[0]}-${result3.syncedRoundRange[1]} when current round is ${result3.currentRound}`,
189+
result3.subscribedTransactions.map((t) => t.id),
190+
)
191+
const subscribedTxns3 = result3.subscribedTransactions
192+
expect(subscribedTxns3.length).toBe(5)
193+
expect(subscribedTxns3[0].id).toBe(txIds3[0])
194+
expect(subscribedTxns3[1].id).toBe(txIds13[0])
195+
expect(subscribedTxns3[2].id).toBe(txIds13[1])
196+
expect(subscribedTxns3[3].id).toBe(txIds23[0])
197+
expect(subscribedTxns3[4].id).toBe(txIds23[1])
198+
expect(result3.currentRound).toBeGreaterThanOrEqual(lastSubscribedRound3)
199+
expect(result3.newWatermark).toBe(result3.currentRound)
200+
expect(getWatermark()).toBeGreaterThanOrEqual(result3.currentRound)
201+
expect(result3.syncedRoundRange).toEqual([result2.newWatermark + 1, result3.currentRound])
202+
expect(result3.subscribedTransactions.length).toBe(5)
203+
expect(result3.subscribedTransactions.map((t) => t.id)).toEqual(txIds3.concat(txIds13, txIds23))
204+
expect(sender1TxnIds).toEqual(txIds1.concat(txIds13))
205+
expect(sender1TxnIdsfromBatch).toEqual(txIds13)
206+
expect(sender2Rounds).toEqual(
207+
txns2.map((t) => Number(t.confirmation!.confirmedRound!)).concat(txns23.map((t) => Number(t.confirmation!.confirmedRound!))),
208+
)
209+
expect(sender2RoundsfromBatch).toEqual(txns23.map((t) => Number(t.confirmation!.confirmedRound!)))
210+
})
211+
212+
test('Subscribes to transactions at regular intervals when started and can be stopped', async () => {
213+
const { algod, testAccount } = localnet.context
214+
const { lastTxnRound, txIds } = await SendXTransactions(1, testAccount, algod)
215+
const {
216+
subscriber,
217+
subscribedTestAccountTxns: subscribedTxns,
218+
getWatermark,
219+
} = getSubscriber(
220+
{ testAccount, configOverrides: { maxRoundsToSync: 1, frequencyInSeconds: 0.1 }, initialWatermark: lastTxnRound - 1 },
221+
algod,
222+
)
223+
const roundsSynced: number[] = []
224+
225+
console.log('Starting subscriber')
226+
subscriber.start((r) => roundsSynced.push(r.currentRound))
227+
228+
console.log('Waiting for ~0.5s')
229+
await new Promise((resolve) => setTimeout(resolve, 500))
230+
const pollCountBeforeStopping = roundsSynced.length
231+
232+
console.log('Stopping subscriber')
233+
await subscriber.stop('TEST')
234+
const pollCountAfterStopping = roundsSynced.length
235+
236+
// Assert
237+
expect(subscribedTxns.length).toBe(1)
238+
expect(subscribedTxns[0]).toBe(txIds[0])
239+
expect(getWatermark()).toBeGreaterThanOrEqual(lastTxnRound)
240+
// Polling frequency is 0.1s and we waited ~0.5s, LocalNet latency is low so expect 3-7 polls
241+
expect(pollCountBeforeStopping).toBeGreaterThanOrEqual(3)
242+
expect(pollCountBeforeStopping).toBeLessThanOrEqual(7)
243+
// Expect no more than 1 extra poll after we called stop
244+
expect(pollCountAfterStopping - pollCountBeforeStopping).toBeLessThanOrEqual(1)
245+
})
246+
247+
test('Waits until transaction appears by default when started', async () => {
248+
const { algod, testAccount } = localnet.context
249+
const currentRound = (await algod.status().do())['last-round'] as number
250+
const {
251+
subscriber,
252+
subscribedTestAccountTxns: subscribedTxns,
253+
getWatermark,
254+
} = getSubscriber(
255+
{
256+
testAccount,
257+
// Polling for 10s means we are definitely testing the algod waiting works
258+
configOverrides: { frequencyInSeconds: 10, waitForBlockWhenAtTip: true, syncBehaviour: 'sync-oldest' },
259+
initialWatermark: currentRound - 1,
260+
},
261+
algod,
262+
)
263+
const roundsSynced: number[] = []
264+
265+
console.log('Starting subscriber')
266+
subscriber.start((r) => roundsSynced.push(r.currentRound))
267+
268+
console.log('Waiting for up to 2s until subscriber has caught up to tip of chain')
269+
await waitFor(() => roundsSynced.length > 0, 2000)
270+
271+
console.log('Issuing transaction')
272+
const pollCountBeforeIssuing = roundsSynced.length
273+
const { lastTxnRound, txIds } = await SendXTransactions(1, testAccount, algod)
274+
275+
console.log(`Waiting for up to 2s for round ${lastTxnRound} to get processed`)
276+
await waitFor(() => subscribedTxns.length === 1, 5000)
277+
const pollCountAfterIssuing = roundsSynced.length
278+
279+
console.log('Stopping subscriber')
280+
await subscriber.stop('TEST')
281+
282+
// Assert
283+
expect(subscribedTxns.length).toBe(1)
284+
expect(subscribedTxns[0]).toBe(txIds[0])
285+
expect(getWatermark()).toBeGreaterThanOrEqual(lastTxnRound)
286+
// Expect at least 1 poll to have occurred
287+
expect(pollCountAfterIssuing - pollCountBeforeIssuing).toBeGreaterThanOrEqual(1)
288+
})
289+
290+
test('Correctly fires various on* methods', async () => {
291+
const { algod, testAccount, generateAccount } = localnet.context
292+
const randomAccount = await generateAccount({ initialFunds: (3).algos() })
293+
const { txns, txIds } = await SendXTransactions(2, testAccount, algod)
294+
const { txIds: txIds2 } = await SendXTransactions(2, randomAccount, algod)
295+
const initialWatermark = Number(txns[0].confirmation!.confirmedRound!) - 1
296+
const eventsEmitted: string[] = []
297+
let pollComplete = false
298+
const { subscriber } = getSubscriber(
299+
{
300+
testAccount: algokit.randomAccount(),
301+
initialWatermark,
302+
configOverrides: {
303+
maxRoundsToSync: 100,
304+
syncBehaviour: 'sync-oldest',
305+
frequencyInSeconds: 1000,
306+
filters: [
307+
{
308+
name: 'account1',
309+
filter: {
310+
sender: algokit.getSenderAddress(testAccount),
311+
},
312+
},
313+
{
314+
name: 'account2',
315+
filter: {
316+
sender: algokit.getSenderAddress(randomAccount),
317+
},
318+
},
319+
],
320+
},
321+
},
322+
algod,
323+
)
324+
subscriber
325+
.onBatch('account1', (b) => {
326+
eventsEmitted.push(`batch:account1:${b.map((b) => b.id).join(':')}`)
327+
})
328+
.on('account1', (t) => {
329+
eventsEmitted.push(`account1:${t.id}`)
330+
})
331+
.onBatch('account2', (b) => {
332+
eventsEmitted.push(`batch:account2:${b.map((b) => b.id).join(':')}`)
333+
})
334+
.on('account2', (t) => {
335+
eventsEmitted.push(`account2:${t.id}`)
336+
})
337+
.onBeforePoll((metadata) => {
338+
eventsEmitted.push(`before:poll:${metadata.watermark}`)
339+
})
340+
.onPoll((result) => {
341+
eventsEmitted.push(`poll:${result.subscribedTransactions.map((b) => b.id).join(':')}`)
342+
})
343+
344+
subscriber.start((result) => {
345+
eventsEmitted.push(`inspect:${result.subscribedTransactions.map((b) => b.id).join(':')}`)
346+
pollComplete = true
347+
})
348+
349+
console.log('Waiting for up to 2s until subscriber has polled')
350+
await waitFor(() => pollComplete, 2000)
351+
352+
const expectedBatchResult = `${txIds[0]}:${txIds[1]}:${txIds2[0]}:${txIds2[1]}`
353+
expect(eventsEmitted).toEqual([
354+
`before:poll:${initialWatermark}`,
355+
`batch:account1:${txIds[0]}:${txIds[1]}`,
356+
`account1:${txIds[0]}`,
357+
`account1:${txIds[1]}`,
358+
`batch:account2:${txIds2[0]}:${txIds2[1]}`,
359+
`account2:${txIds2[0]}`,
360+
`account2:${txIds2[1]}`,
361+
`inspect:${expectedBatchResult}`,
362+
`poll:${expectedBatchResult}`,
363+
])
364+
await subscriber.stop('TEST')
365+
})
366+
})

0 commit comments

Comments
 (0)