Skip to content

Commit bcc845a

Browse files
author
bhavik
committed
feat: merged next
2 parents 137e2e9 + da70f6e commit bcc845a

File tree

34 files changed

+1633
-270
lines changed

34 files changed

+1633
-270
lines changed

apps/api/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
"passport-jwt": "^4.0.1",
6868
"passport-oauth2": "^1.6.1",
6969
"rimraf": "^3.0.2",
70+
"sax": "^1.4.1",
7071
"socket.io": "^4.7.2",
7172
"source-map-support": "^0.5.21",
7273
"uuid": "^9.0.0",
@@ -92,6 +93,7 @@
9293
"@types/passport-github2": "^1.2.5",
9394
"@types/passport-jwt": "^3.0.8",
9495
"@types/passport-oauth2": "^1.4.11",
96+
"@types/sax": "^1.2.7",
9597
"@types/uuid": "^9.0.2",
9698
"chai": "^4.3.7",
9799
"mocha": "^10.1.0",

apps/api/src/app/auto-import-jobs-schedular/usecase/auto-import-jobs-schedular.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ import { UserJobEntity, UserJobRepository, WebhookDestinationRepository } from '
22
import { Injectable } from '@nestjs/common';
33
import * as dayjs from 'dayjs';
44
import * as parser from 'cron-parser';
5-
import { Cron } from '@nestjs/schedule';
5+
import { Cron, CronExpression } from '@nestjs/schedule';
66
import { ScheduleUserJob, UpdateUserJob } from 'app/import-jobs/usecase';
77
import { UserJobImportStatusEnum } from '@impler/shared';
8-
import { CRON_SCHEDULE } from '@shared/constants';
8+
// import { CRON_SCHEDULE } from '@shared/constants';
99
const parseCronExpression = require('@impler/shared/src/utils/cronstrue');
1010

1111
@Injectable()
@@ -17,7 +17,7 @@ export class AutoImportJobsSchedular {
1717
private readonly scheduleUserJob: ScheduleUserJob
1818
) {}
1919

20-
@Cron(CRON_SCHEDULE.DEFAULT_CRON_TIME)
20+
@Cron(CronExpression.EVERY_10_SECONDS)
2121
async handleCronSchedular() {
2222
await this.fetchAndExecuteScheduledJobs();
2323
}

apps/api/src/app/import-jobs/dtos/create-userjob.dto.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import { IsNotEmpty, IsOptional, IsString } from 'class-validator';
22

