Description
Describe the bug
Hi again! I’ve noticed that AsyncEventingBasicConsumer
provides a CancellationToken
in BasicDeliverEventArgs
during the ReceivedAsync
event. However, that token never actually gets cancelled. If I close the channel while awaiting inside ReceivedAsync, I’d expect the token to trigger so I can bail out gracefully. Instead, the token stays active, and CloseAsync ends up waiting forever for the task to complete.
Reproduction steps
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
var factory = new ConnectionFactory
{
HostName = "localhost",
UserName = "guest",
Password = "guest",
};
const string exchangeName = "TestExchange";
const string queueName = "TestQueue";
// Setup
await using var connection = await factory.CreateConnectionAsync();
var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Topic);
await channel.QueueDeclareAsync(queueName, false, true, true);
await channel.QueueBindAsync(queueName, exchangeName, queueName);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (_, args) =>
{
// Wait until cancellation token is cancelled
await Task.Delay(-1, args.CancellationToken);
};
await channel.BasicConsumeAsync(queueName, true, consumer);
// Publish
await channel.BasicPublishAsync(exchangeName, queueName, "Hello"u8.ToArray());
// Give some time for the message to be processed
await Task.Delay(100);
// Close
await channel.CloseAsync();
await connection.CloseAsync();
Console.WriteLine("All good!");
Expected behavior
The CancellationToken
provided in BasicDeliverEventArgs
is cancelled when the channel closes (or is aborted), allowing the awaiting task to end quickly.
Additional context
I poked around in the code and noticed a token is never actually passed to IAsyncBasicConsumer.HandleBasicDeliverAsync
.
I tried creating a CancellationTokenSource
in the Channel
class that I cancel in CloseAsync
, then using its token when calling HandleBasicDeliverAsync
. That change made things work as expected.
If you want, I can make pull request containing this fix.