Skip to content

NETOBSERV-1248: informers-only component #848

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/flowlogs-pipeline
/confgenerator
/flp-informers
/bin/
cover.out
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ IMAGE ?= $(IMAGE_TAG_BASE):$(VERSION)

# Image building tool (docker / podman) - docker is preferred in CI
OCI_BIN_PATH = $(shell which docker 2>/dev/null || which podman)
OCI_BIN ?= $(shell basename ${OCI_BIN_PATH})
OCI_BIN ?= $(shell basename ${OCI_BIN_PATH} 2>/dev/null)
OCI_BUILD_OPTS ?=

ifneq ($(CLEAN_BUILD),)
Expand All @@ -42,6 +42,7 @@ GOLANGCI_LINT_VERSION = v1.61.0

FLP_BIN_FILE=flowlogs-pipeline
CG_BIN_FILE=confgenerator
FLP_INFORMERS_BIN_FILE=flp-informers
NETFLOW_GENERATOR=nflow-generator
CMD_DIR=./cmd/
FLP_CONF_FILE ?= contrib/kubernetes/flowlogs-pipeline.conf.yaml
Expand Down Expand Up @@ -111,9 +112,10 @@ lint: prereqs ## Lint the code
compile: ## Compile main flowlogs-pipeline and config generator
GOARCH=${GOARCH} go build "${CMD_DIR}${FLP_BIN_FILE}"
GOARCH=${GOARCH} go build "${CMD_DIR}${CG_BIN_FILE}"
GOARCH=${GOARCH} go build "${CMD_DIR}${FLP_INFORMERS_BIN_FILE}"

.PHONY: build
build: lint compile docs ## Build flowlogs-pipeline executable and update the docs
build: lint compile docs ## Build flowlogs-pipeline executables and update the docs

.PHONY: docs
docs: FORCE ## Update flowlogs-pipeline documentation
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,7 @@ General
Develop
lint Lint the code
compile Compile main flowlogs-pipeline and config generator
build Build flowlogs-pipeline executable and update the docs
build Build flowlogs-pipeline executables and update the docs
docs Update flowlogs-pipeline documentation
clean Clean
tests-unit Unit tests
Expand Down
7 changes: 6 additions & 1 deletion cmd/flowlogs-pipeline/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ import (

"github.com/stretchr/testify/require"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/informers"
)

