Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
60 changes: 60 additions & 0 deletions common/monitoring/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package monitoring

import (
"context"
"fmt"
"net"

"github.com/hyperledger/fabric-lib-go/common/flogging"
"github.com/hyperledger/fabric-x-orderer/common/types"
)

type Monitor struct {
Provider *Provider
logger types.Logger
endpoint Endpoint
// stop is used to stop the monitoring service
stop context.CancelFunc
listener net.Listener
}

func NewMonitor(endpoint Endpoint, prefix string) *Monitor {
logger := flogging.MustGetLogger(fmt.Sprintf("%s.monitoring", prefix))
return &Monitor{Provider: NewProvider(logger), endpoint: endpoint, logger: logger}
}

func (m *Monitor) Start() {
ctx, cancel := context.WithCancel(context.Background())
m.stop = cancel

var err error
serverConfig := ServerConfig{Endpoint: &m.endpoint, logger: m.logger}
m.listener, err = serverConfig.Listener()
if err != nil {
m.logger.Panicf("%v", err)
}
m.endpoint.Port = serverConfig.Endpoint.Port

go func() {
m.Provider.StartPrometheusServer(ctx, m.listener)
}()
}

func (m *Monitor) Stop() {
if m.stop != nil {
m.stop()
}
if m.listener != nil {
m.listener.Close()
}
}

func (m *Monitor) Address() string {
return fmt.Sprintf("http://%s/metrics", m.endpoint.Address())
}
193 changes: 193 additions & 0 deletions common/monitoring/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package monitoring

import (
"context"
"net"
"net/http"
"net/url"
"time"

"github.com/hyperledger/fabric-lib-go/common/metrics"
"github.com/hyperledger/fabric-x-orderer/common/types"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/sync/errgroup"
)

const (
scheme = "http://"
metricsSubPath = "/metrics"
)

// Provider is a prometheus metrics provider.
type Provider struct {
logger types.Logger
registry *prometheus.Registry
url string
}

// NewProvider creates a new prometheus metrics provider.
func NewProvider(logger types.Logger) *Provider {
return &Provider{logger: logger, registry: prometheus.NewRegistry()}
}

// StartPrometheusServer starts a prometheus server.
// It also starts the given monitoring methods. Their context will cancel once the server is cancelled.
// This method returns once the server is shutdown and all monitoring methods returns.
func (p *Provider) StartPrometheusServer(
ctx context.Context, listener net.Listener, monitor ...func(context.Context),
) error {
p.logger.Debugf("Creating prometheus server")
mux := http.NewServeMux()
mux.Handle(
metricsSubPath,
promhttp.HandlerFor(
p.Registry(),
promhttp.HandlerOpts{
Registry: p.Registry(),
},
),
)
server := &http.Server{
ReadTimeout: 30 * time.Second,
Handler: mux,
}

var err error
p.url, err = MakeMetricsURL(listener.Addr().String())
if err != nil {
return errors.Wrap(err, "failed formatting URL")
}

g, gCtx := errgroup.WithContext(ctx)
g.Go(func() error {
p.logger.Infof("Prometheus serving on URL: %s", p.url)
defer p.logger.Infof("Prometheus stopped serving")
return server.Serve(listener)
})

// The following ensures the method does not return before all monitor methods return.
for _, m := range monitor {
g.Go(func() error {
m(gCtx)
return nil
})
}

// The following ensures the method does not return before the close procedure is complete.
stopAfter := context.AfterFunc(ctx, func() {
g.Go(func() error {
if errClose := server.Close(); errClose != nil {
return errors.Wrap(errClose, "failed to close prometheus server")
}
return nil
})
})
defer stopAfter()

if err = g.Wait(); !errors.Is(err, http.ErrServerClosed) {
return errors.Wrap(err, "prometheus server stopped with an error")
}
return nil
}

// URL returns the prometheus server URL.
func (p *Provider) URL() string {
return p.url
}

// MakeMetricsURL construct the Prometheus metrics URL.
func MakeMetricsURL(address string) (string, error) {
return url.JoinPath(scheme, address, metricsSubPath)
}

func (p *Provider) NewCounter(o metrics.CounterOpts) metrics.Counter {
c := &Counter{
cv: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: o.Namespace,
Subsystem: o.Subsystem,
Name: o.Name,
Help: o.Help,
},
o.LabelNames,
),
}

p.registry.MustRegister(c.cv)
return c
}

