Skip to content

Commit 1e04d10

Browse files
RabbitMQQueueDetails parsing doesn't account for duplicated elements (#4490)
* Fix the RabbitMQQueueDetails parsing to account for duplicated elements * Streamline the stats logic slightly have the element parsing in one place * Align file name with class * Add tests for rabbitmq queue parsing * remove test depency on TransportTestFixture * Remove extra field * Remove default parameter on ctor since that can cause troubles with DI * Link to issue --------- Co-authored-by: danielmarbach <danielmarbach@users.noreply.github.com> Co-authored-by: Jo Palac <jo.palac@particular.net>
1 parent a33e4e5 commit 1e04d10

File tree

7 files changed

+264
-65
lines changed

7 files changed

+264
-65
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#nullable enable
2+
namespace ServiceControl.Transports.RabbitMQ;
3+
4+
using System.Collections.Generic;
5+
using System.Text.Json;
6+
using ServiceControl.Transports.BrokerThroughput;
7+
8+
public class RabbitMQBrokerQueueDetails(JsonElement token) : IBrokerQueue
9+
{
10+
public string QueueName { get; } = token.GetProperty("name").GetString()!;
11+
public string SanitizedName => QueueName;
12+
public string Scope => VHost;
13+
public string VHost { get; } = token.GetProperty("vhost").GetString()!;
14+
public List<string> EndpointIndicators { get; } = [];
15+
long? AckedMessages { get; set; } = FromToken(token);
16+
long Baseline { get; set; } = FromToken(token) ?? 0;
17+
18+
public long CalculateThroughputFrom(RabbitMQBrokerQueueDetails newReading)
19+
{
20+
var newlyAckedMessages = 0L;
21+
if (newReading.AckedMessages is null)
22+
{
23+
return newlyAckedMessages;
24+
}
25+
26+
if (newReading.AckedMessages.Value >= Baseline)
27+
{
28+
newlyAckedMessages = newReading.AckedMessages.Value - Baseline;
29+
AckedMessages += newlyAckedMessages;
30+
}
31+
Baseline = newReading.AckedMessages.Value;
32+
33+
return newlyAckedMessages;
34+
}
35+
36+
static long? FromToken(JsonElement jsonElement) =>
37+
jsonElement.TryGetProperty("message_stats", out var stats) && stats.TryGetProperty("ack", out var val)
38+
? val.GetInt64()
39+
: null;
40+
}

src/ServiceControl.Transports.RabbitMQ/RabbitMQQuery.cs

Lines changed: 33 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ namespace ServiceControl.Transports.RabbitMQ;
99
using System.Net.Http;
1010
using System.Net.Http.Json;
1111
using System.Runtime.CompilerServices;
12+
using System.Text.Json;
1213
using System.Text.Json.Nodes;
1314
using System.Threading;
1415
using System.Threading.Tasks;
@@ -88,15 +89,21 @@ protected override void InitializeCore(ReadOnlyDictionary<string, string> settin
8889

8990
if (InitialiseErrors.Count == 0)
9091
{
91-
httpClient = new HttpClient(new SocketsHttpHandler
92-
{
93-
Credentials = defaultCredential,
94-
PooledConnectionLifetime = TimeSpan.FromMinutes(2)
95-
})
96-
{ BaseAddress = new Uri(apiUrl) };
92+
// ideally we would use the HttpClientFactory, but it would be a bit more involved to set that up
93+
// so for now we are using a virtual method that can be overriden in tests
94+
// https://github.com/Particular/ServiceControl/issues/4493
95+
httpClient = CreateHttpClient(defaultCredential, apiUrl);
9796
}
9897
}
9998

99+
protected virtual HttpClient CreateHttpClient(NetworkCredential defaultCredential, string apiUrl) =>
100+
new(new SocketsHttpHandler
101+
{
102+
Credentials = defaultCredential,
103+
PooledConnectionLifetime = TimeSpan.FromMinutes(2)
104+
})
105+
{ BaseAddress = new Uri(apiUrl) };
106+
100107
public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBrokerQueue brokerQueue,
101108
DateOnly startDate,
102109
[EnumeratorCancellation] CancellationToken cancellationToken = default)
@@ -105,49 +112,22 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
105112
var url = $"/api/queues/{HttpUtility.UrlEncode(queue.VHost)}/{HttpUtility.UrlEncode(queue.QueueName)}";
106113

107114
logger.LogDebug($"Querying {url}");
108-
var node = await pipeline.ExecuteAsync(async token => await httpClient!.GetFromJsonAsync<JsonNode>(url, token), cancellationToken);
109-
queue.AckedMessages = GetAck();
115+
var newReading = await pipeline.ExecuteAsync(async token => new RabbitMQBrokerQueueDetails(await httpClient!.GetFromJsonAsync<JsonElement>(url, token)), cancellationToken);
116+
_ = queue.CalculateThroughputFrom(newReading);
110117

