diff --git a/src/client.rs b/src/client.rs index fd3fded5f..40a59b6e4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -379,19 +379,36 @@ impl Client { .generate_oauth_token(oauthbearer_config.as_deref())?; let token = CString::new(token_info.token)?; let principal_name = CString::new(token_info.principal_name)?; - Ok((token, principal_name, token_info.lifetime_ms)) + let extension_strings = token_info + .extensions + .iter() + .flat_map(|(key, value)| [CString::new(key.as_str()), CString::new(value.as_str())]) + .collect::, _>>()?; + Ok(( + token, + principal_name, + token_info.lifetime_ms, + extension_strings, + )) })(); match res { - Ok((token, principal_name, lifetime_ms)) => { + Ok((token, principal_name, lifetime_ms, extension_strings)) => { let mut err_buf = ErrBuf::new(); + let mut extension_ptrs: Vec<*const c_char> = + extension_strings.iter().map(|s| s.as_ptr()).collect(); + let (extensions_ptr, extensions_count) = if extension_ptrs.is_empty() { + (ptr::null_mut(), 0) + } else { + (extension_ptrs.as_mut_ptr(), extension_ptrs.len()) + }; let code = unsafe { rdkafka_sys::rd_kafka_oauthbearer_set_token( self.native_ptr(), token.as_ptr(), lifetime_ms, principal_name.as_ptr(), - ptr::null_mut(), - 0, + extensions_ptr, + extensions_count, err_buf.as_mut_ptr(), err_buf.capacity(), ) @@ -624,8 +641,6 @@ impl NativeQueue { /// When using the `OAUTHBEARER` SASL authentication method, this type is /// returned from [`ClientContext::generate_oauth_token`]. The token and /// principal name must not contain embedded null characters. -/// -/// Specifying SASL extensions is not currently supported. pub struct OAuthToken { /// The token value to set. pub token: String, @@ -633,6 +648,8 @@ pub struct OAuthToken { pub principal_name: String, /// When the token expires, in number of milliseconds since the Unix epoch. pub lifetime_ms: i64, + /// Optional SASL extensions as key-value pairs. + pub extensions: Vec<(String, String)>, } #[cfg(test)]