53
53
import software .amazon .awssdk .services .s3 .model .NoSuchKeyException ;
54
54
import software .amazon .awssdk .services .s3 .model .ObjectAttributes ;
55
55
import software .amazon .awssdk .services .s3 .model .PutObjectRequest ;
56
- import software .amazon .awssdk .services .s3 .model .ServerSideEncryption ;
57
56
import software .amazon .awssdk .services .s3 .model .UploadPartRequest ;
58
57
import software .amazon .awssdk .services .s3 .model .UploadPartResponse ;
59
58
import software .amazon .awssdk .services .s3 .paginators .ListObjectsV2Iterable ;
110
109
import static org .opensearch .repositories .s3 .S3Repository .MAX_FILE_SIZE ;
111
110
import static org .opensearch .repositories .s3 .S3Repository .MAX_FILE_SIZE_USING_MULTIPART ;
112
111
import static org .opensearch .repositories .s3 .S3Repository .MIN_PART_SIZE_USING_MULTIPART ;
112
+ import static org .opensearch .repositories .s3 .utils .SseKmsUtil .configureEncryptionSettings ;
113
113
114
114
class S3BlobContainer extends AbstractBlobContainer implements AsyncMultiStreamBlobContainer {
115
115
@@ -129,7 +129,13 @@ public boolean blobExists(String blobName) {
129
129
try (AmazonS3Reference clientReference = blobStore .clientReference ()) {
130
130
SocketAccess .doPrivileged (
131
131
() -> clientReference .get ()
132
- .headObject (HeadObjectRequest .builder ().bucket (blobStore .bucket ()).key (buildKey (blobName )).build ())
132
+ .headObject (
133
+ HeadObjectRequest .builder ()
134
+ .bucket (blobStore .bucket ())
135
+ .key (buildKey (blobName ))
136
+ .expectedBucketOwner (blobStore .expectedBucketOwner ())
137
+ .build ()
138
+ )
133
139
);
134
140
return true ;
135
141
} catch (NoSuchKeyException e ) {
@@ -214,7 +220,12 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
214
220
writeContext .doRemoteDataIntegrityCheck (),
215
221
writeContext .getExpectedChecksum (),
216
222
blobStore .isUploadRetryEnabled (),
217
- writeContext .getMetadata ()
223
+ writeContext .getMetadata (),
224
+ blobStore .serverSideEncryptionType (),
225
+ blobStore .serverSideEncryptionKmsKey (),
226
+ blobStore .serverSideEncryptionBucketKey (),
227
+ blobStore .serverSideEncryptionEncryptionContext (),
228
+ blobStore .expectedBucketOwner ()
218
229
);
219
230
try {
220
231
// If file size is greater than the queue capacity than SizeBasedBlockingQ will always reject the upload.
@@ -498,6 +509,7 @@ private ListObjectsV2Request listObjectsRequest(String keyPath) {
498
509
.prefix (keyPath )
499
510
.delimiter ("/" )
500
511
.overrideConfiguration (o -> o .addMetricPublisher (blobStore .getStatsMetricPublisher ().listObjectsMetricPublisher ))
512
+ .expectedBucketOwner (blobStore .expectedBucketOwner ())
501
513
.build ();
502
514
}
503
515
@@ -534,14 +546,13 @@ void executeSingleUpload(
534
546
.contentLength (blobSize )
535
547
.storageClass (blobStore .getStorageClass ())
536
548
.acl (blobStore .getCannedACL ())
537
- .overrideConfiguration (o -> o .addMetricPublisher (blobStore .getStatsMetricPublisher ().putObjectMetricPublisher ));
549
+ .overrideConfiguration (o -> o .addMetricPublisher (blobStore .getStatsMetricPublisher ().putObjectMetricPublisher ))
550
+ .expectedBucketOwner (blobStore .expectedBucketOwner ());
538
551
539
552
if (CollectionUtils .isNotEmpty (metadata )) {
540
553
putObjectRequestBuilder = putObjectRequestBuilder .metadata (metadata );
541
554
}
542
- if (blobStore .serverSideEncryption ()) {
543
- putObjectRequestBuilder .serverSideEncryption (ServerSideEncryption .AES256 );
544
- }
555
+ configureEncryptionSettings (putObjectRequestBuilder , blobStore );
545
556
546
557
PutObjectRequest putObjectRequest = putObjectRequestBuilder .build ();
547
558
try (AmazonS3Reference clientReference = blobStore .clientReference ()) {
@@ -591,15 +602,14 @@ void executeMultipartUpload(
591
602
.key (blobName )
592
603
.storageClass (blobStore .getStorageClass ())
593
604
.acl (blobStore .getCannedACL ())
594
- .overrideConfiguration (o -> o .addMetricPublisher (blobStore .getStatsMetricPublisher ().multipartUploadMetricCollector ));
605
+ .overrideConfiguration (o -> o .addMetricPublisher (blobStore .getStatsMetricPublisher ().multipartUploadMetricCollector ))
606
+ .expectedBucketOwner (blobStore .expectedBucketOwner ());
595
607
596
608
if (CollectionUtils .isNotEmpty (metadata )) {
597
609
createMultipartUploadRequestBuilder .metadata (metadata );
598
610
}
599
611
600
- if (blobStore .serverSideEncryption ()) {
601
- createMultipartUploadRequestBuilder .serverSideEncryption (ServerSideEncryption .AES256 );
602
- }
612
+ configureEncryptionSettings (createMultipartUploadRequestBuilder , blobStore );
603
613
604
614
final InputStream requestInputStream ;
605
615
if (blobStore .isUploadRetryEnabled ()) {
@@ -628,6 +638,7 @@ void executeMultipartUpload(
628
638
.partNumber (i )
629
639
.contentLength ((i < nbParts ) ? partSize : lastPartSize )
630
640
.overrideConfiguration (o -> o .addMetricPublisher (blobStore .getStatsMetricPublisher ().multipartUploadMetricCollector ))
641
+ .expectedBucketOwner (blobStore .expectedBucketOwner ())
631
642
.build ();
632
643
633
644
bytesCount += uploadPartRequest .contentLength ();
@@ -650,6 +661,7 @@ void executeMultipartUpload(
650
661
.uploadId (uploadId .get ())
651
662
.multipartUpload (CompletedMultipartUpload .builder ().parts (parts ).build ())
652
663
.overrideConfiguration (o -> o .addMetricPublisher (blobStore .getStatsMetricPublisher ().multipartUploadMetricCollector ))
664
+ .expectedBucketOwner (blobStore .expectedBucketOwner ())
653
665
.build ();
654
666
655
667
SocketAccess .doPrivilegedVoid (() -> clientReference .get ().completeMultipartUpload (completeMultipartUploadRequest ));
@@ -663,6 +675,7 @@ void executeMultipartUpload(
663
675
.bucket (bucketName )
664
676
.key (blobName )
665
677
.uploadId (uploadId .get ())
678
+ .expectedBucketOwner (blobStore .expectedBucketOwner ())
666
679
.build ();
667
680
try (AmazonS3Reference clientReference = blobStore .clientReference ()) {
668
681
SocketAccess .doPrivilegedVoid (() -> clientReference .get ().abortMultipartUpload (abortRequest ));
@@ -729,12 +742,14 @@ CompletableFuture<InputStreamContainer> getBlobPartInputStreamContainer(
729
742
@ Nullable Integer partNumber
730
743
) {
731
744
final boolean isMultipartObject = partNumber != null ;
732
- final GetObjectRequest .Builder getObjectRequestBuilder = GetObjectRequest .builder ().bucket (bucketName ).key (blobKey );
745
+ final GetObjectRequest .Builder getObjectRequestBuilder = GetObjectRequest .builder ()
746
+ .bucket (bucketName )
747
+ .key (blobKey )
748
+ .expectedBucketOwner (blobStore .expectedBucketOwner ());
733
749
734
750
if (isMultipartObject ) {
735
751
getObjectRequestBuilder .partNumber (partNumber );
736
752
}
737
-
738
753
return SocketAccess .doPrivileged (
739
754
() -> s3AsyncClient .getObject (getObjectRequestBuilder .build (), AsyncResponseTransformer .toBlockingInputStream ())
740
755
.thenApply (response -> transformResponseToInputStreamContainer (response , isMultipartObject ))
@@ -775,6 +790,7 @@ CompletableFuture<GetObjectAttributesResponse> getBlobMetadata(S3AsyncClient s3A
775
790
.bucket (bucketName )
776
791
.key (blobName )
777
792
.objectAttributes (ObjectAttributes .CHECKSUM , ObjectAttributes .OBJECT_SIZE , ObjectAttributes .OBJECT_PARTS )
793
+ .expectedBucketOwner (blobStore .expectedBucketOwner ())
778
794
.build ();
779
795
780
796
return SocketAccess .doPrivileged (() -> s3AsyncClient .getObjectAttributes (getObjectAttributesRequest ));
0 commit comments