Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
b4f0277
Added a Postgres persister for Audit
johnsimons Aug 26, 2025
22a5bcd
Renaming to uppercase SQL
johnsimons Aug 26, 2025
5f091b1
Remove to readd
johnsimons Aug 26, 2025
684d265
Adding it back
johnsimons Aug 26, 2025
49b3dd1
Moving things around
johnsimons Aug 27, 2025
6525de8
Using namespace file scoped
johnsimons Aug 27, 2025
bf64782
We have something working
johnsimons Aug 27, 2025
7adb46d
Adding index and trigger
johnsimons Aug 27, 2025
99e8c61
Returning results
johnsimons Aug 28, 2025
7c2289c
A bit better
johnsimons Aug 28, 2025
76e5c9a
Some refactoring
johnsimons Aug 28, 2025
176fba3
Adding prepare for the inserts
johnsimons Aug 28, 2025
04c887f
Adding batch
johnsimons Aug 28, 2025
91d47e5
remove explicit serialization to json
johnsimons Aug 28, 2025
8c7be1a
Fixed a few bugs
johnsimons Aug 28, 2025
48c2fe4
Implemented more methods
johnsimons Aug 28, 2025
5642bf3
Add url for message body
johnsimons Aug 28, 2025
2e96879
Fix issue with body retrieval api
johnsimons Aug 28, 2025
9fc490e
Fixed issue with pool of connections
johnsimons Aug 29, 2025
772bbbd
Remove logging, it is a bit noisy
johnsimons Aug 29, 2025
2d00684
Adding more granular indexes and adjusting autovacuum settings
johnsimons Aug 29, 2025
db13583
Adding a retention cleanup background service
johnsimons Aug 29, 2025
7db5472
Enforce body storage max size
johnsimons Aug 29, 2025
ecf20cc
Implemented saga logic
johnsimons Sep 1, 2025
315cf2a
Fixed known_endpoints insertions
johnsimons Sep 1, 2025
d7deac3
Updated retention cleanup to include saga snapshots and known endpoints
johnsimons Sep 1, 2025
acd267f
Remove warnings
johnsimons Sep 1, 2025
f71d354
Added missing index for audit counts
johnsimons Sep 1, 2025
69769a0
Fix approval file
johnsimons Sep 1, 2025
d18baee
Fix casing name
johnsimons Sep 1, 2025
9d8ee86
Disable persistence for the installer
johnsimons Sep 1, 2025
75f593c
Using websearch_to_tsquery instead
johnsimons Sep 3, 2025
c297f3c
Fixed but with cleanup retention query
johnsimons Sep 4, 2025
432194d
Small refactor
johnsimons Sep 4, 2025
ac80990
Add otel metrics for Npgsql
johnsimons Sep 4, 2025
11c12b8
Rollback otel for Postgres
johnsimons Sep 5, 2025
f247ce3
Prevent deadlocks
johnsimons Sep 5, 2025
249d732
Try this way
johnsimons Sep 5, 2025
3334aa3
Ensure connection is disposed
johnsimons Sep 5, 2025
892799a
Roll this back
johnsimons Sep 5, 2025
c502fab
Made known_endpoint table insert only
johnsimons Sep 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
<PackageVersion Include="Microsoft.Extensions.Logging.Configuration" Version="8.0.1" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.3" />
<PackageVersion Include="Microsoft.Extensions.TimeProvider.Testing" Version="8.10.0" />
<PackageVersion Include="Microsoft.IO.RecyclableMemoryStream" Version="3.0.1" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.14.1" />
<PackageVersion Include="Microsoft-WindowsAPICodePack-Shell" Version="1.1.5" />
<PackageVersion Include="Mindscape.Raygun4Net.NetCore" Version="11.2.1" />
<PackageVersion Include="NLog.Extensions.Logging" Version="5.4.0" />
<PackageVersion Include="Npgsql" Version="9.0.3" />
<PackageVersion Include="NServiceBus" Version="9.2.7" />
<PackageVersion Include="NServiceBus.AcceptanceTesting" Version="9.2.7" />
<PackageVersion Include="NServiceBus.AmazonSQS" Version="8.0.0" />
Expand Down
1 change: 1 addition & 0 deletions src/ProjectReferences.Persisters.Audit.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
<ItemGroup Label="Persisters">
<ProjectReference Include="..\ServiceControl.Audit.Persistence.InMemory\ServiceControl.Audit.Persistence.InMemory.csproj" ReferenceOutputAssembly="false" Private="false" />
<ProjectReference Include="..\ServiceControl.Audit.Persistence.RavenDB\ServiceControl.Audit.Persistence.RavenDB.csproj" ReferenceOutputAssembly="false" Private="false" />
<ProjectReference Include="..\ServiceControl.Audit.Persistence.PostgreSQL\ServiceControl.Audit.Persistence.PostgreSQL.csproj" ReferenceOutputAssembly="false" Private="false" />
</ItemGroup>

