Skip to content
Merged
11 changes: 11 additions & 0 deletions .changelog/3766.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
```release-note:enhancement
resource/mongodbatlas_stream_connection: Adds new authentication mechanism (OIDC) to the Kafka connection.
```

```release-note:enhancement
data-source/mongodbatlas_stream_connection: Adds new authentication mechanism (OIDC) to the Kafka connection.
```

```release-note:enhancement
data-source/mongodbatlas_stream_connections: Adds new authentication mechanism (OIDC) to the Kafka connection.
```
8 changes: 7 additions & 1 deletion docs/data-sources/stream_connection.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,15 @@ If `type` is of value `Https` the following additional attributes are defined:

### Authentication

* `mechanism` - Style of authentication. Can be one of `PLAIN`, `SCRAM-256`, or `SCRAM-512`.
* `mechanism` - Method of authentication. Value can be `PLAIN`, `SCRAM-256`, `SCRAM-512`, or `OAUTHBEARER`.
* `method` - SASL OAUTHBEARER authentication method. Value must be OIDC.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this value must be OIDC does the user really need to specify it here? or could it be implicit whenever mechanism = OAUTHBEARER

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just for now. We may add more method value in the future. Should I say Value must be OIDC currently

* `username` - Username of the account to connect to the Kafka cluster.
* `password` - Password of the account to connect to the Kafka cluster.
* `token_endpoint_url` - OAUTH issuer (IdP provider) token endpoint HTTP(S) URI used to retrieve the token.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be either an HTTP or HTTPS URI? or more than one HTTP URI? might be useful to clarify

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one HTTP(S) URI.

* `client_id` - Public identifier for the Kafka client.
* `client_secret` - Secret known only to the Kafka client and the authorization server.
* `scope` - Scope of the access request to the broker specified by the Kafka clients.
* `sasl_oauthbearer_extensions` - Additional information to provide to the Kafka broker.

### Security

Expand Down
8 changes: 7 additions & 1 deletion docs/data-sources/stream_connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,15 @@ If `type` is of value `Https` the following additional attributes are defined:

### Authentication

* `mechanism` - Style of authentication. Can be one of `PLAIN`, `SCRAM-256`, or `SCRAM-512`.
* `mechanism` - Method of authentication. Value can be `PLAIN`, `SCRAM-256`, `SCRAM-512`, or `OAUTHBEARER`.
* `method` - SASL OAUTHBEARER authentication method. Value must be OIDC.
* `username` - Username of the account to connect to the Kafka cluster.
* `password` - Password of the account to connect to the Kafka cluster.
* `token_endpoint_url` - OAUTH issuer (IdP provider) token endpoint HTTP(S) URI used to retrieve the token.
* `client_id` - Public identifier for the Kafka client.
* `client_secret` - Secret known only to the Kafka client and the authorization server.
* `scope` - Scope of the access request to the broker specified by the Kafka clients.
* `sasl_oauthbearer_extensions` - Additional information to provide to the Kafka broker.

### Security

Expand Down
40 changes: 39 additions & 1 deletion docs/resources/stream_connection.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,38 @@ resource "mongodbatlas_stream_connection" "test" {
}
```

### Example Kafka SASL OAuthbearer Connection
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any Altas docs link we could add here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you show me an example of adding an atlas doc link to the docs? The atlas doc is WIP. But I think I can get the link for it. I followed the pattern in this file to add this example. But I'd like to add extra information if needed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you show me an example of adding an atlas doc link to the docs

you should be able to find it in the advanced_cluster.md file. We usually link the public external link of the docs if there is any other further explanation of how things work.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LijieZhang1998 please reach out to @jvincent-mongodb he can create a dochub link (our redirect layer) so you can add a link that we can redirect to a different location on our end when the docs are live


```terraform
resource "mongodbatlas_stream_connection" "example-kafka-oauthbearer" {
project_id = var.project_id
instance_name = mongodbatlas_stream_instance.example.instance_name
connection_name = "KafkaOAuthbearerConnection"
type = "Kafka"
authentication = {
mechanism = "OAUTHBEARER"
method = "OIDC"
token_endpoint_url = "https://example.com/oauth/token"
client_id = "auth0Client"
client_secret = var.kafka_client_secret
scope = "read:messages write:messages"
sasl_oauthbearer_extensions = "logicalCluster=lkc-kmom,identityPoolId=pool-lAr"
}
bootstrap_servers = "localhost:9092,localhost:9092"
config = {
"auto.offset.reset" : "earliest"
}
security = {
protocol = "SASL_PLAINTEXT"
}
networking = {
access = {
type = "PUBLIC"
}
}
}
```

### Example Kafka SASL SSL Connection

```terraform
Expand Down Expand Up @@ -145,9 +177,15 @@ If `type` is of value `Https` the following additional attributes are defined:

