Skip to content

Commit 8bd980f

Browse files
authored
feat: skip dlq for timestamp guards (#381)
1 parent e3aa59f commit 8bd980f

File tree

8 files changed

+24
-19
lines changed

8 files changed

+24
-19
lines changed

event-stream-processing/src/dead-letter-queue.service.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ export class DeadLetterQueueService {
2020
});
2121

2222
public async send({ failures }: OsDocumentPipeline) {
23-
if (failures.length === 0) {
23+
const dlqFailures = failures.filter((failure) => !failure.options?.skipDlq);
24+
if (dlqFailures.length === 0) {
2425
return;
2526
}
2627
const chunkSize = 50;
27-
for (let i = 0; i < failures.length; i += chunkSize) {
28-
const chunk = failures.slice(i, i + chunkSize);
28+
for (let i = 0; i < dlqFailures.length; i += chunkSize) {
29+
const chunk = dlqFailures.slice(i, i + chunkSize);
2930
const input: PutRecordBatchCommandInput = {
3031
DeliveryStreamName: process.env.DLQ_STREAM_NAME,
3132
Records: chunk.map((pipelineObject) => {

event-stream-processing/src/ecs-transform.service.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
/* eslint-disable @typescript-eslint/no-unsafe-call */
21
import { KinesisStreamEvent } from 'aws-lambda';
32
import { injectable, inject, multiInject } from 'inversify';
43
import { Parser } from './types/parser';
54
import { TYPES } from './inversify.types';
65
import { LoggerService } from './util/logger.service';
76
import { GenericError } from './util/generic.error';
8-
// eslint-disable-next-line max-len
7+
98
import {
109
OsDocument,
1110
KinesisStreamRecordDecodeFailure,
@@ -81,6 +80,10 @@ export class EcsTransformService {
8180
} catch (error: unknown) {
8281
const parser: string =
8382
error instanceof ParserError ? error.parser : 'unknown';
83+
const skipDlq: boolean =
84+
error instanceof ParserError && error.options
85+
? error.options.skipDlq
86+
: false;
8487
const message: string =
8588
error instanceof ParserError || error instanceof GenericError
8689
? error.message
@@ -100,14 +103,13 @@ export class EcsTransformService {
100103
const path: string = document.data.log?.file?.path
101104
? document.data.log?.file?.path
102105
: '';
103-
// eslint-disable-next-line max-len
104106
this.logger.debug(
105107
`PARSE_ERROR:${parser} ${team} ${hostName} ${serviceName} ${path}:${sequence} ${document.fingerprint.name} : ${message}`,
106108
);
107109
return new OsDocumentProcessingFailure(
108110
document,
109-
// eslint-disable-next-line max-len
110111
`PARSE_ERROR:${parser} ${team} ${hostName} ${serviceName} ${path}:${sequence} ${document.fingerprint.name} : ${message}`,
112+
{ skipDlq },
111113
);
112114
}
113115
})

event-stream-processing/src/parsers/document-index.parser.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ export class DocumentIndexParser implements Parser {
3232
if (!indexName) {
3333
throw new ParserError(
3434
'Could not map event to an index',
35-
this.constructor.name,
35+
'DocumentIndexParser',
3636
);
3737
}
3838
indexName = this.applyTimestampSubstitution(document, indexName);
@@ -50,7 +50,7 @@ export class DocumentIndexParser implements Parser {
5050
if (lodash.isNil(timestamp)) {
5151
throw new ParserError(
5252
'@timestamp field value has not been defined',
53-
this.constructor.name,
53+
'DocumentIndexParser',
5454
);
5555
}
5656
const tsMomement = moment(timestamp);
@@ -60,7 +60,7 @@ export class DocumentIndexParser implements Parser {
6060
}
6161
throw new ParserError(
6262
`Unexpected formatting: ${match}`,
63-
this.constructor.name,
63+
'DocumentIndexParser',
6464
);
6565
});
6666
}
@@ -75,14 +75,14 @@ export class DocumentIndexParser implements Parser {
7575
if (lodash.isNil(substitution)) {
7676
throw new ParserError(
7777
`${fieldName} field value not in document`,
78-
this.constructor.name,
78+
'DocumentIndexParser',
7979
);
8080
}
8181
return substitution;
8282
}
8383
throw new ParserError(
8484
`Unexpected formatting: ${match}`,
85-
this.constructor.name,
85+
'DocumentIndexParser',
8686
);
8787
});
8888
}

event-stream-processing/src/parsers/timestamp-field.parser.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,13 @@ export class TimestampFieldParser implements Parser {
4848
} else {
4949
throw new ParserError(
5050
`Invalid date: '${value}' invalid for format '${tsFormat}'`,
51-
this.constructor.name,
51+
'TimestampFieldParser',
5252
);
5353
}
5454
} else {
5555
throw new ParserError(
5656
`No value set for timestamp: ${fieldName}`,
57-
this.constructor.name,
57+
'TimestampFieldParser',
5858
);
5959
}
6060
}

event-stream-processing/src/parsers/timestamp-guard.parser.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,10 @@ export class TimestampGuardParser implements Parser {
3838
)
3939
) {
4040
throw new ParserError(
41-
`Timestamp is outside guard`,
42-
this.constructor.name,
41+
`Timestamp [${timestamp.toISOString()}] is outside guard [${startStr}, ${endStr}]`,
42+
'TimestampGuardParser',
43+
undefined,
44+
{ skipDlq: true },
4345
);
4446
}
4547
}

event-stream-processing/src/parsers/url-explode.parser.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ export class UrlExplodeParser implements Parser {
6969
} catch (e: unknown) {
7070
throw new ParserError(
7171
`Could not parse [${urlOriginal}]`,
72-
this.constructor.name,
72+
'UrlExplodeParser',
7373
);
7474
}
7575
}

event-stream-processing/src/types/os-document.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ export enum FingerprintCategory {
1818
}
1919

2020
export interface OsDocumentData {
21-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
2221
[key: string]: any;
2322
}
2423

@@ -44,13 +43,13 @@ export class PipelineProcessingFailure<T> {
4443
constructor(
4544
public source: T,
4645
public message: string,
46+
public options?: { skipDlq: boolean },
4747
) {}
4848
}
4949

5050
export class KinesisStreamRecordDecodeFailure extends PipelineProcessingFailure<KinesisStreamRecord> {}
5151
export class OsDocumentProcessingFailure extends PipelineProcessingFailure<OsDocument> {}
5252
export class OsDocumentCommitFailure extends PipelineProcessingFailure<OsDocument> {}
53-
// eslint-disable-next-line max-len
5453
export type PipelineObject =
5554
| OsDocument
5655
| KinesisStreamRecordDecodeFailure

event-stream-processing/src/util/parser.error.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export class ParserError extends GenericError {
55
message: string,
66
public parser: string,
77
source?: Error | undefined,
8+
public options?: { skipDlq: boolean },
89
) {
910
super(message, source);
1011
Object.setPrototypeOf(this, new.target.prototype);

0 commit comments

Comments
 (0)