diff --git a/.changelog/2474.txt b/.changelog/2474.txt new file mode 100644 index 0000000000..fc7943c4a1 --- /dev/null +++ b/.changelog/2474.txt @@ -0,0 +1,11 @@ +```release-note:enhancement +resource/mongodbatlas_stream_connection: Adds `networking` attribute +``` + +```release-note:enhancement +data-source/mongodbatlas_stream_connection: Adds `networking` attribute +``` + +```release-note:enhancement +data-source/mongodbatlas_stream_connections: Adds `networking` attribute +``` \ No newline at end of file diff --git a/docs/data-sources/stream_connection.md b/docs/data-sources/stream_connection.md index 242837f186..9085f16cb2 100644 --- a/docs/data-sources/stream_connection.md +++ b/docs/data-sources/stream_connection.md @@ -31,6 +31,7 @@ If `type` is of value `Kafka` the following additional attributes are defined: * `bootstrap_servers` - Comma separated list of server addresses. * `config` - A map of Kafka key-value pairs for optional configuration. This is a flat object, and keys can have '.' characters. * `security` - Properties for the secure transport connection to Kafka. For SSL, this can include the trusted certificate to use. See [security](#security). +* `networking` - Networking Access Type can either be `PUBLIC` (default) or `VPC`. See [networking](#networking). ### Authentication @@ -48,5 +49,12 @@ If `type` is of value `Kafka` the following additional attributes are defined: * `role` - The name of the role to use. Can be a built in role or a custom role. * `type` - Type of the DB role. Can be either BUILT_IN or CUSTOM. +### Networking +* `access` - Information about the networking access. See [access](#access). + +### Access +* `name` - Id of the vpc peer when the type is `VPC`. +* `type` - Selected networking type. Either `PUBLIC` or `VPC`. Defaults to `PUBLIC`. + To learn more, see: [MongoDB Atlas API - Stream Connection](https://www.mongodb.com/docs/atlas/reference/api-resources-spec/#tag/Streams/operation/getStreamConnection) Documentation. The [Terraform Provider Examples Section](https://github.com/mongodb/terraform-provider-mongodbatlas/blob/master/examples/mongodbatlas_stream_instance/atlas-streams-user-journey.md) also contains details on the overall support for Atlas Streams Processing in Terraform. diff --git a/docs/data-sources/stream_connections.md b/docs/data-sources/stream_connections.md index 6bdbfe2261..c25ee9a7b4 100644 --- a/docs/data-sources/stream_connections.md +++ b/docs/data-sources/stream_connections.md @@ -43,6 +43,7 @@ If `type` is of value `Kafka` the following additional attributes are defined: * `bootstrap_servers` - Comma separated list of server addresses. * `config` - A map of Kafka key-value pairs for optional configuration. This is a flat object, and keys can have '.' characters. * `security` - Properties for the secure transport connection to Kafka. For SSL, this can include the trusted certificate to use. See [security](#security). +* `networking` - Networking Access Type can either be `PUBLIC` (default) or `VPC`. See [networking](#networking). ### Authentication @@ -60,5 +61,12 @@ If `type` is of value `Kafka` the following additional attributes are defined: * `role` - The name of the role to use. Can be a built in role or a custom role. * `type` - Type of the DB role. Can be either BUILT_IN or CUSTOM. +### Networking +* `access` - Information about the networking access. See [access](#access). + +### Access +* `name` - Id of the vpc peer when the type is `VPC`. +* `type` - Networking type. Either `PUBLIC` or `VPC`. Default is `PUBLIC`. + To learn more, see: [MongoDB Atlas API - Stream Connection](https://www.mongodb.com/docs/atlas/reference/api-resources-spec/#tag/Streams/operation/listStreamConnections) Documentation. The [Terraform Provider Examples Section](https://github.com/mongodb/terraform-provider-mongodbatlas/blob/master/examples/mongodbatlas_stream_instance/atlas-streams-user-journey.md) also contains details on the overall support for Atlas Streams Processing in Terraform. diff --git a/docs/resources/stream_connection.md b/docs/resources/stream_connection.md index 9fd9d50454..b97b4de655 100644 --- a/docs/resources/stream_connection.md +++ b/docs/resources/stream_connection.md @@ -82,6 +82,7 @@ If `type` is of value `Kafka` the following additional arguments are defined: * `bootstrap_servers` - Comma separated list of server addresses. * `config` - A map of Kafka key-value pairs for optional configuration. This is a flat object, and keys can have '.' characters. * `security` - Properties for the secure transport connection to Kafka. For SSL, this can include the trusted certificate to use. See [security](#security). +* `networking` - Networking Access Type can either be `PUBLIC` (default) or `VPC`. See [networking](#networking). ### Authentication @@ -99,6 +100,13 @@ If `type` is of value `Kafka` the following additional arguments are defined: * `role` - The name of the role to use. Value can be `atlasAdmin`, `readWriteAnyDatabase`, or `readAnyDatabase` if `type` is set to `BUILT_IN`, or the name of a user-defined role if `type` is set to `CUSTOM`. * `type` - Type of the DB role. Can be either BUILT_IN or CUSTOM. +### Networking +* `access` - Information about the networking access. See [access](#access). + +### Access +* `name` - Id of the vpc peer when the type is `VPC`. +* `type` - Selected networking type. Either `PUBLIC` or `VPC`. Defaults to `PUBLIC`. + ## Import You can import a stream connection resource using the instance name, project ID, and connection name. The format must be `INSTANCE_NAME-PROJECT_ID-CONNECTION_NAME`. For example: diff --git a/examples/mongodbatlas_stream_connection/main.tf b/examples/mongodbatlas_stream_connection/main.tf index 0dc56c6cad..31e4ebdfdf 100644 --- a/examples/mongodbatlas_stream_connection/main.tf +++ b/examples/mongodbatlas_stream_connection/main.tf @@ -36,6 +36,11 @@ resource "mongodbatlas_stream_connection" "example-kafka-plaintext" { security = { protocol = "PLAINTEXT" } + networking = { + access = { + type = "PUBLIC" + } + } } resource "mongodbatlas_stream_connection" "example-kafka-ssl" { diff --git a/internal/service/streamconnection/data_source_stream_connection_test.go b/internal/service/streamconnection/data_source_stream_connection_test.go index de6e62e828..612f7c8779 100644 --- a/internal/service/streamconnection/data_source_stream_connection_test.go +++ b/internal/service/streamconnection/data_source_stream_connection_test.go @@ -20,8 +20,8 @@ func TestAccStreamDSStreamConnection_kafkaPlaintext(t *testing.T) { CheckDestroy: CheckDestroyStreamConnection, Steps: []resource.TestStep{ { - Config: streamConnectionDataSourceConfig(kafkaStreamConnectionConfig(projectID, instanceName, "user", "rawpassword", "localhost:9092,localhost:9092", "earliest", false)), - Check: kafkaStreamConnectionAttributeChecks(dataSourceName, instanceName, "user", "rawpassword", "localhost:9092,localhost:9092", "earliest", false, false), + Config: streamConnectionDataSourceConfig(kafkaStreamConnectionConfig(projectID, instanceName, "user", "rawpassword", "localhost:9092,localhost:9092", "earliest", kafkaNetworkingPublic, false)), + Check: kafkaStreamConnectionAttributeChecks(dataSourceName, instanceName, "user", "rawpassword", "localhost:9092,localhost:9092", "earliest", networkingTypePublic, false, false), }, }, }) @@ -39,8 +39,8 @@ func TestAccStreamDSStreamConnection_kafkaSSL(t *testing.T) { CheckDestroy: CheckDestroyStreamConnection, Steps: []resource.TestStep{ { - Config: streamConnectionDataSourceConfig(kafkaStreamConnectionConfig(projectID, instanceName, "user", "rawpassword", "localhost:9092", "earliest", true)), - Check: kafkaStreamConnectionAttributeChecks(dataSourceName, instanceName, "user", "rawpassword", "localhost:9092", "earliest", true, false), + Config: streamConnectionDataSourceConfig(kafkaStreamConnectionConfig(projectID, instanceName, "user", "rawpassword", "localhost:9092", "earliest", kafkaNetworkingPublic, true)), + Check: kafkaStreamConnectionAttributeChecks(dataSourceName, instanceName, "user", "rawpassword", "localhost:9092", "earliest", networkingTypePublic, true, false), }, }, }) diff --git a/internal/service/streamconnection/data_source_stream_connections_test.go b/internal/service/streamconnection/data_source_stream_connections_test.go index 9603eb6b93..094743dfd6 100644 --- a/internal/service/streamconnection/data_source_stream_connections_test.go +++ b/internal/service/streamconnection/data_source_stream_connections_test.go @@ -21,7 +21,7 @@ func TestAccStreamDSStreamConnections_basic(t *testing.T) { CheckDestroy: CheckDestroyStreamConnection, Steps: []resource.TestStep{ { - Config: streamConnectionsDataSourceConfig(kafkaStreamConnectionConfig(projectID, instanceName, "user", "rawpassword", "localhost:9092,localhost:9092", "earliest", false)), + Config: streamConnectionsDataSourceConfig(kafkaStreamConnectionConfig(projectID, instanceName, "user", "rawpassword", "localhost:9092,localhost:9092", "earliest", kafkaNetworkingPublic, false)), Check: streamConnectionsAttributeChecks(dataSourceName, nil, nil, 1), }, }, @@ -40,7 +40,7 @@ func TestAccStreamDSStreamConnections_withPageConfig(t *testing.T) { CheckDestroy: CheckDestroyStreamConnection, Steps: []resource.TestStep{ { - Config: streamConnectionsWithPageAttrDataSourceConfig(kafkaStreamConnectionConfig(projectID, instanceName, "user", "rawpassword", "localhost:9092,localhost:9092", "earliest", false)), + Config: streamConnectionsWithPageAttrDataSourceConfig(kafkaStreamConnectionConfig(projectID, instanceName, "user", "rawpassword", "localhost:9092,localhost:9092", "earliest", kafkaNetworkingPublic, false)), Check: streamConnectionsAttributeChecks(dataSourceName, admin.PtrInt(2), admin.PtrInt(1), 0), }, }, diff --git a/internal/service/streamconnection/model_stream_connection.go b/internal/service/streamconnection/model_stream_connection.go index 80fc6d5697..036bd24fc3 100644 --- a/internal/service/streamconnection/model_stream_connection.go +++ b/internal/service/streamconnection/model_stream_connection.go @@ -60,6 +60,18 @@ func NewStreamConnectionReq(ctx context.Context, plan *TFStreamConnectionModel) } } + if !plan.Networking.IsNull() && !plan.Networking.IsUnknown() { + networkingModel := &TFNetworkingModel{} + if diags := plan.Networking.As(ctx, networkingModel, basetypes.ObjectAsOptions{}); diags.HasError() { + return nil, diags + } + streamConnection.Networking = &admin.StreamsKafkaNetworking{ + Access: &admin.StreamsKafkaNetworkingAccess{ + Type: networkingModel.Access.Type.ValueStringPointer(), + }, + } + } + return &streamConnection, nil } @@ -114,6 +126,19 @@ func NewTFStreamConnection(ctx context.Context, projID, instanceName string, cur connectionModel.DBRoleToExecute = dbRoleToExecuteModel } + connectionModel.Networking = types.ObjectNull(NetworkingObjectType.AttrTypes) + if apiResp.Networking != nil { + networkingModel, diags := types.ObjectValueFrom(ctx, NetworkingObjectType.AttrTypes, TFNetworkingModel{ + Access: TFNetworkingAccessModel{ + Type: types.StringPointerValue(apiResp.Networking.Access.Type), + }, + }) + if diags.HasError() { + return nil, diags + } + connectionModel.Networking = networkingModel + } + return &connectionModel, nil } diff --git a/internal/service/streamconnection/model_stream_connection_test.go b/internal/service/streamconnection/model_stream_connection_test.go index f206b5d47c..09b5cffab2 100644 --- a/internal/service/streamconnection/model_stream_connection_test.go +++ b/internal/service/streamconnection/model_stream_connection_test.go @@ -24,6 +24,7 @@ const ( dbRole = "customRole" dbRoleType = "CUSTOM" sampleConnectionName = "sample_stream_solar" + networkingType = "PUBLIC" ) var configMap = map[string]string{ @@ -67,6 +68,7 @@ func TestStreamConnectionSDKToTFModel(t *testing.T) { Config: types.MapNull(types.StringType), Security: types.ObjectNull(streamconnection.ConnectionSecurityObjectType.AttrTypes), DBRoleToExecute: tfDBRoleToExecuteObject(t, dbRole, dbRoleType), + Networking: types.ObjectNull(streamconnection.NetworkingObjectType.AttrTypes), }, }, { @@ -98,6 +100,7 @@ func TestStreamConnectionSDKToTFModel(t *testing.T) { Config: tfConfigMap(t, configMap), Security: tfSecurityObject(t, DummyCACert, securityProtocol), DBRoleToExecute: types.ObjectNull(streamconnection.DBRoleToExecuteObjectType.AttrTypes), + Networking: types.ObjectNull(streamconnection.NetworkingObjectType.AttrTypes), }, }, { @@ -118,6 +121,7 @@ func TestStreamConnectionSDKToTFModel(t *testing.T) { Config: types.MapNull(types.StringType), Security: types.ObjectNull(streamconnection.ConnectionSecurityObjectType.AttrTypes), DBRoleToExecute: types.ObjectNull(streamconnection.DBRoleToExecuteObjectType.AttrTypes), + Networking: types.ObjectNull(streamconnection.NetworkingObjectType.AttrTypes), }, }, { @@ -149,6 +153,7 @@ func TestStreamConnectionSDKToTFModel(t *testing.T) { Config: tfConfigMap(t, configMap), Security: tfSecurityObject(t, DummyCACert, securityProtocol), DBRoleToExecute: types.ObjectNull(streamconnection.DBRoleToExecuteObjectType.AttrTypes), + Networking: types.ObjectNull(streamconnection.NetworkingObjectType.AttrTypes), }, }, { @@ -168,6 +173,7 @@ func TestStreamConnectionSDKToTFModel(t *testing.T) { Config: types.MapNull(types.StringType), Security: types.ObjectNull(streamconnection.ConnectionSecurityObjectType.AttrTypes), DBRoleToExecute: types.ObjectNull(streamconnection.DBRoleToExecuteObjectType.AttrTypes), + Networking: types.ObjectNull(streamconnection.NetworkingObjectType.AttrTypes), }, }, } @@ -212,6 +218,11 @@ func TestStreamConnectionsSDKToTFModel(t *testing.T) { Protocol: admin.PtrString(securityProtocol), BrokerPublicCertificate: admin.PtrString(DummyCACert), }, + Networking: &admin.StreamsKafkaNetworking{ + Access: &admin.StreamsKafkaNetworkingAccess{ + Type: admin.PtrString(networkingType), + }, + }, }, { Name: admin.PtrString(connectionName), @@ -253,6 +264,7 @@ func TestStreamConnectionsSDKToTFModel(t *testing.T) { Config: tfConfigMap(t, configMap), Security: tfSecurityObject(t, DummyCACert, securityProtocol), DBRoleToExecute: types.ObjectNull(streamconnection.DBRoleToExecuteObjectType.AttrTypes), + Networking: tfNetworkingObject(t, networkingType), }, { ID: types.StringValue(fmt.Sprintf("%s-%s-%s", instanceName, dummyProjectID, connectionName)), @@ -265,6 +277,7 @@ func TestStreamConnectionsSDKToTFModel(t *testing.T) { Config: types.MapNull(types.StringType), Security: types.ObjectNull(streamconnection.ConnectionSecurityObjectType.AttrTypes), DBRoleToExecute: tfDBRoleToExecuteObject(t, dbRole, dbRoleType), + Networking: types.ObjectNull(streamconnection.NetworkingObjectType.AttrTypes), }, { ID: types.StringValue(fmt.Sprintf("%s-%s-%s", instanceName, dummyProjectID, sampleConnectionName)), @@ -277,6 +290,7 @@ func TestStreamConnectionsSDKToTFModel(t *testing.T) { Config: types.MapNull(types.StringType), Security: types.ObjectNull(streamconnection.ConnectionSecurityObjectType.AttrTypes), DBRoleToExecute: types.ObjectNull(streamconnection.DBRoleToExecuteObjectType.AttrTypes), + Networking: types.ObjectNull(streamconnection.NetworkingObjectType.AttrTypes), }, }, }, @@ -470,3 +484,16 @@ func tfDBRoleToExecuteObject(t *testing.T, role, roleType string) types.Object { } return auth } + +func tfNetworkingObject(t *testing.T, networkingType string) types.Object { + t.Helper() + networking, diags := types.ObjectValueFrom(context.Background(), streamconnection.NetworkingObjectType.AttrTypes, streamconnection.TFNetworkingModel{ + Access: streamconnection.TFNetworkingAccessModel{ + Type: types.StringValue(networkingType), + }, + }) + if diags.HasError() { + t.Errorf("failed to create terraform data model: %s", diags.Errors()[0].Summary()) + } + return networking +} diff --git a/internal/service/streamconnection/resource_schema.go b/internal/service/streamconnection/resource_schema.go index be3ec15d42..c28cc4ca83 100644 --- a/internal/service/streamconnection/resource_schema.go +++ b/internal/service/streamconnection/resource_schema.go @@ -5,6 +5,7 @@ import ( "github.com/hashicorp/terraform-plugin-framework-validators/stringvalidator" "github.com/hashicorp/terraform-plugin-framework/resource/schema" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/objectplanmodifier" "github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier" "github.com/hashicorp/terraform-plugin-framework/resource/schema/stringplanmodifier" "github.com/hashicorp/terraform-plugin-framework/schema/validator" @@ -16,6 +17,9 @@ func ResourceSchema(ctx context.Context) schema.Schema { Attributes: map[string]schema.Attribute{ "id": schema.StringAttribute{ Computed: true, + PlanModifiers: []planmodifier.String{ + stringplanmodifier.UseStateForUnknown(), + }, }, "project_id": schema.StringAttribute{ Required: true, @@ -95,6 +99,23 @@ func ResourceSchema(ctx context.Context) schema.Schema { }, }, }, + "networking": schema.SingleNestedAttribute{ + Optional: true, + Computed: true, + PlanModifiers: []planmodifier.Object{ + objectplanmodifier.UseStateForUnknown(), + }, + Attributes: map[string]schema.Attribute{ + "access": schema.SingleNestedAttribute{ + Required: true, + Attributes: map[string]schema.Attribute{ + "type": schema.StringAttribute{ + Required: true, + }, + }, + }, + }, + }, }, } } diff --git a/internal/service/streamconnection/resource_stream_connection.go b/internal/service/streamconnection/resource_stream_connection.go index f4f571ce23..56ac82bb2f 100644 --- a/internal/service/streamconnection/resource_stream_connection.go +++ b/internal/service/streamconnection/resource_stream_connection.go @@ -5,12 +5,14 @@ import ( "errors" "net/http" "regexp" + "time" "github.com/hashicorp/terraform-plugin-framework/attr" "github.com/hashicorp/terraform-plugin-framework/path" "github.com/hashicorp/terraform-plugin-framework/resource" - "github.com/hashicorp/terraform-plugin-framework/types" "github.com/mongodb/terraform-provider-mongodbatlas/internal/common/conversion" + + "github.com/hashicorp/terraform-plugin-framework/types" "github.com/mongodb/terraform-provider-mongodbatlas/internal/config" ) @@ -43,6 +45,7 @@ type TFStreamConnectionModel struct { Config types.Map `tfsdk:"config"` Security types.Object `tfsdk:"security"` DBRoleToExecute types.Object `tfsdk:"db_role_to_execute"` + Networking types.Object `tfsdk:"networking"` } type TFConnectionAuthenticationModel struct { @@ -77,6 +80,22 @@ var DBRoleToExecuteObjectType = types.ObjectType{AttrTypes: map[string]attr.Type "type": types.StringType, }} +type TFNetworkingAccessModel struct { + Type types.String `tfsdk:"type"` +} + +var NetworkingAccessObjectType = types.ObjectType{AttrTypes: map[string]attr.Type{ + "type": types.StringType, +}} + +type TFNetworkingModel struct { + Access TFNetworkingAccessModel `tfsdk:"access"` +} + +var NetworkingObjectType = types.ObjectType{AttrTypes: map[string]attr.Type{ + "access": NetworkingAccessObjectType, +}} + func (r *streamConnectionRS) Schema(ctx context.Context, req resource.SchemaRequest, resp *resource.SchemaResponse) { resp.Schema = ResourceSchema(ctx) conversion.UpdateSchemaDescription(&resp.Schema) @@ -181,7 +200,7 @@ func (r *streamConnectionRS) Delete(ctx context.Context, req resource.DeleteRequ projectID := streamConnectionState.ProjectID.ValueString() instanceName := streamConnectionState.InstanceName.ValueString() connectionName := streamConnectionState.ConnectionName.ValueString() - if _, _, err := connV2.StreamsApi.DeleteStreamConnection(ctx, projectID, instanceName, connectionName).Execute(); err != nil { + if err := DeleteStreamConnection(ctx, connV2.StreamsApi, projectID, instanceName, connectionName, time.Minute); err != nil { resp.Diagnostics.AddError("error deleting resource", err.Error()) return } diff --git a/internal/service/streamconnection/resource_stream_connection_migration_test.go b/internal/service/streamconnection/resource_stream_connection_migration_test.go index 77202bbd65..de1596577f 100644 --- a/internal/service/streamconnection/resource_stream_connection_migration_test.go +++ b/internal/service/streamconnection/resource_stream_connection_migration_test.go @@ -4,76 +4,15 @@ import ( _ "embed" "testing" - "github.com/hashicorp/terraform-plugin-testing/helper/resource" - "github.com/mongodb/terraform-provider-mongodbatlas/internal/testutil/acc" "github.com/mongodb/terraform-provider-mongodbatlas/internal/testutil/mig" ) func TestMigStreamRSStreamConnection_kafkaPlaintext(t *testing.T) { - var ( - resourceName = "mongodbatlas_stream_connection.test" - projectID = acc.ProjectIDExecution(t) - instanceName = acc.RandomName() - config = kafkaStreamConnectionConfig(projectID, instanceName, "user", "rawpassword", "localhost:9092,localhost:9092", "earliest", false) - ) mig.SkipIfVersionBelow(t, "1.16.0") // when reached GA - - resource.ParallelTest(t, resource.TestCase{ - PreCheck: func() { acc.PreCheckBasic(t) }, - CheckDestroy: CheckDestroyStreamConnection, - Steps: []resource.TestStep{ - { - ExternalProviders: mig.ExternalProviders(), - Config: config, - Check: kafkaStreamConnectionAttributeChecks(resourceName, instanceName, "user", "rawpassword", "localhost:9092,localhost:9092", "earliest", false, true), - }, - mig.TestStepCheckEmptyPlan(config), - }, - }) -} - -func TestMigStreamRSStreamConnection_kafkaSSL(t *testing.T) { - var ( - resourceName = "mongodbatlas_stream_connection.test" - projectID = acc.ProjectIDExecution(t) - instanceName = acc.RandomName() - config = kafkaStreamConnectionConfig(projectID, instanceName, "user", "rawpassword", "localhost:9092", "earliest", true) - ) - mig.SkipIfVersionBelow(t, "1.16.0") // when reached GA - - resource.ParallelTest(t, resource.TestCase{ - PreCheck: func() { acc.PreCheckBasic(t) }, - CheckDestroy: CheckDestroyStreamConnection, - Steps: []resource.TestStep{ - { - ExternalProviders: mig.ExternalProviders(), - Config: config, - Check: kafkaStreamConnectionAttributeChecks(resourceName, instanceName, "user", "rawpassword", "localhost:9092", "earliest", true, true), - }, - mig.TestStepCheckEmptyPlan(config), - }, - }) + mig.CreateAndRunTest(t, testCaseKafkaPlaintext(t)) } func TestMigStreamRSStreamConnection_cluster(t *testing.T) { - var ( - resourceName = "mongodbatlas_stream_connection.test" - projectID, clusterName = acc.ClusterNameExecution(t) - instanceName = acc.RandomName() - config = clusterStreamConnectionConfig(projectID, instanceName, clusterName) - ) mig.SkipIfVersionBelow(t, "1.16.0") // when reached GA - - resource.ParallelTest(t, resource.TestCase{ - PreCheck: func() { acc.PreCheckBasic(t) }, - CheckDestroy: CheckDestroyStreamConnection, - Steps: []resource.TestStep{ - { - ExternalProviders: mig.ExternalProviders(), - Config: config, - Check: clusterStreamConnectionAttributeChecks(resourceName, clusterName), - }, - mig.TestStepCheckEmptyPlan(config), - }, - }) + mig.CreateAndRunTest(t, testCaseCluster(t)) } diff --git a/internal/service/streamconnection/resource_stream_connection_test.go b/internal/service/streamconnection/resource_stream_connection_test.go index f65dce618b..f7cebd7074 100644 --- a/internal/service/streamconnection/resource_stream_connection_test.go +++ b/internal/service/streamconnection/resource_stream_connection_test.go @@ -9,12 +9,63 @@ import ( "github.com/hashicorp/terraform-plugin-testing/helper/resource" "github.com/hashicorp/terraform-plugin-testing/terraform" "github.com/mongodb/terraform-provider-mongodbatlas/internal/testutil/acc" + "github.com/mongodb/terraform-provider-mongodbatlas/internal/testutil/mig" ) -//go:embed testdata/dummy-ca.pem -var DummyCACert string +var ( + //go:embed testdata/dummy-ca.pem + DummyCACert string + networkingTypeVPC = "VPC" + networkingTypePublic = "PUBLIC" + kafkaNetworkingVPC = fmt.Sprintf(`networking = { + access = { + type = %[1]q + } + }`, networkingTypeVPC) + kafkaNetworkingPublic = fmt.Sprintf(`networking = { + access = { + type = %[1]q + } + }`, networkingTypePublic) +) func TestAccStreamRSStreamConnection_kafkaPlaintext(t *testing.T) { + testCase := testCaseKafkaPlaintext(t) + resource.ParallelTest(t, *testCase) +} + +func testCaseKafkaPlaintext(t *testing.T) *resource.TestCase { + t.Helper() + var ( + resourceName = "mongodbatlas_stream_connection.test" + projectID = acc.ProjectIDExecution(t) + instanceName = acc.RandomName() + ) + return &resource.TestCase{ + PreCheck: func() { acc.PreCheckBasic(t) }, + ProtoV6ProviderFactories: acc.TestAccProviderV6Factories, + CheckDestroy: CheckDestroyStreamConnection, + Steps: []resource.TestStep{ + { + Config: kafkaStreamConnectionConfig(projectID, instanceName, "user", "rawpassword", "localhost:9092,localhost:9092", "earliest", "", false), + Check: kafkaStreamConnectionAttributeChecks(resourceName, instanceName, "user", "rawpassword", "localhost:9092,localhost:9092", "earliest", networkingTypePublic, false, true), + }, + { + Config: kafkaStreamConnectionConfig(projectID, instanceName, "user2", "otherpassword", "localhost:9093", "latest", kafkaNetworkingPublic, false), + Check: kafkaStreamConnectionAttributeChecks(resourceName, instanceName, "user2", "otherpassword", "localhost:9093", "latest", networkingTypePublic, false, true), + }, + { + ResourceName: resourceName, + ImportStateIdFunc: checkStreamConnectionImportStateIDFunc(resourceName), + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{"authentication.password"}, + }, + }, + } +} + +func TestAccStreamRSStreamConnection_kafkaNetworkingVPC(t *testing.T) { var ( resourceName = "mongodbatlas_stream_connection.test" projectID = acc.ProjectIDExecution(t) @@ -26,12 +77,14 @@ func TestAccStreamRSStreamConnection_kafkaPlaintext(t *testing.T) { CheckDestroy: CheckDestroyStreamConnection, Steps: []resource.TestStep{ { - Config: kafkaStreamConnectionConfig(projectID, instanceName, "user", "rawpassword", "localhost:9092,localhost:9092", "earliest", false), - Check: kafkaStreamConnectionAttributeChecks(resourceName, instanceName, "user", "rawpassword", "localhost:9092,localhost:9092", "earliest", false, true), + Config: kafkaStreamConnectionConfig(projectID, instanceName, "user", "rawpassword", "localhost:9092", "earliest", kafkaNetworkingPublic, true), + Check: resource.ComposeAggregateTestCheckFunc( + kafkaStreamConnectionAttributeChecks(resourceName, instanceName, "user", "rawpassword", "localhost:9092", "earliest", networkingTypePublic, true, true), + ), }, { - Config: kafkaStreamConnectionConfig(projectID, instanceName, "user2", "otherpassword", "localhost:9093", "latest", false), - Check: kafkaStreamConnectionAttributeChecks(resourceName, instanceName, "user2", "otherpassword", "localhost:9093", "latest", false, true), + Config: kafkaStreamConnectionConfig(projectID, instanceName, "user", "rawpassword", "localhost:9092", "earliest", kafkaNetworkingVPC, true), + Check: kafkaStreamConnectionAttributeChecks(resourceName, instanceName, "user", "rawpassword", "localhost:9092", "earliest", networkingTypeVPC, true, true), }, { ResourceName: resourceName, @@ -56,8 +109,8 @@ func TestAccStreamRSStreamConnection_kafkaSSL(t *testing.T) { CheckDestroy: CheckDestroyStreamConnection, Steps: []resource.TestStep{ { - Config: kafkaStreamConnectionConfig(projectID, instanceName, "user", "rawpassword", "localhost:9092", "earliest", true), - Check: kafkaStreamConnectionAttributeChecks(resourceName, instanceName, "user", "rawpassword", "localhost:9092", "earliest", true, true), + Config: kafkaStreamConnectionConfig(projectID, instanceName, "user", "rawpassword", "localhost:9092", "earliest", kafkaNetworkingPublic, true), + Check: kafkaStreamConnectionAttributeChecks(resourceName, instanceName, "user", "rawpassword", "localhost:9092", "earliest", networkingTypePublic, true, true), }, { ResourceName: resourceName, @@ -71,12 +124,18 @@ func TestAccStreamRSStreamConnection_kafkaSSL(t *testing.T) { } func TestAccStreamRSStreamConnection_cluster(t *testing.T) { + testCase := testCaseCluster(t) + resource.ParallelTest(t, *testCase) +} + +func testCaseCluster(t *testing.T) *resource.TestCase { + t.Helper() var ( resourceName = "mongodbatlas_stream_connection.test" projectID, clusterName = acc.ClusterNameExecution(t) instanceName = acc.RandomName() ) - resource.ParallelTest(t, resource.TestCase{ + return &resource.TestCase{ PreCheck: func() { acc.PreCheckBasic(t) }, ProtoV6ProviderFactories: acc.TestAccProviderV6Factories, CheckDestroy: CheckDestroyStreamConnection, @@ -92,7 +151,7 @@ func TestAccStreamRSStreamConnection_cluster(t *testing.T) { ImportStateVerify: true, }, }, - }) + } } func TestAccStreamRSStreamConnection_sample(t *testing.T) { @@ -121,7 +180,7 @@ func TestAccStreamRSStreamConnection_sample(t *testing.T) { }) } -func kafkaStreamConnectionConfig(projectID, instanceName, username, password, bootstrapServers, configValue string, useSSL bool) string { +func kafkaStreamConnectionConfig(projectID, instanceName, username, password, bootstrapServers, configValue, networkingConfig string, useSSL bool) string { projectAndStreamInstanceConfig := acc.StreamInstanceConfig(projectID, instanceName, "VIRGINIA_USA", "AWS") securityConfig := ` security = { @@ -135,7 +194,6 @@ func kafkaStreamConnectionConfig(projectID, instanceName, username, password, bo protocol = "SSL" }`, DummyCACert) } - return fmt.Sprintf(` %[1]s @@ -154,8 +212,9 @@ func kafkaStreamConnectionConfig(projectID, instanceName, username, password, bo "auto.offset.reset": %[5]q } %[6]s + %[7]s } - `, projectAndStreamInstanceConfig, username, password, bootstrapServers, configValue, securityConfig) + `, projectAndStreamInstanceConfig, username, password, bootstrapServers, configValue, networkingConfig, securityConfig) } func sampleStreamConnectionConfig(projectID, instanceName, sampleName string) string { @@ -186,18 +245,21 @@ func sampleStreamConnectionAttributeChecks( } func kafkaStreamConnectionAttributeChecks( - resourceName, instanceName, username, password, bootstrapServers, configValue string, usesSSL, checkPassword bool) resource.TestCheckFunc { + resourceName, instanceName, username, password, bootstrapServers, configValue, networkingType string, usesSSL, checkPassword bool) resource.TestCheckFunc { resourceChecks := []resource.TestCheckFunc{ checkStreamConnectionExists(), resource.TestCheckResourceAttrSet(resourceName, "project_id"), - resource.TestCheckResourceAttrSet(resourceName, "instance_name"), resource.TestCheckResourceAttrSet(resourceName, "connection_name"), resource.TestCheckResourceAttr(resourceName, "type", "Kafka"), + resource.TestCheckResourceAttr(resourceName, "instance_name", instanceName), resource.TestCheckResourceAttr(resourceName, "authentication.mechanism", "PLAIN"), resource.TestCheckResourceAttr(resourceName, "authentication.username", username), resource.TestCheckResourceAttr(resourceName, "bootstrap_servers", bootstrapServers), resource.TestCheckResourceAttr(resourceName, "config.auto.offset.reset", configValue), } + if mig.IsProviderVersionAtLeast("1.25.0") { + resourceChecks = append(resourceChecks, resource.TestCheckResourceAttr(resourceName, "networking.access.type", networkingType)) + } if checkPassword { resourceChecks = append(resourceChecks, resource.TestCheckResourceAttr(resourceName, "authentication.password", password)) } diff --git a/internal/service/streamconnection/state_transition.go b/internal/service/streamconnection/state_transition.go new file mode 100644 index 0000000000..6a21ca9b49 --- /dev/null +++ b/internal/service/streamconnection/state_transition.go @@ -0,0 +1,25 @@ +package streamconnection + +import ( + "context" + "time" + + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/retry" + "go.mongodb.org/atlas-sdk/v20241113003/admin" +) + +func DeleteStreamConnection(ctx context.Context, api admin.StreamsApi, projectID, instanceName, connectionName string, timeout time.Duration) error { + return retry.RetryContext(ctx, timeout, func() *retry.RetryError { + _, resp, err := api.DeleteStreamConnection(ctx, projectID, instanceName, connectionName).Execute() + if err == nil { + return nil + } + if admin.IsErrorCode(err, "STREAM_KAFKA_CONNECTION_IS_DEPLOYING") { + return retry.RetryableError(err) + } + if resp != nil && resp.StatusCode == 404 { + return nil + } + return retry.NonRetryableError(err) + }) +} diff --git a/internal/service/streamconnection/state_transition_test.go b/internal/service/streamconnection/state_transition_test.go new file mode 100644 index 0000000000..e42a263e95 --- /dev/null +++ b/internal/service/streamconnection/state_transition_test.go @@ -0,0 +1,51 @@ +package streamconnection_test + +import ( + "context" + "net/http" + "testing" + "time" + + "go.mongodb.org/atlas-sdk/v20241113003/admin" + "go.mongodb.org/atlas-sdk/v20241113003/mockadmin" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/mongodb/terraform-provider-mongodbatlas/internal/service/streamconnection" +) + +func TestStreamConnectionDeletion(t *testing.T) { + var ( + m = mockadmin.NewStreamsApi(t) + projectID = "projectID" + instanceName = "instanceName" + connectionName = "connectionName" + errDeleteInProgress = admin.ApiError{ + ErrorCode: "STREAM_KAFKA_CONNECTION_IS_DEPLOYING", + Error: 409, + } + genericErr = admin.GenericOpenAPIError{} + ) + genericErr.SetError("error") + genericErr.SetModel(errDeleteInProgress) + m.EXPECT().DeleteStreamConnection(mock.Anything, projectID, instanceName, connectionName).Return(admin.DeleteStreamConnectionApiRequest{ApiService: m}).Times(3) + m.EXPECT().DeleteStreamConnectionExecute(mock.Anything).Once().Return(nil, nil, &genericErr) + m.EXPECT().DeleteStreamConnectionExecute(mock.Anything).Once().Return(nil, nil, &genericErr) + m.EXPECT().DeleteStreamConnectionExecute(mock.Anything).Once().Return(nil, nil, nil) + err := streamconnection.DeleteStreamConnection(context.Background(), m, projectID, instanceName, connectionName, time.Minute) + assert.NoError(t, err) +} + +func TestStreamConnectionDeletion404(t *testing.T) { + var ( + m = mockadmin.NewStreamsApi(t) + projectID = "projectID" + instanceName = "instanceName" + connectionName = "connectionName" + ) + m.EXPECT().DeleteStreamConnection(mock.Anything, projectID, instanceName, connectionName).Return(admin.DeleteStreamConnectionApiRequest{ApiService: m}).Once() + m.EXPECT().DeleteStreamConnectionExecute(mock.Anything).Once().Return(nil, &http.Response{StatusCode: 404}, nil) + err := streamconnection.DeleteStreamConnection(context.Background(), m, projectID, instanceName, connectionName, time.Minute) + assert.NoError(t, err) +}