111118
// looping for 24hrs, in 4 increments of 15 minutes
112119
for (var i = 0; i < 24 * 4; i++)
113120
{
114-
bool throughputSent = false;
115121
await Task.Delay(TimeSpan.FromMinutes(15), timeProvider, cancellationToken);
116122
logger.LogDebug($"Querying {url}");
117-
node = await pipeline.ExecuteAsync(async token => await httpClient!.GetFromJsonAsync<JsonNode>(url, token), cancellationToken);
118-
var newReading = GetAck();
119-
if (newReading is not null)
120-
{
121-
if (newReading >= queue.AckedMessages)
122-
{
123-
yield return new QueueThroughput
124-
{
125-
DateUTC = DateOnly.FromDateTime(timeProvider.GetUtcNow().DateTime),
126-
TotalThroughput = newReading.Value - queue.AckedMessages.Value
127-
};
128-
throughputSent = true;
129-
}
130-
queue.AckedMessages = newReading;
131-
}
132-
133-
if (!throughputSent)
134-
{
135-
yield return new QueueThroughput
136-
{
137-
DateUTC = DateOnly.FromDateTime(timeProvider.GetUtcNow().DateTime),
138-
TotalThroughput = 0
139-
};
140-
}
141-
}
142-
yield break;
123+
newReading = await pipeline.ExecuteAsync(async token => new RabbitMQBrokerQueueDetails(await httpClient!.GetFromJsonAsync<JsonElement>(url, token)), cancellationToken);
143124

144-
long? GetAck()
145-
{
146-
if (node!["message_stats"] is JsonObject stats && stats["ack"] is JsonValue val)
125+
var newTotalThroughput = queue.CalculateThroughputFrom(newReading);
126+
yield return new QueueThroughput
147127
{
148-
return val.GetValue<long>();
149-
}
150-
return null;
128+
DateUTC = DateOnly.FromDateTime(timeProvider.GetUtcNow().DateTime),
129+
TotalThroughput = newTotalThroughput
130+
};
151131
}
152132
}
153133

@@ -266,7 +246,7 @@ async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, Can
266246
}
267247
}
268248

269-
async Task<(RabbitMQBrokerQueueDetails[]?, bool morePages)> GetPage(int page, CancellationToken cancellationToken)
249+
public async Task<(RabbitMQBrokerQueueDetails[]?, bool morePages)> GetPage(int page, CancellationToken cancellationToken)
270250
{
271251
var url = $"/api/queues/{HttpUtility.UrlEncode(connectionConfiguration.VirtualHost)}?page={page}&page_size=500&name=&use_regex=false&pagination=true";
272252

@@ -283,22 +263,27 @@ async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, Can
283263
return (null, false);
284264
}
285265

286-
var queues = items.Select(item => new RabbitMQBrokerQueueDetails(item!)).ToArray();
287-
288-
return (queues, pageCount > pageReturned);
266+
return (MaterializeQueueDetails(items), pageCount > pageReturned);
289267
}
290268
// Older versions of RabbitMQ API did not have paging and returned the array of items directly
291269
case JsonArray arr:
292270
{
293-
var queues = arr.Select(item => new RabbitMQBrokerQueueDetails(item!)).ToArray();
294-
295-
return (queues, false);
271+
return (MaterializeQueueDetails(arr), false);
296272
}
297273
default:
298274
throw new Exception("Was not able to get list of queues from RabbitMQ broker.");
299275
}
300276
}
301277

