Skip to content

Commit 4e57402

Browse files
author
DanielePalaia
committed
review queue arguments
1 parent 22c8f32 commit 4e57402

File tree

3 files changed

+223
-74
lines changed

3 files changed

+223
-74
lines changed

rabbitmq_amqp_python_client/management.py

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -163,34 +163,39 @@ def _declare_queue(
163163
body = {}
164164
args: dict[str, Any] = {}
165165

166-
body["auto_delete"] = queue_specification.is_auto_delete
167-
body["durable"] = queue_specification.is_durable
168-
169166
if queue_specification.dead_letter_exchange is not None:
170167
args["x-dead-letter-exchange"] = queue_specification.dead_letter_exchange
171168
if queue_specification.dead_letter_routing_key is not None:
172169
args["x-dead-letter-routing-key"] = (
173170
queue_specification.dead_letter_routing_key
174171
)
175-
if queue_specification.overflow is not None:
176-
args["x-overflow"] = queue_specification.overflow
172+
if queue_specification.overflow_behaviour is not None:
173+
args["x-overflow"] = queue_specification.overflow_behaviour
177174
if queue_specification.max_len is not None:
178175
args["x-max-length"] = queue_specification.max_len
179176
if queue_specification.max_len_bytes is not None:
180177
args["x-max-length-bytes"] = queue_specification.max_len_bytes
181178
if queue_specification.message_ttl is not None:
182-
args["x-message-ttl"] = queue_specification.message_ttl
183-
if queue_specification.expires is not None:
184-
args["x-expires"] = queue_specification.expires
179+
args["x-message-ttl"] = int(
180+
queue_specification.message_ttl.total_seconds() * 1000
181+
)
182+
if queue_specification.auto_expires is not None:
183+
args["x-expires"] = int(
184+
queue_specification.auto_expires.total_seconds() * 1000
185+
)
185186
if queue_specification.single_active_consumer is not None:
186187
args["x-single-active-consumer"] = (
187188
queue_specification.single_active_consumer
188189
)
189190

190191
if isinstance(queue_specification, ClassicQueueSpecification):
192+
body["auto_delete"] = queue_specification.is_auto_delete
193+
body["durable"] = queue_specification.is_durable
194+
body["exclusive"] = queue_specification.is_exclusive
195+
191196
args["x-queue-type"] = QueueType.classic.value
192-
if queue_specification.maximum_priority is not None:
193-
args["x-maximum-priority"] = queue_specification.maximum_priority
197+
if queue_specification.max_priority is not None:
198+
args["x-max-priority"] = queue_specification.max_priority
194199

195200
if isinstance(queue_specification, QuorumQueueSpecification):
196201
args["x-queue-type"] = QueueType.quorum.value
@@ -203,12 +208,17 @@ def _declare_queue(
203208
)
204209

205210
if queue_specification.quorum_initial_group_size is not None:
206-
args["x-initial-quorum-group-size"] = (
211+
args["x-quorum-initial-group-size"] = (
207212
queue_specification.quorum_initial_group_size
208213
)
209214

210-
if queue_specification.cluster_target_size is not None:
211-
args["cluster_target_size"] = queue_specification.cluster_target_size
215+
if queue_specification.cluster_target_group_size is not None:
216+
args["x-quorum-target-group-size"] = (
217+
queue_specification.cluster_target_group_size
218+
)
219+
220+
if queue_specification.leader_locator is not None:
221+
args["x-queue-leader-locator"] = queue_specification.leader_locator
212222

213223
body["arguments"] = args # type: ignore
214224

@@ -226,22 +236,26 @@ def _declare_stream(
226236
if stream_specification.max_len_bytes is not None:
227237
args["x-max-length-bytes"] = stream_specification.max_len_bytes
228238

229-
if stream_specification.max_time_retention is not None:
230-
args["x-max-time-retention"] = stream_specification.max_time_retention
239+
if stream_specification.max_age is not None:
240+
args["x-max-age"] = (
241+
str(int(stream_specification.max_age.total_seconds())) + "s"
242+
)
231243

232-
if stream_specification.max_segment_size_in_bytes is not None:
233-
args["x-max-segment-size-in-bytes"] = (
234-
stream_specification.max_segment_size_in_bytes
244+
if stream_specification.stream_max_segment_size_bytes is not None:
245+
args["x-stream-max-segment-size-bytes"] = (
246+
stream_specification.stream_max_segment_size_bytes
235247
)
236248

237-
if stream_specification.filter_size is not None:
238-
args["x-filter-size"] = stream_specification.filter_size
249+
if stream_specification.stream_filter_size_bytes is not None:
250+
args["x-stream-filter-size-bytes"] = (
251+
stream_specification.stream_filter_size_bytes
252+
)
239253

240254
if stream_specification.initial_group_size is not None:
241255
args["x-initial-group-size"] = stream_specification.initial_group_size
242256

243257
if stream_specification.leader_locator is not None:
244-
args["x-leader-locator"] = stream_specification.leader_locator
258+
args["x-queue-leader-locator"] = stream_specification.leader_locator
245259

246260
body["arguments"] = args
247261

@@ -276,7 +290,6 @@ def delete_queue(self, name: str) -> None:
276290
def _validate_reponse_code(
277291
self, response_code: int, expected_response_codes: list[int]
278292
) -> None:
279-
logger.debug("response_code received: " + str(response_code))
280293
if response_code == CommonValues.response_code_409.value:
281294
raise ValidationCodeException("ErrPreconditionFailed")
282295

rabbitmq_amqp_python_client/queues.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,44 @@
11
from dataclasses import dataclass
2+
from datetime import timedelta
23
from typing import Optional
34

45

56
@dataclass
67
class QueueSpecification:
78
name: str
8-
expires: Optional[int] = None
9-
message_ttl: Optional[int] = None
10-
overflow: Optional[str] = None
9+
auto_expires: Optional[timedelta] = None
10+
message_ttl: Optional[timedelta] = None
11+
overflow_behaviour: Optional[str] = None
1112
single_active_consumer: Optional[bool] = None
1213
dead_letter_exchange: Optional[str] = None
1314
dead_letter_routing_key: Optional[str] = None
1415
max_len: Optional[int] = None
1516
max_len_bytes: Optional[int] = None
1617
leader_locator: Optional[str] = None
17-
is_auto_delete: bool = False
18-
is_durable: bool = True
1918

2019

2120
@dataclass
2221
class ClassicQueueSpecification(QueueSpecification):
23-
maximum_priority: Optional[int] = None
22+
max_priority: Optional[int] = None
23+
is_auto_delete: bool = False
24+
is_exclusive: bool = False
25+
is_durable: bool = True
2426

2527

2628
@dataclass
2729
class QuorumQueueSpecification(QueueSpecification):
2830
deliver_limit: Optional[int] = None
2931
dead_letter_strategy: Optional[str] = None
3032
quorum_initial_group_size: Optional[int] = None
31-
cluster_target_size: Optional[int] = None
33+
cluster_target_group_size: Optional[int] = None
3234

3335

3436
@dataclass
3537
class StreamSpecification:
3638
name: str
3739
max_len_bytes: Optional[int] = None
38-
max_time_retention: Optional[int] = None
39-
max_segment_size_in_bytes: Optional[int] = None
40-
filter_size: Optional[int] = None
40+
max_age: Optional[timedelta] = None
41+
stream_max_segment_size_bytes: Optional[int] = None
42+
stream_filter_size_bytes: Optional[int] = None
4143
initial_group_size: Optional[int] = None
4244
leader_locator: Optional[str] = None

0 commit comments

Comments
 (0)