### Authentication

* `mechanism` - Style of authentication. Can be one of `PLAIN`, `SCRAM-256`, or `SCRAM-512`.
* `mechanism` - Method of authentication. Value can be `PLAIN`, `SCRAM-256`, or `SCRAM-512`.
* `method` - SASL OAUTHBEARER authentication method. Value must be OIDC.
* `username` - Username of the account to connect to the Kafka cluster.
* `password` - Password of the account to connect to the Kafka cluster.
* `token_endpoint_url` - OAUTH issuer (IdP provider) token endpoint HTTP(S) URI used to retrieve the token.
* `client_id` - Public identifier for the Kafka client.
* `client_secret` - Secret known only to the Kafka client and the authorization server.
* `scope` - Scope of the access request to the broker specified by the Kafka clients.
* `sasl_oauthbearer_extensions` - Additional information to provide to the Kafka broker.

### Security

Expand Down
28 changes: 28 additions & 0 deletions examples/mongodbatlas_stream_connection/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,34 @@ resource "mongodbatlas_stream_connection" "example-kafka-plaintext" {
}
}

resource "mongodbatlas_stream_connection" "example-kafka-oauthbearer" {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a README.md for this example? should we add more information there about this new resource?

Copy link
Collaborator Author

@LijieZhang1998 LijieZhang1998 Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have this README.md file. Do you mean by adding some information to the file? I don't see we add any information about the example resources?

project_id = var.project_id
instance_name = mongodbatlas_stream_instance.example.instance_name
connection_name = "KafkaOAuthbearerConnection"
type = "Kafka"
authentication = {
mechanism = "OAUTHBEARER"
method = "OIDC"
token_endpoint_url = "https://example.com/oauth/token"
client_id = "auth0Client"
client_secret = var.kafka_client_secret
scope = "read:messages write:messages"
sasl_oauthbearer_extensions = "logicalCluster=lkc-kmom,identityPoolId=pool-lAr"
}
bootstrap_servers = "localhost:9092,localhost:9092"
config = {
"auto.offset.reset" : "earliest"
}
security = {
protocol = "SASL_PLAINTEXT"
}
networking = {
access = {
type = "PUBLIC"
}
}
}

