1
+ namespace ServiceControl . Monitoring . AcceptanceTests . Tests
2
+ {
3
+ using System ;
4
+ using System . Collections . Generic ;
5
+ using System . Threading . Tasks ;
6
+ using AcceptanceTesting ;
7
+ using AcceptanceTesting . EndpointTemplates ;
8
+ using Http . Diagrams ;
9
+ using NServiceBus ;
10
+ using NServiceBus . AcceptanceTesting ;
11
+ using NServiceBus . Pipeline ;
12
+ using NUnit . Framework ;
13
+
14
+ class When_ingesting_multiple_metrics_messages : AcceptanceTest
15
+ {
16
+ [ Test ]
17
+ public async Task Should_not_fail ( )
18
+ {
19
+ CustomConfiguration = endpointConfiguration =>
20
+ {
21
+ endpointConfiguration . Pipeline . Register ( typeof ( InterceptIngestionBehavior ) ,
22
+ "Intercepts ingestion exceptions" ) ;
23
+ } ;
24
+
25
+ var metricReported = false ;
26
+
27
+ var ctx = await Define < Context > ( )
28
+ . WithEndpoint < EndpointWithTimings > ( c => c . When ( async s =>
29
+ {
30
+ var tasks = new List < Task > ( ) ;
31
+ for ( int i = 0 ; i < 100 ; i ++ )
32
+ {
33
+ tasks . Add ( s . SendLocal ( new SampleMessage ( ) ) ) ;
34
+ tasks . Add ( s . SendLocal ( new AnotherSampleMessage ( ) ) ) ;
35
+ }
36
+
37
+ await Task . WhenAll ( tasks ) ;
38
+ } ) )
39
+ . Done ( async c =>
40
+ {
41
+ var result = await this . TryGetMany < MonitoredEndpoint > ( "/monitored-endpoints?history=1" ) ;
42
+
43
+ metricReported = result . HasResult && result . Items [ 0 ] . Metrics . TryGetValue ( "processingTime" , out var processingTime ) && processingTime ? . Average > 0 ;
44
+
45
+ return metricReported ;
46
+ } )
47
+ . Run ( ) ;
48
+
49
+ Assert . IsTrue ( metricReported ) ;
50
+ Assert . IsEmpty ( ctx . Errors ) ;
51
+ }
52
+
53
+ class EndpointWithTimings : EndpointConfigurationBuilder
54
+ {
55
+ public EndpointWithTimings ( ) =>
56
+ EndpointSetup < DefaultServerWithoutAudit > ( c =>
57
+ {
58
+ c . EnableMetrics ( ) . SendMetricDataToServiceControl ( Settings . DEFAULT_INSTANCE_NAME , TimeSpan . FromSeconds ( 5 ) ) ;
59
+ } ) ;
60
+
61
+ class Handler : IHandleMessages < SampleMessage >
62
+ {
63
+ public Task Handle ( SampleMessage message , IMessageHandlerContext context )
64
+ => Task . Delay ( TimeSpan . FromMilliseconds ( 10 ) , context . CancellationToken ) ;
65
+ }
66
+
67
+ class AnotherHandler : IHandleMessages < AnotherSampleMessage >
68
+ {
69
+ public Task Handle ( AnotherSampleMessage message , IMessageHandlerContext context )
70
+ => Task . Delay ( TimeSpan . FromMilliseconds ( 10 ) , context . CancellationToken ) ;
71
+ }
72
+ }
73
+
74
+ class InterceptIngestionBehavior ( ScenarioContext scenarioContext ) : Behavior < IIncomingPhysicalMessageContext >
75
+ {
76
+ public override async Task Invoke ( IIncomingPhysicalMessageContext context , Func < Task > next )
77
+ {
78
+ try
79
+ {
80
+ await next ( ) ;
81
+ }
82
+ catch ( Exception e )
83
+ {
84
+ ( ( Context ) scenarioContext ) . Errors . Add ( e ) ;
85
+ throw ;
86
+ }
87
+ }
88
+ }
89
+
90
+ class MonitoringEndpoint : EndpointConfigurationBuilder
91
+ {
92
+ public MonitoringEndpoint ( ) => EndpointSetup < DefaultServerWithoutAudit > ( ) ;
93
+ }
94
+
95
+ class Context : ScenarioContext
96
+ {
97
+ public List < Exception > Errors { get ; set ; } = [ ] ;
98
+ }
99
+
100
+ class SampleMessage : SampleBaseMessage ;
101
+
102
+ class AnotherSampleMessage : SampleBaseMessage ;
103
+
104
+ class SampleBaseMessage : IMessage ;
105
+ }
106
+ }
0 commit comments