@@ -41,7 +41,7 @@ public QueueMigrateCommand(string queueName, BrokerConnection brokerConnection,
41
41
migrationState = new MigrationState ( ) ;
42
42
}
43
43
44
- public Task Run ( CancellationToken cancellationToken = default )
44
+ public async Task Run ( CancellationToken cancellationToken = default )
45
45
{
46
46
console . WriteLine ( $ "Starting migration of '{ queueName } '") ;
47
47
@@ -54,7 +54,7 @@ public Task Run(CancellationToken cancellationToken = default)
54
54
switch ( migrationState . CurrentStage )
55
55
{
56
56
case MigrationStage . Starting :
57
- migrationState . CurrentStage = MoveMessagesToHoldingQueue ( connection , cancellationToken ) ;
57
+ migrationState . CurrentStage = await MoveMessagesToHoldingQueue ( connection , cancellationToken ) . ConfigureAwait ( false ) ;
58
58
break ;
59
59
case MigrationStage . MessagesMovedToHoldingQueue :
60
60
migrationState . CurrentStage = DeleteMainQueue ( connection ) ;
@@ -73,18 +73,16 @@ public Task Run(CancellationToken cancellationToken = default)
73
73
break ;
74
74
}
75
75
}
76
-
77
- return Task . CompletedTask ;
78
76
}
79
77
80
- MigrationStage MoveMessagesToHoldingQueue ( IConnection connection , CancellationToken cancellationToken )
78
+ async Task < MigrationStage > MoveMessagesToHoldingQueue ( IConnection connection , CancellationToken cancellationToken )
81
79
{
82
- using var channel = connection . CreateModel ( ) ;
80
+ using var channel = await connection . CreateChannelAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
83
81
84
82
console . WriteLine ( $ "Migrating messages from '{ queueName } ' to '{ holdingQueueName } '") ;
85
83
86
84
// does the holding queue need to be quorum?
87
- channel . QueueDeclare ( holdingQueueName , true , false , false , quorumQueueArguments ) ;
85
+ await channel . QueueDeclareAsync ( holdingQueueName , true , false , false , quorumQueueArguments ) . ConfigureAwait ( false ) ;
88
86
console . WriteLine ( $ "Created queue '{ holdingQueueName } '") ;
89
87
90
88
// bind the holding queue to the exchange of the queue under migration
@@ -140,25 +138,25 @@ MigrationStage CreateMainQueueAsQuorum(IConnection connection)
140
138
return MigrationStage . QuorumQueueCreated ;
141
139
}
142
140
143
- MigrationStage RestoreMessages ( IConnection connection , CancellationToken cancellationToken )
141
+ async Task < MigrationStage > RestoreMessages ( IConnection connection , CancellationToken cancellationToken )
144
142
{
145
- using var channel = connection . CreateModel ( ) ;
143
+ using var channel = await connection . CreateChannelAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
146
144
147
- channel . QueueBind ( queueName , queueName , string . Empty ) ;
145
+ await channel . QueueBindAsync ( queueName , queueName , string . Empty , cancellationToken : cancellationToken ) . ConfigureAwait ( false ) ;
148
146
console . WriteLine ( $ "Re-bound '{ queueName } ' to exchange '{ queueName } '") ;
149
147
150
- channel . QueueUnbind ( holdingQueueName , queueName , string . Empty ) ;
148
+ await channel . QueueUnbindAsync ( holdingQueueName , queueName , string . Empty , cancellationToken : cancellationToken ) . ConfigureAwait ( false ) ;
151
149
console . WriteLine ( $ "Unbound '{ holdingQueueName } ' from exchange '{ queueName } '") ;
152
150
153
151
var messageIds = new Dictionary < string , string > ( ) ;
154
152
155
153
// move all messages in the holding queue back to the main queue
156
- channel . ConfirmSelect ( ) ;
154
+ await channel . ConfirmSelectAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
157
155
158
- var numMessageMovedBackToMain = ProcessMessages (
156
+ var numMessageMovedBackToMain = await ProcessMessages (
159
157
channel ,
160
158
holdingQueueName ,
161
- message =>
159
+ async ( message , token ) =>
162
160
{
163
161
string ? messageIdString = null ;
164
162
@@ -175,55 +173,55 @@ MigrationStage RestoreMessages(IConnection connection, CancellationToken cancell
175
173
}
176
174
}
177
175
178
- channel . BasicPublish ( string . Empty , queueName , message . BasicProperties , message . Body ) ;
179
- channel . WaitForConfirmsOrDie ( ) ;
176
+ await channel . BasicPublishAsync ( string . Empty , queueName , new BasicProperties ( message . BasicProperties ) , message . Body , cancellationToken : token ) . ConfigureAwait ( false ) ;
177
+ await channel . WaitForConfirmsOrDieAsync ( token ) . ConfigureAwait ( false ) ;
180
178
181
179
if ( messageIdString != null )
182
180
{
183
181
messageIds . Add ( messageIdString , string . Empty ) ;
184
182
}
185
183
} ,
186
- cancellationToken ) ;
184
+ cancellationToken ) . ConfigureAwait ( false ) ;
187
185
188
186
console . WriteLine ( $ "Moved { numMessageMovedBackToMain } messages from '{ holdingQueueName } ' to '{ queueName } '") ;
189
187
190
188
return MigrationStage . MessagesMovedToQuorumQueue ;
191
189
}
192
190
193
- MigrationStage CleanUpHoldingQueue ( IConnection connection )
191
+ async Task < MigrationStage > CleanUpHoldingQueue ( IConnection connection , CancellationToken cancellationToken )
194
192
{
195
- using var channel = connection . CreateModel ( ) ;
193
+ using var channel = await connection . CreateChannelAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
196
194
197
- if ( channel . MessageCount ( holdingQueueName ) != 0 )
195
+ if ( await channel . MessageCountAsync ( holdingQueueName , cancellationToken ) . ConfigureAwait ( false ) != 0 )
198
196
{
199
197
throw new Exception ( $ "'{ holdingQueueName } ' is not empty and was not deleted. Run the command again to retry message processing.") ;
200
198
}
201
199
202
- channel . QueueDelete ( holdingQueueName ) ;
200
+ await channel . QueueDeleteAsync ( holdingQueueName , cancellationToken : cancellationToken ) . ConfigureAwait ( false ) ;
203
201
console . WriteLine ( $ "Removed '{ holdingQueueName } '") ;
204
202
205
203
return MigrationStage . CleanUpCompleted ;
206
204
}
207
205
208
- uint ProcessMessages ( IModel channel , string sourceQueue , Action < BasicGetResult > onMoveMessage , CancellationToken cancellationToken )
206
+ async Task < uint > ProcessMessages ( IChannel channel , string sourceQueue , Func < BasicGetResult , CancellationToken , Task > onMoveMessage , CancellationToken cancellationToken )
209
207
{
210
- var messageCount = channel . MessageCount ( sourceQueue ) ;
208
+ var messageCount = await channel . MessageCountAsync ( sourceQueue , cancellationToken ) . ConfigureAwait ( false ) ;
211
209
212
210
for ( var i = 0 ; i < messageCount ; i ++ )
213
211
{
214
212
cancellationToken . ThrowIfCancellationRequested ( ) ;
215
213
216
- var message = channel . BasicGet ( sourceQueue , false ) ;
214
+ var message = await channel . BasicGetAsync ( sourceQueue , false , cancellationToken ) . ConfigureAwait ( false ) ;
217
215
218
216
if ( message == null )
219
217
{
220
218
// Queue is empty
221
219
break ;
222
220
}
223
221
224
- onMoveMessage ( message ) ;
222
+ await onMoveMessage ( message , cancellationToken ) . ConfigureAwait ( false ) ;
225
223
226
- channel . BasicAck ( message . DeliveryTag , false ) ;
224
+ await channel . BasicAckAsync ( message . DeliveryTag , false , cancellationToken ) . ConfigureAwait ( false ) ;
227
225
}
228
226
229
227
return messageCount ;
0 commit comments