</Project>
4 changes: 4 additions & 0 deletions src/ServiceControl.Audit.Persistence.PostgreSQL/.editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[*.cs]

# Justification: ServiceControl app has no synchronization context
dotnet_diagnostic.CA2007.severity = none
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace ServiceControl.Audit.Persistence.PostgreSQL.BodyStorage;

using System.IO;
using System.Threading;
using System.Threading.Tasks;
using ServiceControl.Audit.Auditing.BodyStorage;
class PostgreSQLAttachmentsBodyStorage : IBodyStorage
{
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken) => throw new System.NotImplementedException();
public Task<StreamResult> TryFetch(string bodyId, CancellationToken cancellationToken) => throw new System.NotImplementedException();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace ServiceControl.Audit.Persistence.PostgreSQL;

using System;

class DatabaseConfiguration(
string databaseName,
string adminDatabaseName,
int expirationProcessTimerInSeconds,
TimeSpan auditRetentionPeriod,
int maxBodySizeToStore,
string connectionString)
{
public string Name { get; } = databaseName;
public string AdminDatabaseName { get; } = adminDatabaseName;
public int ExpirationProcessTimerInSeconds { get; } = expirationProcessTimerInSeconds;
public TimeSpan AuditRetentionPeriod { get; } = auditRetentionPeriod;
public int MaxBodySizeToStore { get; } = maxBodySizeToStore;
public string ConnectionString { get; } = connectionString;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@

namespace ServiceControl.Audit.Persistence.PostgreSQL;

using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.IO;
using Npgsql;
using NServiceBus;
using ServiceControl.Audit.Auditing;
using ServiceControl.Audit.Auditing.MessagesView;
using ServiceControl.Audit.Infrastructure;
using ServiceControl.Audit.Monitoring;
using ServiceControl.Audit.Persistence;
using ServiceControl.Audit.Persistence.Infrastructure;
using ServiceControl.SagaAudit;


class PostgreSQLAuditDataStore(PostgreSQLConnectionFactory connectionFactory) : IAuditDataStore
{
static readonly RecyclableMemoryStreamManager manager = new RecyclableMemoryStreamManager();

public async Task<MessageBodyView> GetMessageBody(string messageId, CancellationToken cancellationToken)
{
await using var conn = await connectionFactory.OpenConnection(cancellationToken);
await using var cmd = new NpgsqlCommand(@"
select headers, body from processed_messages
where message_id = @message_id
LIMIT 1;", conn);
cmd.Parameters.AddWithValue("message_id", messageId);
await using var reader = await cmd.ExecuteReaderAsync(System.Data.CommandBehavior.SequentialAccess, cancellationToken);
if (await reader.ReadAsync(cancellationToken))
{
var contentType = reader.GetFieldValue<Dictionary<string, string>>(reader.GetOrdinal("headers")).GetValueOrDefault(Headers.ContentType, "text/xml");
using var stream = await reader.GetStreamAsync(reader.GetOrdinal("body"), cancellationToken);
var responseStream = manager.GetStream();
await stream.CopyToAsync(responseStream, cancellationToken);
responseStream.Position = 0;
return MessageBodyView.FromStream(responseStream, contentType, (int)stream.Length, string.Empty);
Comment on lines +36 to +41
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't figure out a way to pass the stream down the pipeline without allocating.
It's possible that our abstraction is preventing it, or perhaps it's me.

@SzymonPobiega @danielmarbach any thoughts?

}
return MessageBodyView.NotFound();
}

public Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange, CancellationToken cancellationToken)
{
var builder = new PostgreSQLMessagesQueryBuilder()
.WithSystemMessages(includeSystemMessages)
.WithTimeSentRange(timeSentRange)
.WithSorting(sortInfo)
.WithPaging(pagingInfo);
return ExecuteMessagesQuery(builder, cancellationToken);
}

public async Task<QueryResult<IList<AuditCount>>> QueryAuditCounts(string endpointName, CancellationToken cancellationToken)
{
var startDate = DateTime.UtcNow.AddDays(-30);
var endDate = DateTime.UtcNow;
await using var connection = await connectionFactory.OpenConnection(cancellationToken);
await using var cmd = new NpgsqlCommand(@"
SELECT
DATE_TRUNC('day', processed_at) AS day,
COUNT(*) AS count
FROM processed_messages
WHERE receiving_endpoint_name = @endpoint_name
AND processed_at BETWEEN @start_date AND @end_date
GROUP BY day
ORDER BY day;", connection);
cmd.Parameters.AddWithValue("endpoint_name", endpointName);
cmd.Parameters.AddWithValue("start_date", startDate);
cmd.Parameters.AddWithValue("end_date", endDate);

await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
var results = new List<AuditCount>();
while (await reader.ReadAsync(cancellationToken))
{
results.Add(new AuditCount
{
UtcDate = reader.GetDateTime(reader.GetOrdinal("day")),
Count = reader.GetInt32(reader.GetOrdinal("count"))
});
}

return new QueryResult<IList<AuditCount>>(results, new QueryStatsInfo(string.Empty, results.Count));
}

public async Task<QueryResult<IList<KnownEndpointsView>>> QueryKnownEndpoints(CancellationToken cancellationToken)
{
// We need to return all the data from known_endpoints table in postgress
await using var connection = await connectionFactory.OpenConnection(cancellationToken);
await using var cmd = new NpgsqlCommand(@"
SELECT
id,
name,
host_id,
host,
last_seen
FROM known_endpoints;", connection);

await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
var results = new List<KnownEndpointsView>();
while (await reader.ReadAsync(cancellationToken))
{
var name = reader.GetString(reader.GetOrdinal("name"));
var hostId = reader.GetGuid(reader.GetOrdinal("host_id"));
var host = reader.GetString(reader.GetOrdinal("host"));
var lastSeen = reader.GetDateTime(reader.GetOrdinal("last_seen"));
results.Add(new KnownEndpointsView
{
Id = DeterministicGuid.MakeId(name, hostId.ToString()),
EndpointDetails = new EndpointDetails
{
Host = host,
HostId = hostId,
Name = name
},
HostDisplayName = host
});
}

return new QueryResult<IList<KnownEndpointsView>>(results, new QueryStatsInfo(string.Empty, results.Count));
}

public Task<QueryResult<IList<MessagesView>>> QueryMessages(string searchParam, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default)
{
var builder = new PostgreSQLMessagesQueryBuilder()
.WithSearch(searchParam)
.WithTimeSentRange(timeSentRange)
.WithSorting(sortInfo)
.WithPaging(pagingInfo);
return ExecuteMessagesQuery(builder, cancellationToken);
}

public Task<QueryResult<IList<MessagesView>>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
{
var builder = new PostgreSQLMessagesQueryBuilder()
.WithConversationId(conversationId)
.WithSorting(sortInfo)
.WithPaging(pagingInfo);
return ExecuteMessagesQuery(builder, cancellationToken);
}

public Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default)
{
var builder = new PostgreSQLMessagesQueryBuilder()
.WithSystemMessages(includeSystemMessages)
.WithEndpointName(endpointName)
.WithTimeSentRange(timeSentRange)
.WithSorting(sortInfo)
.WithPaging(pagingInfo);
return ExecuteMessagesQuery(builder, cancellationToken);
}

public Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default)
{
var builder = new PostgreSQLMessagesQueryBuilder()
.WithSearch(keyword)
.WithEndpointName(endpoint)
.WithTimeSentRange(timeSentRange)
.WithSorting(sortInfo)
.WithPaging(pagingInfo);
return ExecuteMessagesQuery(builder, cancellationToken);
}

public async Task<QueryResult<SagaHistory>> QuerySagaHistoryById(Guid input, CancellationToken cancellationToken)
{
await using var conn = await connectionFactory.OpenConnection(cancellationToken);
await using var cmd = new NpgsqlCommand(@"
SELECT
id,
saga_id,
saga_type,
changes
FROM saga_snapshots
WHERE saga_id = @saga_id
LIMIT 1", conn);

cmd.Parameters.AddWithValue("saga_id", input);

await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);

if (await reader.ReadAsync(cancellationToken))
{
var changes = GetValue<List<SagaStateChange>>(reader, "changes") ?? [];
var sagaHistory = new SagaHistory
{
Id = GetValue<Guid>(reader, "id"),
SagaId = GetValue<Guid>(reader, "saga_id"),
SagaType = GetValue<string>(reader, "saga_type"),
Changes = changes
};

return new QueryResult<SagaHistory>(sagaHistory, new QueryStatsInfo(string.Empty, changes.Count));
}

return QueryResult<SagaHistory>.Empty();
}

async Task<QueryResult<IList<MessagesView>>> ExecuteMessagesQuery(
PostgreSQLMessagesQueryBuilder builder,
CancellationToken cancellationToken)
{
await using var conn = await connectionFactory.OpenConnection(cancellationToken);
var (query, parameters) = builder.Build();
await using var cmd = new NpgsqlCommand(query, conn);
foreach (var param in parameters)
{
cmd.Parameters.Add(param);
}
return await ReturnResults(cmd, cancellationToken);
}

static T? DeserializeOrDefault<T>(Dictionary<string, object> dict, string key, T? defaultValue = default)
{
if (dict.TryGetValue(key, out var value) && value is JsonElement element && element.ValueKind != JsonValueKind.Null)
{
try
{
return JsonSerializer.Deserialize<T>(element);
}
catch { }
}
return defaultValue;
}

static T GetValue<T>(NpgsqlDataReader reader, string column)
=> reader.GetFieldValue<T>(reader.GetOrdinal(column));

async Task<QueryResult<IList<MessagesView>>> ReturnResults(NpgsqlCommand cmd, CancellationToken cancellationToken = default)
{
var results = new List<MessagesView>();
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
while (await reader.ReadAsync(cancellationToken))
{
var headers = GetValue<Dictionary<string, string>>(reader, "headers");
var messageMetadata = GetValue<Dictionary<string, object>>(reader, "message_metadata");

results.Add(new MessagesView
{
Id = GetValue<string>(reader, "unique_message_id"),
MessageId = GetValue<string>(reader, "message_id"),
MessageType = GetValue<string>(reader, "message_type"),
SendingEndpoint = DeserializeOrDefault<EndpointDetails>(messageMetadata, "SendingEndpoint"),
ReceivingEndpoint = DeserializeOrDefault<EndpointDetails>(messageMetadata, "ReceivingEndpoint"),
TimeSent = GetValue<DateTime>(reader, "time_sent"),
ProcessedAt = GetValue<DateTime>(reader, "processed_at"),
CriticalTime = GetValue<TimeSpan>(reader, "critical_time"),
ProcessingTime = GetValue<TimeSpan>(reader, "processing_time"),
DeliveryTime = GetValue<TimeSpan>(reader, "delivery_time"),
IsSystemMessage = GetValue<bool>(reader, "is_system_message"),
ConversationId = GetValue<string>(reader, "conversation_id"),
Headers = [.. headers],
Status = (MessageStatus)GetValue<int>(reader, "status"),
MessageIntent = (MessageIntent)DeserializeOrDefault(messageMetadata, "MessageIntent", 1),
BodyUrl = string.Format(BodyUrlFormatString, GetValue<string>(reader, "message_id")),
BodySize = DeserializeOrDefault(messageMetadata, "ContentLength", 0),
InvokedSagas = DeserializeOrDefault<List<SagaInfo>>(messageMetadata, "InvokedSagas", []),
OriginatesFromSaga = DeserializeOrDefault<SagaInfo>(messageMetadata, "OriginatesFromSaga")
});
}
return new QueryResult<IList<MessagesView>>(results, new QueryStatsInfo(string.Empty, results.Count));
}
public const string BodyUrlFormatString = "/messages/{0}/body";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
namespace ServiceControl.Audit.Persistence.PostgreSQL;

using Npgsql;
using System.Threading.Tasks;
using System.Threading;

class PostgreSQLConnectionFactory
{
readonly NpgsqlDataSource dataSource;
readonly NpgsqlDataSource dataSourceAdmin;

public PostgreSQLConnectionFactory(DatabaseConfiguration databaseConfiguration)
{
var dataSourceBuilder = new NpgsqlDataSourceBuilder(databaseConfiguration.ConnectionString)
{
Name = "ServiceControl.Audit"
};
//dataSourceBuilder.UseLoggerFactory(loggerFactory);
dataSourceBuilder.EnableDynamicJson();
dataSource = dataSourceBuilder.Build();

var builder = new NpgsqlConnectionStringBuilder(databaseConfiguration.ConnectionString)
{
Database = databaseConfiguration.AdminDatabaseName
};
var dataSourceBuilderAdmin = new NpgsqlDataSourceBuilder(builder.ConnectionString)
{
Name = "ServiceControl.Audit-admin",
};
//dataSourceBuilderAdmin.UseLoggerFactory(loggerFactory);
dataSourceBuilderAdmin.EnableDynamicJson();
dataSourceAdmin = dataSourceBuilderAdmin.Build();
}

public async Task<NpgsqlConnection> OpenConnection(CancellationToken cancellationToken)
{
var conn = await dataSource.OpenConnectionAsync(cancellationToken);
return conn;
}

public async Task<NpgsqlConnection> OpenAdminConnection(CancellationToken cancellationToken)
{
var conn = dataSourceAdmin.CreateConnection();
await conn.OpenAsync(cancellationToken);
return conn;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace ServiceControl.Audit.Persistence.PostgreSQL;

using System;
using System.Threading;
using System.Threading.Tasks;
using ServiceControl.Audit.Auditing;
using ServiceControl.Audit.Persistence;
class PostgreSQLFailedAuditStorage : IFailedAuditStorage
{
public Task<int> GetFailedAuditsCount() => Task.FromResult(0);
public Task ProcessFailedMessages(Func<FailedTransportMessage, Func<CancellationToken, Task>, CancellationToken, Task> onMessage, CancellationToken cancellationToken) => throw new NotImplementedException();
public Task SaveFailedAuditImport(FailedAuditImport message) => throw new NotImplementedException();
}
Loading
Loading