Skip to content

Commit 546c449

Browse files
authored
Merge pull request #41794 from sappusaketh/f-aws_kinesis_firehose_delivery_stream-msk_source_configuration-read_from_timestamp
feat: add read_from_timestamp option to msk_source_configuration for Firehose delivery stream
2 parents 8eeb933 + 98d2567 commit 546c449

File tree

4 files changed

+95
-0
lines changed

4 files changed

+95
-0
lines changed

.changelog/41794.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:enhancement
2+
resource/aws_kinesis_firehose_delivery_stream: Add `msk_source_configuration.read_from_timestamp` argument
3+
```

internal/service/firehose/delivery_stream.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -957,6 +957,12 @@ func resourceDeliveryStream() *schema.Resource {
957957
ForceNew: true,
958958
ValidateFunc: verify.ValidARN,
959959
},
960+
"read_from_timestamp": {
961+
Type: schema.TypeString,
962+
Optional: true,
963+
ForceNew: true,
964+
ValidateFunc: validation.IsRFC3339Time,
965+
},
960966
"topic_name": {
961967
Type: schema.TypeString,
962968
Required: true,
@@ -3493,6 +3499,11 @@ func expandMSKSourceConfiguration(tfMap map[string]interface{}) *types.MSKSource
34933499
apiObject.MSKClusterARN = aws.String(v)
34943500
}
34953501

3502+
if v, ok := tfMap["read_from_timestamp"].(string); ok && v != "" {
3503+
v, _ := time.Parse(time.RFC3339, v)
3504+
apiObject.ReadFromTimestamp = aws.Time(v)
3505+
}
3506+
34963507
if v, ok := tfMap["topic_name"].(string); ok && v != "" {
34973508
apiObject.TopicName = aws.String(v)
34983509
}
@@ -3533,6 +3544,10 @@ func flattenMSKSourceDescription(apiObject *types.MSKSourceDescription) map[stri
35333544
tfMap["msk_cluster_arn"] = aws.ToString(v)
35343545
}
35353546

3547+
if v := apiObject.ReadFromTimestamp; v != nil {
3548+
tfMap["read_from_timestamp"] = aws.ToTime(v).Format(time.RFC3339)
3549+
}
3550+
35363551
if v := apiObject.TopicName; v != nil {
35373552
tfMap["topic_name"] = aws.ToString(v)
35383553
}

internal/service/firehose/delivery_stream_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ import (
2424
"github.com/hashicorp/terraform-provider-aws/names"
2525
)
2626

27+
func init() {
28+
acctest.RegisterServiceErrorCheckFunc(names.FirehoseServiceID, testAccErrorCheckSkip)
29+
}
30+
31+
func testAccErrorCheckSkip(t *testing.T) resource.ErrorCheckFunc {
32+
return acctest.ErrorCheckSkipMessagesContaining(t,
33+
"Read from timestamp feature is not currently available",
34+
)
35+
}
36+
2737
func TestAccFirehoseDeliveryStream_basic(t *testing.T) {
2838
ctx := acctest.Context(t)
2939
var stream types.DeliveryStreamDescription
@@ -996,6 +1006,42 @@ func TestAccFirehoseDeliveryStream_ExtendedS3_mskClusterSource(t *testing.T) {
9961006
resource.TestCheckResourceAttrSet(resourceName, "msk_source_configuration.0.authentication_configuration.0.role_arn"),
9971007
resource.TestCheckResourceAttrSet(resourceName, "msk_source_configuration.0.msk_cluster_arn"),
9981008
resource.TestCheckResourceAttr(resourceName, "msk_source_configuration.0.topic_name", "test"),
1009+
resource.TestCheckResourceAttr(resourceName, "msk_source_configuration.0.read_from_timestamp", ""),
1010+
),
1011+
},
1012+
{
1013+
ResourceName: resourceName,
1014+
ImportState: true,
1015+
ImportStateVerify: true,
1016+
},
1017+
},
1018+
})
1019+
}
1020+
1021+
func TestAccFirehoseDeliveryStream_ExtendedS3_readFromTimestamp(t *testing.T) {
1022+
ctx := acctest.Context(t)
1023+
var stream types.DeliveryStreamDescription
1024+
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
1025+
resourceName := "aws_kinesis_firehose_delivery_stream.test"
1026+
1027+
resource.ParallelTest(t, resource.TestCase{
1028+
PreCheck: func() { acctest.PreCheck(ctx, t) },
1029+
ErrorCheck: acctest.ErrorCheck(t, names.FirehoseServiceID),
1030+
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
1031+
CheckDestroy: testAccCheckDeliveryStreamDestroy(ctx),
1032+
Steps: []resource.TestStep{
1033+
{
1034+
Config: testAccDeliveryStreamConfig_readFromTimestamp(rName),
1035+
Check: resource.ComposeTestCheckFunc(
1036+
testAccCheckDeliveryStreamExists(ctx, resourceName, &stream),
1037+
resource.TestCheckResourceAttr(resourceName, "kinesis_source_configuration.#", "0"),
1038+
resource.TestCheckResourceAttr(resourceName, "msk_source_configuration.#", "1"),
1039+
resource.TestCheckResourceAttr(resourceName, "msk_source_configuration.0.authentication_configuration.#", "1"),
1040+
resource.TestCheckResourceAttr(resourceName, "msk_source_configuration.0.authentication_configuration.0.connectivity", "PRIVATE"),
1041+
resource.TestCheckResourceAttrSet(resourceName, "msk_source_configuration.0.authentication_configuration.0.role_arn"),
1042+
resource.TestCheckResourceAttrSet(resourceName, "msk_source_configuration.0.msk_cluster_arn"),
1043+
resource.TestCheckResourceAttr(resourceName, "msk_source_configuration.0.topic_name", "test"),
1044+
resource.TestCheckResourceAttr(resourceName, "msk_source_configuration.0.read_from_timestamp", "2025-03-11T14:30:00Z"),
9991045
),
10001046
},
10011047
{
@@ -3303,6 +3349,36 @@ resource "aws_kinesis_firehose_delivery_stream" "test" {
33033349
`, rName))
33043350
}
33053351

