diff --git a/Cargo.toml b/Cargo.toml index ff36571a..ce107d32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,7 +57,7 @@ xmltree = "0.11.0" [dev-dependencies] async-std = { version = "1.13.0", features = ["attributes", "tokio1"] } -clap = { version = "4.5.23", features = ["derive"] } +clap = { version = "4.5.27", features = ["derive"] } quickcheck = "1.0.3" [[example]] diff --git a/src/s3/builders/put_object.rs b/src/s3/builders/put_object.rs index 3719340a..731466d1 100644 --- a/src/s3/builders/put_object.rs +++ b/src/s3/builders/put_object.rs @@ -35,7 +35,7 @@ use crate::s3::{ use super::{ObjectContent, SegmentedBytes}; /// Argument for -/// [create_multipart_upload()](crate::s3::client::Client::create_multipart_upload) +/// [create_multipart_upload()](Client::create_multipart_upload) /// API #[derive(Clone, Debug, Default)] pub struct CreateMultipartUpload { @@ -170,7 +170,7 @@ impl S3Api for CreateMultipartUpload { } /// Argument for -/// [abort_multipart_upload()](crate::s3::client::Client::abort_multipart_upload) +/// [abort_multipart_upload()](Client::abort_multipart_upload) /// API #[derive(Clone, Debug, Default)] pub struct AbortMultipartUpload { @@ -252,7 +252,7 @@ impl S3Api for AbortMultipartUpload { } /// Argument for -/// [complete_multipart_upload()](crate::s3::client::Client::complete_multipart_upload) +/// [complete_multipart_upload()](Client::complete_multipart_upload) /// API #[derive(Clone, Debug, Default)] pub struct CompleteMultipartUpload { @@ -368,7 +368,7 @@ impl S3Api for CompleteMultipartUpload { type S3Response = CompleteMultipartUploadResponse2; } -/// Argument for [upload_part()](crate::s3::client::Client::upload_part) S3 API +/// Argument for [upload_part()](Client::upload_part) S3 API #[derive(Debug, Clone, Default)] pub struct UploadPart { client: Option, @@ -1126,23 +1126,23 @@ mod tests { // Validate that basic invalid sizes return the expected error. if let Size::Known(v) = part_size { if v < MIN_PART_SIZE { - match res { - Err(Error::InvalidMinPartSize(v_err)) => return v == v_err, - _ => return false, + return match res { + Err(Error::InvalidMinPartSize(v_err)) => v == v_err, + _ => false, } } if v > MAX_PART_SIZE { - match res { - Err(Error::InvalidMaxPartSize(v_err)) => return v == v_err, - _ => return false, + return match res { + Err(Error::InvalidMaxPartSize(v_err)) => v == v_err, + _ => false, } } } if let Size::Known(v) = object_size { if v > MAX_OBJECT_SIZE { - match res { - Err(Error::InvalidObjectSize(v_err)) => return v == v_err, - _ => return false, + return match res { + Err(Error::InvalidObjectSize(v_err)) => v == v_err, + _ => false, } } } @@ -1167,23 +1167,23 @@ mod tests { if psize > object_size { return false; } - part_count > 0 && part_count <= MAX_MULTIPART_COUNT + (part_count > 0) && (part_count <= MAX_MULTIPART_COUNT) } (Size::Known(_), Size::Unknown, _) => false, (Size::Known(object_size), Size::Known(part_size), res) => { - if part_size > object_size || (part_size * (MAX_MULTIPART_COUNT as u64)) < object_size { - match res { + if (part_size > object_size) || ((part_size * (MAX_MULTIPART_COUNT as u64)) < object_size) { + return match res { Err(Error::InvalidPartCount(v1, v2, v3)) => { - return v1 == object_size && v2 == part_size && v3 == MAX_MULTIPART_COUNT; + (v1 == object_size) && (v2 == part_size) && (v3 == MAX_MULTIPART_COUNT) } - _ => return false, + _ => false, } } match res { Ok((psize, part_count)) => { let expected_part_count = (object_size as f64 / part_size as f64).ceil() as u16; - return psize == part_size && part_count == Some(expected_part_count); + (psize == part_size) && (part_count == Some(expected_part_count)) } _ => false, } diff --git a/tests/run-tests-windows.ps1 b/tests/run-tests-windows.ps1 new file mode 100644 index 00000000..3652b987 --- /dev/null +++ b/tests/run-tests-windows.ps1 @@ -0,0 +1,11 @@ +# Set environment variables to run tests on play.min.io +$Env:SERVER_ENDPOINT = "http://localhost:9000/" +$Env:ACCESS_KEY = "minioadmin" +$Env:SECRET_KEY = "minioadmin" +$Env:ENABLE_HTTPS = "false" +$Env:SSL_CERT_FILE = "./tests/public.crt" +$Env:IGNORE_CERT_CHECK = "false" +$Env:SERVER_REGION = "" + +# Run tests +cargo test \ No newline at end of file diff --git a/tests/tests.rs b/tests/tests.rs index ae79007a..bfafab52 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -15,26 +15,23 @@ use async_std::task; use bytes::Bytes; -use chrono::Duration; -use futures_util::Stream; use http::header; use hyper::http::Method; - -use minio::s3::builders::{ObjectContent, ObjectToDelete, VersioningStatus}; -use rand::{ - distributions::{Alphanumeric, DistString}, - rngs::SmallRng, - SeedableRng, -}; +use rand::distributions::{Alphanumeric, DistString}; +use rand::prelude::SmallRng; +use rand::SeedableRng; use sha2::{Digest, Sha256}; use std::collections::HashMap; use std::io::BufReader; use std::path::{Path, PathBuf}; -use std::{fs, io}; -use tokio::{io::AsyncRead, sync::mpsc}; -use tokio_stream::StreamExt; +use std::{fs, io, thread}; +use tokio::io::AsyncRead; +use tokio::sync::mpsc; +use tokio::time::timeout; +use tokio_stream::{Stream, StreamExt}; use minio::s3::args::*; +use minio::s3::builders::{ObjectContent, ObjectToDelete, VersioningStatus}; use minio::s3::client::Client; use minio::s3::creds::StaticProvider; use minio::s3::error::Error; @@ -57,8 +54,8 @@ impl RandReader { } } -impl std::io::Read for RandReader { - fn read(&mut self, buf: &mut [u8]) -> Result { +impl io::Read for RandReader { + fn read(&mut self, buf: &mut [u8]) -> Result { let bytes_read = match self.size > buf.len() { true => buf.len(), false => self.size, @@ -88,7 +85,7 @@ impl RandSrc { } impl Stream for RandSrc { - type Item = Result; + type Item = Result; fn poll_next( self: std::pin::Pin<&mut Self>, @@ -118,9 +115,9 @@ impl Stream for RandSrc { impl AsyncRead for RandSrc { fn poll_read( self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, + _cx: &mut task::Context<'_>, read_buf: &mut tokio::io::ReadBuf<'_>, - ) -> std::task::Poll> { + ) -> task::Poll> { let buf = read_buf.initialize_unfilled(); let bytes_read = match self.size > (buf.len() as u64) { true => buf.len(), @@ -137,7 +134,7 @@ impl AsyncRead for RandSrc { this.size -= bytes_read as u64; read_buf.advance(bytes_read); - std::task::Poll::Ready(Ok(())) + task::Poll::Ready(Ok(())) } } @@ -151,902 +148,1055 @@ fn rand_object_name() -> String { Alphanumeric.sample_string(&mut rand::thread_rng(), 8) } -struct ClientTest { +#[derive(Clone)] +struct TestContext { base_url: BaseUrl, access_key: String, secret_key: String, ignore_cert_check: Option, ssl_cert_file: Option, client: Client, - test_bucket: String, } -impl ClientTest { +impl TestContext { const SQS_ARN: &'static str = "arn:minio:sqs::miniojavatest:webhook"; - fn new( - base_url: BaseUrl, - access_key: String, - secret_key: String, - static_provider: StaticProvider, - ignore_cert_check: Option, - ssl_cert_file: Option<&Path>, - ) -> ClientTest { - let client = Client::new( - base_url.clone(), - Some(Box::new(static_provider)), - ssl_cert_file.as_ref().cloned(), - ignore_cert_check, - ) - .unwrap(); + pub fn new_from_env() -> Self { + let run_on_ci: bool = std::env::var("CI") + .unwrap_or("false".into()) + .parse() + .unwrap_or(false); + if run_on_ci { + let host = std::env::var("SERVER_ENDPOINT").unwrap(); + let access_key = std::env::var("ACCESS_KEY").unwrap(); + let secret_key = std::env::var("SECRET_KEY").unwrap(); + let secure = std::env::var("ENABLE_HTTPS").is_ok(); + let value = std::env::var("SSL_CERT_FILE").unwrap(); + let mut ssl_cert_file = None; + if !value.is_empty() { + ssl_cert_file = Some(Path::new(&value)); + } + let ignore_cert_check = std::env::var("IGNORE_CERT_CHECK").is_ok(); + let region = std::env::var("SERVER_REGION").ok(); - ClientTest { - base_url, - access_key, - secret_key, - ignore_cert_check, - ssl_cert_file: ssl_cert_file.map(PathBuf::from), - client, - test_bucket: rand_bucket_name(), - } - } + let mut base_url: BaseUrl = host.parse().unwrap(); + base_url.https = secure; + if let Some(v) = region { + base_url.region = v; + } - async fn init(&self) { - self.client - .make_bucket(&MakeBucketArgs::new(&self.test_bucket).unwrap()) - .await + let static_provider = StaticProvider::new(&access_key, &secret_key, None); + let client = Client::new( + base_url.clone(), + Some(Box::new(static_provider)), + ssl_cert_file, + Some(ignore_cert_check), + ) .unwrap(); - } - async fn drop(&self) { - self.client - .remove_bucket(&RemoveBucketArgs::new(&self.test_bucket).unwrap()) - .await - .unwrap(); - } + Self { + base_url, + access_key, + secret_key, + ignore_cert_check: Some(ignore_cert_check), + ssl_cert_file: ssl_cert_file.map(PathBuf::from), + client, + } + } else { + const DEFAULT_SERVER_ENDPOINT: &str = "https://play.min.io/"; + const DEFAULT_ACCESS_KEY: &str = "minioadmin"; + const DEFAULT_SECRET_KEY: &str = "minioadmin"; + const DEFAULT_ENABLE_HTTPS: &str = "true"; + const DEFAULT_SSL_CERT_FILE: &str = "./tests/public.crt"; + const DEFAULT_IGNORE_CERT_CHECK: &str = "false"; + const DEFAULT_SERVER_REGION: &str = ""; + + let host: String = + std::env::var("SERVER_ENDPOINT").unwrap_or(DEFAULT_SERVER_ENDPOINT.to_string()); + log::info!("SERVER_ENDPOINT={}", host); + let access_key: String = + std::env::var("ACCESS_KEY").unwrap_or(DEFAULT_ACCESS_KEY.to_string()); + log::info!("ACCESS_KEY={}", access_key); + let secret_key: String = + std::env::var("SECRET_KEY").unwrap_or(DEFAULT_SECRET_KEY.to_string()); + log::info!("SECRET_KEY=*****"); + let secure: bool = std::env::var("ENABLE_HTTPS") + .unwrap_or(DEFAULT_ENABLE_HTTPS.to_string()) + .parse() + .unwrap_or(false); + log::info!("ENABLE_HTTPS={}", secure); + let ssl_cert: String = + std::env::var("SSL_CERT_FILE").unwrap_or(DEFAULT_SSL_CERT_FILE.to_string()); + log::info!("SSL_CERT_FILE={}", ssl_cert); + let ssl_cert_file: PathBuf = ssl_cert.into(); + let ignore_cert_check: bool = std::env::var("IGNORE_CERT_CHECK") + .unwrap_or(DEFAULT_IGNORE_CERT_CHECK.to_string()) + .parse() + .unwrap_or(true); + log::info!("IGNORE_CERT_CHECK={}", ignore_cert_check); + let region: String = + std::env::var("SERVER_REGION").unwrap_or(DEFAULT_SERVER_REGION.to_string()); + log::info!("SERVER_REGION={:?}", region); + + let mut base_url: BaseUrl = host.parse().unwrap(); + base_url.https = secure; + base_url.region = region; - async fn bucket_exists(&self) { - let bucket_name = rand_bucket_name(); - self.client - .make_bucket(&MakeBucketArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); - let exists = self - .client - .bucket_exists(&BucketExistsArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); - assert!(exists); - self.client - .remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap()) - .await + let static_provider = StaticProvider::new(&access_key, &secret_key, None); + let client = Client::new( + base_url.clone(), + Some(Box::new(static_provider)), + Some(&*ssl_cert_file), + Some(ignore_cert_check), + ) .unwrap(); - } - async fn list_buckets(&self) { - let mut names: Vec = Vec::new(); - for _ in 1..=3 { - names.push(rand_bucket_name()); + Self { + base_url, + access_key, + secret_key, + ignore_cert_check: Some(ignore_cert_check), + ssl_cert_file: Some(ssl_cert_file), + client, + } } + } +} - for b in names.iter() { - self.client - .make_bucket(&MakeBucketArgs::new(b).unwrap()) - .await - .unwrap(); - } +async fn create_bucket_helper(ctx: &TestContext) -> (String, CleanupGuard) { + let bucket_name = rand_bucket_name(); + ctx.client + .make_bucket(&MakeBucketArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + let guard = CleanupGuard::new(ctx, &bucket_name); + (bucket_name, guard) +} - let mut count = 0; - let resp = self.client.list_buckets().send().await.unwrap(); - for bucket in resp.buckets.iter() { - if names.contains(&bucket.name) { - count += 1; - } - } - assert_eq!(count, 3); +// Cleanup guard that removes the bucket when it is dropped +struct CleanupGuard { + ctx: TestContext, + bucket_name: String, +} - for b in names.iter() { - self.client - .remove_bucket(&RemoveBucketArgs::new(b).unwrap()) - .await - .unwrap(); +impl CleanupGuard { + fn new(ctx: &TestContext, bucket_name: &str) -> Self { + Self { + ctx: ctx.clone(), + bucket_name: bucket_name.to_string(), } } +} - async fn put_object(&self) { - let object_name = rand_object_name(); - let size = 16_usize; - self.client - .put_object_old( - &mut PutObjectArgs::new( - &self.test_bucket, - &object_name, - &mut RandReader::new(size), - Some(size), - None, +impl Drop for CleanupGuard { + fn drop(&mut self) { + let ctx = self.ctx.clone(); + let bucket_name = self.bucket_name.clone(); + //println!("Going to remove bucket {}", bucket_name); + + // Spawn the cleanup task in a way that detaches it from the current runtime + thread::spawn(move || { + // Create a new runtime for this thread + let rt = tokio::runtime::Runtime::new().unwrap(); + + // Execute the async cleanup in this new runtime + rt.block_on(async { + //clean_bucket(&bucket_name, &ctx).await; + + // do the actual removal of the bucket + match timeout( + std::time::Duration::from_secs(60), + ctx.client + .remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap()), ) - .unwrap(), - ) - .await - .unwrap(); - let resp = self - .client - .stat_object(&StatObjectArgs::new(&self.test_bucket, &object_name).unwrap()) - .await - .unwrap(); - assert_eq!(resp.bucket_name, self.test_bucket); - assert_eq!(resp.object_name, object_name); - assert_eq!(resp.size, size); - self.client - .remove_object(&self.test_bucket, object_name.as_str()) - .send() - .await - .unwrap(); - // Validate delete succeeded. - let resp = self - .client - .stat_object(&StatObjectArgs::new(&self.test_bucket, &object_name).unwrap()) - .await; - match resp.err().unwrap() { - Error::S3Error(er) => { - assert_eq!(er.code, "NoSuchKey") - } - e => panic!("Unexpected error {:?}", e), - } + .await + { + Ok(result) => match result { + Ok(_) => { + //println!("Bucket {} removed successfully", bucket_name), + } + Err(e) => println!("Error removing bucket {}: {:?}", bucket_name, e), + }, + Err(_) => println!("Timeout after 15s while removing bucket {}", bucket_name), + } + }); + }) + .join() + .unwrap(); // This blocks the current thread until cleanup is done } +} - async fn put_object_multipart(&self) { - let object_name = rand_object_name(); - let size: usize = 16 + 5 * 1024 * 1024; - self.client - .put_object_old( - &mut PutObjectArgs::new( - &self.test_bucket, - &object_name, - &mut RandReader::new(size), - Some(size), - None, - ) - .unwrap(), - ) - .await - .unwrap(); - let resp = self - .client - .stat_object(&StatObjectArgs::new(&self.test_bucket, &object_name).unwrap()) - .await - .unwrap(); - assert_eq!(resp.bucket_name, self.test_bucket); - assert_eq!(resp.object_name, object_name); - assert_eq!(resp.size, size); - self.client - .remove_object(&self.test_bucket, object_name.as_str()) - .send() - .await - .unwrap(); +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn bucket_exists() { + let ctx = TestContext::new_from_env(); + let (bucket_name, _cleanup) = create_bucket_helper(&ctx).await; + + let exists = ctx + .client + .bucket_exists(&BucketExistsArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + + assert!(exists); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn list_buckets() { + const N_BUCKETS: usize = 3; + let ctx = TestContext::new_from_env(); + + let mut names: Vec = Vec::new(); + let mut guards: Vec = Vec::new(); + for _ in 1..=N_BUCKETS { + let (bucket_name, guard) = create_bucket_helper(&ctx).await; + names.push(bucket_name); + guards.push(guard); } - async fn put_object_content(&self) { - let object_name = rand_object_name(); - let sizes = [16_u64, 5 * 1024 * 1024, 16 + 5 * 1024 * 1024]; - - for size in sizes.iter() { - let data_src = RandSrc::new(*size); - let rsp = self - .client - .put_object_content( - &self.test_bucket, - &object_name, - ObjectContent::new_from_stream(data_src, Some(*size)), - ) - .content_type(String::from("image/jpeg")) - .send() - .await - .unwrap(); - assert_eq!(rsp.object_size, *size); - let etag = rsp.etag; - let resp = self - .client - .stat_object(&StatObjectArgs::new(&self.test_bucket, &object_name).unwrap()) - .await - .unwrap(); - assert_eq!(resp.size, *size as usize); - assert_eq!(resp.etag, etag); - assert_eq!( - resp.headers.get(header::CONTENT_TYPE).unwrap(), - "image/jpeg" - ); - self.client - .remove_object(&self.test_bucket, object_name.as_str()) - .send() - .await - .unwrap(); - } + assert_eq!(names.len(), N_BUCKETS); - // Repeat test with no size specified in ObjectContent - for size in sizes.iter() { - let data_src = RandSrc::new(*size); - let rsp = self - .client - .put_object_content( - &self.test_bucket, - &object_name, - ObjectContent::new_from_stream(data_src, None), - ) - .part_size(Some(5 * 1024 * 1024)) // Set part size to 5MB - .send() - .await - .unwrap(); - assert_eq!(rsp.object_size, *size); - let etag = rsp.etag; - let resp = self - .client - .stat_object(&StatObjectArgs::new(&self.test_bucket, &object_name).unwrap()) - .await - .unwrap(); - assert_eq!(resp.size, *size as usize); - assert_eq!(resp.etag, etag); - self.client - .remove_object(&self.test_bucket, object_name.as_str()) - .send() - .await - .unwrap(); + let mut count = 0; + let resp = ctx.client.list_buckets().send().await.unwrap(); + + for bucket in resp.buckets.iter() { + if names.contains(&bucket.name) { + count += 1; } } + assert_eq!(guards.len(), N_BUCKETS); + assert_eq!(count, N_BUCKETS); +} - // Test sending ObjectContent across async tasks. - async fn put_object_content_2(&self) { - let object_name = rand_object_name(); - let sizes = vec![16_u64, 5 * 1024 * 1024, 16 + 5 * 1024 * 1024]; - - let (sender, mut receiver): (mpsc::Sender, mpsc::Receiver) = - mpsc::channel(2); - - let sender_handle = { - let sizes = sizes.clone(); - tokio::spawn(async move { - for size in sizes.iter() { - let data_src = RandSrc::new(*size); - sender - .send(ObjectContent::new_from_stream(data_src, Some(*size))) - .await - .unwrap(); - } - }) - }; +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn create_delete_bucket() { + let ctx = TestContext::new_from_env(); + let bucket_name = rand_bucket_name(); - let uploader_handler = { - let sizes = sizes.clone(); - let object_name = object_name.clone(); - let client = self.client.clone(); - let test_bucket = self.test_bucket.clone(); - tokio::spawn(async move { - let mut idx = 0; - while let Some(item) = receiver.recv().await { - let rsp = client - .put_object_content(&test_bucket, &object_name, item) - .send() - .await - .unwrap(); - assert_eq!(rsp.object_size, sizes[idx]); - let etag = rsp.etag; - let resp = client - .stat_object(&StatObjectArgs::new(&test_bucket, &object_name).unwrap()) - .await - .unwrap(); - assert_eq!(resp.size, sizes[idx] as usize); - assert_eq!(resp.etag, etag); - client - .remove_object(&test_bucket, object_name.as_str()) - .send() - .await - .unwrap(); - - idx += 1; - } - }) - }; + ctx.client + .make_bucket(&MakeBucketArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + + let exists = ctx + .client + .bucket_exists(&BucketExistsArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + assert!(exists); - sender_handle.await.unwrap(); - uploader_handler.await.unwrap(); + ctx.client + .remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + + let exists = ctx + .client + .bucket_exists(&BucketExistsArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + assert!(!exists); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn put_object() { + let ctx = TestContext::new_from_env(); + let (bucket_name, _cleanup) = create_bucket_helper(&ctx).await; + let object_name = rand_object_name(); + + let size = 16_usize; + ctx.client + .put_object_old( + &mut PutObjectArgs::new( + &bucket_name, + &object_name, + &mut RandReader::new(size), + Some(size), + None, + ) + .unwrap(), + ) + .await + .unwrap(); + let resp = ctx + .client + .stat_object(&StatObjectArgs::new(&bucket_name, &object_name).unwrap()) + .await + .unwrap(); + assert_eq!(resp.bucket_name, bucket_name); + assert_eq!(resp.object_name, object_name); + assert_eq!(resp.size, size); + ctx.client + .remove_object(&bucket_name, object_name.as_str()) + .send() + .await + .unwrap(); + // Validate delete succeeded. + let resp = ctx + .client + .stat_object(&StatObjectArgs::new(&bucket_name, &object_name).unwrap()) + .await; + match resp.err().unwrap() { + Error::S3Error(er) => { + assert_eq!(er.code, "NoSuchKey") + } + e => panic!("Unexpected error {:?}", e), } +} - async fn get_object_old(&self) { - let object_name = rand_object_name(); - let data = "hello, world"; - self.client - .put_object_old( - &mut PutObjectArgs::new( - &self.test_bucket, - &object_name, - &mut BufReader::new(data.as_bytes()), - Some(data.len()), - None, - ) - .unwrap(), +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn put_object_multipart() { + let ctx = TestContext::new_from_env(); + let (bucket_name, _cleanup) = create_bucket_helper(&ctx).await; + let object_name = rand_object_name(); + + let size: usize = 16 + 5 * 1024 * 1024; + ctx.client + .put_object_old( + &mut PutObjectArgs::new( + &bucket_name, + &object_name, + &mut RandReader::new(size), + Some(size), + None, + ) + .unwrap(), + ) + .await + .unwrap(); + let resp = ctx + .client + .stat_object(&StatObjectArgs::new(&bucket_name, &object_name).unwrap()) + .await + .unwrap(); + assert_eq!(resp.bucket_name, bucket_name); + assert_eq!(resp.object_name, object_name); + assert_eq!(resp.size, size); + ctx.client + .remove_object(&bucket_name, object_name.as_str()) + .send() + .await + .unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn put_object_content() { + let ctx = TestContext::new_from_env(); + let (bucket_name, _cleanup) = create_bucket_helper(&ctx).await; + let object_name = rand_object_name(); + + let sizes = [16_u64, 5 * 1024 * 1024, 16 + 5 * 1024 * 1024]; + + for size in sizes.iter() { + let data_src = RandSrc::new(*size); + let rsp = ctx + .client + .put_object_content( + &bucket_name, + &object_name, + ObjectContent::new_from_stream(data_src, Some(*size)), ) + .content_type(String::from("image/jpeg")) + .send() .await .unwrap(); - let resp = self + assert_eq!(rsp.object_size, *size); + let etag = rsp.etag; + let resp = ctx .client - .get_object_old(&GetObjectArgs::new(&self.test_bucket, &object_name).unwrap()) + .stat_object(&StatObjectArgs::new(&bucket_name, &object_name).unwrap()) .await .unwrap(); - let got = resp.text().await.unwrap(); - assert_eq!(got, data); - self.client - .remove_object(&self.test_bucket, object_name.as_str()) + assert_eq!(resp.size, *size as usize); + assert_eq!(resp.etag, etag); + assert_eq!( + resp.headers.get(header::CONTENT_TYPE).unwrap(), + "image/jpeg" + ); + ctx.client + .remove_object(&bucket_name, object_name.as_str()) .send() .await .unwrap(); } - async fn get_object(&self) { - let object_name = rand_object_name(); - let data = Bytes::from("hello, world".to_string().into_bytes()); - self.client - .put_object_content(&self.test_bucket, &object_name, data.clone()) + // Repeat test with no size specified in ObjectContent + for size in sizes.iter() { + let data_src = RandSrc::new(*size); + let rsp = ctx + .client + .put_object_content( + &bucket_name, + &object_name, + ObjectContent::new_from_stream(data_src, None), + ) + .part_size(Some(5 * 1024 * 1024)) // Set part size to 5MB .send() .await .unwrap(); - let resp = self + assert_eq!(rsp.object_size, *size); + let etag = rsp.etag; + let resp = ctx .client - .get_object(&self.test_bucket, &object_name) - .send() + .stat_object(&StatObjectArgs::new(&bucket_name, &object_name).unwrap()) .await .unwrap(); - let got = resp.content.to_segmented_bytes().await.unwrap().to_bytes(); - assert_eq!(got, data); - self.client - .remove_object(&self.test_bucket, object_name.as_str()) + assert_eq!(resp.size, *size as usize); + assert_eq!(resp.etag, etag); + ctx.client + .remove_object(&bucket_name, object_name.as_str()) .send() .await .unwrap(); } +} - fn get_hash(filename: &String) -> String { - let mut hasher = Sha256::new(); - let mut file = fs::File::open(filename).unwrap(); - io::copy(&mut file, &mut hasher).unwrap(); - format!("{:x}", hasher.finalize()) - } - - async fn upload_download_object(&self) { - let object_name = rand_object_name(); - let size = 16_usize; - let mut file = fs::File::create(&object_name).unwrap(); - io::copy(&mut RandReader::new(size), &mut file).unwrap(); - file.sync_all().unwrap(); - self.client - .upload_object( - &UploadObjectArgs::new(&self.test_bucket, &object_name, &object_name).unwrap(), - ) - .await - .unwrap(); - - let filename = rand_object_name(); - self.client - .download_object( - &DownloadObjectArgs::new(&self.test_bucket, &object_name, &filename).unwrap(), - ) - .await - .unwrap(); - assert!(ClientTest::get_hash(&object_name) == ClientTest::get_hash(&filename)); +/// Test sending ObjectContent across async tasks. +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn put_object_content_2() { + let ctx = TestContext::new_from_env(); + let (bucket_name, _cleanup) = create_bucket_helper(&ctx).await; + let object_name = rand_object_name(); + let sizes = vec![16_u64, 5 * 1024 * 1024, 16 + 5 * 1024 * 1024]; + + let (sender, mut receiver): (mpsc::Sender, mpsc::Receiver) = + mpsc::channel(2); + + let sender_handle = { + let sizes = sizes.clone(); + tokio::spawn(async move { + for size in sizes.iter() { + let data_src = RandSrc::new(*size); + sender + .send(ObjectContent::new_from_stream(data_src, Some(*size))) + .await + .unwrap(); + } + }) + }; + + let uploader_handler = { + let sizes = sizes.clone(); + let object_name = object_name.clone(); + let client = ctx.client.clone(); + let test_bucket = bucket_name.clone(); + tokio::spawn(async move { + let mut idx = 0; + while let Some(item) = receiver.recv().await { + let rsp = client + .put_object_content(&test_bucket, &object_name, item) + .send() + .await + .unwrap(); + assert_eq!(rsp.object_size, sizes[idx]); + let etag = rsp.etag; + let resp = client + .stat_object(&StatObjectArgs::new(&test_bucket, &object_name).unwrap()) + .await + .unwrap(); + assert_eq!(resp.size, sizes[idx] as usize); + assert_eq!(resp.etag, etag); + client + .remove_object(&test_bucket, object_name.as_str()) + .send() + .await + .unwrap(); + + idx += 1; + } + }) + }; - fs::remove_file(&object_name).unwrap(); - fs::remove_file(&filename).unwrap(); + sender_handle.await.unwrap(); + uploader_handler.await.unwrap(); +} - self.client - .remove_object(&self.test_bucket, object_name.as_str()) - .send() - .await - .unwrap(); - self.client - .remove_object(&self.test_bucket, object_name.as_str()) - .send() - .await - .unwrap(); +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn get_object_old() { + let ctx = TestContext::new_from_env(); + let (bucket_name, _cleanup) = create_bucket_helper(&ctx).await; + let object_name = rand_object_name(); - let object_name = rand_object_name(); - let size: usize = 16 + 5 * 1024 * 1024; - let mut file = fs::File::create(&object_name).unwrap(); - io::copy(&mut RandReader::new(size), &mut file).unwrap(); - file.sync_all().unwrap(); - self.client - .upload_object( - &UploadObjectArgs::new(&self.test_bucket, &object_name, &object_name).unwrap(), + let data = "hello, world"; + ctx.client + .put_object_old( + &mut PutObjectArgs::new( + &bucket_name, + &object_name, + &mut BufReader::new(data.as_bytes()), + Some(data.len()), + None, ) - .await - .unwrap(); + .unwrap(), + ) + .await + .unwrap(); + let resp = ctx + .client + .get_object_old(&GetObjectArgs::new(&bucket_name, &object_name).unwrap()) + .await + .unwrap(); + let got = resp.text().await.unwrap(); + assert_eq!(got, data); + ctx.client + .remove_object(&bucket_name, object_name.as_str()) + .send() + .await + .unwrap(); +} - let filename = rand_object_name(); - self.client - .download_object( - &DownloadObjectArgs::new(&self.test_bucket, &object_name, &filename).unwrap(), - ) - .await - .unwrap(); - assert!(ClientTest::get_hash(&object_name) == ClientTest::get_hash(&filename)); +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn get_object() { + let ctx = TestContext::new_from_env(); + let (bucket_name, _cleanup) = create_bucket_helper(&ctx).await; + let object_name = rand_object_name(); + + let data = Bytes::from("hello, world".to_string().into_bytes()); + ctx.client + .put_object_content(&bucket_name, &object_name, data.clone()) + .send() + .await + .unwrap(); + let resp = ctx + .client + .get_object(&bucket_name, &object_name) + .send() + .await + .unwrap(); + let got = resp.content.to_segmented_bytes().await.unwrap().to_bytes(); + assert_eq!(got, data); + ctx.client + .remove_object(&bucket_name, object_name.as_str()) + .send() + .await + .unwrap(); +} - fs::remove_file(&object_name).unwrap(); - fs::remove_file(&filename).unwrap(); +fn get_hash(filename: &String) -> String { + let mut hasher = Sha256::new(); + let mut file = fs::File::open(filename).unwrap(); + io::copy(&mut file, &mut hasher).unwrap(); + format!("{:x}", hasher.finalize()) +} - self.client - .remove_object(&self.test_bucket, object_name.as_str()) - .send() - .await - .unwrap(); - self.client - .remove_object(&self.test_bucket, object_name.as_str()) - .send() - .await - .unwrap(); - } +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn upload_download_object() { + let ctx = TestContext::new_from_env(); + let (bucket_name, _cleanup) = create_bucket_helper(&ctx).await; + let object_name = rand_object_name(); + + let size = 16_usize; + let mut file = fs::File::create(&object_name).unwrap(); + io::copy(&mut RandReader::new(size), &mut file).unwrap(); + file.sync_all().unwrap(); + ctx.client + .upload_object(&UploadObjectArgs::new(&bucket_name, &object_name, &object_name).unwrap()) + .await + .unwrap(); - async fn remove_objects(&self) { - let bucket_name = rand_bucket_name(); - self.client - .make_bucket(&MakeBucketArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); + let filename = rand_object_name(); + ctx.client + .download_object(&DownloadObjectArgs::new(&bucket_name, &object_name, &filename).unwrap()) + .await + .unwrap(); + assert_eq!(get_hash(&object_name), get_hash(&filename)); - let mut names: Vec = Vec::new(); - for _ in 1..=3 { - let object_name = rand_object_name(); - let size = 0_usize; - self.client - .put_object_old( - &mut PutObjectArgs::new( - &self.test_bucket, - &object_name, - &mut RandReader::new(size), - Some(size), - None, - ) - .unwrap(), - ) - .await - .unwrap(); - names.push(object_name); - } - let del_items: Vec = names - .iter() - .map(|v| ObjectToDelete::from(v.as_str())) - .collect(); + fs::remove_file(&object_name).unwrap(); + fs::remove_file(&filename).unwrap(); - let mut resp = self - .client - .remove_objects(&self.test_bucket, del_items.into_iter()) - .verbose_mode(true) - .to_stream() - .await; - - let mut del_count = 0; - while let Some(item) = resp.next().await { - let res = item.unwrap(); - for obj in res.result.iter() { - assert!(obj.is_deleted()); - } - del_count += res.result.len(); - } - assert_eq!(del_count, 3); + ctx.client + .remove_object(&bucket_name, object_name.as_str()) + .send() + .await + .unwrap(); + ctx.client + .remove_object(&bucket_name, object_name.as_str()) + .send() + .await + .unwrap(); - self.client - .remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); - } + let object_name = rand_object_name(); + let size: usize = 16 + 5 * 1024 * 1024; + let mut file = fs::File::create(&object_name).unwrap(); + io::copy(&mut RandReader::new(size), &mut file).unwrap(); + file.sync_all().unwrap(); + ctx.client + .upload_object(&UploadObjectArgs::new(&bucket_name, &object_name, &object_name).unwrap()) + .await + .unwrap(); - async fn list_objects(&self) { - let bucket_name = rand_bucket_name(); - self.client - .make_bucket(&MakeBucketArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); + let filename = rand_object_name(); + ctx.client + .download_object(&DownloadObjectArgs::new(&bucket_name, &object_name, &filename).unwrap()) + .await + .unwrap(); + assert_eq!(get_hash(&object_name), get_hash(&filename)); - let mut names: Vec = Vec::new(); - for _ in 1..=3 { - let object_name = rand_object_name(); - let size = 0_usize; - self.client - .put_object_old( - &mut PutObjectArgs::new( - &self.test_bucket, - &object_name, - &mut RandReader::new(size), - Some(size), - None, - ) - .unwrap(), - ) - .await - .unwrap(); - names.push(object_name); - } + fs::remove_file(&object_name).unwrap(); + fs::remove_file(&filename).unwrap(); - let mut stream = self - .client - .list_objects(&self.test_bucket) - .to_stream() - .await; - - let mut count = 0; - while let Some(items) = stream.next().await { - let items = items.unwrap().contents; - for item in items.iter() { - assert!(names.contains(&item.name)); - count += 1; - } - } - assert!(count == 3); + ctx.client + .remove_object(&bucket_name, object_name.as_str()) + .send() + .await + .unwrap(); + ctx.client + .remove_object(&bucket_name, object_name.as_str()) + .send() + .await + .unwrap(); +} - let del_items: Vec = names - .iter() - .map(|v| ObjectToDelete::from(v.as_str())) - .collect(); - let mut resp = self - .client - .remove_objects(&self.test_bucket, del_items.into_iter()) - .verbose_mode(true) - .to_stream() - .await; - while let Some(item) = resp.next().await { - let res = item.unwrap(); - for obj in res.result.iter() { - assert!(obj.is_deleted()); - } - } +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn remove_objects() { + let ctx = TestContext::new_from_env(); + let (bucket_name, _cleanup) = create_bucket_helper(&ctx).await; - self.client - .remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap()) + let mut names: Vec = Vec::new(); + for _ in 1..=3 { + let object_name = rand_object_name(); + let size = 0_usize; + ctx.client + .put_object_old( + &mut PutObjectArgs::new( + &bucket_name, + &object_name, + &mut RandReader::new(size), + Some(size), + None, + ) + .unwrap(), + ) .await .unwrap(); + names.push(object_name); } + let del_items: Vec = names + .iter() + .map(|v| ObjectToDelete::from(v.as_str())) + .collect(); + + let mut resp = ctx + .client + .remove_objects(&bucket_name, del_items.into_iter()) + .verbose_mode(true) + .to_stream() + .await; + + let mut del_count = 0; + while let Some(item) = resp.next().await { + let res = item.unwrap(); + for obj in res.result.iter() { + assert!(obj.is_deleted()); + } + del_count += res.result.len(); + } + assert_eq!(del_count, 3); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn list_objects() { + const N_OBJECTS: usize = 3; + let ctx = TestContext::new_from_env(); + let (bucket_name, _cleanup) = create_bucket_helper(&ctx).await; - async fn select_object_content(&self) { + let mut names: Vec = Vec::new(); + for _ in 1..=N_OBJECTS { let object_name = rand_object_name(); - let mut data = String::new(); - data.push_str("1997,Ford,E350,\"ac, abs, moon\",3000.00\n"); - data.push_str("1999,Chevy,\"Venture \"\"Extended Edition\"\"\",,4900.00\n"); - data.push_str("1999,Chevy,\"Venture \"\"Extended Edition, Very Large\"\"\",,5000.00\n"); - data.push_str("1996,Jeep,Grand Cherokee,\"MUST SELL!\n"); - data.push_str("air, moon roof, loaded\",4799.00\n"); - let body = String::from("Year,Make,Model,Description,Price\n") + &data; - - self.client + let size = 0_usize; + ctx.client .put_object_old( &mut PutObjectArgs::new( - &self.test_bucket, + &bucket_name, &object_name, - &mut BufReader::new(body.as_bytes()), - Some(body.len()), + &mut RandReader::new(size), + Some(size), None, ) .unwrap(), ) .await .unwrap(); + names.push(object_name); + } - let request = SelectRequest::new_csv_input_output( - "select * from S3Object", - CsvInputSerialization { - compression_type: None, - allow_quoted_record_delimiter: false, - comments: None, - field_delimiter: None, - file_header_info: Some(FileHeaderInfo::USE), - quote_character: None, - quote_escape_character: None, - record_delimiter: None, - }, - CsvOutputSerialization { - field_delimiter: None, - quote_character: None, - quote_escape_character: None, - quote_fields: Some(QuoteFields::ASNEEDED), - record_delimiter: None, - }, - ) - .unwrap(); - let mut resp = self - .client - .select_object_content( - &SelectObjectContentArgs::new(&self.test_bucket, &object_name, &request).unwrap(), - ) - .await - .unwrap(); - let mut got = String::new(); - let mut buf = [0_u8; 512]; - loop { - let size = resp.read(&mut buf).await.unwrap(); - if size == 0 { - break; - } - got += core::str::from_utf8(&buf[..size]).unwrap(); + let mut stream = ctx.client.list_objects(&bucket_name).to_stream().await; + + let mut count = 0; + while let Some(items) = stream.next().await { + let items = items.unwrap().contents; + for item in items.iter() { + assert!(names.contains(&item.name)); + count += 1; } - assert_eq!(got, data); - self.client - .remove_object(&self.test_bucket, object_name.as_str()) - .send() - .await - .unwrap(); } + assert_eq!(count, N_OBJECTS); + + let del_items: Vec = names + .iter() + .map(|v| ObjectToDelete::from(v.as_str())) + .collect(); + let mut resp = ctx + .client + .remove_objects(&bucket_name, del_items.into_iter()) + .verbose_mode(true) + .to_stream() + .await; + while let Some(item) = resp.next().await { + let res = item.unwrap(); + for obj in res.result.iter() { + assert!(obj.is_deleted()); + } + } +} - async fn listen_bucket_notification(&self) { - let object_name = rand_object_name(); - - let name = object_name.clone(); - let (sender, mut receiver): (mpsc::UnboundedSender, mpsc::UnboundedReceiver) = - mpsc::unbounded_channel(); - - let access_key = self.access_key.clone(); - let secret_key = self.secret_key.clone(); - let base_url = self.base_url.clone(); - let ignore_cert_check = self.ignore_cert_check; - let ssl_cert_file = self.ssl_cert_file.clone(); - let test_bucket = self.test_bucket.clone(); - - let listen_task = move || async move { - let static_provider = StaticProvider::new(&access_key, &secret_key, None); - let ssl_cert_file = &ssl_cert_file; - let client = Client::new( - base_url, - Some(Box::new(static_provider)), - ssl_cert_file.as_deref(), - ignore_cert_check, +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn select_object_content() { + let ctx = TestContext::new_from_env(); + let (bucket_name, _cleanup) = create_bucket_helper(&ctx).await; + let object_name = rand_object_name(); + + let mut data = String::new(); + data.push_str("1997,Ford,E350,\"ac, abs, moon\",3000.00\n"); + data.push_str("1999,Chevy,\"Venture \"\"Extended Edition\"\"\",,4900.00\n"); + data.push_str("1999,Chevy,\"Venture \"\"Extended Edition, Very Large\"\"\",,5000.00\n"); + data.push_str("1996,Jeep,Grand Cherokee,\"MUST SELL!\n"); + data.push_str("air, moon roof, loaded\",4799.00\n"); + let body = String::from("Year,Make,Model,Description,Price\n") + &data; + + ctx.client + .put_object_old( + &mut PutObjectArgs::new( + &bucket_name, + &object_name, + &mut BufReader::new(body.as_bytes()), + Some(body.len()), + None, ) - .unwrap(); + .unwrap(), + ) + .await + .unwrap(); - let event_fn = |event: NotificationRecords| { - let record = event.records.first(); - if let Some(record) = record { - let key = &record.s3.object.key; - if name == *key { - sender.send(true).unwrap(); - return false; - } - } - sender.send(false).unwrap(); - false - }; + let request = SelectRequest::new_csv_input_output( + "select * from S3Object", + CsvInputSerialization { + compression_type: None, + allow_quoted_record_delimiter: false, + comments: None, + field_delimiter: None, + file_header_info: Some(FileHeaderInfo::USE), + quote_character: None, + quote_escape_character: None, + record_delimiter: None, + }, + CsvOutputSerialization { + field_delimiter: None, + quote_character: None, + quote_escape_character: None, + quote_fields: Some(QuoteFields::ASNEEDED), + record_delimiter: None, + }, + ) + .unwrap(); + let mut resp = ctx + .client + .select_object_content( + &SelectObjectContentArgs::new(&bucket_name, &object_name, &request).unwrap(), + ) + .await + .unwrap(); + let mut got = String::new(); + let mut buf = [0_u8; 512]; + loop { + let size = resp.read(&mut buf).await.unwrap(); + if size == 0 { + break; + } + got += core::str::from_utf8(&buf[..size]).unwrap(); + } + assert_eq!(got, data); + ctx.client + .remove_object(&bucket_name, object_name.as_str()) + .send() + .await + .unwrap(); +} - let (_, mut event_stream) = client - .listen_bucket_notification(&test_bucket) - .send() - .await - .unwrap(); - while let Some(event) = event_stream.next().await { - let event = event.unwrap(); - if !event_fn(event) { - break; +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn listen_bucket_notification() { + let ctx = TestContext::new_from_env(); + let (bucket_name, _cleanup) = create_bucket_helper(&ctx).await; + let object_name = rand_object_name(); + + let name = object_name.clone(); + let (sender, mut receiver): (mpsc::UnboundedSender, mpsc::UnboundedReceiver) = + mpsc::unbounded_channel(); + + let access_key = ctx.access_key.clone(); + let secret_key = ctx.secret_key.clone(); + let base_url = ctx.base_url.clone(); + let ignore_cert_check = ctx.ignore_cert_check; + let ssl_cert_file = ctx.ssl_cert_file.clone(); + let test_bucket = bucket_name.clone(); + + let listen_task = move || async move { + let static_provider = StaticProvider::new(&access_key, &secret_key, None); + let ssl_cert_file = &ssl_cert_file; + let client = Client::new( + base_url, + Some(Box::new(static_provider)), + ssl_cert_file.as_deref(), + ignore_cert_check, + ) + .unwrap(); + + let event_fn = |event: NotificationRecords| { + let record = event.records.first(); + if let Some(record) = record { + let key = &record.s3.object.key; + if name == *key { + sender.send(true).unwrap(); + return false; } } + sender.send(false).unwrap(); + false }; - let spawned_task = task::spawn(listen_task()); - task::sleep(std::time::Duration::from_millis(200)).await; - - let size = 16_usize; - self.client - .put_object_old( - &mut PutObjectArgs::new( - &self.test_bucket, - &object_name, - &mut RandReader::new(size), - Some(size), - None, - ) - .unwrap(), - ) - .await - .unwrap(); - - self.client - .remove_object(&self.test_bucket, object_name.as_str()) + let (_, mut event_stream) = client + .listen_bucket_notification(&test_bucket) .send() .await .unwrap(); + while let Some(event) = event_stream.next().await { + let event = event.unwrap(); + if !event_fn(event) { + break; + } + } + }; - spawned_task.await; - assert!(receiver.recv().await.unwrap()); - } + let spawned_task = task::spawn(listen_task()); + task::sleep(std::time::Duration::from_millis(200)).await; - async fn copy_object(&self) { - let src_object_name = rand_object_name(); + let size = 16_usize; + ctx.client + .put_object_old( + &mut PutObjectArgs::new( + &bucket_name, + &object_name, + &mut RandReader::new(size), + Some(size), + None, + ) + .unwrap(), + ) + .await + .unwrap(); - let size = 16_usize; - self.client - .put_object_old( - &mut PutObjectArgs::new( - &self.test_bucket, - &src_object_name, - &mut RandReader::new(size), - Some(size), - None, - ) - .unwrap(), + ctx.client + .remove_object(&bucket_name, object_name.as_str()) + .send() + .await + .unwrap(); + + spawned_task.await; + assert!(receiver.recv().await.unwrap()); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn copy_object() { + let ctx = TestContext::new_from_env(); + let (bucket_name, _cleanup) = create_bucket_helper(&ctx).await; + let src_object_name = rand_object_name(); + + let size = 16_usize; + ctx.client + .put_object_old( + &mut PutObjectArgs::new( + &bucket_name, + &src_object_name, + &mut RandReader::new(size), + Some(size), + None, ) - .await - .unwrap(); + .unwrap(), + ) + .await + .unwrap(); - let object_name = rand_object_name(); - self.client - .copy_object( - &CopyObjectArgs::new( - &self.test_bucket, - &object_name, - CopySource::new(&self.test_bucket, &src_object_name).unwrap(), - ) - .unwrap(), + let object_name = rand_object_name(); + ctx.client + .copy_object( + &CopyObjectArgs::new( + &bucket_name, + &object_name, + CopySource::new(&bucket_name, &src_object_name).unwrap(), ) - .await - .unwrap(); + .unwrap(), + ) + .await + .unwrap(); - let resp = self - .client - .stat_object(&StatObjectArgs::new(&self.test_bucket, &object_name).unwrap()) - .await - .unwrap(); - assert_eq!(resp.size, size); + let resp = ctx + .client + .stat_object(&StatObjectArgs::new(&bucket_name, &object_name).unwrap()) + .await + .unwrap(); + assert_eq!(resp.size, size); - self.client - .remove_object(&self.test_bucket, object_name.as_str()) - .send() - .await - .unwrap(); - self.client - .remove_object(&self.test_bucket, src_object_name.as_str()) - .send() - .await - .unwrap(); - } + ctx.client + .remove_object(&bucket_name, object_name.as_str()) + .send() + .await + .unwrap(); + ctx.client + .remove_object(&bucket_name, src_object_name.as_str()) + .send() + .await + .unwrap(); +} - async fn compose_object(&self) { - let src_object_name = rand_object_name(); +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn compose_object() { + let ctx = TestContext::new_from_env(); + let (bucket_name, _cleanup) = create_bucket_helper(&ctx).await; + let src_object_name = rand_object_name(); - let size = 16_usize; - self.client - .put_object_old( - &mut PutObjectArgs::new( - &self.test_bucket, - &src_object_name, - &mut RandReader::new(size), - Some(size), - None, - ) - .unwrap(), + let size = 16_usize; + ctx.client + .put_object_old( + &mut PutObjectArgs::new( + &bucket_name, + &src_object_name, + &mut RandReader::new(size), + Some(size), + None, ) - .await - .unwrap(); + .unwrap(), + ) + .await + .unwrap(); - let mut s1 = ComposeSource::new(&self.test_bucket, &src_object_name).unwrap(); - s1.offset = Some(3); - s1.length = Some(5); - let mut sources: Vec = Vec::new(); - sources.push(s1); + let mut s1 = ComposeSource::new(&bucket_name, &src_object_name).unwrap(); + s1.offset = Some(3); + s1.length = Some(5); + let mut sources: Vec = Vec::new(); + sources.push(s1); - let object_name = rand_object_name(); + let object_name = rand_object_name(); - self.client - .compose_object( - &mut ComposeObjectArgs::new(&self.test_bucket, &object_name, &mut sources).unwrap(), - ) - .await - .unwrap(); + ctx.client + .compose_object( + &mut ComposeObjectArgs::new(&bucket_name, &object_name, &mut sources).unwrap(), + ) + .await + .unwrap(); - let resp = self - .client - .stat_object(&StatObjectArgs::new(&self.test_bucket, &object_name).unwrap()) - .await - .unwrap(); - assert_eq!(resp.size, 5); + let resp = ctx + .client + .stat_object(&StatObjectArgs::new(&bucket_name, &object_name).unwrap()) + .await + .unwrap(); + assert_eq!(resp.size, 5); - self.client - .remove_object(&self.test_bucket, object_name.as_str()) - .send() - .await - .unwrap(); - self.client - .remove_object(&self.test_bucket, src_object_name.as_str()) - .send() - .await - .unwrap(); - } + ctx.client + .remove_object(&bucket_name, object_name.as_str()) + .send() + .await + .unwrap(); + ctx.client + .remove_object(&bucket_name, src_object_name.as_str()) + .send() + .await + .unwrap(); +} - async fn set_get_delete_bucket_notification(&self) { - let bucket_name = rand_bucket_name(); - self.client - .make_bucket(&MakeBucketArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn set_get_delete_bucket_notification() { + let ctx = TestContext::new_from_env(); + let (bucket_name, _cleanup) = create_bucket_helper(&ctx).await; - self.client - .set_bucket_notification( - &SetBucketNotificationArgs::new( - &bucket_name, - &NotificationConfig { - cloud_func_config_list: None, - queue_config_list: Some(vec![QueueConfig { - events: vec![ - String::from("s3:ObjectCreated:Put"), - String::from("s3:ObjectCreated:Copy"), - ], - id: None, - prefix_filter_rule: Some(PrefixFilterRule { - value: String::from("images"), - }), - suffix_filter_rule: Some(SuffixFilterRule { - value: String::from("pg"), - }), - queue: String::from(ClientTest::SQS_ARN), - }]), - topic_config_list: None, - }, - ) - .unwrap(), + ctx.client + .set_bucket_notification( + &SetBucketNotificationArgs::new( + &bucket_name, + &NotificationConfig { + cloud_func_config_list: None, + queue_config_list: Some(vec![QueueConfig { + events: vec![ + String::from("s3:ObjectCreated:Put"), + String::from("s3:ObjectCreated:Copy"), + ], + id: None, + prefix_filter_rule: Some(PrefixFilterRule { + value: String::from("images"), + }), + suffix_filter_rule: Some(SuffixFilterRule { + value: String::from("pg"), + }), + queue: String::from(TestContext::SQS_ARN), + }]), + topic_config_list: None, + }, ) - .await - .unwrap(); - - let resp = self - .client - .get_bucket_notification(&GetBucketNotificationArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); - assert_eq!(resp.config.queue_config_list.as_ref().unwrap().len(), 1); - assert!(resp.config.queue_config_list.as_ref().unwrap()[0] - .events - .contains(&String::from("s3:ObjectCreated:Put"))); - assert!(resp.config.queue_config_list.as_ref().unwrap()[0] - .events - .contains(&String::from("s3:ObjectCreated:Copy"))); - assert_eq!( - resp.config.queue_config_list.as_ref().unwrap()[0] - .prefix_filter_rule - .as_ref() - .unwrap() - .value, - "images" - ); - assert_eq!( - resp.config.queue_config_list.as_ref().unwrap()[0] - .suffix_filter_rule - .as_ref() - .unwrap() - .value, - "pg" - ); - assert_eq!( - resp.config.queue_config_list.as_ref().unwrap()[0].queue, - ClientTest::SQS_ARN - ); + .unwrap(), + ) + .await + .unwrap(); - self.client - .delete_bucket_notification(&DeleteBucketNotificationArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); + let resp = ctx + .client + .get_bucket_notification(&GetBucketNotificationArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + assert_eq!(resp.config.queue_config_list.as_ref().unwrap().len(), 1); + assert!(resp.config.queue_config_list.as_ref().unwrap()[0] + .events + .contains(&String::from("s3:ObjectCreated:Put"))); + assert!(resp.config.queue_config_list.as_ref().unwrap()[0] + .events + .contains(&String::from("s3:ObjectCreated:Copy"))); + assert_eq!( + resp.config.queue_config_list.as_ref().unwrap()[0] + .prefix_filter_rule + .as_ref() + .unwrap() + .value, + "images" + ); + assert_eq!( + resp.config.queue_config_list.as_ref().unwrap()[0] + .suffix_filter_rule + .as_ref() + .unwrap() + .value, + "pg" + ); + assert_eq!( + resp.config.queue_config_list.as_ref().unwrap()[0].queue, + TestContext::SQS_ARN + ); - let resp = self - .client - .get_bucket_notification(&GetBucketNotificationArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); - assert!(resp.config.queue_config_list.is_none()); + ctx.client + .delete_bucket_notification(&DeleteBucketNotificationArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); - self.client - .remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); - } + let resp = ctx + .client + .get_bucket_notification(&GetBucketNotificationArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + assert!(resp.config.queue_config_list.is_none()); +} - async fn set_get_delete_bucket_policy(&self) { - let bucket_name = rand_bucket_name(); - self.client - .make_bucket(&MakeBucketArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn set_get_delete_bucket_policy() { + let ctx = TestContext::new_from_env(); + let (bucket_name, _cleanup) = create_bucket_helper(&ctx).await; - let config = r#" + let config = r#" { "Version": "2012-10-17", "Statement": [ @@ -1068,446 +1218,318 @@ impl ClientTest { ] } "# - .replace("", &bucket_name); - - self.client - .set_bucket_policy(&SetBucketPolicyArgs::new(&bucket_name, &config).unwrap()) - .await - .unwrap(); - - let resp = self - .client - .get_bucket_policy(&GetBucketPolicyArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); - assert!(!resp.config.is_empty()); + .replace("", &bucket_name); - self.client - .delete_bucket_policy(&DeleteBucketPolicyArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); + ctx.client + .set_bucket_policy(&SetBucketPolicyArgs::new(&bucket_name, &config).unwrap()) + .await + .unwrap(); - let resp = self - .client - .get_bucket_policy(&GetBucketPolicyArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); - assert_eq!(resp.config, "{}"); + let resp = ctx + .client + .get_bucket_policy(&GetBucketPolicyArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + assert!(!resp.config.is_empty()); - self.client - .remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); - } + ctx.client + .delete_bucket_policy(&DeleteBucketPolicyArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); - async fn set_get_delete_bucket_tags(&self) { - let bucket_name = rand_bucket_name(); - self.client - .make_bucket(&MakeBucketArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); + let resp = ctx + .client + .get_bucket_policy(&GetBucketPolicyArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + assert_eq!(resp.config, "{}"); +} - let tags = HashMap::from([ - (String::from("Project"), String::from("Project One")), - (String::from("User"), String::from("jsmith")), - ]); +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn set_get_delete_bucket_tags() { + let ctx = TestContext::new_from_env(); + let (bucket_name, _cleanup) = create_bucket_helper(&ctx).await; - self.client - .set_bucket_tags(&SetBucketTagsArgs::new(&bucket_name, &tags).unwrap()) - .await - .unwrap(); + let tags = HashMap::from([ + (String::from("Project"), String::from("Project One")), + (String::from("User"), String::from("jsmith")), + ]); - let resp = self - .client - .get_bucket_tags(&GetBucketTagsArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); - assert!(resp.tags.len() == tags.len() && resp.tags.keys().all(|k| tags.contains_key(k))); + ctx.client + .set_bucket_tags(&SetBucketTagsArgs::new(&bucket_name, &tags).unwrap()) + .await + .unwrap(); - self.client - .delete_bucket_tags(&DeleteBucketTagsArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); + let resp = ctx + .client + .get_bucket_tags(&GetBucketTagsArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + assert!(resp.tags.len() == tags.len() && resp.tags.keys().all(|k| tags.contains_key(k))); - let resp = self - .client - .get_bucket_tags(&GetBucketTagsArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); - assert!(resp.tags.is_empty()); + ctx.client + .delete_bucket_tags(&DeleteBucketTagsArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); - self.client - .remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); - } + let resp = ctx + .client + .get_bucket_tags(&GetBucketTagsArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + assert!(resp.tags.is_empty()); +} - async fn set_get_delete_object_lock_config(&self) { - let bucket_name = rand_bucket_name(); +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn set_get_delete_object_lock_config() { + let ctx = TestContext::new_from_env(); + let bucket_name = rand_bucket_name(); - let mut args = MakeBucketArgs::new(&bucket_name).unwrap(); - args.object_lock = true; - self.client.make_bucket(&args).await.unwrap(); + let mut args = MakeBucketArgs::new(&bucket_name).unwrap(); + args.object_lock = true; + ctx.client.make_bucket(&args).await.unwrap(); + let _cleanup = CleanupGuard::new(&ctx, &bucket_name); - self.client - .set_object_lock_config( - &SetObjectLockConfigArgs::new( - &bucket_name, - &ObjectLockConfig::new(RetentionMode::GOVERNANCE, Some(7), None).unwrap(), - ) - .unwrap(), + ctx.client + .set_object_lock_config( + &SetObjectLockConfigArgs::new( + &bucket_name, + &ObjectLockConfig::new(RetentionMode::GOVERNANCE, Some(7), None).unwrap(), ) - .await - .unwrap(); - - let resp = self - .client - .get_object_lock_config(&GetObjectLockConfigArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); - assert!(match resp.config.retention_mode { - Some(r) => matches!(r, RetentionMode::GOVERNANCE), - _ => false, - }); - - assert!(resp.config.retention_duration_days == Some(7)); - assert!(resp.config.retention_duration_years.is_none()); - - self.client - .delete_object_lock_config(&DeleteObjectLockConfigArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); + .unwrap(), + ) + .await + .unwrap(); - let resp = self - .client - .get_object_lock_config(&GetObjectLockConfigArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); - assert!(resp.config.retention_mode.is_none()); + let resp = ctx + .client + .get_object_lock_config(&GetObjectLockConfigArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + assert!(match resp.config.retention_mode { + Some(r) => matches!(r, RetentionMode::GOVERNANCE), + _ => false, + }); - self.client - .remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); - } + assert_eq!(resp.config.retention_duration_days, Some(7)); + assert!(resp.config.retention_duration_years.is_none()); - async fn set_get_delete_object_tags(&self) { - let object_name = rand_object_name(); + ctx.client + .delete_object_lock_config(&DeleteObjectLockConfigArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); - let size = 16_usize; - self.client - .put_object_old( - &mut PutObjectArgs::new( - &self.test_bucket, - &object_name, - &mut RandReader::new(size), - Some(size), - None, - ) - .unwrap(), - ) - .await - .unwrap(); + let resp = ctx + .client + .get_object_lock_config(&GetObjectLockConfigArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + assert!(resp.config.retention_mode.is_none()); +} - let tags = HashMap::from([ - (String::from("Project"), String::from("Project One")), - (String::from("User"), String::from("jsmith")), - ]); +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn set_get_delete_object_tags() { + let ctx = TestContext::new_from_env(); + let (bucket_name, _cleanup) = create_bucket_helper(&ctx).await; + let object_name = rand_object_name(); - self.client - .set_object_tags( - &SetObjectTagsArgs::new(&self.test_bucket, &object_name, &tags).unwrap(), + let size = 16_usize; + ctx.client + .put_object_old( + &mut PutObjectArgs::new( + &bucket_name, + &object_name, + &mut RandReader::new(size), + Some(size), + None, ) - .await - .unwrap(); + .unwrap(), + ) + .await + .unwrap(); - let resp = self - .client - .get_object_tags(&GetObjectTagsArgs::new(&self.test_bucket, &object_name).unwrap()) - .await - .unwrap(); - assert!(resp.tags.len() == tags.len() && resp.tags.keys().all(|k| tags.contains_key(k))); + let tags = HashMap::from([ + (String::from("Project"), String::from("Project One")), + (String::from("User"), String::from("jsmith")), + ]); - self.client - .delete_object_tags( - &DeleteObjectTagsArgs::new(&self.test_bucket, &object_name).unwrap(), - ) - .await - .unwrap(); + ctx.client + .set_object_tags(&SetObjectTagsArgs::new(&bucket_name, &object_name, &tags).unwrap()) + .await + .unwrap(); - let resp = self - .client - .get_object_tags(&GetObjectTagsArgs::new(&self.test_bucket, &object_name).unwrap()) - .await - .unwrap(); - assert!(resp.tags.is_empty()); + let resp = ctx + .client + .get_object_tags(&GetObjectTagsArgs::new(&bucket_name, &object_name).unwrap()) + .await + .unwrap(); + assert!(resp.tags.len() == tags.len() && resp.tags.keys().all(|k| tags.contains_key(k))); - self.client - .remove_object(&self.test_bucket, object_name.as_str()) - .send() - .await - .unwrap(); - } + ctx.client + .delete_object_tags(&DeleteObjectTagsArgs::new(&bucket_name, &object_name).unwrap()) + .await + .unwrap(); - async fn set_get_bucket_versioning(&self) { - let bucket_name = rand_bucket_name(); + let resp = ctx + .client + .get_object_tags(&GetObjectTagsArgs::new(&bucket_name, &object_name).unwrap()) + .await + .unwrap(); + assert!(resp.tags.is_empty()); - self.client - .make_bucket(&MakeBucketArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); + ctx.client + .remove_object(&bucket_name, object_name.as_str()) + .send() + .await + .unwrap(); +} - self.client - .set_bucket_versioning(&bucket_name) - .versioning_status(VersioningStatus::Enabled) - .send() - .await - .unwrap(); +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn set_get_bucket_versioning() { + let ctx = TestContext::new_from_env(); + let (bucket_name, _cleanup) = create_bucket_helper(&ctx).await; - let resp: GetBucketVersioningResponse = self - .client - .get_bucket_versioning(&bucket_name) - .send() - .await - .unwrap(); - assert_eq!(resp.status, Some(VersioningStatus::Enabled)); + ctx.client + .set_bucket_versioning(&bucket_name) + .versioning_status(VersioningStatus::Enabled) + .send() + .await + .unwrap(); - self.client - .set_bucket_versioning(&bucket_name) - .versioning_status(VersioningStatus::Suspended) - .send() - .await - .unwrap(); + let resp: GetBucketVersioningResponse = ctx + .client + .get_bucket_versioning(&bucket_name) + .send() + .await + .unwrap(); + assert_eq!(resp.status, Some(VersioningStatus::Enabled)); - let resp: GetBucketVersioningResponse = self - .client - .get_bucket_versioning(&bucket_name) - .send() - .await - .unwrap(); - assert_eq!(resp.status, Some(VersioningStatus::Suspended)); + ctx.client + .set_bucket_versioning(&bucket_name) + .versioning_status(VersioningStatus::Suspended) + .send() + .await + .unwrap(); - self.client - .remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); - } + let resp: GetBucketVersioningResponse = ctx + .client + .get_bucket_versioning(&bucket_name) + .send() + .await + .unwrap(); + assert_eq!(resp.status, Some(VersioningStatus::Suspended)); +} - async fn set_get_object_retention(&self) { - let bucket_name = rand_bucket_name(); +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn set_get_object_retention() { + let ctx = TestContext::new_from_env(); + let bucket_name = rand_bucket_name(); - let mut args = MakeBucketArgs::new(&bucket_name).unwrap(); - args.object_lock = true; - self.client.make_bucket(&args).await.unwrap(); + let mut args = MakeBucketArgs::new(&bucket_name).unwrap(); + args.object_lock = true; + ctx.client.make_bucket(&args).await.unwrap(); + let _cleanup = CleanupGuard::new(&ctx, &bucket_name); - let object_name = rand_object_name(); + let object_name = rand_object_name(); - let size = 16_usize; - let obj_resp = self - .client - .put_object_old( - &mut PutObjectArgs::new( - &bucket_name, - &object_name, - &mut RandReader::new(size), - Some(size), - None, - ) - .unwrap(), + let size = 16_usize; + let obj_resp = ctx + .client + .put_object_old( + &mut PutObjectArgs::new( + &bucket_name, + &object_name, + &mut RandReader::new(size), + Some(size), + None, ) - .await - .unwrap(); - - let retain_until_date = utc_now() + Duration::days(1); - let args = SetObjectRetentionArgs::new( - &bucket_name, - &object_name, - Some(RetentionMode::GOVERNANCE), - Some(retain_until_date), + .unwrap(), ) + .await .unwrap(); - self.client.set_object_retention(&args).await.unwrap(); - - let resp = self - .client - .get_object_retention(&GetObjectRetentionArgs::new(&bucket_name, &object_name).unwrap()) - .await - .unwrap(); - assert!(match resp.retention_mode { - Some(v) => matches!(v, RetentionMode::GOVERNANCE), - _ => false, - }); - assert!(match resp.retain_until_date { - Some(v) => to_iso8601utc(v) == to_iso8601utc(retain_until_date), - _ => false, - },); - - let mut args = SetObjectRetentionArgs::new(&bucket_name, &object_name, None, None).unwrap(); - args.bypass_governance_mode = true; - self.client.set_object_retention(&args).await.unwrap(); - - let resp = self - .client - .get_object_retention(&GetObjectRetentionArgs::new(&bucket_name, &object_name).unwrap()) - .await - .unwrap(); - assert!(resp.retention_mode.is_none()); - assert!(resp.retain_until_date.is_none()); - - self.client - .remove_object( - &bucket_name, - (object_name.as_str(), obj_resp.version_id.as_deref()), - ) - .send() - .await - .unwrap(); - - self.client - .remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap()) - .await - .unwrap(); - } - - async fn get_presigned_object_url(&self) { - let object_name = rand_object_name(); - let resp = self - .client - .get_presigned_object_url( - &GetPresignedObjectUrlArgs::new(&self.test_bucket, &object_name, Method::GET) - .unwrap(), - ) - .await - .unwrap(); - assert!(resp.url.contains("X-Amz-Signature=")); - } - - async fn get_presigned_post_form_data(&self) { - let object_name = rand_object_name(); - let expiration = utc_now() + Duration::days(5); - - let mut policy = PostPolicy::new(&self.test_bucket, &expiration).unwrap(); - policy.add_equals_condition("key", &object_name).unwrap(); - policy - .add_content_length_range_condition(1024 * 1024, 4 * 1024 * 1024) - .unwrap(); + let retain_until_date = utc_now() + chrono::Duration::days(1); + let args = SetObjectRetentionArgs::new( + &bucket_name, + &object_name, + Some(RetentionMode::GOVERNANCE), + Some(retain_until_date), + ) + .unwrap(); + + ctx.client.set_object_retention(&args).await.unwrap(); + + let resp = ctx + .client + .get_object_retention(&GetObjectRetentionArgs::new(&bucket_name, &object_name).unwrap()) + .await + .unwrap(); + assert!(match resp.retention_mode { + Some(v) => matches!(v, RetentionMode::GOVERNANCE), + _ => false, + }); + assert!(match resp.retain_until_date { + Some(v) => to_iso8601utc(v) == to_iso8601utc(retain_until_date), + _ => false, + },); + + let mut args = SetObjectRetentionArgs::new(&bucket_name, &object_name, None, None).unwrap(); + args.bypass_governance_mode = true; + ctx.client.set_object_retention(&args).await.unwrap(); + + let resp = ctx + .client + .get_object_retention(&GetObjectRetentionArgs::new(&bucket_name, &object_name).unwrap()) + .await + .unwrap(); + assert!(resp.retention_mode.is_none()); + assert!(resp.retain_until_date.is_none()); - let form_data = self - .client - .get_presigned_post_form_data(&policy) - .await - .unwrap(); - assert!(form_data.contains_key("x-amz-signature")); - assert!(form_data.contains_key("policy")); - } + ctx.client + .remove_object( + &bucket_name, + (object_name.as_str(), obj_resp.version_id.as_deref()), + ) + .send() + .await + .unwrap(); } -#[tokio::main] -#[test] -async fn s3_tests() -> Result<(), Box> { - let host = std::env::var("SERVER_ENDPOINT")?; - let access_key = std::env::var("ACCESS_KEY")?; - let secret_key = std::env::var("SECRET_KEY")?; - let secure = std::env::var("ENABLE_HTTPS").is_ok(); - let value = std::env::var("SSL_CERT_FILE")?; - let mut ssl_cert_file = None; - if !value.is_empty() { - ssl_cert_file = Some(Path::new(&value)); - } - let ignore_cert_check = std::env::var("IGNORE_CERT_CHECK").is_ok(); - let region = std::env::var("SERVER_REGION").ok(); - - let mut base_url: BaseUrl = host.parse().unwrap(); - base_url.https = secure; - if let Some(v) = region { - base_url.region = v; - } - - let static_provider = StaticProvider::new(&access_key, &secret_key, None); - let ctest = ClientTest::new( - base_url, - access_key, - secret_key, - static_provider, - Some(ignore_cert_check), - ssl_cert_file, - ); - ctest.init().await; - - println!("make_bucket() + bucket_exists() + remove_bucket()"); - ctest.bucket_exists().await; - - println!("list_buckets()"); - ctest.list_buckets().await; +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn get_presigned_object_url() { + let ctx = TestContext::new_from_env(); + let (bucket_name, _cleanup) = create_bucket_helper(&ctx).await; - println!("put_object() + stat_object() + remove_object()"); - ctest.put_object().await; - - println!("[Multipart] put_object()"); - ctest.put_object_multipart().await; - - println!("put_object_stream()"); - ctest.put_object_content().await; - - println!("put_object_stream_2()"); - ctest.put_object_content_2().await; - - println!("get_object_old()"); - ctest.get_object_old().await; - - println!("get_object()"); - ctest.get_object().await; - - println!("{{upload,download}}_object()"); - ctest.upload_download_object().await; - - println!("remove_objects()"); - ctest.remove_objects().await; - - println!("list_objects()"); - ctest.list_objects().await; - - println!("select_object_content()"); - ctest.select_object_content().await; - - println!("listen_bucket_notification()"); - ctest.listen_bucket_notification().await; - - println!("copy_object()"); - ctest.copy_object().await; - - println!("compose_object()"); - ctest.compose_object().await; - - println!("{{set,get,delete}}_bucket_notification()"); - ctest.set_get_delete_bucket_notification().await; - - println!("{{set,get,delete}}_bucket_policy()"); - ctest.set_get_delete_bucket_policy().await; - - println!("{{set,get,delete}}_bucket_tags()"); - ctest.set_get_delete_bucket_tags().await; - - println!("{{set,get,delete}}_object_lock_config()"); - ctest.set_get_delete_object_lock_config().await; - - println!("{{set,get,delete}}_object_tags()"); - ctest.set_get_delete_object_tags().await; - - println!("{{set,get}}_bucket_versioning()"); - ctest.set_get_bucket_versioning().await; - - println!("{{set,get}}_object_retention()"); - ctest.set_get_object_retention().await; + let object_name = rand_object_name(); + let resp = ctx + .client + .get_presigned_object_url( + &GetPresignedObjectUrlArgs::new(&bucket_name, &object_name, Method::GET).unwrap(), + ) + .await + .unwrap(); + assert!(resp.url.contains("X-Amz-Signature=")); +} - println!("get_presigned_object_url()"); - ctest.get_presigned_object_url().await; +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn get_presigned_post_form_data() { + let ctx = TestContext::new_from_env(); + let (bucket_name, _cleanup) = create_bucket_helper(&ctx).await; - println!("get_presigned_post_form_data()"); - ctest.get_presigned_post_form_data().await; + let object_name = rand_object_name(); + let expiration = utc_now() + chrono::Duration::days(5); - ctest.drop().await; + let mut policy = PostPolicy::new(&bucket_name, &expiration).unwrap(); + policy.add_equals_condition("key", &object_name).unwrap(); + policy + .add_content_length_range_condition(1024 * 1024, 4 * 1024 * 1024) + .unwrap(); - Ok(()) + let form_data = ctx + .client + .get_presigned_post_form_data(&policy) + .await + .unwrap(); + assert!(form_data.contains_key("x-amz-signature")); + assert!(form_data.contains_key("policy")); }