-
Notifications
You must be signed in to change notification settings - Fork 49
[SPIKE] Using Postgress as the audit instance persister #5106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not really required to be implemented, given that we are storing the message body with the message row.
var contentType = reader.GetFieldValue<Dictionary<string, string>>(reader.GetOrdinal("headers")).GetValueOrDefault(Headers.ContentType, "text/xml"); | ||
using var stream = await reader.GetStreamAsync(reader.GetOrdinal("body"), cancellationToken); | ||
var responseStream = manager.GetStream(); | ||
await stream.CopyToAsync(responseStream, cancellationToken); | ||
responseStream.Position = 0; | ||
return MessageBodyView.FromStream(responseStream, contentType, (int)stream.Length, string.Empty); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't figure out a way to pass the stream down the pipeline without allocating.
It's possible that our abstraction is preventing it, or perhaps it's me.
@SzymonPobiega @danielmarbach any thoughts?
cmd.CommandText = @" | ||
INSERT INTO known_endpoints ( | ||
id, name, host_id, host, last_seen | ||
) VALUES ( | ||
@id, @name, @host_id, @host, @last_seen | ||
) | ||
ON CONFLICT (id) DO UPDATE SET | ||
last_seen = GREATEST(known_endpoints.last_seen, EXCLUDED.last_seen);"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After running a test scaling out the audit instance, this query caused too many deadlocks.
This would have to be redesigned to prevent that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be fixed with:
cmd.CommandText = @" | |
INSERT INTO known_endpoints ( | |
id, name, host_id, host, last_seen | |
) VALUES ( | |
@id, @name, @host_id, @host, @last_seen | |
) | |
ON CONFLICT (id) DO UPDATE SET | |
last_seen = GREATEST(known_endpoints.last_seen, EXCLUDED.last_seen);"; | |
cmd.CommandText = @" | |
SELECT pg_advisory_xact_lock(hashtext(@id)); | |
INSERT INTO known_endpoints ( | |
id, name, host_id, host, last_seen | |
) VALUES ( | |
@id, @name, @host_id, @host, @last_seen | |
) | |
ON CONFLICT (id) DO UPDATE SET | |
last_seen = GREATEST(known_endpoints.last_seen, EXCLUDED.last_seen);"; |
But this may impact performance based on the number of collisions.
So, not sure if this is the right option or not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the pg_advisory_xact_lock
did not work 😢
INSERT INTO saga_snapshots (id, saga_id, saga_type, changes) | ||
VALUES (@saga_id, @saga_id, @saga_type, @new_change) | ||
ON CONFLICT (id) DO UPDATE SET | ||
changes = COALESCE(saga_snapshots.changes, '[]'::jsonb) || @new_change::jsonb;"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CREATE TABLE IF NOT EXISTS saga_snapshots ( | ||
id UUID PRIMARY KEY, | ||
saga_id UUID, | ||
saga_type TEXT, | ||
changes JSONB, | ||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now() | ||
);", connection)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The design of this table is incorrect, needs to be updated to the same design as the know_endpoints where we use an insert only table for the hot path and then we use a background job to reconcile the results in the more permanent table.
id UUID PRIMARY KEY, | ||
saga_id UUID, | ||
saga_type TEXT, | ||
changes JSONB, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, this design is based on the Raven design where we append changes to the changes column.
But for never ending sagas this could get quite large and in essence not work.
I think we need to keep rotating changes where we keep a maximum of x changes that is it. Maybe a bit smarter so that we always keep the first change, ...
Summary of the PostgreSQL spike
IFailedAuditStorage
is not implemented. I assume we need to create another table where we store messages that could not be imported.last_seen
column, what we found is that on a busy system, there would be too many deadlocks because of collision on theid
. To mitigate this we ended up creating an insert only table and then have a background job that updates the "real" know_endpoints table. This design would also need to be considered for the saga snapshot if there are lots of concurrent updates to the same saga_id.Testing
We used a MBP with an Apple M3 Pro chip and 36GB of memory.
We used RabbitMQ and latest PostgreSQL database, all running in docker.
On a single Audit instance we were able to ingests on average 1400msg/s, this is just ingestion rate, no other queries or cleanup.
When scaled out to 4 instances, we could get the ingestion rate at 5000msg/s.