resource "mongodbatlas_stream_connection" "example-kafka-ssl" {
project_id = var.project_id
instance_name = mongodbatlas_stream_instance.example.instance_name
Expand Down
5 changes: 5 additions & 0 deletions examples/mongodbatlas_stream_connection/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ variable "kafka_password" {
type = string
}

variable "kafka_client_secret" {
description = "Secret known only to the Kafka client and the authorization server"
type = string
}

variable "kafka_ssl_cert" {
description = "Public certificate used for SASL_SSL configuration to connect to your Kafka cluster"
type = string
Expand Down
22 changes: 17 additions & 5 deletions internal/service/streamconnection/model_stream_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,15 @@ func NewStreamConnectionReq(ctx context.Context, plan *TFStreamConnectionModel)
return nil, diags
}
streamConnection.Authentication = &admin.StreamsKafkaAuthentication{
Mechanism: authenticationModel.Mechanism.ValueStringPointer(),
Password: authenticationModel.Password.ValueStringPointer(),
Username: authenticationModel.Username.ValueStringPointer(),
Mechanism: authenticationModel.Mechanism.ValueStringPointer(),
Method: authenticationModel.Method.ValueStringPointer(),
Password: authenticationModel.Password.ValueStringPointer(),
Username: authenticationModel.Username.ValueStringPointer(),
TokenEndpointUrl: authenticationModel.TokenEndpointURL.ValueStringPointer(),
ClientId: authenticationModel.ClientID.ValueStringPointer(),
ClientSecret: authenticationModel.ClientSecret.ValueStringPointer(),
Scope: authenticationModel.Scope.ValueStringPointer(),
SaslOauthbearerExtensions: authenticationModel.SaslOauthbearerExtensions.ValueStringPointer(),
}
}
if !plan.Security.IsNull() {
Expand Down Expand Up @@ -215,8 +221,13 @@ func NewTFStreamConnection(ctx context.Context, projID, instanceName string, cur
func newTFConnectionAuthenticationModel(ctx context.Context, currAuthConfig *types.Object, authResp *admin.StreamsKafkaAuthentication) (*types.Object, diag.Diagnostics) {
if authResp != nil {
resultAuthModel := TFConnectionAuthenticationModel{
Mechanism: types.StringPointerValue(authResp.Mechanism),
Username: types.StringPointerValue(authResp.Username),
Mechanism: types.StringPointerValue(authResp.Mechanism),
Method: types.StringPointerValue(authResp.Method),
Username: types.StringPointerValue(authResp.Username),
TokenEndpointURL: types.StringPointerValue(authResp.TokenEndpointUrl),
ClientID: types.StringPointerValue(authResp.ClientId),
Scope: types.StringPointerValue(authResp.Scope),
SaslOauthbearerExtensions: types.StringPointerValue(authResp.SaslOauthbearerExtensions),
}

if currAuthConfig != nil && !currAuthConfig.IsNull() { // if config is available (create & update of resource) password value is set in new state
Expand All @@ -225,6 +236,7 @@ func newTFConnectionAuthenticationModel(ctx context.Context, currAuthConfig *typ
return nil, diags
}
resultAuthModel.Password = configAuthModel.Password
resultAuthModel.ClientSecret = configAuthModel.ClientSecret
}

resultObject, diags := types.ObjectValueFrom(ctx, ConnectionAuthenticationObjectType.AttrTypes, resultAuthModel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,20 @@ import (
)

const (
connectionName = "Connection"
typeValue = ""
clusterName = "Cluster0"
dummyProjectID = "111111111111111111111111"
instanceName = "InstanceName"
authMechanism = "PLAIN"
authUsername = "user1"
connectionName = "Connection"
typeValue = ""
clusterName = "Cluster0"
dummyProjectID = "111111111111111111111111"
instanceName = "InstanceName"
authMechanism = "PLAIN"
authUsername = "user1"
clientID = "auth0Client"
clientSecret = "secret"
// #nosec G101
tokenEndpointURL = "https://your-domain.com/"
scope = "read:messages write:messages"
saslOauthbearerExtentions = "logicalCluster=cluster-kmo17m,identityPoolId=pool-l7Arl"
method = "OIDC"
securityProtocol = "SASL_SSL"
bootstrapServers = "localhost:9092,another.host:9092"
dbRole = "customRole"
Expand Down Expand Up @@ -50,6 +57,7 @@ type sdkToTFModelTestCase struct {

func TestStreamConnectionSDKToTFModel(t *testing.T) {
var authConfigWithPasswordDefined = tfAuthenticationObject(t, authMechanism, authUsername, "raw password")
var authConfigWithOAuth = tfAuthenticationObjectForOAuth(t, authMechanism, clientID, clientSecret, tokenEndpointURL, scope, saslOauthbearerExtentions, method)

testCases := []sdkToTFModelTestCase{
{
Expand Down Expand Up @@ -146,6 +154,44 @@ func TestStreamConnectionSDKToTFModel(t *testing.T) {
Headers: types.MapNull(types.StringType),
},
},
{
name: "Kafka connection type SDK response for OAuthBearer authentication",
SDKResp: &admin.StreamsConnection{
Name: admin.PtrString(connectionName),
Type: admin.PtrString("Kafka"),
Authentication: &admin.StreamsKafkaAuthentication{
Mechanism: admin.PtrString(authMechanism),
Method: admin.PtrString(method),
ClientId: admin.PtrString(clientID),
TokenEndpointUrl: admin.PtrString(tokenEndpointURL),
Scope: admin.PtrString(scope),
SaslOauthbearerExtensions: admin.PtrString(saslOauthbearerExtentions),
},
BootstrapServers: admin.PtrString(bootstrapServers),
Config: &configMap,
Security: &admin.StreamsKafkaSecurity{
Protocol: admin.PtrString(securityProtocol),
BrokerPublicCertificate: admin.PtrString(DummyCACert),
},
},
providedProjID: dummyProjectID,
providedInstanceName: instanceName,
providedAuthConfig: &authConfigWithOAuth,
expectedTFModel: &streamconnection.TFStreamConnectionModel{
ProjectID: types.StringValue(dummyProjectID),
InstanceName: types.StringValue(instanceName),
ConnectionName: types.StringValue(connectionName),
Type: types.StringValue("Kafka"),
Authentication: tfAuthenticationObjectForOAuth(t, authMechanism, clientID, clientSecret, tokenEndpointURL, scope, saslOauthbearerExtentions, method), // password value is obtained from config, not api resp.
BootstrapServers: types.StringValue(bootstrapServers),
Config: tfConfigMap(t, configMap),
Security: tfSecurityObject(t, DummyCACert, securityProtocol),
DBRoleToExecute: types.ObjectNull(streamconnection.DBRoleToExecuteObjectType.AttrTypes),
Networking: types.ObjectNull(streamconnection.NetworkingObjectType.AttrTypes),
AWS: types.ObjectNull(streamconnection.AWSObjectType.AttrTypes),
Headers: types.MapNull(types.StringType),
},
},
{
name: "Kafka connection type SDK response with no optional values provided",
SDKResp: &admin.StreamsConnection{
Expand Down Expand Up @@ -596,6 +642,23 @@ func tfAuthenticationObject(t *testing.T, mechanism, username, password string)
return auth
}

func tfAuthenticationObjectForOAuth(t *testing.T, mechanism, clientID, clientSecret, tokenEndpointURL, scope, saslOauthbearerExtensions, method string) types.Object {
t.Helper()
auth, diags := types.ObjectValueFrom(t.Context(), streamconnection.ConnectionAuthenticationObjectType.AttrTypes, streamconnection.TFConnectionAuthenticationModel{
Mechanism: types.StringValue(mechanism),
Method: types.StringValue(method),
ClientID: types.StringValue(clientID),
ClientSecret: types.StringValue(clientSecret),
TokenEndpointURL: types.StringValue(tokenEndpointURL),
Scope: types.StringValue(scope),
SaslOauthbearerExtensions: types.StringValue(saslOauthbearerExtensions),
})
if diags.HasError() {
t.Errorf("failed to create terraform data model: %s", diags.Errors()[0].Summary())
}
return auth
}

func tfAuthenticationObjectWithNoPassword(t *testing.T, mechanism, username string) types.Object {
t.Helper()
auth, diags := types.ObjectValueFrom(t.Context(), streamconnection.ConnectionAuthenticationObjectType.AttrTypes, streamconnection.TFConnectionAuthenticationModel{
Expand Down
19 changes: 19 additions & 0 deletions internal/service/streamconnection/resource_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,32 @@ func ResourceSchema(ctx context.Context) schema.Schema {
"mechanism": schema.StringAttribute{
Optional: true,
},
"method": schema.StringAttribute{
Optional: true,
},
"password": schema.StringAttribute{
Optional: true,
Sensitive: true,
},
"username": schema.StringAttribute{
Optional: true,
},
"token_endpoint_url": schema.StringAttribute{
Optional: true,
},
"client_id": schema.StringAttribute{
Optional: true,
},
"client_secret": schema.StringAttribute{
Optional: true,
Sensitive: true,
},
"scope": schema.StringAttribute{
Optional: true,
},
"sasl_oauthbearer_extensions": schema.StringAttribute{
Optional: true,
},
},
},
"bootstrap_servers": schema.StringAttribute{
Expand Down
24 changes: 18 additions & 6 deletions internal/service/streamconnection/resource_stream_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,27 @@ type TFStreamConnectionModel struct {
}

type TFConnectionAuthenticationModel struct {
Mechanism types.String `tfsdk:"mechanism"`
Password types.String `tfsdk:"password"`
Username types.String `tfsdk:"username"`
Mechanism types.String `tfsdk:"mechanism"`
Method types.String `tfsdk:"method"`
Password types.String `tfsdk:"password"`
Username types.String `tfsdk:"username"`
TokenEndpointURL types.String `tfsdk:"token_endpoint_url"`
ClientID types.String `tfsdk:"client_id"`
ClientSecret types.String `tfsdk:"client_secret"`
Scope types.String `tfsdk:"scope"`
SaslOauthbearerExtensions types.String `tfsdk:"sasl_oauthbearer_extensions"`
}

var ConnectionAuthenticationObjectType = types.ObjectType{AttrTypes: map[string]attr.Type{
"mechanism": types.StringType,
"password": types.StringType,
"username": types.StringType,
"mechanism": types.StringType,
"method": types.StringType,
"password": types.StringType,
"username": types.StringType,
"token_endpoint_url": types.StringType,
"client_id": types.StringType,
"client_secret": types.StringType,
"scope": types.StringType,
"sasl_oauthbearer_extensions": types.StringType,
}}

type TFConnectionSecurityModel struct {
Expand Down
Loading