-
Notifications
You must be signed in to change notification settings - Fork 49
[SPIKE] Using Postgress as the audit instance persister #5106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
johnsimons
wants to merge
41
commits into
master
Choose a base branch
from
john/postgresql
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
41 commits
Select commit
Hold shift + click to select a range
b4f0277
Added a Postgres persister for Audit
johnsimons 22a5bcd
Renaming to uppercase SQL
johnsimons 5f091b1
Remove to readd
johnsimons 684d265
Adding it back
johnsimons 49b3dd1
Moving things around
johnsimons 6525de8
Using namespace file scoped
johnsimons bf64782
We have something working
johnsimons 7adb46d
Adding index and trigger
johnsimons 99e8c61
Returning results
johnsimons 7c2289c
A bit better
johnsimons 76e5c9a
Some refactoring
johnsimons 176fba3
Adding prepare for the inserts
johnsimons 04c887f
Adding batch
johnsimons 91d47e5
remove explicit serialization to json
johnsimons 8c7be1a
Fixed a few bugs
johnsimons 48c2fe4
Implemented more methods
johnsimons 5642bf3
Add url for message body
johnsimons 2e96879
Fix issue with body retrieval api
johnsimons 9fc490e
Fixed issue with pool of connections
johnsimons 772bbbd
Remove logging, it is a bit noisy
johnsimons 2d00684
Adding more granular indexes and adjusting autovacuum settings
johnsimons db13583
Adding a retention cleanup background service
johnsimons 7db5472
Enforce body storage max size
johnsimons ecf20cc
Implemented saga logic
johnsimons 315cf2a
Fixed known_endpoints insertions
johnsimons d7deac3
Updated retention cleanup to include saga snapshots and known endpoints
johnsimons acd267f
Remove warnings
johnsimons f71d354
Added missing index for audit counts
johnsimons 69769a0
Fix approval file
johnsimons d18baee
Fix casing name
johnsimons 9d8ee86
Disable persistence for the installer
johnsimons 75f593c
Using websearch_to_tsquery instead
johnsimons c297f3c
Fixed but with cleanup retention query
johnsimons 432194d
Small refactor
johnsimons ac80990
Add otel metrics for Npgsql
johnsimons 11c12b8
Rollback otel for Postgres
johnsimons f247ce3
Prevent deadlocks
johnsimons 249d732
Try this way
johnsimons 3334aa3
Ensure connection is disposed
johnsimons 892799a
Roll this back
johnsimons c502fab
Made known_endpoint table insert only
johnsimons File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
4 changes: 4 additions & 0 deletions
4
src/ServiceControl.Audit.Persistence.PostgreSQL/.editorconfig
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
[*.cs] | ||
|
||
# Justification: ServiceControl app has no synchronization context | ||
dotnet_diagnostic.CA2007.severity = none |
11 changes: 11 additions & 0 deletions
11
...rviceControl.Audit.Persistence.PostgreSQL/BodyStorage/PostgreSQLAttachmentsBodyStorage.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
namespace ServiceControl.Audit.Persistence.PostgreSQL.BodyStorage; | ||
|
||
using System.IO; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using ServiceControl.Audit.Auditing.BodyStorage; | ||
class PostgreSQLAttachmentsBodyStorage : IBodyStorage | ||
{ | ||
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken) => throw new System.NotImplementedException(); | ||
public Task<StreamResult> TryFetch(string bodyId, CancellationToken cancellationToken) => throw new System.NotImplementedException(); | ||
} |
19 changes: 19 additions & 0 deletions
19
src/ServiceControl.Audit.Persistence.PostgreSQL/DatabaseConfiguration.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
namespace ServiceControl.Audit.Persistence.PostgreSQL; | ||
|
||
using System; | ||
|
||
class DatabaseConfiguration( | ||
string databaseName, | ||
string adminDatabaseName, | ||
int expirationProcessTimerInSeconds, | ||
TimeSpan auditRetentionPeriod, | ||
int maxBodySizeToStore, | ||
string connectionString) | ||
{ | ||
public string Name { get; } = databaseName; | ||
public string AdminDatabaseName { get; } = adminDatabaseName; | ||
public int ExpirationProcessTimerInSeconds { get; } = expirationProcessTimerInSeconds; | ||
public TimeSpan AuditRetentionPeriod { get; } = auditRetentionPeriod; | ||
public int MaxBodySizeToStore { get; } = maxBodySizeToStore; | ||
public string ConnectionString { get; } = connectionString; | ||
} |
265 changes: 265 additions & 0 deletions
265
src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLAuditDataStore.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,265 @@ | ||
|
||
namespace ServiceControl.Audit.Persistence.PostgreSQL; | ||
|
||
using System; | ||
using System.Collections.Generic; | ||
using System.Text.Json; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using Microsoft.IO; | ||
using Npgsql; | ||
using NServiceBus; | ||
using ServiceControl.Audit.Auditing; | ||
using ServiceControl.Audit.Auditing.MessagesView; | ||
using ServiceControl.Audit.Infrastructure; | ||
using ServiceControl.Audit.Monitoring; | ||
using ServiceControl.Audit.Persistence; | ||
using ServiceControl.Audit.Persistence.Infrastructure; | ||
using ServiceControl.SagaAudit; | ||
|
||
|
||
class PostgreSQLAuditDataStore(PostgreSQLConnectionFactory connectionFactory) : IAuditDataStore | ||
{ | ||
static readonly RecyclableMemoryStreamManager manager = new RecyclableMemoryStreamManager(); | ||
|
||
public async Task<MessageBodyView> GetMessageBody(string messageId, CancellationToken cancellationToken) | ||
{ | ||
await using var conn = await connectionFactory.OpenConnection(cancellationToken); | ||
await using var cmd = new NpgsqlCommand(@" | ||
select headers, body from processed_messages | ||
where message_id = @message_id | ||
LIMIT 1;", conn); | ||
cmd.Parameters.AddWithValue("message_id", messageId); | ||
await using var reader = await cmd.ExecuteReaderAsync(System.Data.CommandBehavior.SequentialAccess, cancellationToken); | ||
if (await reader.ReadAsync(cancellationToken)) | ||
{ | ||
var contentType = reader.GetFieldValue<Dictionary<string, string>>(reader.GetOrdinal("headers")).GetValueOrDefault(Headers.ContentType, "text/xml"); | ||
using var stream = await reader.GetStreamAsync(reader.GetOrdinal("body"), cancellationToken); | ||
var responseStream = manager.GetStream(); | ||
await stream.CopyToAsync(responseStream, cancellationToken); | ||
responseStream.Position = 0; | ||
return MessageBodyView.FromStream(responseStream, contentType, (int)stream.Length, string.Empty); | ||
} | ||
return MessageBodyView.NotFound(); | ||
} | ||
|
||
public Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange, CancellationToken cancellationToken) | ||
{ | ||
var builder = new PostgreSQLMessagesQueryBuilder() | ||
.WithSystemMessages(includeSystemMessages) | ||
.WithTimeSentRange(timeSentRange) | ||
.WithSorting(sortInfo) | ||
.WithPaging(pagingInfo); | ||
return ExecuteMessagesQuery(builder, cancellationToken); | ||
} | ||
|
||
public async Task<QueryResult<IList<AuditCount>>> QueryAuditCounts(string endpointName, CancellationToken cancellationToken) | ||
{ | ||
var startDate = DateTime.UtcNow.AddDays(-30); | ||
var endDate = DateTime.UtcNow; | ||
await using var connection = await connectionFactory.OpenConnection(cancellationToken); | ||
await using var cmd = new NpgsqlCommand(@" | ||
SELECT | ||
DATE_TRUNC('day', processed_at) AS day, | ||
COUNT(*) AS count | ||
FROM processed_messages | ||
WHERE receiving_endpoint_name = @endpoint_name | ||
AND processed_at BETWEEN @start_date AND @end_date | ||
GROUP BY day | ||
ORDER BY day;", connection); | ||
cmd.Parameters.AddWithValue("endpoint_name", endpointName); | ||
cmd.Parameters.AddWithValue("start_date", startDate); | ||
cmd.Parameters.AddWithValue("end_date", endDate); | ||
|
||
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken); | ||
var results = new List<AuditCount>(); | ||
while (await reader.ReadAsync(cancellationToken)) | ||
{ | ||
results.Add(new AuditCount | ||
{ | ||
UtcDate = reader.GetDateTime(reader.GetOrdinal("day")), | ||
Count = reader.GetInt32(reader.GetOrdinal("count")) | ||
}); | ||
} | ||
|
||
return new QueryResult<IList<AuditCount>>(results, new QueryStatsInfo(string.Empty, results.Count)); | ||
} | ||
|
||
public async Task<QueryResult<IList<KnownEndpointsView>>> QueryKnownEndpoints(CancellationToken cancellationToken) | ||
{ | ||
// We need to return all the data from known_endpoints table in postgress | ||
await using var connection = await connectionFactory.OpenConnection(cancellationToken); | ||
await using var cmd = new NpgsqlCommand(@" | ||
SELECT | ||
id, | ||
name, | ||
host_id, | ||
host, | ||
last_seen | ||
FROM known_endpoints;", connection); | ||
|
||
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken); | ||
var results = new List<KnownEndpointsView>(); | ||
while (await reader.ReadAsync(cancellationToken)) | ||
{ | ||
var name = reader.GetString(reader.GetOrdinal("name")); | ||
var hostId = reader.GetGuid(reader.GetOrdinal("host_id")); | ||
var host = reader.GetString(reader.GetOrdinal("host")); | ||
var lastSeen = reader.GetDateTime(reader.GetOrdinal("last_seen")); | ||
results.Add(new KnownEndpointsView | ||
{ | ||
Id = DeterministicGuid.MakeId(name, hostId.ToString()), | ||
EndpointDetails = new EndpointDetails | ||
{ | ||
Host = host, | ||
HostId = hostId, | ||
Name = name | ||
}, | ||
HostDisplayName = host | ||
}); | ||
} | ||
|
||
return new QueryResult<IList<KnownEndpointsView>>(results, new QueryStatsInfo(string.Empty, results.Count)); | ||
} | ||
|
||
public Task<QueryResult<IList<MessagesView>>> QueryMessages(string searchParam, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default) | ||
{ | ||
var builder = new PostgreSQLMessagesQueryBuilder() | ||
.WithSearch(searchParam) | ||
.WithTimeSentRange(timeSentRange) | ||
.WithSorting(sortInfo) | ||
.WithPaging(pagingInfo); | ||
return ExecuteMessagesQuery(builder, cancellationToken); | ||
} | ||
|
||
public Task<QueryResult<IList<MessagesView>>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken) | ||
{ | ||
var builder = new PostgreSQLMessagesQueryBuilder() | ||
.WithConversationId(conversationId) | ||
.WithSorting(sortInfo) | ||
.WithPaging(pagingInfo); | ||
return ExecuteMessagesQuery(builder, cancellationToken); | ||
} | ||
|
||
public Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default) | ||
{ | ||
var builder = new PostgreSQLMessagesQueryBuilder() | ||
.WithSystemMessages(includeSystemMessages) | ||
.WithEndpointName(endpointName) | ||
.WithTimeSentRange(timeSentRange) | ||
.WithSorting(sortInfo) | ||
.WithPaging(pagingInfo); | ||
return ExecuteMessagesQuery(builder, cancellationToken); | ||
} | ||
|
||
public Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default) | ||
{ | ||
var builder = new PostgreSQLMessagesQueryBuilder() | ||
.WithSearch(keyword) | ||
.WithEndpointName(endpoint) | ||
.WithTimeSentRange(timeSentRange) | ||
.WithSorting(sortInfo) | ||
.WithPaging(pagingInfo); | ||
return ExecuteMessagesQuery(builder, cancellationToken); | ||
} | ||
|
||
public async Task<QueryResult<SagaHistory>> QuerySagaHistoryById(Guid input, CancellationToken cancellationToken) | ||
{ | ||
await using var conn = await connectionFactory.OpenConnection(cancellationToken); | ||
await using var cmd = new NpgsqlCommand(@" | ||
SELECT | ||
id, | ||
saga_id, | ||
saga_type, | ||
changes | ||
FROM saga_snapshots | ||
WHERE saga_id = @saga_id | ||
LIMIT 1", conn); | ||
|
||
cmd.Parameters.AddWithValue("saga_id", input); | ||
|
||
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken); | ||
|
||
if (await reader.ReadAsync(cancellationToken)) | ||
{ | ||
var changes = GetValue<List<SagaStateChange>>(reader, "changes") ?? []; | ||
var sagaHistory = new SagaHistory | ||
{ | ||
Id = GetValue<Guid>(reader, "id"), | ||
SagaId = GetValue<Guid>(reader, "saga_id"), | ||
SagaType = GetValue<string>(reader, "saga_type"), | ||
Changes = changes | ||
}; | ||
|
||
return new QueryResult<SagaHistory>(sagaHistory, new QueryStatsInfo(string.Empty, changes.Count)); | ||
} | ||
|
||
return QueryResult<SagaHistory>.Empty(); | ||
} | ||
|
||
async Task<QueryResult<IList<MessagesView>>> ExecuteMessagesQuery( | ||
PostgreSQLMessagesQueryBuilder builder, | ||
CancellationToken cancellationToken) | ||
{ | ||
await using var conn = await connectionFactory.OpenConnection(cancellationToken); | ||
var (query, parameters) = builder.Build(); | ||
await using var cmd = new NpgsqlCommand(query, conn); | ||
foreach (var param in parameters) | ||
{ | ||
cmd.Parameters.Add(param); | ||
} | ||
return await ReturnResults(cmd, cancellationToken); | ||
} | ||
|
||
static T? DeserializeOrDefault<T>(Dictionary<string, object> dict, string key, T? defaultValue = default) | ||
{ | ||
if (dict.TryGetValue(key, out var value) && value is JsonElement element && element.ValueKind != JsonValueKind.Null) | ||
{ | ||
try | ||
{ | ||
return JsonSerializer.Deserialize<T>(element); | ||
} | ||
catch { } | ||
} | ||
return defaultValue; | ||
} | ||
|
||
static T GetValue<T>(NpgsqlDataReader reader, string column) | ||
=> reader.GetFieldValue<T>(reader.GetOrdinal(column)); | ||
|
||
async Task<QueryResult<IList<MessagesView>>> ReturnResults(NpgsqlCommand cmd, CancellationToken cancellationToken = default) | ||
{ | ||
var results = new List<MessagesView>(); | ||
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken); | ||
while (await reader.ReadAsync(cancellationToken)) | ||
{ | ||
var headers = GetValue<Dictionary<string, string>>(reader, "headers"); | ||
var messageMetadata = GetValue<Dictionary<string, object>>(reader, "message_metadata"); | ||
|
||
results.Add(new MessagesView | ||
{ | ||
Id = GetValue<string>(reader, "unique_message_id"), | ||
MessageId = GetValue<string>(reader, "message_id"), | ||
MessageType = GetValue<string>(reader, "message_type"), | ||
SendingEndpoint = DeserializeOrDefault<EndpointDetails>(messageMetadata, "SendingEndpoint"), | ||
ReceivingEndpoint = DeserializeOrDefault<EndpointDetails>(messageMetadata, "ReceivingEndpoint"), | ||
TimeSent = GetValue<DateTime>(reader, "time_sent"), | ||
ProcessedAt = GetValue<DateTime>(reader, "processed_at"), | ||
CriticalTime = GetValue<TimeSpan>(reader, "critical_time"), | ||
ProcessingTime = GetValue<TimeSpan>(reader, "processing_time"), | ||
DeliveryTime = GetValue<TimeSpan>(reader, "delivery_time"), | ||
IsSystemMessage = GetValue<bool>(reader, "is_system_message"), | ||
ConversationId = GetValue<string>(reader, "conversation_id"), | ||
Headers = [.. headers], | ||
Status = (MessageStatus)GetValue<int>(reader, "status"), | ||
MessageIntent = (MessageIntent)DeserializeOrDefault(messageMetadata, "MessageIntent", 1), | ||
BodyUrl = string.Format(BodyUrlFormatString, GetValue<string>(reader, "message_id")), | ||
BodySize = DeserializeOrDefault(messageMetadata, "ContentLength", 0), | ||
InvokedSagas = DeserializeOrDefault<List<SagaInfo>>(messageMetadata, "InvokedSagas", []), | ||
OriginatesFromSaga = DeserializeOrDefault<SagaInfo>(messageMetadata, "OriginatesFromSaga") | ||
}); | ||
} | ||
return new QueryResult<IList<MessagesView>>(results, new QueryStatsInfo(string.Empty, results.Count)); | ||
} | ||
public const string BodyUrlFormatString = "/messages/{0}/body"; | ||
} |
47 changes: 47 additions & 0 deletions
47
src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLConnectionFactory.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
namespace ServiceControl.Audit.Persistence.PostgreSQL; | ||
|
||
using Npgsql; | ||
using System.Threading.Tasks; | ||
using System.Threading; | ||
|
||
class PostgreSQLConnectionFactory | ||
{ | ||
readonly NpgsqlDataSource dataSource; | ||
readonly NpgsqlDataSource dataSourceAdmin; | ||
|
||
public PostgreSQLConnectionFactory(DatabaseConfiguration databaseConfiguration) | ||
{ | ||
var dataSourceBuilder = new NpgsqlDataSourceBuilder(databaseConfiguration.ConnectionString) | ||
{ | ||
Name = "ServiceControl.Audit" | ||
}; | ||
//dataSourceBuilder.UseLoggerFactory(loggerFactory); | ||
dataSourceBuilder.EnableDynamicJson(); | ||
dataSource = dataSourceBuilder.Build(); | ||
|
||
var builder = new NpgsqlConnectionStringBuilder(databaseConfiguration.ConnectionString) | ||
{ | ||
Database = databaseConfiguration.AdminDatabaseName | ||
}; | ||
var dataSourceBuilderAdmin = new NpgsqlDataSourceBuilder(builder.ConnectionString) | ||
{ | ||
Name = "ServiceControl.Audit-admin", | ||
}; | ||
//dataSourceBuilderAdmin.UseLoggerFactory(loggerFactory); | ||
dataSourceBuilderAdmin.EnableDynamicJson(); | ||
dataSourceAdmin = dataSourceBuilderAdmin.Build(); | ||
} | ||
|
||
public async Task<NpgsqlConnection> OpenConnection(CancellationToken cancellationToken) | ||
{ | ||
var conn = await dataSource.OpenConnectionAsync(cancellationToken); | ||
return conn; | ||
} | ||
|
||
public async Task<NpgsqlConnection> OpenAdminConnection(CancellationToken cancellationToken) | ||
{ | ||
var conn = dataSourceAdmin.CreateConnection(); | ||
await conn.OpenAsync(cancellationToken); | ||
return conn; | ||
} | ||
} |
13 changes: 13 additions & 0 deletions
13
src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLFailedAuditStorage.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
namespace ServiceControl.Audit.Persistence.PostgreSQL; | ||
|
||
using System; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using ServiceControl.Audit.Auditing; | ||
using ServiceControl.Audit.Persistence; | ||
class PostgreSQLFailedAuditStorage : IFailedAuditStorage | ||
{ | ||
public Task<int> GetFailedAuditsCount() => Task.FromResult(0); | ||
public Task ProcessFailedMessages(Func<FailedTransportMessage, Func<CancellationToken, Task>, CancellationToken, Task> onMessage, CancellationToken cancellationToken) => throw new NotImplementedException(); | ||
public Task SaveFailedAuditImport(FailedAuditImport message) => throw new NotImplementedException(); | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't figure out a way to pass the stream down the pipeline without allocating.
It's possible that our abstraction is preventing it, or perhaps it's me.
@SzymonPobiega @danielmarbach any thoughts?