Skip to content

Commit 2e4b418

Browse files
committed
Add noop and plain authentication
1 parent f798fd6 commit 2e4b418

File tree

16 files changed

+483
-50
lines changed

16 files changed

+483
-50
lines changed

README.md

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,14 @@ MQTT Proxy allows MQTT clients to send messages to other messaging systems
1515
* [x] [MQTT 3.1.1](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html)
1616
* [ ] [MQTT 5.0](https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html)
1717
* Publisher
18+
* [x] Noop
1819
* [x] [Apache Kafka](https://kafka.apache.org/)
1920
* [ ] [Apache Pulsar](https://pulsar.apache.org/)
2021
* [ ] Others
21-
* [ ] Authentication
22+
* Authentication
23+
* [x] Noop
24+
* [x] Plain
25+
* [ ] Others
2226
* [x] Helm chart
2327

2428
## Build
@@ -111,6 +115,38 @@ prerequisites
111115
mosquitto_pub -m "on" -t "dummy" -k 20 -i mqtt-proxy.clientv --repeat 1 -q 1 -h <ec2-ip> -p 1884
112116
```
113117
118+
### plain authenticator
119+
120+
1. start server with `plain` authenticator
121+
* with credentials file
122+
123+
```
124+
cat <<EOF > mqtt-credentials.csv
125+
alice,alice-secret
126+
"bob","bob-secret"
127+
EOF
128+
```
129+
130+
```
131+
mqtt-proxy server --mqtt.publisher.name=noop \
132+
--mqtt.handler.auth.name=plain \
133+
--mqtt.handler.auth.plain.credentials-file=mqtt-credentials.csv
134+
```
135+
* providing credentials as parameters
136+
```
137+
mqtt-proxy server --mqtt.publisher.name=noop \
138+
--mqtt.handler.auth.name=plain \
139+
--mqtt.handler.auth.plain.credentials=alice=alice-secret \
140+
--mqtt.handler.auth.plain.credentials=bob=bob-secret
141+
```
142+
2. publish
143+
144+
```
145+
mosquitto_pub -m "on" -t "dummy" -u alice -P alice-secret
146+
mosquitto_pub -L mqtt://bob:bob-secret@localhost:1883/dummy -m "on"
147+
```
148+
149+
114150
## Configuration
115151
116152
### Kafka publisher
@@ -121,7 +157,6 @@ Kafka producer configuration properties used by [librdkafka](https://github.com/
121157
--mqtt.publisher.kafka.config=producer.sasl.mechanisms=PLAIN,producer.security.protocol=SASL_SSL,producer.sasl.username=myuser,producer.sasl.password=mypasswd
122158
```
123159
124-
125160
### Examples
126161
127162
- Ignore subscribe / unsubscribe requests
@@ -141,4 +176,5 @@ metric | labels | description
141176
|mqtt_proxy_server_connections_total| |Total number of TCP connections from clients to server.|
142177
|mqtt_proxy_handler_requests_total|type|Total number of MQTT requests labeled by package control type. |
143178
|mqtt_proxy_handler_responses_total|type|Total number of MQTT responses labeled by package control type. |
144-
|mqtt_proxy_publisher_publish_duration_seconds | name, type, qos | Histogram tracking latencies for publish requests. |
179+
|mqtt_proxy_publisher_publish_duration_seconds | name, type, qos | Histogram tracking latencies for publish requests. |
180+
|mqtt_proxy_authenticator_login_duration_seconds | name, code, err | Histogram tracking latencies for login requests. |

apis/auth.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,15 @@
11
package apis
22

3-
import "context"
3+
import (
4+
"context"
5+
6+
mqttcodec "github.com/grepplabs/mqtt-proxy/pkg/mqtt/codec"
7+
)
8+
9+
const (
10+
AuthAccepted = mqttcodec.Accepted
11+
AuthUnauthorized = mqttcodec.RefusedNotAuthorized
12+
)
413

514
type UserPasswordAuthRequest struct {
615
Username string
@@ -12,10 +21,11 @@ type UserPasswordAuthResponse struct {
1221
}
1322

1423
type UserPasswordAuthenticator interface {
24+
Name() string
1525
Login(context.Context, *UserPasswordAuthRequest) (*UserPasswordAuthResponse, error)
1626
Close() error
1727
}
1828

1929
type UserPasswordAuthenticatorFactory interface {
20-
New(context.Context) (UserPasswordAuthenticator, error)
30+
New(params []string) (UserPasswordAuthenticator, error)
2131
}

apis/publish.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,5 @@ type Publisher interface {
3232
}
3333

3434
type PublisherFactory interface {
35-
New(context.Context) (Publisher, error)
35+
New(params []string) (Publisher, error)
3636
}

cmd/server.go

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@ package cmd
22

33
import (
44
"github.com/grepplabs/mqtt-proxy/apis"
5+
authinst "github.com/grepplabs/mqtt-proxy/pkg/auth/instrument"
6+
authnoop "github.com/grepplabs/mqtt-proxy/pkg/auth/noop"
7+
authplain "github.com/grepplabs/mqtt-proxy/pkg/auth/plain"
58
"github.com/grepplabs/mqtt-proxy/pkg/config"
69
"github.com/grepplabs/mqtt-proxy/pkg/log"
710
mqtthandler "github.com/grepplabs/mqtt-proxy/pkg/mqtt/handler"
811
"github.com/grepplabs/mqtt-proxy/pkg/prober"
9-
"github.com/grepplabs/mqtt-proxy/pkg/publisher/instrument"
10-
"github.com/grepplabs/mqtt-proxy/pkg/publisher/kafka"
11-
"github.com/grepplabs/mqtt-proxy/pkg/publisher/noop"
12+
pubinst "github.com/grepplabs/mqtt-proxy/pkg/publisher/instrument"
13+
pubkafka "github.com/grepplabs/mqtt-proxy/pkg/publisher/kafka"
14+
pubnoop "github.com/grepplabs/mqtt-proxy/pkg/publisher/noop"
1215
httpserver "github.com/grepplabs/mqtt-proxy/pkg/server/http"
1316
mqttserver "github.com/grepplabs/mqtt-proxy/pkg/server/mqtt"
1417
"github.com/grepplabs/mqtt-proxy/pkg/tls"
@@ -25,6 +28,7 @@ func registerServer(m map[string]setupFunc, app *kingpin.Application) {
2528
cmd := app.Command(command, "mqtt-proxy server")
2629

2730
cfg := new(config.Server)
31+
cfg.Init()
2832

2933
cmd.Flag("http.listen-address", "Listen host:port for HTTP endpoints.").Default("0.0.0.0:9090").StringVar(&cfg.HTTP.ListenAddress)
3034
cmd.Flag("http.grace-period", "Time to wait after an interrupt received for HTTP Server.").Default("10s").DurationVar(&cfg.HTTP.GracePeriod)
@@ -49,12 +53,17 @@ func registerServer(m map[string]setupFunc, app *kingpin.Application) {
4953
cmd.Flag("mqtt.handler.publish.async.at-least-once", "Async publish for AT_LEAST_ONCE QoS.").Default("false").BoolVar(&cfg.MQTT.Handler.Publish.Async.AtLeastOnce)
5054
cmd.Flag("mqtt.handler.publish.async.exactly-once", "Async publish for EXACTLY_ONCE QoS.").Default("false").BoolVar(&cfg.MQTT.Handler.Publish.Async.ExactlyOnce)
5155

52-
cmd.Flag("mqtt.publisher.name", "Publisher name. One of: [noop, kafka]").Default(config.Noop).EnumVar(&cfg.MQTT.Publisher.Name, config.Noop, config.Kafka)
56+
cmd.Flag("mqtt.handler.auth.name", "Authenticator name. One of: [noop, plain]").Default(config.AuthNoop).EnumVar(&cfg.MQTT.Handler.Authenticator.Name, config.AuthNoop, config.AuthPlain)
57+
cmd.Flag("mqtt.handler.auth.plain.credentials", "List of username and password fields.").Default("USERNAME=PASSWORD").StringMapVar(&cfg.MQTT.Handler.Authenticator.Plain.Credentials)
58+
cmd.Flag("mqtt.handler.auth.plain.credentials-file", "Location of a headerless CSV file containing `usernanme,password` records").Default("").StringVar(&cfg.MQTT.Handler.Authenticator.Plain.CredentialsFile)
59+
60+
cmd.Flag("mqtt.publisher.name", "Publisher name. One of: [noop, kafka]").Default(config.PublisherNoop).EnumVar(&cfg.MQTT.Publisher.Name, config.PublisherNoop, config.PublisherKafka)
5361
cmd.Flag("mqtt.publisher.kafka.config", "Comma separated list of properties").PlaceHolder("PROP=VAL").SetValue(&cfg.MQTT.Publisher.Kafka.ConfArgs)
5462
cmd.Flag("mqtt.publisher.kafka.bootstrap-servers", "Kafka bootstrap servers").Default("localhost:9092").StringVar(&cfg.MQTT.Publisher.Kafka.BootstrapServers)
5563
cmd.Flag("mqtt.publisher.kafka.grace-period", "Time to wait after an interrupt received for Kafka publisher.").Default("10s").DurationVar(&cfg.MQTT.Publisher.Kafka.GracePeriod)
5664
cmd.Flag("mqtt.publisher.kafka.default-topic", "Default Kafka topic for MQTT publish messages").Default("").StringVar(&cfg.MQTT.Publisher.Kafka.DefaultTopic)
5765
cmd.Flag("mqtt.publisher.kafka.topic-mappings", "Comma separated list of Kafka topic to MQTT topic mappings").PlaceHolder("TOPIC=REGEX").SetValue(&cfg.MQTT.Publisher.Kafka.TopicMappings)
66+
cmd.Flag("mqtt.publisher.kafka.workers", "Number of kafka publisher workers").Default("1").IntVar(&cfg.MQTT.Publisher.Kafka.Workers)
5867

5968
m[command] = func(group *run.Group, logger log.Logger, registry *prometheus.Registry) error {
6069
return runServer(group, logger, registry, cfg)
@@ -91,33 +100,58 @@ func runServer(
91100
srv.Shutdown(err)
92101
})
93102
}
103+
104+
var authenticator apis.UserPasswordAuthenticator
105+
{
106+
logger.Infof("setting up authenticator %s", cfg.MQTT.Handler.Authenticator.Name)
107+
108+
switch cfg.MQTT.Handler.Authenticator.Name {
109+
case config.AuthNoop:
110+
authenticator = authnoop.New(logger, registry)
111+
case config.AuthPlain:
112+
authenticator, err = authplain.New(logger, registry,
113+
authplain.WithCredentials(cfg.MQTT.Handler.Authenticator.Plain.Credentials),
114+
authplain.WithCredentialsFile(cfg.MQTT.Handler.Authenticator.Plain.CredentialsFile),
115+
)
116+
if err != nil {
117+
return errors.Wrap(err, "setup plain authenticator")
118+
}
119+
default:
120+
return errors.Errorf("unknown authenticator %s", cfg.MQTT.Handler.Authenticator.Name)
121+
}
122+
authenticator = authinst.New(authenticator, registry)
123+
defer func() {
124+
err := authenticator.Close()
125+
if err != nil {
126+
logger.WithError(err).Warnf("authenticator close failed")
127+
}
128+
}()
129+
}
94130
var publisher apis.Publisher
95131
{
96-
logger.Infof("setting publisher")
132+
logger.Infof("setting up publisher %s", cfg.MQTT.Publisher.Name)
97133

98134
var err error
99135

100136
switch cfg.MQTT.Publisher.Name {
101-
case config.Noop:
102-
publisher, err = noop.New(logger, registry)
103-
if err != nil {
104-
return errors.Wrap(err, "setup noop publisher")
105-
}
106-
case config.Kafka:
107-
publisher, err = kafka.New(logger, registry,
108-
kafka.WithBootstrapServers(cfg.MQTT.Publisher.Kafka.BootstrapServers),
109-
kafka.WithDefaultTopic(cfg.MQTT.Publisher.Kafka.DefaultTopic),
110-
kafka.WithTopicMappings(cfg.MQTT.Publisher.Kafka.TopicMappings),
111-
kafka.WithConfigMap(cfg.MQTT.Publisher.Kafka.ConfArgs.ConfigMap()),
112-
kafka.WithGracePeriod(cfg.MQTT.Publisher.Kafka.GracePeriod),
137+
case config.PublisherNoop:
138+
publisher = pubnoop.New(logger, registry)
139+
case config.PublisherKafka:
140+
publisher, err = pubkafka.New(logger, registry,
141+
pubkafka.WithBootstrapServers(cfg.MQTT.Publisher.Kafka.BootstrapServers),
142+
pubkafka.WithDefaultTopic(cfg.MQTT.Publisher.Kafka.DefaultTopic),
143+
pubkafka.WithTopicMappings(cfg.MQTT.Publisher.Kafka.TopicMappings),
144+
pubkafka.WithConfigMap(cfg.MQTT.Publisher.Kafka.ConfArgs.ConfigMap()),
145+
pubkafka.WithGracePeriod(cfg.MQTT.Publisher.Kafka.GracePeriod),
146+
pubkafka.WithWorkers(cfg.MQTT.Publisher.Kafka.Workers),
113147
)
114148
if err != nil {
115149
return errors.Wrap(err, "setup kafka publisher")
116150
}
117151
default:
118-
return errors.Errorf("Unknown publisher %s", cfg.MQTT.Publisher.Name)
152+
return errors.Errorf("unknown publisher %s", cfg.MQTT.Publisher.Name)
119153
}
120-
publisher = instrument.New(publisher, registry)
154+
publisher = pubinst.New(publisher, registry)
121155

122156
group.Add(func() error {
123157
return publisher.Serve()
@@ -140,6 +174,7 @@ func runServer(
140174
mqtthandler.WithPublishAsyncAtMostOnce(cfg.MQTT.Handler.Publish.Async.AtMostOnce),
141175
mqtthandler.WithPublishAsyncAtLeastOnce(cfg.MQTT.Handler.Publish.Async.AtLeastOnce),
142176
mqtthandler.WithPublishAsyncExactlyOnce(cfg.MQTT.Handler.Publish.Async.ExactlyOnce),
177+
mqtthandler.WithAuthenticator(authenticator),
143178
)
144179

145180
srv := mqttserver.New(logger, registry, httpProbe,

pkg/auth/instrument/instrument.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package instrument
2+
3+
import (
4+
"context"
5+
"strconv"
6+
"time"
7+
8+
"github.com/grepplabs/mqtt-proxy/apis"
9+
"github.com/prometheus/client_golang/prometheus"
10+
"github.com/prometheus/client_golang/prometheus/promauto"
11+
)
12+
13+
type authenticator struct {
14+
delegate apis.UserPasswordAuthenticator
15+
metrics *authenticatorMetrics
16+
}
17+
18+
type authenticatorMetrics struct {
19+
loginDuration *prometheus.HistogramVec
20+
}
21+
22+
func New(delegate apis.UserPasswordAuthenticator, registry *prometheus.Registry) apis.UserPasswordAuthenticator {
23+
return &authenticator{
24+
delegate: delegate,
25+
metrics: newAuthenticatorMetrics(delegate.Name(), registry),
26+
}
27+
}
28+
29+
func (p *authenticator) Name() string {
30+
return p.delegate.Name()
31+
}
32+
33+
func (p *authenticator) Login(ctx context.Context, authRequest *apis.UserPasswordAuthRequest) (*apis.UserPasswordAuthResponse, error) {
34+
start := time.Now()
35+
authResponse, err := p.delegate.Login(ctx, authRequest)
36+
code := ""
37+
if authResponse != nil {
38+
code = strconv.Itoa(int(authResponse.ReturnCode))
39+
}
40+
isError := "0"
41+
if err != nil {
42+
isError = "1"
43+
}
44+
p.metrics.loginDuration.WithLabelValues(code, isError).Observe(time.Since(start).Seconds())
45+
return authResponse, err
46+
}
47+
48+
func (p *authenticator) Close() error {
49+
return p.delegate.Close()
50+
}
51+
52+
func newAuthenticatorMetrics(name string, registry *prometheus.Registry) *authenticatorMetrics {
53+
loginDuration := promauto.With(registry).NewHistogramVec(prometheus.HistogramOpts{
54+
Name: "mqtt_proxy_authenticator_login_duration_seconds",
55+
Help: "Tracks the latencies for auth requests.",
56+
ConstLabels: prometheus.Labels{"name": name},
57+
}, []string{"code", "error"})
58+
59+
return &authenticatorMetrics{
60+
loginDuration: loginDuration,
61+
}
62+
}

pkg/auth/noop/noop.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package noop
2+
3+
import (
4+
"context"
5+
6+
"github.com/grepplabs/mqtt-proxy/apis"
7+
"github.com/grepplabs/mqtt-proxy/pkg/log"
8+
"github.com/prometheus/client_golang/prometheus"
9+
)
10+
11+
const (
12+
authName = "noop"
13+
)
14+
15+
type noopAuthenticator struct {
16+
logger log.Logger
17+
}
18+
19+
func New(logger log.Logger, _ *prometheus.Registry) apis.UserPasswordAuthenticator {
20+
return &noopAuthenticator{
21+
logger: logger.WithField("authenticator", authName),
22+
}
23+
}
24+
25+
func (p noopAuthenticator) Login(_ context.Context, _ *apis.UserPasswordAuthRequest) (*apis.UserPasswordAuthResponse, error) {
26+
return &apis.UserPasswordAuthResponse{
27+
ReturnCode: apis.AuthAccepted,
28+
}, nil
29+
}
30+
31+
func (p noopAuthenticator) Close() error {
32+
return nil
33+
}
34+
35+
func (p noopAuthenticator) Name() string {
36+
return authName
37+
}

0 commit comments

Comments
 (0)