@@ -6,34 +6,79 @@ import {
66 useGetEventStream ,
77} from "@squonk/account-server-client/event-stream" ;
88
9+ import dayjs from "dayjs" ;
10+ import utc from "dayjs/plugin/utc" ;
911import { useAtom } from "jotai" ;
1012import { useSnackbar } from "notistack" ;
1113
1214import { useASAuthorizationStatus } from "../../hooks/useIsAuthorized" ;
13- import { getMessageFromEvent , protoBlobToText } from "../../protobuf/protobuf" ;
14- import { eventStreamEnabledAtom } from "../../state/eventStream" ;
15+ import { getMessageFromEvent } from "../../protobuf/protobuf" ;
16+ import {
17+ eventStreamEnabledAtom ,
18+ useEventStream ,
19+ webSocketStatusAtom ,
20+ } from "../../state/eventStream" ;
21+ import { useUnreadEventCount } from "../../state/notifications" ;
1522import { EventMessage } from "../eventMessages/EventMessage" ;
1623import { useIsEventStreamInstalled } from "./useIsEventStreamInstalled" ;
1724
25+ dayjs . extend ( utc ) ;
26+
27+ /**
28+ * Builds WebSocket URL
29+ */
30+ const buildWebSocketUrl = ( location : string ) : string => {
31+ const url = new URL ( location ) ;
32+ url . protocol = "wss:" ;
33+
34+ // Add ordinal parameter to get all historical messages
35+ url . searchParams . set ( "stream_from_ordinal" , "1" ) ;
36+
37+ return url . toString ( ) ;
38+ } ;
39+
40+ /**
41+ * Manages WebSocket connection for event stream and displays toast notifications
42+ */
1843export const EventStream = ( ) => {
1944 const isEventStreamInstalled = useIsEventStreamInstalled ( ) ;
2045 const [ location , setLocation ] = useState < string | null > ( null ) ;
2146 const { enqueueSnackbar } = useSnackbar ( ) ;
47+ const { incrementCount } = useUnreadEventCount ( ) ;
2248 const asRole = useASAuthorizationStatus ( ) ;
49+ const { addEvent, isEventNewerThanSession, initializeSession } = useEventStream ( ) ;
2350
2451 const { data, error : streamError } = useGetEventStream ( {
2552 query : { select : ( data ) => data . location , enabled : ! ! asRole && isEventStreamInstalled } ,
2653 } ) ;
54+
2755 const { mutate : createEventStream } = useCreateEventStream ( {
28- mutation : {
29- onSuccess : ( eventStreamResponse ) => {
30- setLocation ( eventStreamResponse . location ) ;
31- } ,
32- } ,
56+ mutation : { onSuccess : ( eventStreamResponse ) => setLocation ( eventStreamResponse . location ) } ,
3357 } ) ;
58+
3459 const [ eventStreamEnabled ] = useAtom ( eventStreamEnabledAtom ) ;
60+ const [ , setWebSocketStatus ] = useAtom ( webSocketStatusAtom ) ;
61+
62+ const handleWebSocketMessage = useCallback (
63+ ( event : MessageEvent ) => {
64+ const message = getMessageFromEvent ( JSON . parse ( event . data ) ) ;
65+
66+ if (
67+ message &&
68+ addEvent ( message ) && // Only show toast for events newer than session start
69+ isEventNewerThanSession ( message )
70+ ) {
71+ enqueueSnackbar ( < EventMessage message = { message } /> , {
72+ variant : "default" ,
73+ anchorOrigin : { horizontal : "right" , vertical : "bottom" } ,
74+ autoHideDuration : 10_000 ,
75+ } ) ;
76+ incrementCount ( ) ;
77+ }
78+ } ,
79+ [ enqueueSnackbar , incrementCount , addEvent , isEventNewerThanSession ] ,
80+ ) ;
3581
36- // Define callbacks *before* useWebSocket hook
3782 const handleWebSocketOpen = useCallback ( ( ) => {
3883 enqueueSnackbar ( "Connected to event stream" , {
3984 variant : "success" ,
@@ -43,19 +88,14 @@ export const EventStream = () => {
4388
4489 const handleWebSocketClose = useCallback (
4590 ( event : CloseEvent ) => {
46- console . log ( event ) ;
47- if ( event . wasClean ) {
48- enqueueSnackbar ( "Disconnected from event stream" , {
49- variant : "info" ,
50- anchorOrigin : { horizontal : "right" , vertical : "bottom" } ,
51- } ) ;
52- } else {
53- console . warn ( "EventStream: WebSocket closed unexpectedly." ) ;
54- enqueueSnackbar ( "Event stream disconnected unexpectedly. Attempting to reconnect..." , {
55- variant : "warning" ,
56- anchorOrigin : { horizontal : "right" , vertical : "bottom" } ,
57- } ) ;
58- }
91+ const message = event . wasClean
92+ ? "Disconnected from event stream"
93+ : "Event stream disconnected unexpectedly. Attempting to reconnect..." ;
94+
95+ enqueueSnackbar ( message , {
96+ variant : event . wasClean ? "info" : "warning" ,
97+ anchorOrigin : { horizontal : "right" , vertical : "bottom" } ,
98+ } ) ;
5999 } ,
60100 [ enqueueSnackbar ] ,
61101 ) ;
@@ -67,42 +107,10 @@ export const EventStream = () => {
67107 } ) ;
68108 } , [ enqueueSnackbar ] ) ;
69109
70- const handleWebSocketMessage = useCallback (
71- ( event : MessageEvent ) => {
72- if ( event . data instanceof Blob ) {
73- protoBlobToText ( event . data )
74- . then ( ( textData ) => {
75- const message = getMessageFromEvent ( textData ) ;
76- if ( message ) {
77- enqueueSnackbar ( < EventMessage message = { message } /> , {
78- variant : "default" ,
79- anchorOrigin : { horizontal : "right" , vertical : "bottom" } ,
80- autoHideDuration : 10_000 ,
81- } ) ;
82- } else {
83- console . warn (
84- "Received event data could not be parsed into a known message type:" ,
85- textData ,
86- ) ;
87- }
88- } )
89- . catch ( ( error ) => {
90- console . error ( "Error processing protobuf message:" , error ) ;
91- enqueueSnackbar ( "Error processing incoming event" , {
92- variant : "error" ,
93- anchorOrigin : { horizontal : "right" , vertical : "bottom" } ,
94- } ) ;
95- } ) ;
96- } else {
97- console . warn ( "Received non-Blob WebSocket message:" , event . data ) ;
98- }
99- } ,
100- [ enqueueSnackbar ] ,
101- ) ;
102-
103- const wsUrl = eventStreamEnabled && asRole ? ( location ?. replace ( "ws" , "wss" ) ?? null ) : null ;
110+ // Build WebSocket URL
111+ const wsUrl = eventStreamEnabled && asRole && location ? buildWebSocketUrl ( location ) : null ;
104112
105- useWebSocket ( wsUrl , {
113+ const { readyState } = useWebSocket ( wsUrl , {
106114 onOpen : handleWebSocketOpen ,
107115 onClose : handleWebSocketClose ,
108116 onError : handleWebSocketError ,
@@ -113,7 +121,11 @@ export const EventStream = () => {
113121 reconnectInterval : 3000 ,
114122 } ) ;
115123
116- // Effects can now safely use the hook results or return early based on auth
124+ // Expose connection status for status indicator
125+ useEffect ( ( ) => {
126+ setWebSocketStatus ( readyState ) ;
127+ } , [ readyState , setWebSocketStatus ] ) ;
128+
117129 useEffect ( ( ) => {
118130 if ( asRole && data ) {
119131 setLocation ( data ) ;
@@ -126,5 +138,10 @@ export const EventStream = () => {
126138 }
127139 } , [ asRole , streamError , createEventStream ] ) ;
128140
141+ // Initialize session on client side only
142+ useEffect ( ( ) => {
143+ initializeSession ( ) ;
144+ } , [ initializeSession ] ) ;
145+
129146 return null ;
130147} ;
0 commit comments