Skip to content

[BUG] WebSocketException leaks while receiving messages using Azure Service Bus client SDK #50903

Open
@Veslo5

Description

@Veslo5

Library name and version

Azure.Messaging.ServiceBus 7.18.4

Describe the bug

We are using ServiceBusProcessor to receive messages from Azure Service Bus, and encountering problem where we get WebSocketException with SocketException as inner exception instead of ServiceBusException as documented in official documentation. The communication runs via AMQP protocol and we found that only SocketException is handled in code.

Expected behavior

Instead of WebSocketException we get ServiceBusException.

Actual behavior

WebSocketException is thrown after several retries via default retry policy.

Stacktrace:

ReceiveBatchAsync Exception: System.Net.WebSockets.WebSocketException (0x80004005): Unable to connect to the remote server
---> System.Net.Http.HttpRequestException: An error occurred while sending the request.
---> System.IO.IOException: Unable to read data from the transport connection: Connection reset by peer.
---> System.Net.Sockets.SocketException (104): Connection reset by peer
at System.Net.Sockets.NetworkStream.ReadAsync(Memory`1 buffer, CancellationToken cancellationToken)
at System.Net.Security.SslStream.EnsureFullTlsFrameAsync[TIOAdapter](CancellationToken cancellationToken, Int32 estimatedSize)
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Net.Sockets.SocketAsyncEventArgs.TransferCompletionCallbackCore(Int32 bytesTransferred, Memory`1 socketAddress, SocketFlags receivedFlags, SocketError socketError)
at System.Net.Sockets.SocketAsyncEngine.System.Threading.IThreadPoolWorkItem.Execute()
at System.Threading.ThreadPoolWorkQueue.Dispatch()
at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()
--- End of stack trace from previous location ---