func (p *Provider) NewGauge(o metrics.GaugeOpts) metrics.Gauge {
g := &Gauge{
gv: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: o.Namespace,
Subsystem: o.Subsystem,
Name: o.Name,
Help: o.Help,
},
o.LabelNames,
),
}

p.registry.MustRegister(g.gv)
return g
}

func (p *Provider) NewHistogram(o metrics.HistogramOpts) metrics.Histogram {
h := &Histogram{
hv: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: o.Namespace,
Subsystem: o.Subsystem,
Name: o.Name,
Help: o.Help,
Buckets: o.Buckets,
},
o.LabelNames,
),
}

p.registry.MustRegister(h.hv)
return h
}

type Counter struct {
prometheus.Counter
cv *prometheus.CounterVec
}

func (c *Counter) With(labelValues ...string) metrics.Counter {
return &Counter{Counter: c.cv.WithLabelValues(labelValues...)}
}

type Gauge struct {
prometheus.Gauge
gv *prometheus.GaugeVec
}

func (g *Gauge) With(labelValues ...string) metrics.Gauge {
return &Gauge{Gauge: g.gv.WithLabelValues(labelValues...)}
}

type Histogram struct {
prometheus.Observer
hv *prometheus.HistogramVec
}

func (h *Histogram) With(labelValues ...string) metrics.Histogram {
return &Histogram{Observer: h.hv.WithLabelValues(labelValues...)}
}

// Registry returns the prometheus registry.
func (p *Provider) Registry() *prometheus.Registry {
return p.registry
}
75 changes: 75 additions & 0 deletions common/monitoring/server_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package monitoring

import (
"net"
"os/exec"
"strconv"
"strings"

"github.com/hyperledger/fabric-x-orderer/common/types"
"github.com/pkg/errors"
)

const protocol = "tcp"

type Endpoint struct {
Host string
Port int
}

// Address returns a string representation of the endpoint's address.
func (e *Endpoint) Address() string {
return net.JoinHostPort(e.Host, strconv.Itoa(e.Port))
}

type ServerConfig struct {
Endpoint *Endpoint
preAllocatedListener net.Listener
logger types.Logger
}

// Listener instantiate a [net.Listener] and updates the config port with the effective port.
func (s *ServerConfig) Listener() (net.Listener, error) {
if s.preAllocatedListener != nil {
return s.preAllocatedListener, nil
}
listener, err := net.Listen(protocol, s.Endpoint.Address())
if err != nil {
return nil, errors.Wrap(err, "failed to listen")
}

addr := listener.Addr()
tcpAddress, ok := addr.(*net.TCPAddr)
if !ok {
return nil, errors.New(strings.Join([]string{"failed to cast to TCP address", listener.Close().Error()}, "; "))
}
s.Endpoint.Port = tcpAddress.Port

s.logger.Infof("Listening on: %s://%s", protocol, s.Endpoint.Address())
return listener, nil
}

// PreAllocateListener is used to allocate a port and bind to ahead of the server initialization.
// It stores the listener object internally to be reused on subsequent calls to Listener().
func (c *ServerConfig) PreAllocateListener() (net.Listener, error) {
listener, err := c.Listener()
if err != nil {
return nil, err
}
c.preAllocatedListener = listener
return listener, nil
}

func FQDN() (string, error) {
out, err := exec.Command("hostname", "--fqdn").Output()
if err != nil {
return "", err
}
return string(out), nil
}
30 changes: 18 additions & 12 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ func (config *Configuration) ExtractRouterConfig(configBlock *common.Block) *nod
RequestMaxBytes: config.SharedConfig.BatchingConfig.RequestMaxBytes,
ClientSignatureVerificationRequired: config.LocalConfig.NodeLocalConfig.GeneralConfig.ClientSignatureVerificationRequired,
Bundle: bundle,
MonitoringListenAddress: config.LocalConfig.NodeLocalConfig.GeneralConfig.ListenAddress + ":" + strconv.Itoa(int(config.LocalConfig.NodeLocalConfig.GeneralConfig.MonitoringListenPort)),
MetricsLogInterval: config.LocalConfig.NodeLocalConfig.GeneralConfig.MetricsLogInterval,
}
return routerConfig
}
Expand Down Expand Up @@ -284,8 +286,9 @@ func (config *Configuration) ExtractBatcherConfig(configBlock *common.Block) *no
RequestMaxBytes: config.SharedConfig.BatchingConfig.RequestMaxBytes,
SubmitTimeout: config.LocalConfig.NodeLocalConfig.BatcherParams.SubmitTimeout,
BatchSequenceGap: types.BatchSequence(config.LocalConfig.NodeLocalConfig.BatcherParams.BatchSequenceGap),
ClientSignatureVerificationRequired: config.LocalConfig.NodeLocalConfig.GeneralConfig.ClientSignatureVerificationRequired,
MonitoringListenAddress: config.LocalConfig.NodeLocalConfig.GeneralConfig.ListenAddress + ":" + strconv.Itoa(int(config.LocalConfig.NodeLocalConfig.GeneralConfig.MonitoringListenPort)),
MetricsLogInterval: config.LocalConfig.NodeLocalConfig.GeneralConfig.MetricsLogInterval,
ClientSignatureVerificationRequired: config.LocalConfig.NodeLocalConfig.GeneralConfig.ClientSignatureVerificationRequired,
}