func TestTheMain(t *testing.T) {
Expand All @@ -48,7 +50,10 @@ func TestTheMain(t *testing.T) {

func TestPipelineConfigSetup(t *testing.T) {
// Kube init mock
kubernetes.MockInformers()
kubernetes.ResetGlobals(
informers.NewInformersMock(),
informers.NewConfig(api.NetworkTransformKubeConfig{}),
)

js := `{
"PipeLine": "[{\"name\":\"grpc\"},{\"follows\":\"grpc\",\"name\":\"enrich\"},{\"follows\":\"enrich\",\"name\":\"loki\"},{\"follows\":\"enrich\",\"name\":\"prometheus\"}]",
Expand Down
84 changes: 84 additions & 0 deletions cmd/flp-informers/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package main

import (
"flag"
"fmt"
"net/http"
_ "net/http/pprof"
"os"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"

"github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
)

var (
buildVersion = "unknown"
buildDate = "unknown"
app = "flp-informers"
configPath = flag.String("config", "", "path to a config file")
versionFlag = flag.Bool("v", false, "print version")
log = logrus.WithField("module", "main")
)

func main() {
flag.Parse()

appVersion := fmt.Sprintf("%s [build version: %s, build date: %s]", app, buildVersion, buildDate)
if *versionFlag {
fmt.Println(appVersion)
os.Exit(0)
}

cfg, err := readConfig(*configPath)
if err != nil {
log.WithError(err).Fatal("error reading config file")
}

lvl, err := logrus.ParseLevel(cfg.LogLevel)
if err != nil {
log.Errorf("Log level %s not recognized, using info", cfg.LogLevel)
lvl = logrus.InfoLevel
}
logrus.SetLevel(lvl)
log.Infof("Starting %s at log level %s", appVersion, lvl)
log.Infof("Configuration: %#v", cfg)

if cfg.PProfPort != 0 {
go func() {
log.WithField("port", cfg.PProfPort).Info("starting PProf HTTP listener")
err := http.ListenAndServe(fmt.Sprintf(":%d", cfg.PProfPort), nil)
log.WithError(err).Error("PProf HTTP listener stopped working")
}()
}

opMetrics := operational.NewMetrics(&cfg.MetricsSettings)
err = kubernetes.InitInformerDatasource(cfg.KubeConfig, &cfg.KafkaConfig, opMetrics)
if err != nil {
log.WithError(err).Fatal("error initializing Kubernetes & informers")
}

stopCh := utils.SetupElegantExit()
<-stopCh
}

func readConfig(path string) (*config.Informers, error) {
var cfg config.Informers
if len(path) == 0 {
return &cfg, nil
}
yamlFile, err := os.ReadFile(path)
if err != nil {
return nil, err
}
err = yaml.Unmarshal(yamlFile, &cfg)
if err != nil {
return nil, err
}

return &cfg, err
}
2 changes: 2 additions & 0 deletions contrib/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ COPY cmd/ cmd/
COPY pkg/ pkg/

RUN GOARCH=$TARGETARCH go build -ldflags "$LDFLAGS" -mod vendor -o flowlogs-pipeline cmd/flowlogs-pipeline/main.go
RUN GOARCH=$TARGETARCH go build -ldflags "$LDFLAGS" -mod vendor -o flp-informers cmd/flp-informers/main.go

# final stage
FROM --platform=linux/$TARGETARCH registry.access.redhat.com/ubi9/ubi-minimal:9.5-1738816775

COPY --from=builder /app/flowlogs-pipeline /app/
COPY --from=builder /app/flp-informers /app/

ENTRYPOINT ["/app/flowlogs-pipeline"]
7 changes: 2 additions & 5 deletions contrib/docker/Dockerfile.downstream
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,22 @@ WORKDIR /app
# Copy source code
COPY go.mod .
COPY go.sum .
COPY Makefile .
COPY .mk/ .mk/
COPY vendor/ vendor/
COPY .git/ .git/
COPY cmd/ cmd/
COPY pkg/ pkg/

RUN git status --porcelain
ENV GOEXPERIMENT strictfipsruntime
RUN GOARCH=$TARGETARCH go build -tags strictfipsruntime -ldflags "-X main.BuildVersion=$BUILDVERSION -X main.BuildDate=$DATE" "./cmd/flowlogs-pipeline"
RUN GOARCH=$TARGETARCH go build -tags strictfipsruntime -ldflags "-X main.BuildVersion=$BUILDVERSION -X main.BuildDate=$DATE" "./cmd/flp-informers"

# final stage
FROM --platform=linux/$TARGETARCH registry.access.redhat.com/ubi9/ubi-minimal:9.5-1738816775

ARG COMMIT

COPY --from=builder /app/flowlogs-pipeline /app/
COPY --from=builder /app/flp-informers /app/

# expose ports
ENTRYPOINT ["/app/flowlogs-pipeline"]

LABEL com.redhat.component="network-observability-flowlogs-pipeline-container"
Expand Down
26 changes: 26 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,32 @@ Following is the supported API format for network transformations:
name: name of the secondary network, as mentioned in the annotation 'k8s.v1.cni.cncf.io/network-status'
index: fields to use for indexing, must be any combination of 'mac', 'ip', 'interface', or 'udn'
managedCNI: a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn
kafkaCacheConfig: Kafka config for informers cache (optional)
brokers: list of kafka broker addresses
topic: kafka topic to listen on
groupid: separate groupid for each consumer on specified topic
groupBalancers: list of balancing strategies (range, roundRobin, rackAffinity)
startOffset: FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition
batchReadTimeout: how often (in milliseconds) to process input
decoder: decoder to use (E.g. json or protobuf)
type: (enum) one of the following:
json: JSON decoder
protobuf: Protobuf decoder
batchMaxLen: the number of accumulated flows before being forwarded for processing
pullQueueCapacity: the capacity of the queue use to store pulled flows
pullMaxBytes: the maximum number of bytes being pulled from kafka
commitInterval: the interval (in milliseconds) at which offsets are committed to the broker. If 0, commits will be handled synchronously.
tls: TLS client configuration (optional)
insecureSkipVerify: skip client verifying the server's certificate chain and host name
caCertPath: path to the CA certificate
userCertPath: path to the user certificate
userKeyPath: path to the user private key
sasl: SASL configuration (optional)
type: SASL type
plain: Plain SASL
scramSHA512: SCRAM/SHA512 SASL
clientIDPath: path to the client ID / SASL username
clientSecretPath: path to the client secret / SASL password
servicesFile: path to services file (optional, default: /etc/services)
protocolsFile: path to protocols file (optional, default: /etc/protocols)
subnetLabels: configure subnet and IPs custom labels
Expand Down
1 change: 1 addition & 0 deletions pkg/api/transform_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type NetworkTransformKubeConfig struct {
ConfigPath string `yaml:"configPath,omitempty" json:"configPath,omitempty" doc:"path to kubeconfig file (optional)"`
SecondaryNetworks []SecondaryNetwork `yaml:"secondaryNetworks,omitempty" json:"secondaryNetworks,omitempty" doc:"configuration for secondary networks"`
ManagedCNI []string `yaml:"managedCNI,omitempty" json:"managedCNI,omitempty" doc:"a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn"`
KafkaCacheConfig *IngestKafka `yaml:"kafkaCacheConfig,omitempty" json:"kafkaCacheConfig,omitempty" doc:"Kafka config for informers cache (optional)"`
}

type TransformNetworkOperationEnum string
Expand Down
11 changes: 11 additions & 0 deletions pkg/config/informers_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package config

import "github.com/netobserv/flowlogs-pipeline/pkg/api"

type Informers struct {
KubeConfig api.NetworkTransformKubeConfig `yaml:"kubeConfig"`
KafkaConfig api.EncodeKafka `yaml:"kafkaConfig"`
MetricsSettings MetricsSettings `yaml:"metricsSettings"`
PProfPort int32 `yaml:"pprofPort"`
LogLevel string `yaml:"logLevel"`
}
23 changes: 3 additions & 20 deletions pkg/pipeline/transform/kubernetes/datasource/datasource.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,11 @@
package datasource

import (
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/cni"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/informers"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/model"
)

type Datasource struct {
Informers informers.InformersInterface
}

func NewInformerDatasource(kubeconfig string, infConfig informers.Config, opMetrics *operational.Metrics) (*Datasource, error) {
inf := &informers.Informers{}
if err := inf.InitFromConfig(kubeconfig, infConfig, opMetrics); err != nil {
return nil, err
}
return &Datasource{Informers: inf}, nil
}

func (d *Datasource) IndexLookup(potentialKeys []cni.SecondaryNetKey, ip string) *model.ResourceMetaData {
return d.Informers.IndexLookup(potentialKeys, ip)
}

func (d *Datasource) GetNodeByName(name string) (*model.ResourceMetaData, error) {
return d.Informers.GetNodeByName(name)
type Datasource interface {
IndexLookup([]cni.SecondaryNetKey, string) *model.ResourceMetaData
GetNodeByName(string) (*model.ResourceMetaData, error)
}
15 changes: 15 additions & 0 deletions pkg/pipeline/transform/kubernetes/datasource/informers_ds.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package datasource

import (
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/informers"
)

func NewInformerDatasource(kubeconfig string, infConfig informers.Config, kafkaConfig *api.EncodeKafka, opMetrics *operational.Metrics) (Datasource, error) {
inf := &informers.Informers{}
if err := inf.InitFromConfig(kubeconfig, infConfig, kafkaConfig, opMetrics); err != nil {
return nil, err
}
return inf, nil
}
Loading