@@ -163,31 +163,36 @@ def _declare_queue(
163
163
body = {}
164
164
args : dict [str , Any ] = {}
165
165
166
- body ["auto_delete" ] = queue_specification .is_auto_delete
167
- body ["durable" ] = queue_specification .is_durable
168
-
169
166
if queue_specification .dead_letter_exchange is not None :
170
167
args ["x-dead-letter-exchange" ] = queue_specification .dead_letter_exchange
171
168
if queue_specification .dead_letter_routing_key is not None :
172
169
args ["x-dead-letter-routing-key" ] = (
173
170
queue_specification .dead_letter_routing_key
174
171
)
175
- if queue_specification .overflow is not None :
176
- args ["x-overflow" ] = queue_specification .overflow
172
+ if queue_specification .overflow_behaviour is not None :
173
+ args ["x-overflow" ] = queue_specification .overflow_behaviour
177
174
if queue_specification .max_len is not None :
178
175
args ["x-max-length" ] = queue_specification .max_len
179
176
if queue_specification .max_len_bytes is not None :
180
177
args ["x-max-length-bytes" ] = queue_specification .max_len_bytes
181
178
if queue_specification .message_ttl is not None :
182
- args ["x-message-ttl" ] = queue_specification .message_ttl
183
- if queue_specification .expires is not None :
184
- args ["x-expires" ] = queue_specification .expires
179
+ args ["x-message-ttl" ] = int (
180
+ queue_specification .message_ttl .total_seconds () * 1000
181
+ )
182
+ if queue_specification .auto_expires is not None :
183
+ args ["x-expires" ] = int (
184
+ queue_specification .auto_expires .total_seconds () * 1000
185
+ )
185
186
if queue_specification .single_active_consumer is not None :
186
187
args ["x-single-active-consumer" ] = (
187
188
queue_specification .single_active_consumer
188
189
)
189
190
190
191
if isinstance (queue_specification , ClassicQueueSpecification ):
192
+ body ["auto_delete" ] = queue_specification .is_auto_delete
193
+ body ["durable" ] = queue_specification .is_durable
194
+ body ["exclusive" ] = queue_specification .is_exclusive
195
+
191
196
args ["x-queue-type" ] = QueueType .classic .value
192
197
if queue_specification .maximum_priority is not None :
193
198
args ["x-maximum-priority" ] = queue_specification .maximum_priority
@@ -203,12 +208,17 @@ def _declare_queue(
203
208
)
204
209
205
210
if queue_specification .quorum_initial_group_size is not None :
206
- args ["x-initial- quorum-group-size" ] = (
211
+ args ["x-quorum-initial -group-size" ] = (
207
212
queue_specification .quorum_initial_group_size
208
213
)
209
214
210
- if queue_specification .cluster_target_size is not None :
211
- args ["cluster_target_size" ] = queue_specification .cluster_target_size
215
+ if queue_specification .cluster_target_group_size is not None :
216
+ args ["x-quorum-target-group-size" ] = (
217
+ queue_specification .cluster_target_group_size
218
+ )
219
+
220
+ if queue_specification .leader_locator is not None :
221
+ args ["x-queue-leader-locator" ] = queue_specification .leader_locator
212
222
213
223
body ["arguments" ] = args # type: ignore
214
224
@@ -226,22 +236,26 @@ def _declare_stream(
226
236
if stream_specification .max_len_bytes is not None :
227
237
args ["x-max-length-bytes" ] = stream_specification .max_len_bytes
228
238
229
- if stream_specification .max_time_retention is not None :
230
- args ["x-max-time-retention" ] = stream_specification .max_time_retention
239
+ if stream_specification .max_age is not None :
240
+ args ["x-max-age" ] = (
241
+ str (int (stream_specification .max_age .total_seconds ())) + "s"
242
+ )
231
243
232
- if stream_specification .max_segment_size_in_bytes is not None :
233
- args ["x-max-segment-size-in -bytes" ] = (
234
- stream_specification .max_segment_size_in_bytes
244
+ if stream_specification .stream_max_segment_size_bytes is not None :
245
+ args ["x-stream- max-segment-size-bytes" ] = (
246
+ stream_specification .stream_max_segment_size_bytes
235
247
)
236
248
237
- if stream_specification .filter_size is not None :
238
- args ["x-filter-size" ] = stream_specification .filter_size
249
+ if stream_specification .stream_filter_size_bytes is not None :
250
+ args ["x-stream-filter-size-bytes" ] = (
251
+ stream_specification .stream_filter_size_bytes
252
+ )
239
253
240
254
if stream_specification .initial_group_size is not None :
241
255
args ["x-initial-group-size" ] = stream_specification .initial_group_size
242
256
243
257
if stream_specification .leader_locator is not None :
244
- args ["x-leader-locator" ] = stream_specification .leader_locator
258
+ args ["x-queue- leader-locator" ] = stream_specification .leader_locator
245
259
246
260
body ["arguments" ] = args
247
261
@@ -276,7 +290,6 @@ def delete_queue(self, name: str) -> None:
276
290
def _validate_reponse_code (
277
291
self , response_code : int , expected_response_codes : list [int ]
278
292
) -> None :
279
- logger .debug ("response_code received: " + str (response_code ))
280
293
if response_code == CommonValues .response_code_409 .value :
281
294
raise ValidationCodeException ("ErrPreconditionFailed" )
282
295
0 commit comments