@@ -3,9 +3,10 @@ import WebSocket from 'ws'
33import { BaseWebsocketClient , Args , Options , BaseConnectionHandler } from '@shapeshiftoss/websocket'
44import { MarketDataClient , MarketDataConnectionHandler , MarketDataMessage } from './marketData'
55
6- // TODO: track assets and subscriptionId for clients
76export class CoincapWebsocketClient extends BaseWebsocketClient implements MarketDataClient {
87 clients = new Map < string , BaseConnectionHandler > ( )
8+ // clientId -> subscriptionId -> assets
9+ subscriptions = new Map < string , Map < string , string [ ] > > ( )
910
1011 constructor ( url : string , args : Args , opts ?: Options ) {
1112 super ( url , { logger : args . logger } , opts )
@@ -27,29 +28,71 @@ export class CoincapWebsocketClient extends BaseWebsocketClient implements Marke
2728 }
2829
2930 subscribe ( clientId : string , subscriptionId : string , connection : MarketDataConnectionHandler , assets : Array < string > ) {
30- console . log ( this . clients . size , { subscriptionId, assets } )
3131 if ( ! this . clients . size ) this . connect ( )
3232 this . clients . set ( clientId , connection )
33+
34+ // Get or create the client's subscription map
35+ let clientSubscriptions = this . subscriptions . get ( clientId )
36+ if ( ! clientSubscriptions ) {
37+ clientSubscriptions = new Map < string , string [ ] > ( )
38+ this . subscriptions . set ( clientId , clientSubscriptions )
39+ }
40+
41+ // Add this subscription
42+ clientSubscriptions . set ( subscriptionId , assets )
3343 }
3444
35- unsubscribe ( clientId : string , subscriptionId : string , assets : Array < string > ) {
36- console . log ( { subscriptionId, assets } )
37- if ( ! this . clients . has ( clientId ) ) return
38- this . clients . delete ( clientId )
45+ unsubscribe ( clientId : string , subscriptionId ?: string ) {
46+ const clientSubscriptions = this . subscriptions . get ( clientId )
47+ if ( ! clientSubscriptions ) return
48+
49+ if ( subscriptionId ) {
50+ // Remove specific subscription
51+ clientSubscriptions . delete ( subscriptionId )
52+
53+ // If client has no more subscriptions, remove them entirely
54+ if ( clientSubscriptions . size === 0 ) {
55+ this . clients . delete ( clientId )
56+ this . subscriptions . delete ( clientId )
57+ }
58+ } else {
59+ // Remove all subscriptions for this client
60+ this . clients . delete ( clientId )
61+ this . subscriptions . delete ( clientId )
62+ }
63+
64+ // Close connection if no more clients
3965 if ( ! this . clients . size ) this . socket ?. close ( 1000 )
4066 }
4167
4268 private handleMessage ( message : Record < string , string > ) : void {
4369 for ( const [ clientId , client ] of this . clients ) {
4470 try {
45- const payload : MarketDataMessage = {
46- type : 'price_update' ,
47- source : 'coincap' ,
48- data : message ,
49- timestamp : Date . now ( ) ,
50- }
71+ const clientSubscriptions = this . subscriptions . get ( clientId )
72+ if ( ! clientSubscriptions ) continue
5173
52- client . publish ( clientId , payload )
74+ // Send updates for each subscription
75+ for ( const [ subscriptionId , assets ] of clientSubscriptions ) {
76+ // Filter data to only include assets this subscription requested
77+ const filteredData : Record < string , string > = { }
78+ for ( const asset of assets ) {
79+ if ( message [ asset ] !== undefined ) {
80+ filteredData [ asset ] = message [ asset ]
81+ }
82+ }
83+
84+ // Only send if there's relevant data for this subscription
85+ if ( Object . keys ( filteredData ) . length > 0 ) {
86+ const payload : MarketDataMessage = {
87+ type : 'price_update' ,
88+ source : 'coincap' ,
89+ data : filteredData ,
90+ timestamp : Date . now ( ) ,
91+ }
92+
93+ client . publish ( subscriptionId , payload )
94+ }
95+ }
5396 } catch ( error ) {
5497 this . logger . error ( { clientId, error } , 'failed to handle message' )
5598 }
0 commit comments