278+
static RabbitMQBrokerQueueDetails[] MaterializeQueueDetails(JsonArray items)
279+
{
280+
// It is not possible to directly operated on the JsonNode. When the JsonNode is a JObject
281+
// and the indexer is access the internal dictionary is initialized which can cause key not found exceptions
282+
// when the payload contains the same key multiple times (which happened in the past).
283+
var queues = items.Select(item => new RabbitMQBrokerQueueDetails(item!.Deserialize<JsonElement>())).ToArray();
284+
return queues;
285+
}
286+
302287
public override KeyDescriptionPair[] Settings =>
303288
[
304289
new KeyDescriptionPair(RabbitMQSettings.API, RabbitMQSettings.APIDescription),

src/ServiceControl.Transports.RabbitMQ/RabbitMQQueueDetails.cs

Lines changed: 0 additions & 16 deletions
This file was deleted.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
[
2+
{
3+
"QueueName": "queue1",
4+
"SanitizedName": "queue1",
5+
"Scope": "vhost1",
6+
"VHost": "vhost1",
7+
"EndpointIndicators": []
8+
},
9+
{
10+
"QueueName": "queue2",
11+
"SanitizedName": "queue2",
12+
"Scope": "vhost2",
13+
"VHost": "vhost2",
14+
"EndpointIndicators": []
15+
},
16+
{
17+
"QueueName": "queue3",
18+
"SanitizedName": "queue3",
19+
"Scope": "vhost1",
20+
"VHost": "vhost1",
21+
"EndpointIndicators": []
22+
}
23+
]
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
[
2+
{
3+
"QueueName": "queue1",
4+
"SanitizedName": "queue1",
5+
"Scope": "vhost1",
6+
"VHost": "vhost1",
7+
"EndpointIndicators": []
8+
},
9+
{
10+
"QueueName": "queue2",
11+
"SanitizedName": "queue2",
12+
"Scope": "vhost2",
13+
"VHost": "vhost2",
14+
"EndpointIndicators": []
15+
}
16+
]
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
namespace ServiceControl.Transport.Tests;
2+
3+
using System;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Microsoft.Extensions.Logging.Abstractions;
7+
using Microsoft.Extensions.Time.Testing;
8+
using NUnit.Framework;
9+
using Transports;
10+
using Transports.RabbitMQ;
11+
using System.Net.Http;
12+
using Particular.Approvals;
13+
using System.Collections.ObjectModel;
14+
using System.Net;
15+
16+
[TestFixture]
17+
class RabbitMQQuery_ResponseParsing_Tests
18+
{
19+
FakeTimeProvider provider;
20+
TransportSettings transportSettings;
21+
FakeHttpHandler httpHandler;
22+
RabbitMQQuery rabbitMQQuery;
23+
24+
[SetUp]
25+
public void SetUp()
26+
{
27+
provider = new();
28+
provider.SetUtcNow(DateTimeOffset.UtcNow);
29+
transportSettings = new TransportSettings
30+
{
31+
ConnectionString = "host=localhost;username=rabbitmq;password=rabbitmq",
32+
MaxConcurrency = 1,
33+
EndpointName = Guid.NewGuid().ToString("N")
34+
};
35+
httpHandler = new FakeHttpHandler();
36+
var httpClient = new HttpClient(httpHandler) { BaseAddress = new Uri("http://localhost:15672") };
37+
38+
rabbitMQQuery = new TestableRabbitMQQuery(provider, transportSettings, httpClient);
39+
rabbitMQQuery.Initialize(ReadOnlyDictionary<string, string>.Empty);
40+
}
41+
42+
[TearDown]
43+
public void TearDown() => httpHandler.Dispose();
44+
45+
public Func<HttpRequestMessage, HttpResponseMessage> SendCallback
46+
{
47+
get => httpHandler.SendCallback;
48+
set => httpHandler.SendCallback = value;
49+
}
50+
51+
[Test]
52+
public async Task Should_handle_duplicated_json_data()
53+
{
54+
SendCallback = _ =>
55+
{
56+
var response = new HttpResponseMessage
57+
{
58+
Content = new StringContent("""
59+
{
60+
"items": [
61+
{
62+
"name": "queue1",
63+
"vhost": "vhost1",
64+
"memory": 1024,
65+
"memory": 1024,
66+
"message_stats": {
67+
"ack": 1
68+
}
69+
},
70+
{
71+
"name": "queue2",
72+
"vhost": "vhost2",
73+
"vhost": "vhost2",
74+
"message_stats": {
75+
"ack": 2
76+
}
77+
}
78+
],
79+
"page": 1,
80+
"page_count": 1,
81+
"page_size": 500,
82+
"total_count": 2
83+
}
84+
""")
85+
};
86+
return response;
87+
};
88+
89+
var queues = (await rabbitMQQuery.GetPage(1, default)).Item1;
90+
Approver.Verify(queues);
91+
}
92+
93+
[Test]
94+
public async Task Should_fetch_queue_details_in_old_format()
95+
{
96+
SendCallback = _ =>
97+
{
98+
var response = new HttpResponseMessage
99+
{
100+
Content = new StringContent("""
101+
[
102+
{
103+
"name": "queue1",
104+
"vhost": "vhost1",
105+
"memory": 1024,
106+
"message_stats": {
107+
"ack": 1
108+
}
109+
},
110+
{
111+
"name": "queue2",
112+
"vhost": "vhost2",
113+
"message_stats": {
114+
"ack": 2
115+
}
116+
},
117+
{
118+
"name": "queue3",
119+
"vhost": "vhost1"
120+
}
121+
]
122+
""")
123+
};
124+
return response;
125+
};
126+
127+
var queues = (await rabbitMQQuery.GetPage(1, default)).Item1;
128+
Approver.Verify(queues);
129+
}
130+
131+
sealed class TestableRabbitMQQuery(
132+
TimeProvider timeProvider,
133+
TransportSettings transportSettings,
134+
HttpClient customHttpClient)
135+
: RabbitMQQuery(NullLogger<RabbitMQQuery>.Instance, timeProvider, transportSettings)
136+
{
137+
protected override HttpClient CreateHttpClient(NetworkCredential defaultCredential, string apiUrl) => customHttpClient;
138+
}
139+
140+
sealed class FakeHttpHandler : HttpClientHandler
141+
{
142+
public Func<HttpRequestMessage, HttpResponseMessage> SendCallback { get; set; }
143+
144+
protected override HttpResponseMessage Send(HttpRequestMessage request, CancellationToken cancellationToken) => SendCallback(request);
145+
146+
protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) => Task.FromResult(SendCallback(request));
147+
}
148+
}

0 commit comments

Comments
 (0)