33
export class CreateUserJobDto {
4+
@IsString()
5+
@IsNotEmpty()
6+
webSocketSessionId: string;
7+
48
@IsString()
59
@IsNotEmpty()
610
url: string;

apps/api/src/app/import-jobs/import-jobs.controller.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ export class ImportJobsController {
3737
@ApiSecurity(ACCESS_KEY_NAME)
3838
async createUserJobRoute(@Param('templateId') templateId: string, @Body() jobData: CreateUserJobDto) {
3939
return this.createUserJob.execute({
40+
webSocketSessionId: jobData.webSocketSessionId,
4041
_templateId: templateId,
4142
url: jobData.url,
4243
extra: jobData.extra,

apps/api/src/app/import-jobs/usecase/create-userjob/create-userjob.command.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export class CreateUserJobCommand {
2+
webSocketSessionId: string;
23
url: string;
34
extra?: string;
45
_templateId: string;

apps/api/src/app/import-jobs/usecase/create-userjob/create-userjob.usecase.ts

Lines changed: 39 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,60 @@
1-
import { BadRequestException, Injectable } from '@nestjs/common';
2-
import { isValidXMLMimeType } from '@shared/helpers/common.helper';
3-
import { APIMessages } from '@shared/constants';
1+
import { Injectable } from '@nestjs/common';
42
import { UserJobEntity, UserJobRepository } from '@impler/dal';
5-
import { RSSService } from '@shared/services';
63
import { CreateUserJobCommand } from './create-userjob.command';
4+
import { APIMessages } from '@shared/constants';
5+
import { BadRequestException } from '@nestjs/common';
6+
import { RSSXMLService } from '@impler/services';
7+
import { isValidXMLMimeType } from '@shared/helpers/common.helper';
8+
import { WebSocketService } from '@shared/services';
79

810
@Injectable()
911
export class CreateUserJob {
1012
constructor(
11-
private readonly rssService: RSSService,
13+
private readonly webSocketService: WebSocketService,
1214
private readonly userJobRepository: UserJobRepository
1315
) {}
1416

1517
async execute({
18+
webSocketSessionId,
1619
url,
1720
extra,
1821
_templateId,
1922
externalUserId,
2023
authHeaderValue,
2124
}: CreateUserJobCommand): Promise<UserJobEntity> {
22-
const mimeType = await this.rssService.getMimeType(url);
25+
const rssService = new RSSXMLService(url);
26+
27+
const mimeType = await rssService.getMimeType(url);
28+
2329
if (isValidXMLMimeType(mimeType)) {
24-
const { rssKeyHeading } = await this.rssService.parseRssFeed(url);
25-
let formattedExtra = extra || '{}';
2630
try {
27-
formattedExtra = JSON.parse(extra);
28-
} catch (_) {}
29-
30-
return await this.userJobRepository.create({
31-
url,
32-
extra,
33-
authHeaderValue,
34-
headings: rssKeyHeading,
35-
_templateId: _templateId,
36-
externalUserId: externalUserId || (formattedExtra as unknown as Record<string, any>)?.externalUserId,
37-
});
31+
const abortController = new AbortController();
32+
33+
this.webSocketService.registerSessionAbort(webSocketSessionId, abortController);
34+
35+
const rssXmlParsedDataKeys = await rssService.parseXMLAndExtractData({
36+
xmlUrl: url,
37+
sessionId: webSocketSessionId,
38+
sendProgress: this.webSocketService.sendProgress,
39+
sendError: this.webSocketService.sendError,
40+
sendCompletion: this.webSocketService.sendCompletion,
41+
abortSignal: abortController.signal,
42+
});
43+
44+
let formattedExtra = extra || '{}';
45+
try {
46+
formattedExtra = JSON.parse(extra);
47+
} catch (_) {}
48+
49+
return await this.userJobRepository.create({
50+
url,
51+
extra,
52+
authHeaderValue,
53+
headings: rssXmlParsedDataKeys?.keys || [],
54+
_templateId: _templateId,
55+
externalUserId: externalUserId || (formattedExtra as unknown as Record<string, any>)?.externalUserId,
56+
});
57+
} catch (error) {}
3858
} else {
3959
throw new BadRequestException(APIMessages.INVALID_RSS_URL);
4060
}

apps/api/src/app/import-jobs/usecase/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { GetColumnSchemaMapping } from './get-columnschema-mapping/get-columnsch
33
import { CreateJobMapping } from './create-jobmapping/create-jobmapping.usecase';
44
import { UpdateUserJob } from './update-userjob/update-userjob.usecase';
55
import { GetUserJob } from './get-userjob/get-userjob.usecase';
6-
import { RSSService } from '@shared/services';
6+
import { RSSXMLService } from '@impler/services';
77
import { QueueService } from '@shared/services/queue.service';
88
import { UserJobPause } from './userjob-usecase/userjob-pause.usecase';
99
import { UserJobDelete } from './userjob-usecase/userjob-delete.usecase';
@@ -18,7 +18,7 @@ export const USECASES = [
1818
CreateJobMapping,
1919
UpdateUserJob,
2020
GetUserJob,
21-
RSSService,
21+
RSSXMLService,
2222
QueueService,
2323
UserJobResume,
2424
UserJobPause,
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
export * from './file';
2-
export * from './rss.service';
2+
export * from './websocket-service';

apps/api/src/app/shared/services/rss.service.ts

Lines changed: 0 additions & 103 deletions
This file was deleted.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from './websocket.service';

0 commit comments

Comments
 (0)