Skip to content

Commit 40ea644

Browse files
authored
Reduce size of sqs backend futures (#115)
1 parent 5193490 commit 40ea644

File tree

2 files changed

+22
-2
lines changed

2 files changed

+22
-2
lines changed

omniqueue/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,6 @@ rabbitmq-with-message-ids = ["rabbitmq", "dep:svix-ksuid"]
5050
redis = ["dep:bb8", "dep:bb8-redis", "dep:redis", "dep:svix-ksuid"]
5151
redis_cluster = ["redis", "redis/cluster-async"]
5252
redis_sentinel = ["redis", "redis/sentinel"]
53-
sqs = ["dep:aws-config", "dep:aws-sdk-sqs"]
53+
sqs = ["dep:aws-config", "dep:aws-sdk-sqs", "dep:futures-util"]
5454
azure_queue_storage = ["dep:azure_storage", "dep:azure_storage_queues"]
5555
beta = []

omniqueue/src/backends/sqs.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use aws_sdk_sqs::{
1010
types::{error::ReceiptHandleIsInvalid, Message, SendMessageBatchRequestEntry},
1111
Client,
1212
};
13+
use futures_util::FutureExt as _;
1314
use serde::Serialize;
1415

1516
#[allow(deprecated)]
@@ -49,10 +50,17 @@ impl SqsConfigFull {
4950
&aws_config::from_env()
5051
.endpoint_url(&self.queue_dsn)
5152
.load()
53+
// Segment the async state machine. load future is >7kb at the time of writing.
54+
.boxed()
5255
.await,
5356
)
5457
} else {
55-
aws_sdk_sqs::Config::from(&aws_config::load_from_env().await)
58+
aws_sdk_sqs::Config::from(
59+
&aws_config::load_from_env()
60+
// Same as above
61+
.boxed()
62+
.await,
63+
)
5664
}
5765
}
5866
}
@@ -205,6 +213,8 @@ impl Acker for SqsAcker {
205213
.queue_url(&self.queue_dsn)
206214
.receipt_handle(receipt_handle)
207215
.send()
216+
// Segment the async state machine. send future is >5kb at the time of writing.
217+
.boxed()
208218
.await
209219
.map_err(aws_to_queue_error)?;
210220

@@ -241,6 +251,8 @@ impl Acker for SqsAcker {
241251
.queue_url(&self.queue_dsn)
242252
.receipt_handle(receipt_handle)
243253
.send()
254+
// Segment the async state machine. send future is >5kb at the time of writing.
255+
.boxed()
244256
.await
245257
.map_err(aws_to_queue_error)?;
246258

@@ -294,6 +306,8 @@ impl SqsProducer {
294306
.message_body(payload)
295307
.delay_seconds(delay.as_secs().try_into().map_err(QueueError::generic)?)
296308
.send()
309+
// Segment the async state machine. send future is >5kb at the time of writing.
310+
.boxed()
297311
.await
298312
.map_err(aws_to_queue_error)?;
299313

@@ -349,6 +363,8 @@ impl SqsProducer {
349363
.queue_url(&self.queue_dsn)
350364
.set_entries(Some(entries))
351365
.send()
366+
// Segment the async state machine. send future is >5kb at the time of writing.
367+
.boxed()
352368
.await
353369
.map_err(aws_to_queue_error)?;
354370
}
@@ -414,6 +430,8 @@ impl SqsConsumer {
414430
.set_max_number_of_messages(Some(1))
415431
.queue_url(&self.queue_dsn)
416432
.send()
433+
// Segment the async state machine. send future is >5kb at the time of writing.
434+
.boxed()
417435
.await
418436
.map_err(aws_to_queue_error)?;
419437

@@ -438,6 +456,8 @@ impl SqsConsumer {
438456
.set_max_number_of_messages(Some(max_messages.try_into().map_err(QueueError::generic)?))
439457
.queue_url(&self.queue_dsn)
440458
.send()
459+
// Segment the async state machine. send future is >5kb at the time of writing.
460+
.boxed()
441461
.await
442462
.map_err(aws_to_queue_error)?;
443463

0 commit comments

Comments
 (0)