Skip to content

Commit 18f3fe7

Browse files
feat: Optimized Socket Connection and Too many socket connection prev… (#1003)
…ention
2 parents 46239a1 + 66ec9e9 commit 18f3fe7

File tree

7 files changed

+15610
-11396
lines changed

7 files changed

+15610
-11396
lines changed

apps/api/src/app/shared/services/websocket-service/websocket.service.ts

Lines changed: 1 addition & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,6 @@ export class WebSocketService implements OnGatewayConnection, OnGatewayDisconnec
110110
});
111111
}
112112

113-
// Get abort signal for a session
114-
getSessionAbortSignal(sessionId: string): AbortSignal | undefined {
115-
return this.sessionAbortControllers.get(sessionId)?.signal;
116-
}
117-
118113
// Use arrow functions to automatically bind 'this'
119114
sendProgress = (sessionId: string, progressData: IProgressData) => {
120115
if (!this.server) {
@@ -145,7 +140,7 @@ export class WebSocketService implements OnGatewayConnection, OnGatewayDisconnec
145140
error,
146141
timestamp: new Date().toISOString(),
147142
});
148-
this.logger.error(` Error sent to session ${sessionId}: ${error}`);
143+
this.logger.error(` Error sent to session ${sessionId}: ${error}`);
149144
} catch (err) {
150145
this.logger.error(`Failed to send error to session ${sessionId}:`, err);
151146
} finally {
@@ -172,38 +167,4 @@ export class WebSocketService implements OnGatewayConnection, OnGatewayDisconnec
172167
this.sessionAbortControllers.delete(sessionId);
173168
}
174169
};
175-
176-
// Helper method to check if a session has connected clients
177-
hasClientsInSession = (sessionId: string): boolean => {
178-
const room = this.server.sockets.adapter.rooms.get(sessionId);
179-
180-
return room && room.size > 0;
181-
};
182-
183-
// Method to get session info for debugging
184-
getSessionInfo = (sessionId: string) => {
185-
const room = this.server.sockets.adapter.rooms.get(sessionId);
186-
const hasAbortController = this.sessionAbortControllers.has(sessionId);
187-
const isAborted = this.sessionAbortControllers.get(sessionId)?.signal.aborted;
188-
189-
return {
190-
sessionId,
191-
clientCount: room ? room.size : 0,
192-
hasClients: room && room.size > 0,
193-
hasAbortController,
194-
isAborted,
195-
};
196-
};
197-
198-
// Method to manually abort all sessions (useful for cleanup)
199-
abortAllSessions() {
200-
this.logger.log('Aborting all active sessions');
201-
this.sessionAbortControllers.forEach((controller, sessionId) => {
202-
if (!controller.signal.aborted) {
203-
controller.abort();
204-
this.logger.log(`Aborted session ${sessionId}`);
205-
}
206-
});
207-
this.sessionAbortControllers.clear();
208-
}
209170
}

apps/widget/src/components/widget/Phases/AutoImport/AutoImportPhase1/AutoImportPhase1.tsx

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,35 @@
1-
/* eslint-disable @typescript-eslint/no-unused-vars */
2-
/* eslint-disable no-unused-vars */
31
import { Stack, TextInput, Text } from '@mantine/core';
4-
import { PhasesEnum } from '@types';
52
import { validateRssUrl } from '@util';
3+
import { PhasesEnum } from '@types';
64
import { Footer } from 'components/Common/Footer';
75
import { useAutoImportPhase1 } from '@hooks/AutoImportPhase1/useAutoImportPhase1';
86
import { ProgressRing } from './ProgressRing';
9-
import { ConfirmModal } from 'components/widget/modals/ConfirmModal';
10-
import { WIDGET_TEXTS } from '@impler/client';
11-
import { useState, useCallback, useEffect } from 'react';
7+
import { colors } from '@config';
128

139
interface IAutoImportPhase1Props {
1410
onNextClick: () => void;
1511
onCloseClick: () => void;
1612
onRssParsingStart?: () => void;
1713
onRssParsingEnd?: () => void;
1814
onRegisterAbortFunction?: (abortFn: () => void) => void;
15+
onRegisterDisconnectFunction?: (disconnectFn: () => void) => void;
1916
}
2017

