diff --git a/.gitignore b/.gitignore index 16ce1e8da..7ac2b0f05 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ /flowlogs-pipeline /confgenerator +/flp-informers /bin/ cover.out diff --git a/Makefile b/Makefile index 0cd56ac13..4d639da25 100644 --- a/Makefile +++ b/Makefile @@ -29,7 +29,7 @@ IMAGE ?= $(IMAGE_TAG_BASE):$(VERSION) # Image building tool (docker / podman) - docker is preferred in CI OCI_BIN_PATH = $(shell which docker 2>/dev/null || which podman) -OCI_BIN ?= $(shell basename ${OCI_BIN_PATH}) +OCI_BIN ?= $(shell basename ${OCI_BIN_PATH} 2>/dev/null) OCI_BUILD_OPTS ?= ifneq ($(CLEAN_BUILD),) @@ -42,6 +42,7 @@ GOLANGCI_LINT_VERSION = v1.61.0 FLP_BIN_FILE=flowlogs-pipeline CG_BIN_FILE=confgenerator +FLP_INFORMERS_BIN_FILE=flp-informers NETFLOW_GENERATOR=nflow-generator CMD_DIR=./cmd/ FLP_CONF_FILE ?= contrib/kubernetes/flowlogs-pipeline.conf.yaml @@ -111,9 +112,10 @@ lint: prereqs ## Lint the code compile: ## Compile main flowlogs-pipeline and config generator GOARCH=${GOARCH} go build "${CMD_DIR}${FLP_BIN_FILE}" GOARCH=${GOARCH} go build "${CMD_DIR}${CG_BIN_FILE}" + GOARCH=${GOARCH} go build "${CMD_DIR}${FLP_INFORMERS_BIN_FILE}" .PHONY: build -build: lint compile docs ## Build flowlogs-pipeline executable and update the docs +build: lint compile docs ## Build flowlogs-pipeline executables and update the docs .PHONY: docs docs: FORCE ## Update flowlogs-pipeline documentation diff --git a/README.md b/README.md index 193e4e579..048c72657 100644 --- a/README.md +++ b/README.md @@ -928,7 +928,7 @@ General Develop lint Lint the code compile Compile main flowlogs-pipeline and config generator - build Build flowlogs-pipeline executable and update the docs + build Build flowlogs-pipeline executables and update the docs docs Update flowlogs-pipeline documentation clean Clean tests-unit Unit tests diff --git a/cmd/flowlogs-pipeline/main_test.go b/cmd/flowlogs-pipeline/main_test.go index 2f0a3c8ae..0f8f29d3f 100644 --- a/cmd/flowlogs-pipeline/main_test.go +++ b/cmd/flowlogs-pipeline/main_test.go @@ -26,9 +26,11 @@ import ( "github.com/stretchr/testify/require" + "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/informers" ) func TestTheMain(t *testing.T) { @@ -48,7 +50,10 @@ func TestTheMain(t *testing.T) { func TestPipelineConfigSetup(t *testing.T) { // Kube init mock - kubernetes.MockInformers() + kubernetes.ResetGlobals( + informers.NewInformersMock(), + informers.NewConfig(api.NetworkTransformKubeConfig{}), + ) js := `{ "PipeLine": "[{\"name\":\"grpc\"},{\"follows\":\"grpc\",\"name\":\"enrich\"},{\"follows\":\"enrich\",\"name\":\"loki\"},{\"follows\":\"enrich\",\"name\":\"prometheus\"}]", diff --git a/cmd/flp-informers/main.go b/cmd/flp-informers/main.go new file mode 100644 index 000000000..668c86e5d --- /dev/null +++ b/cmd/flp-informers/main.go @@ -0,0 +1,84 @@ +package main + +import ( + "flag" + "fmt" + "net/http" + _ "net/http/pprof" + "os" + + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" + + "github.com/sirupsen/logrus" + "gopkg.in/yaml.v2" +) + +var ( + buildVersion = "unknown" + buildDate = "unknown" + app = "flp-informers" + configPath = flag.String("config", "", "path to a config file") + versionFlag = flag.Bool("v", false, "print version") + log = logrus.WithField("module", "main") +) + +func main() { + flag.Parse() + + appVersion := fmt.Sprintf("%s [build version: %s, build date: %s]", app, buildVersion, buildDate) + if *versionFlag { + fmt.Println(appVersion) + os.Exit(0) + } + + cfg, err := readConfig(*configPath) + if err != nil { + log.WithError(err).Fatal("error reading config file") + } + + lvl, err := logrus.ParseLevel(cfg.LogLevel) + if err != nil { + log.Errorf("Log level %s not recognized, using info", cfg.LogLevel) + lvl = logrus.InfoLevel + } + logrus.SetLevel(lvl) + log.Infof("Starting %s at log level %s", appVersion, lvl) + log.Infof("Configuration: %#v", cfg) + + if cfg.PProfPort != 0 { + go func() { + log.WithField("port", cfg.PProfPort).Info("starting PProf HTTP listener") + err := http.ListenAndServe(fmt.Sprintf(":%d", cfg.PProfPort), nil) + log.WithError(err).Error("PProf HTTP listener stopped working") + }() + } + + opMetrics := operational.NewMetrics(&cfg.MetricsSettings) + err = kubernetes.InitInformerDatasource(cfg.KubeConfig, &cfg.KafkaConfig, opMetrics) + if err != nil { + log.WithError(err).Fatal("error initializing Kubernetes & informers") + } + + stopCh := utils.SetupElegantExit() + <-stopCh +} + +func readConfig(path string) (*config.Informers, error) { + var cfg config.Informers + if len(path) == 0 { + return &cfg, nil + } + yamlFile, err := os.ReadFile(path) + if err != nil { + return nil, err + } + err = yaml.Unmarshal(yamlFile, &cfg) + if err != nil { + return nil, err + } + + return &cfg, err +} diff --git a/contrib/docker/Dockerfile b/contrib/docker/Dockerfile index 5861a9b45..50bcb91ba 100644 --- a/contrib/docker/Dockerfile +++ b/contrib/docker/Dockerfile @@ -13,10 +13,12 @@ COPY cmd/ cmd/ COPY pkg/ pkg/ RUN GOARCH=$TARGETARCH go build -ldflags "$LDFLAGS" -mod vendor -o flowlogs-pipeline cmd/flowlogs-pipeline/main.go +RUN GOARCH=$TARGETARCH go build -ldflags "$LDFLAGS" -mod vendor -o flp-informers cmd/flp-informers/main.go # final stage FROM --platform=linux/$TARGETARCH registry.access.redhat.com/ubi9/ubi-minimal:9.5-1738816775 COPY --from=builder /app/flowlogs-pipeline /app/ +COPY --from=builder /app/flp-informers /app/ ENTRYPOINT ["/app/flowlogs-pipeline"] diff --git a/contrib/docker/Dockerfile.downstream b/contrib/docker/Dockerfile.downstream index a33b871f2..5969ad142 100644 --- a/contrib/docker/Dockerfile.downstream +++ b/contrib/docker/Dockerfile.downstream @@ -11,16 +11,13 @@ WORKDIR /app # Copy source code COPY go.mod . COPY go.sum . -COPY Makefile . -COPY .mk/ .mk/ COPY vendor/ vendor/ -COPY .git/ .git/ COPY cmd/ cmd/ COPY pkg/ pkg/ -RUN git status --porcelain ENV GOEXPERIMENT strictfipsruntime RUN GOARCH=$TARGETARCH go build -tags strictfipsruntime -ldflags "-X main.BuildVersion=$BUILDVERSION -X main.BuildDate=$DATE" "./cmd/flowlogs-pipeline" +RUN GOARCH=$TARGETARCH go build -tags strictfipsruntime -ldflags "-X main.BuildVersion=$BUILDVERSION -X main.BuildDate=$DATE" "./cmd/flp-informers" # final stage FROM --platform=linux/$TARGETARCH registry.access.redhat.com/ubi9/ubi-minimal:9.5-1738816775 @@ -28,8 +25,8 @@ FROM --platform=linux/$TARGETARCH registry.access.redhat.com/ubi9/ubi-minimal:9. ARG COMMIT COPY --from=builder /app/flowlogs-pipeline /app/ +COPY --from=builder /app/flp-informers /app/ -# expose ports ENTRYPOINT ["/app/flowlogs-pipeline"] LABEL com.redhat.component="network-observability-flowlogs-pipeline-container" diff --git a/docs/api.md b/docs/api.md index 37ff16bd7..b0eae8a32 100644 --- a/docs/api.md +++ b/docs/api.md @@ -297,6 +297,32 @@ Following is the supported API format for network transformations: name: name of the secondary network, as mentioned in the annotation 'k8s.v1.cni.cncf.io/network-status' index: fields to use for indexing, must be any combination of 'mac', 'ip', 'interface', or 'udn' managedCNI: a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn + kafkaCacheConfig: Kafka config for informers cache (optional) + brokers: list of kafka broker addresses + topic: kafka topic to listen on + groupid: separate groupid for each consumer on specified topic + groupBalancers: list of balancing strategies (range, roundRobin, rackAffinity) + startOffset: FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition + batchReadTimeout: how often (in milliseconds) to process input + decoder: decoder to use (E.g. json or protobuf) + type: (enum) one of the following: + json: JSON decoder + protobuf: Protobuf decoder + batchMaxLen: the number of accumulated flows before being forwarded for processing + pullQueueCapacity: the capacity of the queue use to store pulled flows + pullMaxBytes: the maximum number of bytes being pulled from kafka + commitInterval: the interval (in milliseconds) at which offsets are committed to the broker. If 0, commits will be handled synchronously. + tls: TLS client configuration (optional) + insecureSkipVerify: skip client verifying the server's certificate chain and host name + caCertPath: path to the CA certificate + userCertPath: path to the user certificate + userKeyPath: path to the user private key + sasl: SASL configuration (optional) + type: SASL type + plain: Plain SASL + scramSHA512: SCRAM/SHA512 SASL + clientIDPath: path to the client ID / SASL username + clientSecretPath: path to the client secret / SASL password servicesFile: path to services file (optional, default: /etc/services) protocolsFile: path to protocols file (optional, default: /etc/protocols) subnetLabels: configure subnet and IPs custom labels diff --git a/pkg/api/transform_network.go b/pkg/api/transform_network.go index e174df344..7fdbe7a63 100644 --- a/pkg/api/transform_network.go +++ b/pkg/api/transform_network.go @@ -46,6 +46,7 @@ type NetworkTransformKubeConfig struct { ConfigPath string `yaml:"configPath,omitempty" json:"configPath,omitempty" doc:"path to kubeconfig file (optional)"` SecondaryNetworks []SecondaryNetwork `yaml:"secondaryNetworks,omitempty" json:"secondaryNetworks,omitempty" doc:"configuration for secondary networks"` ManagedCNI []string `yaml:"managedCNI,omitempty" json:"managedCNI,omitempty" doc:"a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn"` + KafkaCacheConfig *IngestKafka `yaml:"kafkaCacheConfig,omitempty" json:"kafkaCacheConfig,omitempty" doc:"Kafka config for informers cache (optional)"` } type TransformNetworkOperationEnum string diff --git a/pkg/config/informers_config.go b/pkg/config/informers_config.go new file mode 100644 index 000000000..aa47dfde0 --- /dev/null +++ b/pkg/config/informers_config.go @@ -0,0 +1,11 @@ +package config + +import "github.com/netobserv/flowlogs-pipeline/pkg/api" + +type Informers struct { + KubeConfig api.NetworkTransformKubeConfig `yaml:"kubeConfig"` + KafkaConfig api.EncodeKafka `yaml:"kafkaConfig"` + MetricsSettings MetricsSettings `yaml:"metricsSettings"` + PProfPort int32 `yaml:"pprofPort"` + LogLevel string `yaml:"logLevel"` +} diff --git a/pkg/pipeline/transform/kubernetes/datasource/datasource.go b/pkg/pipeline/transform/kubernetes/datasource/datasource.go index 6ba7ccf31..7535353eb 100644 --- a/pkg/pipeline/transform/kubernetes/datasource/datasource.go +++ b/pkg/pipeline/transform/kubernetes/datasource/datasource.go @@ -1,28 +1,11 @@ package datasource import ( - "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/cni" - "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/informers" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/model" ) -type Datasource struct { - Informers informers.InformersInterface -} - -func NewInformerDatasource(kubeconfig string, infConfig informers.Config, opMetrics *operational.Metrics) (*Datasource, error) { - inf := &informers.Informers{} - if err := inf.InitFromConfig(kubeconfig, infConfig, opMetrics); err != nil { - return nil, err - } - return &Datasource{Informers: inf}, nil -} - -func (d *Datasource) IndexLookup(potentialKeys []cni.SecondaryNetKey, ip string) *model.ResourceMetaData { - return d.Informers.IndexLookup(potentialKeys, ip) -} - -func (d *Datasource) GetNodeByName(name string) (*model.ResourceMetaData, error) { - return d.Informers.GetNodeByName(name) +type Datasource interface { + IndexLookup([]cni.SecondaryNetKey, string) *model.ResourceMetaData + GetNodeByName(string) (*model.ResourceMetaData, error) } diff --git a/pkg/pipeline/transform/kubernetes/datasource/informers_ds.go b/pkg/pipeline/transform/kubernetes/datasource/informers_ds.go new file mode 100644 index 000000000..6d6306a0e --- /dev/null +++ b/pkg/pipeline/transform/kubernetes/datasource/informers_ds.go @@ -0,0 +1,15 @@ +package datasource + +import ( + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/informers" +) + +func NewInformerDatasource(kubeconfig string, infConfig informers.Config, kafkaConfig *api.EncodeKafka, opMetrics *operational.Metrics) (Datasource, error) { + inf := &informers.Informers{} + if err := inf.InitFromConfig(kubeconfig, infConfig, kafkaConfig, opMetrics); err != nil { + return nil, err + } + return inf, nil +} diff --git a/pkg/pipeline/transform/kubernetes/datasource/kafka_ds.go b/pkg/pipeline/transform/kubernetes/datasource/kafka_ds.go new file mode 100644 index 000000000..1abeb50ee --- /dev/null +++ b/pkg/pipeline/transform/kubernetes/datasource/kafka_ds.go @@ -0,0 +1,120 @@ +package datasource + +import ( + "context" + "sync" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/kafka" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/cni" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/model" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" + + "github.com/sirupsen/logrus" +) + +var log = logrus.WithField("component", "transform.Network.Kubernetes.KafkaDS") + +type KafkaDS struct { + Datasource + // We use map+mutex rather than sync.Map for better performance on writes, since the lock is acquired once to perform several writes. + kafkaIPCacheMut sync.RWMutex + kafkaIPCache map[string]model.ResourceMetaData + kafkaNodeNameCacheMut sync.RWMutex + kafkaNodeNameCache map[string]model.ResourceMetaData +} + +func NewKafkaCacheDatasource(kafkaConfig *api.IngestKafka) (Datasource, error) { + // Init Kafka reader + log.Debug("Initializing Kafka reader datasource") + kafkaReader, _, err := kafka.NewReader(kafkaConfig) + if err != nil { + return nil, err + } + + d := KafkaDS{ + kafkaIPCache: make(map[string]model.ResourceMetaData), + kafkaNodeNameCache: make(map[string]model.ResourceMetaData), + } + exitChan := utils.ExitChannel() + go func() { + for { + select { + case <-exitChan: + log.Info("gracefully exiting") + return + default: + } + // Blocking + msg, err := kafkaReader.ReadMessage(context.Background()) + if err != nil { + log.Errorln(err) + continue + } + if len(msg.Value) > 0 { + content, err := model.MessageFromBytes(msg.Value) + if err != nil { + log.Errorln(err) + continue + } + log.Debugf("Kafka reader: got message %v", content) + d.updateCache(content) + } else { + log.Debug("Kafka reader: empty message") + } + } + }() + + return &d, nil +} + +func (d *KafkaDS) updateCache(msg *model.KafkaCacheMessage) { + // TODO: manage secondary network keys + switch msg.Operation { + case model.OperationAdd, model.OperationUpdate: + d.kafkaIPCacheMut.Lock() + for _, ip := range msg.Resource.IPs { + d.kafkaIPCache[ip] = *msg.Resource + } + d.kafkaIPCacheMut.Unlock() + if msg.Resource.Kind == model.KindNode { + d.kafkaNodeNameCacheMut.Lock() + d.kafkaNodeNameCache[msg.Resource.Name] = *msg.Resource + d.kafkaNodeNameCacheMut.Unlock() + } + case model.OperationDelete: + d.kafkaIPCacheMut.Lock() + for _, ip := range msg.Resource.IPs { + delete(d.kafkaIPCache, ip) + } + d.kafkaIPCacheMut.Unlock() + if msg.Resource.Kind == model.KindNode { + d.kafkaNodeNameCacheMut.Lock() + delete(d.kafkaNodeNameCache, msg.Resource.Name) + d.kafkaNodeNameCacheMut.Unlock() + } + } +} + +func (d *KafkaDS) IndexLookup(potentialKeys []cni.SecondaryNetKey, ip string) *model.ResourceMetaData { + d.kafkaIPCacheMut.RLock() + defer d.kafkaIPCacheMut.RUnlock() + for _, key := range potentialKeys { + if obj, ok := d.kafkaIPCache[key.Key]; ok { + return &obj + } + } + if obj, ok := d.kafkaIPCache[ip]; ok { + return &obj + } + return nil +} + +func (d *KafkaDS) GetNodeByName(name string) (*model.ResourceMetaData, error) { + d.kafkaNodeNameCacheMut.RLock() + defer d.kafkaNodeNameCacheMut.RUnlock() + if obj, ok := d.kafkaNodeNameCache[name]; ok { + return &obj, nil + } + return nil, nil +} diff --git a/pkg/pipeline/transform/kubernetes/datasource/kafka_ds_test.go b/pkg/pipeline/transform/kubernetes/datasource/kafka_ds_test.go new file mode 100644 index 000000000..7b390ef53 --- /dev/null +++ b/pkg/pipeline/transform/kubernetes/datasource/kafka_ds_test.go @@ -0,0 +1,123 @@ +package datasource + +import ( + "testing" + + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/model" + "github.com/stretchr/testify/require" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var ( + node1 = model.ResourceMetaData{ + ObjectMeta: v1.ObjectMeta{ + Name: "node-1", + }, + Kind: model.KindNode, + IPs: []string{"1.2.3.4", "5.6.7.8"}, + } + pod1 = model.ResourceMetaData{ + ObjectMeta: v1.ObjectMeta{ + Name: "pod-1", + Namespace: "ns-1", + }, + Kind: model.KindPod, + IPs: []string{"10.0.0.1", "10.0.0.2"}, + } + svc1 = model.ResourceMetaData{ + ObjectMeta: v1.ObjectMeta{ + Name: "svc-1", + Namespace: "ns-1", + }, + Kind: model.KindService, + IPs: []string{"192.168.0.1"}, + } +) + +func TestCacheUpdate(t *testing.T) { + ds := KafkaDS{ + kafkaIPCache: make(map[string]model.ResourceMetaData), + kafkaNodeNameCache: make(map[string]model.ResourceMetaData), + } + var err error + + res := ds.IndexLookup(nil, "1.2.3.4") + require.Nil(t, res) + res, err = ds.GetNodeByName("node-1") + require.NoError(t, err) + require.Nil(t, res) + + ds.updateCache(&model.KafkaCacheMessage{ + Operation: model.OperationAdd, + Resource: &node1, + }) + + ds.updateCache(&model.KafkaCacheMessage{ + Operation: model.OperationAdd, + Resource: &pod1, + }) + + ds.updateCache(&model.KafkaCacheMessage{ + Operation: model.OperationAdd, + Resource: &svc1, + }) + + res = ds.IndexLookup(nil, "1.2.3.4") + require.Equal(t, node1, *res) + res, err = ds.GetNodeByName("node-1") + require.NoError(t, err) + require.Equal(t, node1, *res) + + res = ds.IndexLookup(nil, "5.6.7.8") + require.Equal(t, node1, *res) + + res = ds.IndexLookup(nil, "10.0.0.1") + require.Equal(t, pod1, *res) + + res = ds.IndexLookup(nil, "192.168.0.1") + require.Equal(t, svc1, *res) + + svc2 := svc1 + svc2.Labels = map[string]string{"label": "value"} + ds.updateCache(&model.KafkaCacheMessage{ + Operation: model.OperationUpdate, + Resource: &svc2, + }) + ds.updateCache(&model.KafkaCacheMessage{ + Operation: model.OperationDelete, + Resource: &node1, + }) + + res = ds.IndexLookup(nil, "192.168.0.1") + require.Equal(t, map[string]string{"label": "value"}, res.Labels) + + res = ds.IndexLookup(nil, "1.2.3.4") + require.Nil(t, res) + res, err = ds.GetNodeByName("node-1") + require.NoError(t, err) + require.Nil(t, res) +} + +func BenchmarkPromEncode(b *testing.B) { + ds := KafkaDS{ + kafkaIPCache: make(map[string]model.ResourceMetaData), + kafkaNodeNameCache: make(map[string]model.ResourceMetaData), + } + + for i := 0; i < b.N; i++ { + ds.updateCache(&model.KafkaCacheMessage{ + Operation: model.OperationAdd, + Resource: &node1, + }) + + ds.updateCache(&model.KafkaCacheMessage{ + Operation: model.OperationAdd, + Resource: &pod1, + }) + + ds.updateCache(&model.KafkaCacheMessage{ + Operation: model.OperationAdd, + Resource: &svc1, + }) + } +} diff --git a/pkg/pipeline/transform/kubernetes/enrich.go b/pkg/pipeline/transform/kubernetes/enrich.go index 56561167f..bb7f41042 100644 --- a/pkg/pipeline/transform/kubernetes/enrich.go +++ b/pkg/pipeline/transform/kubernetes/enrich.go @@ -12,24 +12,30 @@ import ( "github.com/sirupsen/logrus" ) -var ds *datasource.Datasource +var ds datasource.Datasource var infConfig informers.Config // For testing -func MockInformers() { - infConfig = informers.NewConfig(api.NetworkTransformKubeConfig{}) - ds = &datasource.Datasource{Informers: informers.NewInformersMock()} +func ResetGlobals(newDS datasource.Datasource, newInfConfig informers.Config) { + ds = newDS + infConfig = newInfConfig } -func InitInformerDatasource(config api.NetworkTransformKubeConfig, opMetrics *operational.Metrics) error { +func InitInformerDatasource(config api.NetworkTransformKubeConfig, kafkaConfig *api.EncodeKafka, opMetrics *operational.Metrics) error { var err error infConfig = informers.NewConfig(config) if ds == nil { - ds, err = datasource.NewInformerDatasource(config.ConfigPath, infConfig, opMetrics) + ds, err = datasource.NewInformerDatasource(config.ConfigPath, infConfig, kafkaConfig, opMetrics) } return err } +func InitKafkaCacheDatasource(kafkaConfig *api.IngestKafka) error { + var err error + ds, err = datasource.NewKafkaCacheDatasource(kafkaConfig) + return err +} + func Enrich(outputEntry config.GenericMap, rule *api.K8sRule) { ip, ok := outputEntry.LookupString(rule.IPField) if !ok { diff --git a/pkg/pipeline/transform/kubernetes/enrich_test.go b/pkg/pipeline/transform/kubernetes/enrich_test.go index 8ad81e409..998a1ac77 100644 --- a/pkg/pipeline/transform/kubernetes/enrich_test.go +++ b/pkg/pipeline/transform/kubernetes/enrich_test.go @@ -5,7 +5,6 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" - "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/datasource" inf "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/informers" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/model" "github.com/stretchr/testify/assert" @@ -104,8 +103,7 @@ var rules = api.NetworkTransformRules{ func setupStubs(ipInfo, customKeysInfo, nodes map[string]*model.ResourceMetaData) { cfg, informers := inf.SetupStubs(ipInfo, customKeysInfo, nodes) - ds = &datasource.Datasource{Informers: informers} - infConfig = cfg + ResetGlobals(informers, cfg) } func TestEnrich(t *testing.T) { diff --git a/pkg/pipeline/transform/kubernetes/informers/informers.go b/pkg/pipeline/transform/kubernetes/informers/informers.go index a56455a24..6cd2a09db 100644 --- a/pkg/pipeline/transform/kubernetes/informers/informers.go +++ b/pkg/pipeline/transform/kubernetes/informers/informers.go @@ -22,13 +22,15 @@ import ( "net" "time" + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/kafka" "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/cni" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/model" "github.com/netobserv/flowlogs-pipeline/pkg/utils" - "github.com/sirupsen/logrus" "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -375,7 +377,7 @@ func (k *Informers) initReplicaSetInformer(informerFactory metadatainformer.Shar return nil } -func (k *Informers) InitFromConfig(kubeconfig string, infConfig Config, opMetrics *operational.Metrics) error { +func (k *Informers) InitFromConfig(kubeconfig string, infConfig Config, kafkaConfig *api.EncodeKafka, opMetrics *operational.Metrics) error { // Initialization variables k.stopChan = make(chan struct{}) k.mdStopChan = make(chan struct{}) @@ -396,7 +398,7 @@ func (k *Informers) InitFromConfig(kubeconfig string, infConfig Config, opMetric } k.indexerHitMetric = opMetrics.CreateIndexerHitCounter() - err = k.initInformers(kubeClient, metaKubeClient, infConfig) + err = k.initInformers(kubeClient, metaKubeClient, infConfig, kafkaConfig) if err != nil { return err } @@ -404,7 +406,7 @@ func (k *Informers) InitFromConfig(kubeconfig string, infConfig Config, opMetric return nil } -func (k *Informers) initInformers(client kubernetes.Interface, metaClient metadata.Interface, cfg Config) error { +func (k *Informers) initInformers(client kubernetes.Interface, metaClient metadata.Interface, cfg Config, kafkaConfig *api.EncodeKafka) error { informerFactory := inf.NewSharedInformerFactory(client, syncTime) metadataInformerFactory := metadatainformer.NewSharedInformerFactory(metaClient, syncTime) err := k.initNodeInformer(informerFactory, cfg) @@ -424,21 +426,39 @@ func (k *Informers) initInformers(client kubernetes.Interface, metaClient metada return err } - // Informers expose an indexer - log.Debugf("adding indexers") - byIP := cache.Indexers{IndexIP: ipIndexer} - byIPAndCustom := cache.Indexers{ - IndexIP: ipIndexer, - IndexCustom: customKeyIndexer, - } - if err := k.nodes.AddIndexers(byIP); err != nil { - return fmt.Errorf("can't add indexers to Nodes informer: %w", err) - } - if err := k.pods.AddIndexers(byIPAndCustom); err != nil { - return fmt.Errorf("can't add indexers to Pods informer: %w", err) - } - if err := k.services.AddIndexers(byIP); err != nil { - return fmt.Errorf("can't add indexers to Services informer: %w", err) + if kafkaConfig != nil { + log.Debugf("adding event handlers for Kafka") + kafkaWriter, err := kafka.NewWriter(kafkaConfig) + if err != nil { + return err + } + // Informers will publish updates to Kafka + for _, inf := range []cache.SharedIndexInformer{k.nodes, k.pods, k.services} { + // Note that the update handler is called whenever the resource was updated, even if the update doesn't affect + // the transformed data (model.ResourceMetaData). We may want to further optimize this later. On the flip side, this helps + // to keep everything in sync in case of missed events. + _, err := inf.AddEventHandler(getKafkaEventHandlers(kafkaWriter)) + if err != nil { + return err + } + } + } else { + // Informers expose an indexer + log.Debugf("adding indexers") + byIP := cache.Indexers{IndexIP: ipIndexer} + byIPAndCustom := cache.Indexers{ + IndexIP: ipIndexer, + IndexCustom: customKeyIndexer, + } + if err := k.nodes.AddIndexers(byIP); err != nil { + return fmt.Errorf("can't add indexers to Nodes informer: %w", err) + } + if err := k.pods.AddIndexers(byIPAndCustom); err != nil { + return fmt.Errorf("can't add indexers to Pods informer: %w", err) + } + if err := k.services.AddIndexers(byIP); err != nil { + return fmt.Errorf("can't add indexers to Services informer: %w", err) + } } log.Debugf("starting kubernetes informers, waiting for synchronization") diff --git a/pkg/pipeline/transform/kubernetes/informers/kafka_cache_writer.go b/pkg/pipeline/transform/kubernetes/informers/kafka_cache_writer.go new file mode 100644 index 000000000..6b5f38ba3 --- /dev/null +++ b/pkg/pipeline/transform/kubernetes/informers/kafka_cache_writer.go @@ -0,0 +1,52 @@ +package informers + +import ( + "context" + + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/model" + kafkago "github.com/segmentio/kafka-go" + "k8s.io/client-go/tools/cache" +) + +func getKafkaEventHandlers(kafka *kafkago.Writer) cache.ResourceEventHandler { + return &cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + if res, ok := obj.(*model.ResourceMetaData); ok { + publish(&model.KafkaCacheMessage{ + Operation: model.OperationAdd, + Resource: res, + }, kafka) + } + }, + UpdateFunc: func(_, new any) { + if res, ok := new.(*model.ResourceMetaData); ok { + publish(&model.KafkaCacheMessage{ + Operation: model.OperationUpdate, + Resource: res, + }, kafka) + } + }, + DeleteFunc: func(obj any) { + if res, ok := obj.(*model.ResourceMetaData); ok { + publish(&model.KafkaCacheMessage{ + Operation: model.OperationDelete, + Resource: res, + }, kafka) + } + }, + } +} + +func publish(content *model.KafkaCacheMessage, kafka *kafkago.Writer) { + log.Debugf("Publishing to Kafka: %v", content.Resource) + b, err := content.ToBytes() + if err != nil { + log.Errorf("kafka publish, encoding error: %v", err) + return + } + msg := kafkago.Message{Value: b} + err = kafka.WriteMessages(context.Background(), msg) + if err != nil { + log.Errorf("kafka publish, write error: %v", err) + } +} diff --git a/pkg/pipeline/transform/kubernetes/model/model.go b/pkg/pipeline/transform/kubernetes/model/model.go index 92bc523bd..96fb93f7d 100644 --- a/pkg/pipeline/transform/kubernetes/model/model.go +++ b/pkg/pipeline/transform/kubernetes/model/model.go @@ -1,6 +1,9 @@ package model import ( + "bytes" + "encoding/gob" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -27,3 +30,35 @@ type ResourceMetaData struct { IPs []string SecondaryNetKeys []string } + +type Operation string + +const ( + OperationAdd Operation = "add" + OperationDelete Operation = "delete" + OperationUpdate Operation = "update" +) + +type KafkaCacheMessage struct { + Operation Operation + Resource *ResourceMetaData +} + +func (i *KafkaCacheMessage) ToBytes() ([]byte, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + if err := enc.Encode(i); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func MessageFromBytes(b []byte) (*KafkaCacheMessage, error) { + var msg KafkaCacheMessage + buf := bytes.NewReader(b) + dec := gob.NewDecoder(buf) + if err := dec.Decode(&msg); err != nil { + return nil, err + } + return &msg, nil +} diff --git a/pkg/pipeline/transform/kubernetes/model/model_test.go b/pkg/pipeline/transform/kubernetes/model/model_test.go new file mode 100644 index 000000000..c5599921f --- /dev/null +++ b/pkg/pipeline/transform/kubernetes/model/model_test.go @@ -0,0 +1,32 @@ +package model + +import ( + "testing" + + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestEncodeDecode(t *testing.T) { + msg := KafkaCacheMessage{ + Operation: OperationAdd, + Resource: &ResourceMetaData{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Labels: map[string]string{ + "label-1": "value-1", + }, + }, + Kind: KindNode, + IPs: []string{"1.2.3.4", "4.5.6.7"}, + }, + } + + b, err := msg.ToBytes() + require.NoError(t, err) + + decoded, err := MessageFromBytes(b) + require.NoError(t, err) + + require.Equal(t, msg, *decoded) +} diff --git a/pkg/pipeline/transform/transform_network.go b/pkg/pipeline/transform/transform_network.go index 510d83444..6826c4bb3 100644 --- a/pkg/pipeline/transform/transform_network.go +++ b/pkg/pipeline/transform/transform_network.go @@ -214,9 +214,16 @@ func NewTransformNetwork(params config.StageParam, opMetrics *operational.Metric } if needToInitKubeData { - err := kubernetes.InitInformerDatasource(jsonNetworkTransform.KubeConfig, opMetrics) - if err != nil { - return nil, err + if jsonNetworkTransform.KubeConfig.KafkaCacheConfig != nil { + // Get kube data from Kafka rather than informers + if err := kubernetes.InitKafkaCacheDatasource(jsonNetworkTransform.KubeConfig.KafkaCacheConfig); err != nil { + return nil, err + } + } else { + // Init informers + if err := kubernetes.InitInformerDatasource(jsonNetworkTransform.KubeConfig, nil, opMetrics); err != nil { + return nil, err + } } } diff --git a/pkg/pipeline/utils/exit.go b/pkg/pipeline/utils/exit.go index 7bbfb7cc2..667026a9d 100644 --- a/pkg/pipeline/utils/exit.go +++ b/pkg/pipeline/utils/exit.go @@ -42,8 +42,7 @@ func CloseExitChannel() { close(exitChannel) } -func SetupElegantExit() { - logrus.Debugf("entering SetupElegantExit") +func SetupElegantExit() (stopCh <-chan struct{}) { // handle elegant exit; create support for channels of go routines that want to exit cleanly exitChannel = make(chan struct{}) exitSigChan := make(chan os.Signal, 1) @@ -54,7 +53,6 @@ func SetupElegantExit() { sig := <-exitSigChan logrus.Debugf("received exit signal = %v", sig) close(exitChannel) - logrus.Debugf("exiting SetupElegantExit go function") }() - logrus.Debugf("exiting SetupElegantExit") + return exitChannel }