Skip to content

Commit 9d0c4c8

Browse files
committed
Standardised communication ion the coin indexer to be simulat to the other workers
Signed-off-by: Robert Gogete <gogeterobert@yahoo.com>
1 parent 050baab commit 9d0c4c8

File tree

5 files changed

+83
-100
lines changed

5 files changed

+83
-100
lines changed

src/application/workers/CoinIndexer/CoinIndexer.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { spawn, Worker, Thread } from 'threads';
33
import { IWorker } from '../IWorker';
44
import {
55
CoinIndexerEventNames,
6+
CoinIndexerEvents,
67
CoinStateUpdatedEvent,
78
} from './CoinIndexerEvents';
89

@@ -16,7 +17,7 @@ interface ICoinIndexer extends IWorker {
1617
}
1718

1819
export class CoinIndexer
19-
extends EventEmitter
20+
extends (EventEmitter as { new (): CoinIndexerEvents })
2021
implements ICoinIndexer
2122
{
2223
private worker: import('threads').ModuleThread<CoinIndexerWorkerApi> | null = null;
@@ -54,8 +55,10 @@ export class CoinIndexer
5455
)) as import('threads').ModuleThread<CoinIndexerWorkerApi>;
5556
}
5657

57-
this.worker.onCoinStateUpdated().subscribe((coinState: CoinStateUpdatedEvent) => {
58-
this.emit(CoinIndexerEventNames.CoinStateUpdated, coinState);
58+
this.worker.onCoinStateUpdated().subscribe({
59+
next: (coinState: CoinStateUpdatedEvent) => {
60+
this.emit(CoinIndexerEventNames.CoinStateUpdated, coinState);
61+
},
5962
});
6063

6164
try {

src/application/workers/CoinIndexer/CoinIndexer.worker.logic.ts

Lines changed: 39 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import Database from 'better-sqlite3';
2-
import { CoinIndexerEvents, CoinStateUpdatedEvent } from './CoinIndexerEvents';
2+
import { Observable } from 'observable-fns';
3+
import { CoinStateUpdatedEvent } from './CoinIndexerEvents';
34
import { CoinRepository, CoinRow } from '../../repositories/CoinRepository';
45
import { WalletRepository, WalletRow } from '../../repositories/WalletRepository';
56
import { IBlockchainService } from '../../interfaces/IBlockChainService';
@@ -11,11 +12,13 @@ import { ChiaBlockchainService } from '../../../infrastructure/BlockchainService
1112
let db: Database.Database | null = null;
1213
let coinRepo: CoinRepository | null = null;
1314
let walletRepo: WalletRepository | null = null;
14-
let events: CoinIndexerEvents | null = null;
1515
let started = false;
1616
let intervalId: NodeJS.Timeout | null = null;
1717
let blockchainService: IBlockchainService | null = null;
1818

19+
let coinStateObservable: Observable<CoinStateUpdatedEvent> | null = null;
20+
let coinStateObserver: ((event: CoinStateUpdatedEvent) => void) | null = null;
21+
1922
function mapUnspentCoinToDbFields(
2023
coin: Coin,
2124
wallet_id: string,
@@ -34,7 +37,7 @@ function mapUnspentCoinToDbFields(
3437
}
3538

3639
async function sync() {
37-
if (!coinRepo || !walletRepo || !events || !blockchainService) return;
40+
if (!coinRepo || !walletRepo || !blockchainService) return;
3841
const wallets: WalletRow[] = walletRepo.getWallets();
3942

4043
let peer: Peer | null = null; // TODO Update this by getting a value from PeerCluster from datalayer driver
@@ -54,12 +57,14 @@ async function sync() {
5457
wallet.synced_to_height || 0,
5558
);
5659
coinRepo.upsertCoin(wallet.address, mapped);
57-
events.emitCoinStateUpdated({
58-
wallet_id: wallet.address,
59-
coinId: mapped.coinId,
60-
status: 'unspent',
61-
synced_height: wallet.synced_to_height || 0,
62-
});
60+
if (coinStateObserver) {
61+
coinStateObserver({
62+
wallet_id: wallet.address,
63+
coinId: mapped.coinId,
64+
status: 'unspent',
65+
synced_height: wallet.synced_to_height || 0,
66+
});
67+
}
6368
}
6469
}
6570
// Check pending coins
@@ -73,23 +78,24 @@ async function sync() {
7378
);
7479
if (!spendable) {
7580
coinRepo.updateCoinStatus(coin.wallet_id, coin.coinId, 'spent', coin.synced_height);
76-
events.emitCoinStateUpdated({
77-
wallet_id: coin.wallet_id,
78-
coinId: coin.coinId,
79-
status: 'spent',
80-
synced_height: coin.synced_height,
81-
});
81+
if (coinStateObserver) {
82+
coinStateObserver({
83+
wallet_id: coin.wallet_id,
84+
coinId: coin.coinId,
85+
status: 'spent',
86+
synced_height: coin.synced_height,
87+
});
88+
}
8289
}
8390
}
8491
}
8592

8693
export const api = {
87-
async start(_blockchainType: string, dbPath: string = './coin_indexer.sqlite') {
94+
async start(_blockchainType: BlockChainType, dbPath: string = './coin_indexer.sqlite') {
8895
if (started) return;
8996
db = new Database(dbPath);
9097
coinRepo = new CoinRepository(db);
9198
walletRepo = new WalletRepository(db);
92-
events = new CoinIndexerEvents();
9399

94100
switch (_blockchainType) {
95101
case BlockChainType.Test:
@@ -101,7 +107,6 @@ export const api = {
101107
break;
102108
}
103109

104-
// peer = ... (should be set externally or passed in)
105110
started = true;
106111
intervalId = setInterval(sync, 1000);
107112
},
@@ -111,20 +116,31 @@ export const api = {
111116
db = null;
112117
coinRepo = null;
113118
walletRepo = null;
114-
events = null;
115119
blockchainService = null;
120+
coinStateObservable = null;
121+
coinStateObserver = null;
116122
},
117-
onCoinStateUpdated(listener: (event: CoinStateUpdatedEvent) => void) {
118-
if (!events) throw new Error('CoinIndexerEvents not initialized');
119-
events.onCoinStateUpdated(listener);
123+
onCoinStateUpdated() {
124+
if (!coinStateObservable) {
125+
coinStateObservable = new Observable<CoinStateUpdatedEvent>((observer) => {
126+
coinStateObserver = (event: CoinStateUpdatedEvent) => {
127+
observer.next(event);
128+
};
129+
return () => {
130+
coinStateObserver = null;
131+
};
132+
});
133+
}
134+
return coinStateObservable;
120135
},
121136
__reset() {
122137
if (intervalId) clearInterval(intervalId);
123138
db = null;
124139
coinRepo = null;
125140
walletRepo = null;
126-
events = null;
127141
blockchainService = null;
128142
started = false;
143+
coinStateObservable = null;
144+
coinStateObserver = null;
129145
},
130146
};
Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
import { EventEmitter } from 'events';
2-
31
export enum CoinIndexerEventNames {
4-
CoinStateUpdated = 'CoinStateUpdated',
2+
CoinStateUpdated = 'coinStateUpdated',
53
}
64

75
export interface CoinStateUpdatedEvent {
@@ -11,11 +9,7 @@ export interface CoinStateUpdatedEvent {
119
synced_height: number;
1210
}
1311

14-
export class CoinIndexerEvents extends EventEmitter {
15-
emitCoinStateUpdated(event: CoinStateUpdatedEvent) {
16-
this.emit(CoinIndexerEventNames.CoinStateUpdated, event);
17-
}
18-
onCoinStateUpdated(listener: (event: CoinStateUpdatedEvent) => void) {
19-
this.on(CoinIndexerEventNames.CoinStateUpdated, listener);
20-
}
12+
export interface CoinIndexerEvents {
13+
on(event: CoinIndexerEventNames.CoinStateUpdated, listener: (event: CoinStateUpdatedEvent) => void): this;
14+
emit(event: CoinIndexerEventNames.CoinStateUpdated, eventData: CoinStateUpdatedEvent): boolean;
2115
}

test/application/workers/BlockIndexer/BlockIndexer.worker.logic.test.ts

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,6 @@ import Database from 'better-sqlite3';
33
import { BlockChainType } from '../../../../src/application/types/BlockChain';
44
import fs from 'fs';
55

6-
// Mock BlockchainService for unit tests
7-
class MockBlockchainService {
8-
private blocks: any[] = [];
9-
constructor(blocks: any[] = []) {
10-
this.blocks = blocks;
11-
}
12-
async getCurrentBlockchainHeight() {
13-
return this.blocks.length;
14-
}
15-
async getBlockchainBlockByHeight(h: number) {
16-
return this.blocks[h - 1] || null;
17-
}
18-
}
19-
206
describe('BlockIndexer.worker.logic api', () => {
217
const dbPath = 'test_blockindexer_worker_logic.sqlite';
228

Lines changed: 34 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,46 @@
11
import Database from 'better-sqlite3';
22
import { api as coinIndexerApi } from '../../../../src/application/workers/CoinIndexer/CoinIndexer.worker.logic';
3-
import { CoinStateUpdatedEvent } from '../../../../src/application/workers/CoinIndexer/CoinIndexerEvents';
4-
import { CoinRepository } from '../../../../src/application/repositories/CoinRepository';
5-
import { WalletRepository } from '../../../../src/application/repositories/WalletRepository';
6-
import type { IBlockchainService } from '../../../../src/application/interfaces/IBlockChainService';
7-
import type { Peer, Coin } from '@dignetwork/datalayer-driver';
3+
import { BlockChainType } from '../../../../src/application/types/BlockChain';
4+
import { existsSync, unlinkSync } from 'fs';
85

9-
describe('CoinIndexer.worker.logic', () => {
10-
const dbPath = ':memory:';
11-
let db: Database.Database;
12-
let coinRepo: CoinRepository;
13-
let walletRepo: WalletRepository;
14-
let mockService: IBlockchainService;
15-
let mockPeer: Peer;
6+
const dbPath = 'test_coinindexer_worker_logic.sqlite';
167

17-
beforeEach(async () => {
18-
coinIndexerApi.__reset();
19-
db = new Database(dbPath);
20-
coinRepo = new CoinRepository(db);
21-
walletRepo = new WalletRepository(db);
22-
// Add a wallet for tests
23-
walletRepo.addWallet('xch1234');
24-
// Mock blockchain service and peer
25-
mockService = {
26-
listUnspentCoins: jest.fn().mockResolvedValue({ coins: [] }),
27-
isCoinSpendable: jest.fn().mockResolvedValue(true),
28-
// ...other methods can be no-ops
29-
} as any;
30-
mockPeer = {} as Peer;
31-
// If needed, inject mockService and mockPeer into the worker here
32-
await coinIndexerApi.start('Test', dbPath);
8+
describe('CoinIndexer.worker.logic api', () => {
9+
beforeAll(() => {
10+
if (existsSync(dbPath)) unlinkSync(dbPath);
3311
});
34-
12+
3513
afterEach(() => {
36-
coinIndexerApi.stop();
14+
try { new Database(dbPath).close(); } catch {}
15+
});
16+
17+
it('should create the database file after start', async () => {
3718
coinIndexerApi.__reset();
19+
if (existsSync(dbPath)) unlinkSync(dbPath);
20+
await coinIndexerApi.start(BlockChainType.Test, dbPath);
21+
expect(existsSync(dbPath)).toBe(true);
22+
// Check table exists (should be named 'coin' not 'coins')
23+
const db = new Database(dbPath);
24+
const tables = db.prepare("SELECT name FROM sqlite_master WHERE type='table' AND name='coin'").get();
25+
expect(tables).toBeDefined();
3826
db.close();
27+
coinIndexerApi.stop();
28+
});
29+
30+
it('should not start twice', async () => {
31+
coinIndexerApi.__reset();
32+
await coinIndexerApi.start(BlockChainType.Test, dbPath);
33+
await coinIndexerApi.start(BlockChainType.Test, dbPath); // should not throw
34+
coinIndexerApi.stop();
3935
});
4036

41-
it('should emit CoinStateUpdated event on sync', async () => {
42-
// Arrange: mock listUnspentCoins to return a coin
43-
const coin: Coin = {
44-
coin_id: Buffer.from('aabbcc', 'hex'),
45-
parentCoinInfo: Buffer.from('ddeeff', 'hex'),
46-
puzzleHash: Buffer.from('112233', 'hex'),
47-
amount: BigInt(1000),
48-
} as any;
49-
(mockService.listUnspentCoins as jest.Mock).mockResolvedValue({ coins: [coin] });
50-
walletRepo.updateWalletSync('xch1234', 10, 'abc');
51-
const eventPromise = new Promise<void>((resolve) => {
52-
coinIndexerApi.onCoinStateUpdated((event: CoinStateUpdatedEvent) => {
53-
expect(event.wallet_id).toBe('xch1234');
54-
expect(event.status).toBe('unspent');
55-
resolve();
56-
});
57-
});
58-
// Wait for the interval to trigger sync (simulate passage of time)
59-
await new Promise((r) => setTimeout(r, 1100));
60-
await eventPromise;
37+
it('should stop and reset', async () => {
38+
coinIndexerApi.__reset();
39+
await coinIndexerApi.start(BlockChainType.Test, dbPath);
40+
coinIndexerApi.stop();
41+
coinIndexerApi.__reset();
42+
// Should be able to start again
43+
await coinIndexerApi.start(BlockChainType.Test, dbPath);
44+
coinIndexerApi.stop();
6145
});
6246
});

0 commit comments

Comments
 (0)