3352+
func testAccDeliveryStreamConfig_readFromTimestamp(rName string) string {
3353+
return acctest.ConfigCompose(
3354+
testAccDeliveryStreamConfig_base(rName),
3355+
testAccDeliveryStreamConfig_baseMSKClusterSource(rName),
3356+
fmt.Sprintf(`
3357+
resource "aws_kinesis_firehose_delivery_stream" "test" {
3358+
depends_on = [aws_iam_role_policy.firehose, aws_iam_role_policy.msk_source, aws_msk_cluster_policy.test]
3359+
name = %[1]q
3360+
3361+
msk_source_configuration {
3362+
authentication_configuration {
3363+
connectivity = "PRIVATE"
3364+
role_arn = aws_iam_role.msk_source.arn
3365+
}
3366+
3367+
msk_cluster_arn = aws_msk_serverless_cluster.test.arn
3368+
topic_name = "test"
3369+
read_from_timestamp = "2025-03-11T14:30:00Z"
3370+
}
3371+
3372+
destination = "extended_s3"
3373+
3374+
extended_s3_configuration {
3375+
role_arn = aws_iam_role.firehose.arn
3376+
bucket_arn = aws_s3_bucket.bucket.arn
3377+
}
3378+
}
3379+
`, rName))
3380+
}
3381+
33063382
func testAccDeliveryStreamConfig_extendedS3DataFormatConversionConfigurationEnabled(rName string, enabled bool) string {
33073383
return acctest.ConfigCompose(testAccDeliveryStreamConfig_base(rName), fmt.Sprintf(`
33083384
resource "aws_glue_catalog_database" "test" {

website/docs/r/kinesis_firehose_delivery_stream.html.markdown

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -735,6 +735,7 @@ The `msk_source_configuration` configuration block supports the following argume
735735
* `authentication_configuration` - (Required) The authentication configuration of the Amazon MSK cluster. See [`authentication_configuration` block](#authentication_configuration-block) below for details.
736736
* `msk_cluster_arn` - (Required) The ARN of the Amazon MSK cluster.
737737
* `topic_name` - (Required) The topic name within the Amazon MSK cluster.
738+
* `read_from_timestamp` - (Optional) The start date and time in UTC for the offset position within your MSK topic from where Firehose begins to read. By default, this is set to timestamp when Firehose becomes Active. If you want to create a Firehose stream with Earliest start position set the `read_from_timestamp` parameter to Epoch (1970-01-01T00:00:00Z).
738739

739740
### `authentication_configuration` block
740741

0 commit comments

Comments
 (0)