if batcherConfig.FirstStrikeThreshold, err = time.ParseDuration(config.SharedConfig.BatchingConfig.BatchTimeouts.FirstStrikeThreshold); err != nil {
Expand Down Expand Up @@ -317,17 +320,18 @@ func (config *Configuration) ExtractConsenterConfig() *nodeconfig.ConsenterNodeC
panic(fmt.Sprintf("error launching consenter, failed extracting consenter config: %s", err))
}
consenterConfig := &nodeconfig.ConsenterNodeConfig{
Shards: config.ExtractShards(),
Consenters: config.ExtractConsenters(),
Directory: config.LocalConfig.NodeLocalConfig.FileStore.Path,
ListenAddress: config.LocalConfig.NodeLocalConfig.GeneralConfig.ListenAddress + ":" + strconv.Itoa(int(config.LocalConfig.NodeLocalConfig.GeneralConfig.ListenPort)),
PartyId: config.LocalConfig.NodeLocalConfig.PartyID,
TLSPrivateKeyFile: config.LocalConfig.TLSConfig.PrivateKey,
TLSCertificateFile: config.LocalConfig.TLSConfig.Certificate,
SigningPrivateKey: signingPrivateKey,
WALDir: DefaultConsenterNodeConfigParams(config.LocalConfig.NodeLocalConfig.FileStore.Path).WALDir,
BFTConfig: BFTConfig,
MetricsLogInterval: config.LocalConfig.NodeLocalConfig.GeneralConfig.MetricsLogInterval,
Shards: config.ExtractShards(),
Consenters: config.ExtractConsenters(),
Directory: config.LocalConfig.NodeLocalConfig.FileStore.Path,
ListenAddress: config.LocalConfig.NodeLocalConfig.GeneralConfig.ListenAddress + ":" + strconv.Itoa(int(config.LocalConfig.NodeLocalConfig.GeneralConfig.ListenPort)),
PartyId: config.LocalConfig.NodeLocalConfig.PartyID,
TLSPrivateKeyFile: config.LocalConfig.TLSConfig.PrivateKey,
TLSCertificateFile: config.LocalConfig.TLSConfig.Certificate,
SigningPrivateKey: signingPrivateKey,
WALDir: DefaultConsenterNodeConfigParams(config.LocalConfig.NodeLocalConfig.FileStore.Path).WALDir,
BFTConfig: BFTConfig,
MonitoringListenAddress: config.LocalConfig.NodeLocalConfig.GeneralConfig.ListenAddress + ":" + strconv.Itoa(int(config.LocalConfig.NodeLocalConfig.GeneralConfig.MonitoringListenPort)),
MetricsLogInterval: config.LocalConfig.NodeLocalConfig.GeneralConfig.MetricsLogInterval,
}
return consenterConfig
}
Expand Down Expand Up @@ -358,6 +362,8 @@ func (config *Configuration) ExtractAssemblerConfig() *nodeconfig.AssemblerNodeC
Consenter: consenterFromMyParty,
UseTLS: config.LocalConfig.TLSConfig.Enabled,
ClientAuthRequired: config.LocalConfig.TLSConfig.ClientAuthRequired,
MonitoringListenAddress: config.LocalConfig.NodeLocalConfig.GeneralConfig.ListenAddress + ":" + strconv.Itoa(int(config.LocalConfig.NodeLocalConfig.GeneralConfig.MonitoringListenPort)),
MetricsLogInterval: config.LocalConfig.NodeLocalConfig.GeneralConfig.MetricsLogInterval,
}
return assemblerConfig
}
Expand Down
Loading