2118
export function AutoImportPhase1({
2219
onNextClick,
2320
onRssParsingStart,
2421
onRssParsingEnd,
2522
onRegisterAbortFunction,
23+
onRegisterDisconnectFunction,
2624
}: IAutoImportPhase1Props) {
27-
const { isGetRssXmlHeadingsLoading, progressPercentage, register, errors, onSubmit, abortOperation, canAbort } =
28-
useAutoImportPhase1({
29-
goNext: onNextClick,
30-
});
31-
32-
// Register the abort function with the parent component
33-
useEffect(() => {
34-
if (canAbort && abortOperation && onRegisterAbortFunction) {
35-
onRegisterAbortFunction(abortOperation);
36-
}
37-
}, [canAbort, abortOperation, onRegisterAbortFunction]);
38-
39-
// Notify parent component about RSS parsing status
40-
useEffect(() => {
41-
if (isGetRssXmlHeadingsLoading) {
42-
onRssParsingStart?.();
43-
} else {
44-
onRssParsingEnd?.();
45-
}
46-
}, [isGetRssXmlHeadingsLoading, onRssParsingStart, onRssParsingEnd]);
25+
const { isGetRssXmlHeadingsLoading, progressPercentage, register, errors, onSubmit, texts } = useAutoImportPhase1({
26+
goNext: onNextClick,
27+
onRegisterAbortFunction,
28+
onRegisterDisconnectFunction,
29+
onRssParsingStart,
30+
onRssParsingEnd,
31+
});
4732

48-
// Show form view when not processing
4933
return (
5034
<Stack spacing="xs" style={{ height: '100%', justifyContent: 'space-between' }}>
5135
<form onSubmit={onSubmit}>
@@ -70,6 +54,9 @@ export function AutoImportPhase1({
7054
{isGetRssXmlHeadingsLoading && (
7155
<Stack align="center" justify="center" spacing="xl">
7256
<ProgressRing percentage={progressPercentage} />
57+
<Text fw="bolder" color={colors.StrokeLight}>
58+
{texts.AUTOIMPORT_PHASE1.CLOSE_CONFIRMATION.TITLE}
59+
</Text>
7360
</Stack>
7461
)}
7562

apps/widget/src/components/widget/Widget.tsx

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ export function Widget() {
3737

3838
// Add ref to store abort function from AutoImportPhase1
3939
const abortRssOperationRef = useRef<(() => void) | null>(null);
40+
const disconnectSocketRef = useRef<(() => void) | null>(null);
4041

4142
const {
4243
flow,
@@ -63,25 +64,40 @@ export function Widget() {
6364
abortRssOperationRef.current();
6465
}
6566

67+
if (promptContinueAction === PromptModalTypesEnum.CLOSE && disconnectSocketRef.current) {
68+
disconnectSocketRef.current();
69+
}
70+
6671
terminateUpload();
72+
6773
setPromptContinueAction(undefined);
6874
if (uploadInfo._id) ParentWindow.UploadTerminated({ uploadId: uploadInfo._id });
6975
if (promptContinueAction === PromptModalTypesEnum.CLOSE) closeWidget();
7076
};
7177

78+
// Add callback to register disconnect function from AutoImportPhase1
79+
const handleRegisterDisconnectFunction = useCallback((disconnectFn: () => void) => {
80+
if (promptContinueAction === PromptModalTypesEnum.CLOSE) {
81+
console.log('disconnectFn');
82+
}
83+
disconnectSocketRef.current = disconnectFn;
84+
}, []);
85+
7286
const onPromptCancel = () => {
7387
setPromptContinueAction(undefined);
7488
};
7589

7690
const onClose = () => {
7791
let isImportNotOnProgress = false;
7892

93+
// If RSS is actively parsing, show confirmation dialog but DON'T disconnect socket yet
7994
if (flow === FlowsEnum.AUTO_IMPORT && phase === PhasesEnum.CONFIGURE && isRssParsing) {
8095
setPromptContinueAction(PromptModalTypesEnum.CLOSE);
8196

8297
return;
8398
}
8499

100+
// For other AUTO_IMPORT phases
85101
if (flow === FlowsEnum.AUTO_IMPORT)
86102
isImportNotOnProgress = [PhasesEnum.CONFIGURE, PhasesEnum.CONFIRM].includes(phase);
87103
else if (flow == FlowsEnum.MANUAL_ENTRY)
@@ -95,10 +111,13 @@ export function Widget() {
95111
].includes(phase);
96112

97113
if (isImportNotOnProgress) {
114+
if (disconnectSocketRef.current) disconnectSocketRef.current();
98115
setPhase(PhasesEnum.VALIDATE);
99116
resetAppState();
100117
closeWidget();
101-
} else setPromptContinueAction(PromptModalTypesEnum.CLOSE);
118+
} else {
119+
setPromptContinueAction(PromptModalTypesEnum.CLOSE);
120+
}
102121
};
103122

104123
const closeWidget = () => {
@@ -108,6 +127,8 @@ export function Widget() {
108127
setIsRssParsing(false);
109128
// Clear the abort function reference
110129
abortRssOperationRef.current = null;
130+
// Clear the disconnect function reference
131+
disconnectSocketRef.current = null;
111132
setTimeout(() => {
112133
ParentWindow.Close();
113134
}, variables.closeDelayInMS);
@@ -119,6 +140,7 @@ export function Widget() {
119140
setPhase(PhasesEnum.VALIDATE);
120141
setIsRssParsing(false); // Reset RSS parsing state
121142
abortRssOperationRef.current = null; // Clear abort function reference
143+
disconnectSocketRef.current = null; // Clear disconnect function reference
122144
};
123145

124146
const onImportJobCreated = (jobInfo: IUserJob) => {
@@ -164,6 +186,7 @@ export function Widget() {
164186
onRssParsingStart={() => setIsRssParsing(true)}
165187
onRssParsingEnd={() => setIsRssParsing(false)}
166188
onRegisterAbortFunction={handleRegisterAbortFunction}
189+
onRegisterDisconnectFunction={handleRegisterDisconnectFunction}
167190
/>
168191
),
169192
[PhasesEnum.MAPCOLUMNS]: <AutoImportPhase2 texts={texts} onNextClick={() => setPhase(PhasesEnum.SCHEDULE)} />,
@@ -210,6 +233,7 @@ export function Widget() {
210233
setPhase(PhasesEnum.VALIDATE);
211234
setIsRssParsing(false); // Reset RSS parsing state when widget is hidden
212235
abortRssOperationRef.current = null; // Clear abort function reference
236+
disconnectSocketRef.current = null; // Clear disconnect function reference
213237
}
214238
}, [showWidget]);
215239

apps/widget/src/hooks/AutoImportPhase1/useAutoImportPhase1.tsx

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import { useMutation } from '@tanstack/react-query';
2-
import { useForm, SubmitHandler } from 'react-hook-form';
31
import { useCallback, useEffect, useMemo, useRef } from 'react';
2+
import { useForm, SubmitHandler } from 'react-hook-form';
3+
import { useMutation } from '@tanstack/react-query';
44

55
import { generateSessionId, notifier } from '@util';
66
import { IAutoImportValues, IErrorData } from '@types';
@@ -13,16 +13,26 @@ import { useWebSocketProgress } from '@hooks/AutoImportPhase1/useWebSocketProgre
1313

1414
interface IUseAutoImportPhase1Props {
1515
goNext: () => void;
16+
onRegisterAbortFunction?: (abortFn: () => void) => void;
17+
onRegisterDisconnectFunction?: (disconnectFn: () => void) => void;
18+
onRssParsingStart?: () => void;
19+
onRssParsingEnd?: () => void;
1620
}
1721

1822
interface FormValues {
1923
rssUrl: string;
2024
}
2125

22-
export function useAutoImportPhase1({ goNext }: IUseAutoImportPhase1Props) {
26+
export function useAutoImportPhase1({
27+
goNext,
28+
onRegisterAbortFunction,
29+
onRegisterDisconnectFunction,
30+
onRssParsingStart,
31+
onRssParsingEnd,
32+
}: IUseAutoImportPhase1Props) {
2333
const { api } = useAPIState();
2434
const { setJobsInfo } = useJobsInfo();
25-
const { output, schema } = useAppState();
35+
const { output, schema, texts } = useAppState();
2636
const { templateId, extra, authHeaderValue } = useImplerState();
2737

2838
const webSocketSessionIdRef = useRef<string | null>(null);
@@ -62,13 +72,22 @@ export function useAutoImportPhase1({ goNext }: IUseAutoImportPhase1Props) {
6272
}, []);
6373

6474
// Initialize WebSocket connection
65-
const { isConnected, progressData, completionData, errorData, joinSession, leaveSession, socket, clearProgress } =
66-
useWebSocketProgress({
67-
onCompletion: handleCompletion,
68-
onError: handleError,
69-
onConnectionChange: handleConnectionChange,
70-
autoConnect: true,
71-
});
75+
const {
76+
isConnected,
77+
progressData,
78+
completionData,
79+
errorData,
80+
joinSession,
81+
leaveSession,
82+
socket,
83+
disconnect,
84+
clearProgress,
85+
} = useWebSocketProgress({
86+
onCompletion: handleCompletion,
87+
onError: handleError,
88+
onConnectionChange: handleConnectionChange,
89+
autoConnect: true,
90+
});
7291

7392
// Mutation for RSS XML processing
7493
const { isLoading: isGetRssXmlHeadingsLoading, mutate: getRssXmlHeading } = useMutation<
@@ -189,21 +208,24 @@ export function useAutoImportPhase1({ goNext }: IUseAutoImportPhase1Props) {
189208
return 0;
190209
}, [progressData?.percentage]);
191210

192-
const handleCleanup = useCallback(() => {
193-
// First abort the session if it's running
194-
if (canAbort && webSocketSessionIdRef.current) {
195-
abortOperation();
211+
useEffect(() => {
212+
if (canAbort && abortOperation && onRegisterAbortFunction) {
213+
onRegisterAbortFunction(abortOperation);
196214
}
215+
}, [canAbort, abortOperation, onRegisterAbortFunction]);
197216

198-
// Then do additional cleanup
199-
if (socket && socket.connected) {
200-
socket.disconnect();
217+
// Register the disconnect function with the parent component
218+
useEffect(() => {
219+
if (isGetRssXmlHeadingsLoading) {
220+
onRssParsingStart?.();
221+
} else {
222+
onRssParsingEnd?.();
201223
}
202-
if (webSocketSessionIdRef.current) {
203-
leaveSession(webSocketSessionIdRef.current);
204-
webSocketSessionIdRef.current = null;
224+
225+
if (disconnect && onRegisterDisconnectFunction) {
226+
onRegisterDisconnectFunction(disconnect);
205227
}
206-
}, [socket, webSocketSessionIdRef, leaveSession, abortOperation, canAbort]);
228+
}, [isGetRssXmlHeadingsLoading, onRssParsingStart, onRssParsingEnd, disconnect, onRegisterDisconnectFunction]);
207229

208230
useEffect(() => {
209231
return () => {
@@ -225,6 +247,7 @@ export function useAutoImportPhase1({ goNext }: IUseAutoImportPhase1Props) {
225247

226248
// WebSocket connection state
227249
isConnected,
250+
disconnect,
228251
socket,
229252
leaveSession,
230253
webSocketSessionIdRef,
@@ -240,7 +263,8 @@ export function useAutoImportPhase1({ goNext }: IUseAutoImportPhase1Props) {
240263
// Abort functionality
241264
abortOperation,
242265
canAbort,
243-
handleCleanup,
244266
isAborted: isAbortedRef.current,
267+
268+
texts,
245269
};
246270
}

0 commit comments

Comments
 (0)