--- End of inner exception stack trace ---
at System.Net.Security.SslStream.EnsureFullTlsFrameAsync[TIOAdapter](CancellationToken cancellationToken, Int32 estimatedSize)
at System.Runtime.CompilerServices.PoolingAsyncValueTaskMethodBuilder`1.StateMachineBox`1.System.Threading.Tasks.Sources.IValueTaskSource<TResult>.GetResult(Int16 token)
at System.Net.Security.SslStream.ReadAsyncInternal[TIOAdapter](Memory`1 buffer, CancellationToken cancellationToken)
at System.Runtime.CompilerServices.PoolingAsyncValueTaskMethodBuilder`1.StateMachineBox`1.System.Threading.Tasks.Sources.IValueTaskSource<TResult>.GetResult(Int16 token)
at System.Net.Http.HttpConnection.InitialFillAsync(Boolean async)
at System.Net.Http.HttpConnection.SendAsync(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)
--- End of inner exception stack trace ---
at System.Net.Http.HttpConnection.SendAsync(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)
at System.Net.Http.HttpConnectionPool.SendWithVersionDetectionAndRetryAsync(HttpRequestMessage request, Boolean async, Boolean doRequestAuth, CancellationToken cancellationToken)
at System.Net.Http.DiagnosticsHandler.SendAsyncCore(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)
at System.Net.Http.RedirectHandler.SendAsync(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)
at System.Net.WebSockets.WebSocketHandle.ConnectAsync(Uri uri, HttpMessageInvoker invoker, CancellationToken cancellationToken, ClientWebSocketOptions options)
at System.Net.WebSockets.WebSocketHandle.ConnectAsync(Uri uri, HttpMessageInvoker invoker, CancellationToken cancellationToken, ClientWebSocketOptions options)
at System.Net.WebSockets.ClientWebSocket.ConnectAsyncCore(Uri uri, HttpMessageInvoker invoker, CancellationToken cancellationToken)
at Microsoft.Azure.Amqp.AsyncResult.End[TAsyncResult](IAsyncResult result)
at Microsoft.Azure.Amqp.Transport.AmqpTransportInitiator.ConnectAsyncResult.End(IAsyncResult result)
at Microsoft.Azure.Amqp.Transport.AmqpTransportInitiator.<>c.<ConnectAsync>b__17_1(IAsyncResult r)
at System.Threading.Tasks.TaskFactory`1.FromAsyncCoreLogic(IAsyncResult iar, Func`2 endFunction, Action`1 endAction, Task`1 promise, Boolean requiresSynchronization)
--- End of stack trace from previous location ---
at Azure.Messaging.ServiceBus.Amqp.AmqpConnectionScope.CreateAndOpenConnectionAsync(Version amqpVersion, Uri serviceEndpoint, Uri connectionEndpoint, ServiceBusTransportType transportType, IWebProxy proxy, String scopeIdentifier, TimeSpan timeout)
at Microsoft.Azure.Amqp.FaultTolerantAmqpObject`1.OnCreateAsync(TimeSpan timeout, CancellationToken cancellationToken)
at Microsoft.Azure.Amqp.Singleton`1.GetOrCreateAsync(TimeSpan timeout, CancellationToken cancellationToken)
at Microsoft.Azure.Amqp.Singleton`1.GetOrCreateAsync(TimeSpan timeout, CancellationToken cancellationToken)
at Azure.Messaging.ServiceBus.Amqp.AmqpConnectionScope.OpenReceiverLinkAsync(String identifier, String entityPath, TimeSpan timeout, UInt32 prefetchCount, ServiceBusReceiveMode receiveMode, String sessionId, Boolean isSessionReceiver, CancellationToken cancellationToken)
at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.OpenReceiverLinkAsync(TimeSpan timeout, UInt32 prefetchCount, ServiceBusReceiveMode receiveMode, String identifier, CancellationToken cancellationToken)
at Microsoft.Azure.Amqp.FaultTolerantAmqpObject`1.OnCreateAsync(TimeSpan timeout, CancellationToken cancellationToken)
at Microsoft.Azure.Amqp.Singleton`1.GetOrCreateAsync(TimeSpan timeout, CancellationToken cancellationToken)
at Azure.Messaging.ServiceBus.ServiceBusReceiver.ReceiveMessagesAsync(Int32 maxMessages, Nullable`1 maxWaitTime, Boolean isProcessor, CancellationToken cancellationToken)
at Azure.Messaging.ServiceBus.ServiceBusReceiver.ReceiveMessagesAsync(Int32 maxMessages, Nullable`1 maxWaitTime, Boolean isProcessor, CancellationToken cancellationToken)
at Microsoft.Azure.Amqp.Singleton`1.GetOrCreateAsync(TimeSpan timeout, CancellationToken cancellationToken)
at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.ReceiveMessagesAsyncInternal(Int32 maxMessages, Nullable`1 maxWaitTime, TimeSpan timeout, CancellationToken cancellationToken)
at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.ReceiveMessagesAsyncInternal(Int32 maxMessages, Nullable`1 maxWaitTime, TimeSpan timeout, CancellationToken cancellationToken)
at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.<>c.<<ReceiveMessagesAsync>b__44_0>d.MoveNext()
--- End of stack trace from previous location ---
at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose)
at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose)
at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.ReceiveMessagesAsync(Int32 maxMessages, Nullable`1 maxWaitTime, CancellationToken cancellationToken)
at Azure.Messaging.ServiceBus.ServiceBusReceiver.ReceiveMessagesAsync(Int32 maxMessages, Nullable`1 maxWaitTime, Boolean isProcessor, CancellationToken cancellationToken).

Reproduction Steps

This scenario cannot be reliably reproduced, as it occurs randomly.

Environment

Runtime .NET 8.0.411
Azure Service Bus Standard

Metadata

Metadata

Assignees

Labels

ClientThis issue is related to a non-management packageService Buscustomer-reportedIssues that are reported by GitHub users external to the Azure organization.issue-addressedWorkflow: The Azure SDK team believes it to be addressed and ready to close.questionThe issue doesn't require a change to the product in order to be resolved. Most issues start as that

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions