Skip to content

Commit e718afd

Browse files
Merge branch 'main' into pulsar-subscription-mode
2 parents bf064af + 649483d commit e718afd

File tree

10 files changed

+491
-44
lines changed

10 files changed

+491
-44
lines changed

bindings/azure/eventhubs/metadata.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,9 @@ metadata:
162162
Storage container name.
163163
example: '"myeventhubstoragecontainer"'
164164
- name: getAllMessageProperties
165+
type: bool
165166
required: false
166-
default: "false"
167+
default: false
167168
example: "false"
168169
binding:
169170
input: true

bindings/postgres/metadata.yaml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,3 +124,38 @@ metadata:
124124
- "simple_protocol"
125125
example: "cache_describe"
126126
default: ""
127+
- name: host
128+
required: false
129+
description: The host of the PostgreSQL database
130+
example: "localhost"
131+
type: string
132+
- name: hostaddr
133+
required: false
134+
description: The host address of the PostgreSQL database
135+
example: "127.0.0.1"
136+
type: string
137+
- name: port
138+
required: false
139+
description: The port of the PostgreSQL database
140+
example: "5432"
141+
type: string
142+
- name: database
143+
required: false
144+
description: The database of the PostgreSQL database
145+
example: "postgres"
146+
type: string
147+
- name: user
148+
required: false
149+
description: The user of the PostgreSQL database
150+
example: "postgres"
151+
type: string
152+
- name: password
153+
required: false
154+
description: The password of the PostgreSQL database
155+
example: "password"
156+
type: string
157+
- name: sslRootCert
158+
required: false
159+
description: The path to the SSL root certificate file
160+
example: "/path/to/ssl/root/cert.pem"
161+
type: string

bindings/postgres/metadata_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestMetadata(t *testing.T) {
3434
t.Run("has connection string", func(t *testing.T) {
3535
m := psqlMetadata{}
3636
props := map[string]string{
37-
"connectionString": "foo",
37+
"connectionString": "foo=bar",
3838
}
3939

4040
err := m.InitWithMetadata(props)
@@ -44,7 +44,7 @@ func TestMetadata(t *testing.T) {
4444
t.Run("default timeout", func(t *testing.T) {
4545
m := psqlMetadata{}
4646
props := map[string]string{
47-
"connectionString": "foo",
47+
"connectionString": "foo=bar",
4848
}
4949

5050
err := m.InitWithMetadata(props)
@@ -55,7 +55,7 @@ func TestMetadata(t *testing.T) {
5555
t.Run("invalid timeout", func(t *testing.T) {
5656
m := psqlMetadata{}
5757
props := map[string]string{
58-
"connectionString": "foo",
58+
"connectionString": "foo=bar",
5959
"timeout": "NaN",
6060
}
6161

@@ -66,7 +66,7 @@ func TestMetadata(t *testing.T) {
6666
t.Run("positive timeout", func(t *testing.T) {
6767
m := psqlMetadata{}
6868
props := map[string]string{
69-
"connectionString": "foo",
69+
"connectionString": "foo=bar",
7070
"timeout": "42",
7171
}
7272

@@ -78,7 +78,7 @@ func TestMetadata(t *testing.T) {
7878
t.Run("zero timeout", func(t *testing.T) {
7979
m := psqlMetadata{}
8080
props := map[string]string{
81-
"connectionString": "foo",
81+
"connectionString": "foo=bar",
8282
"timeout": "0",
8383
}
8484

common/authentication/postgresql/metadata.go

Lines changed: 136 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import (
1717
"context"
1818
"errors"
1919
"fmt"
20+
"net/url"
21+
"strings"
2022
"time"
2123

2224
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
@@ -32,6 +34,13 @@ import (
3234
// PostgresAuthMetadata contains authentication metadata for PostgreSQL components.
3335
type PostgresAuthMetadata struct {
3436
ConnectionString string `mapstructure:"connectionString" mapstructurealiases:"url"`
37+
Host string `mapstructure:"host"`
38+
HostAddr string `mapstructure:"hostaddr"`
39+
Port string `mapstructure:"port"`
40+
Database string `mapstructure:"database"`
41+
User string `mapstructure:"user"`
42+
Password string `mapstructure:"password"`
43+
SslRootCert string `mapstructure:"sslRootCert"`
3544
ConnectionMaxIdleTime time.Duration `mapstructure:"connectionMaxIdleTime"`
3645
MaxConns int `mapstructure:"maxConns"`
3746
UseAzureAD bool `mapstructure:"useAzureAD"`
@@ -45,6 +54,13 @@ type PostgresAuthMetadata struct {
4554
// Reset the object.
4655
func (m *PostgresAuthMetadata) Reset() {
4756
m.ConnectionString = ""
57+
m.Host = ""
58+
m.HostAddr = ""
59+
m.Port = ""
60+
m.Database = ""
61+
m.User = ""
62+
m.Password = ""
63+
m.SslRootCert = ""
4864
m.ConnectionMaxIdleTime = 0
4965
m.MaxConns = 0
5066
m.UseAzureAD = false
@@ -62,8 +78,9 @@ type InitWithMetadataOpts struct {
6278
// This is different from the "useAzureAD" property from the user, which is provided by the user and instructs the component to authenticate using Azure AD.
6379
func (m *PostgresAuthMetadata) InitWithMetadata(meta map[string]string, opts InitWithMetadataOpts) (err error) {
6480
// Validate input
65-
if m.ConnectionString == "" {
66-
return errors.New("missing connection string")
81+
_, err = m.buildConnectionString()
82+
if err != nil {
83+
return err
6784
}
6885
switch {
6986
case opts.AzureADEnabled && m.UseAzureAD:
@@ -87,6 +104,118 @@ func (m *PostgresAuthMetadata) InitWithMetadata(meta map[string]string, opts Ini
87104
return nil
88105
}
89106

107+
// buildConnectionString builds the connection string from the metadata.
108+
// It supports both DSN-style and URL-style connection strings.
109+
// Metadata fields override existing values in the connection string.
110+
func (m *PostgresAuthMetadata) buildConnectionString() (string, error) {
111+
metadata := m.getConnectionStringMetadata()
112+
if strings.HasPrefix(m.ConnectionString, "postgres://") || strings.HasPrefix(m.ConnectionString, "postgresql://") {
113+
return m.buildURLConnectionString(metadata)
114+
}
115+
return m.buildDSNConnectionString(metadata)
116+
}
117+
118+
func (m *PostgresAuthMetadata) buildDSNConnectionString(metadata map[string]string) (string, error) {
119+
connectionString := ""
120+
parts := strings.Split(m.ConnectionString, " ")
121+
for _, part := range parts {
122+
kv := strings.SplitN(part, "=", 2)
123+
if len(kv) == 2 {
124+
key := kv[0]
125+
if value, ok := metadata[key]; ok {
126+
connectionString += fmt.Sprintf("%s=%s ", key, value)
127+
delete(metadata, key)
128+
} else {
129+
connectionString += fmt.Sprintf("%s=%s ", key, kv[1])
130+
}
131+
}
132+
}
133+
for k, v := range metadata {
134+
connectionString += fmt.Sprintf("%s=%s ", k, v)
135+
}
136+
137+
if connectionString == "" {
138+
return "", errors.New("failed to build connection string")
139+
}
140+
141+
return strings.TrimSpace(connectionString), nil
142+
}
143+
144+
func (m *PostgresAuthMetadata) getConnectionStringMetadata() map[string]string {
145+
metadata := make(map[string]string)
146+
if m.User != "" {
147+
metadata["user"] = m.User
148+
}
149+
if m.Host != "" {
150+
metadata["host"] = m.Host
151+
}
152+
if m.HostAddr != "" {
153+
metadata["hostaddr"] = m.HostAddr
154+
}
155+
if m.Port != "" {
156+
metadata["port"] = m.Port
157+
}
158+
if m.Database != "" {
159+
metadata["database"] = m.Database
160+
}
161+
if m.Password != "" {
162+
metadata["password"] = m.Password
163+
}
164+
if m.SslRootCert != "" {
165+
metadata["sslrootcert"] = m.SslRootCert
166+
}
167+
return metadata
168+
}
169+
170+
func (m *PostgresAuthMetadata) buildURLConnectionString(metadata map[string]string) (string, error) {
171+
u, err := url.Parse(m.ConnectionString)
172+
if err != nil {
173+
return "", fmt.Errorf("invalid URL connection string: %w", err)
174+
}
175+
176+
var username string
177+
var password string
178+
if u.User != nil {
179+
username = u.User.Username()
180+
pw, set := u.User.Password()
181+
if set {
182+
password = pw
183+
}
184+
}
185+
186+
if val, ok := metadata["user"]; ok {
187+
username = val
188+
}
189+
if val, ok := metadata["password"]; ok {
190+
password = val
191+
}
192+
if username != "" {
193+
u.User = url.UserPassword(username, password)
194+
}
195+
196+
if val, ok := metadata["host"]; ok {
197+
u.Host = val
198+
}
199+
if val, ok := metadata["hostaddr"]; ok {
200+
u.Host = val
201+
}
202+
if m.Port != "" {
203+
u.Host = fmt.Sprintf("%s:%s", u.Host, m.Port)
204+
}
205+
206+
if val, ok := metadata["database"]; ok {
207+
u.Path = "/" + strings.TrimPrefix(val, "/")
208+
}
209+
210+
q := u.Query()
211+
if val, ok := metadata["sslrootcert"]; ok {
212+
q.Set("sslrootcert", val)
213+
}
214+
u.RawQuery = q.Encode()
215+
216+
return u.String(), nil
217+
}
218+
90219
func (m *PostgresAuthMetadata) BuildAwsIamOptions(logger logger.Logger, properties map[string]string) (*aws.Options, error) {
91220
awsRegion, _ := metadata.GetMetadataProperty(m.awsEnv.Metadata, "AWSRegion")
92221
region, _ := metadata.GetMetadataProperty(m.awsEnv.Metadata, "region")
@@ -132,8 +261,11 @@ func (m *PostgresAuthMetadata) BuildAwsIamOptions(logger logger.Logger, properti
132261

133262
// GetPgxPoolConfig returns the pgxpool.Config object that contains the credentials for connecting to PostgreSQL.
134263
func (m *PostgresAuthMetadata) GetPgxPoolConfig() (*pgxpool.Config, error) {
135-
// Get the config from the connection string
136-
config, err := pgxpool.ParseConfig(m.ConnectionString)
264+
connectionString, err := m.buildConnectionString()
265+
if err != nil {
266+
return nil, err
267+
}
268+
config, err := pgxpool.ParseConfig(connectionString)
137269
if err != nil {
138270
return nil, fmt.Errorf("failed to parse connection string: %w", err)
139271
}

0 commit comments

Comments
 (0)