diff --git a/contrib/kubernetes/flowlogs-pipeline.conf.yaml b/contrib/kubernetes/flowlogs-pipeline.conf.yaml index e09e62d9e..3a6b6f804 100644 --- a/contrib/kubernetes/flowlogs-pipeline.conf.yaml +++ b/contrib/kubernetes/flowlogs-pipeline.conf.yaml @@ -20,8 +20,8 @@ pipeline: parameters: - name: ingest_collector ingest: - type: collector - collector: + type: ipfix + ipfix: hostName: 0.0.0.0 port: 2055 portLegacy: 2056 diff --git a/contrib/kubernetes/ipfix-collector-stdout.yaml b/contrib/kubernetes/ipfix-collector-stdout.yaml index d7211b006..6093025ca 100644 --- a/contrib/kubernetes/ipfix-collector-stdout.yaml +++ b/contrib/kubernetes/ipfix-collector-stdout.yaml @@ -1,41 +1,32 @@ -apiVersion: apps/v1 -kind: Deployment +apiVersion: v1 +kind: Pod metadata: - name: flowlogs-pipeline + name: flp-ipfix-stdout labels: - app: flowlogs-pipeline + app: flp-ipfix-stdout spec: - replicas: 1 - selector: - matchLabels: - app: flowlogs-pipeline - template: - metadata: - labels: - app: flowlogs-pipeline - spec: - containers: - - name: flowlogs-pipeline - image: quay.io/netobserv/flowlogs-pipeline:main - args: - - "--config=/etc/flowlogs-pipeline/config.yaml" - ports: - - containerPort: 2055 - imagePullPolicy: IfNotPresent - volumeMounts: - - name: configuration - mountPath: "/etc/flowlogs-pipeline/" - volumes: + containers: + - name: flp-ipfix-stdout + image: quay.io/jotak/flowlogs-pipeline:main + imagePullPolicy: Always + args: + - "--config=/etc/flowlogs-pipeline/config.yaml" + ports: + - containerPort: 2055 + volumeMounts: - name: configuration - configMap: - name: flp-config + mountPath: "/etc/flowlogs-pipeline/" + volumes: + - name: configuration + configMap: + name: flp-ipfix-stdout-config --- apiVersion: v1 kind: Service metadata: - name: flowlogs-pipeline + name: flp-ipfix-stdout labels: - app: flowlogs-pipeline + app: flp-ipfix-stdout spec: ports: - port: 2055 @@ -43,12 +34,12 @@ spec: protocol: UDP name: ipfix selector: - app: flowlogs-pipeline + app: flp-ipfix-stdout --- apiVersion: v1 kind: ConfigMap metadata: - name: flp-config + name: flp-ipfix-stdout-config data: config.yaml: | log-level: info @@ -59,10 +50,26 @@ data: parameters: - name: ingest ingest: - type: collector - collector: - hostName: 0.0.0.0 + type: ipfix + ipfix: port: 2055 + mapping: + - penprovided: true + pen: 2 + field: 7733 + destination: CustomBytes_1 + - penprovided: true + pen: 2 + field: 7734 + destination: CustomBytes_2 + - penprovided: true + pen: 2 + field: 7735 + destination: CustomBytes_3 + - penprovided: true + pen: 2 + field: 7736 + destination: CustomBytes_4 - name: write write: type: stdout diff --git a/contrib/local/ipfix-collector-stdout.yaml b/contrib/local/ipfix-collector-stdout.yaml new file mode 100644 index 000000000..0622b7d26 --- /dev/null +++ b/contrib/local/ipfix-collector-stdout.yaml @@ -0,0 +1,16 @@ +# Use it with: +# ./flowlogs-pipeline --config=contrib/local/ipfix-collector-stdout.yaml +log-level: info +pipeline: + - name: ingest + - name: write + follows: ingest +parameters: + - name: ingest + ingest: + type: ipfix + ipfix: + port: 2055 + - name: write + write: + type: stdout diff --git a/docs/api.md b/docs/api.md index bcc526ff2..824cc338a 100644 --- a/docs/api.md +++ b/docs/api.md @@ -81,17 +81,17 @@ Following is the supported API format for S3 encode: secure: true for https, false for http (default: false) objectHeaderParameters: parameters to include in object header (key/value pairs) -## Ingest collector API +## Ingest NetFlow/IPFIX API Following is the supported API format for the NetFlow / IPFIX collector:
- collector: - hostName: the hostname to listen on - port: the port number to listen on, for IPFIX/NetFlow v9. Omit or set to 0 to disable IPFIX/NetFlow v9 ingestion + ipfix: + hostName: the hostname to listen on; defaults to 0.0.0.0 + port: the port number to listen on, for IPFIX/NetFlow v9. Omit or set to 0 to disable IPFIX/NetFlow v9 ingestion. If both port and portLegacy are omitted, defaults to 2055 portLegacy: the port number to listen on, for legacy NetFlow v5. Omit or set to 0 to disable NetFlow v5 ingestion - batchMaxLen: the number of accumulated flows before being forwarded for processing workers: the number of netflow/ipfix decoding workers sockets: the number of listening sockets + mapping: custom field mapping## Ingest Kafka API Following is the supported API format for the kafka ingest: @@ -340,6 +340,7 @@ Following is the supported API format for writing to an IPFIX collector: targetPort: IPFIX Collector host target port transport: Transport protocol (tcp/udp) to be used for the IPFIX connection enterpriseId: Enterprise ID for exporting transformations + tplSendInterval: Interval for resending templates to the collector (default: 1m) ## Aggregate metrics API Following is the supported API format for specifying metrics aggregations: diff --git a/go.mod b/go.mod index 996d6680e..e480571be 100644 --- a/go.mod +++ b/go.mod @@ -51,12 +51,11 @@ require ( sigs.k8s.io/e2e-framework v0.6.0 ) -require github.com/cenkalti/backoff/v5 v5.0.2 // indirect - require ( github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/cenkalti/backoff/v5 v5.0.2 // indirect github.com/cenkalti/hub v1.0.2 // indirect github.com/cenkalti/rpc2 v0.0.0-20210604223624-c1acbc6ec984 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -170,3 +169,5 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.7.0 // indirect sigs.k8s.io/yaml v1.4.0 // indirect ) + +replace github.com/vmware/go-ipfix => github.com/jotak/go-ipfix v0.0.0-20250708115123-407c539ea101 diff --git a/go.sum b/go.sum index 08b4108b8..e2bf87417 100644 --- a/go.sum +++ b/go.sum @@ -180,6 +180,8 @@ github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFF github.com/josharian/native v1.0.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA= github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= +github.com/jotak/go-ipfix v0.0.0-20250708115123-407c539ea101 h1:tpaHjydMAy2MTukKIUAVK4xIFUpL12xuexA0FuTVpuo= +github.com/jotak/go-ipfix v0.0.0-20250708115123-407c539ea101/go.mod h1:GgFbcmEGqMQfA7jDC9UVLKAelNh2sy1jsxyV7Tor3Ig= github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= @@ -380,8 +382,6 @@ github.com/vishvananda/netns v0.0.5 h1:DfiHV+j8bA32MFM7bfEunvT8IAqQ/NzSJHtcmW5zd github.com/vishvananda/netns v0.0.5/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= github.com/vladimirvivien/gexe v0.5.0 h1:AWBVaYnrTsGYBktXvcO0DfWPeSiZxn6mnQ5nvL+A1/A= github.com/vladimirvivien/gexe v0.5.0/go.mod h1:3gjgTqE2c0VyHnU5UOIwk7gyNzZDGulPb/DJPgcw64E= -github.com/vmware/go-ipfix v0.15.0 h1:F/3BjFoODvCHEHYk36jy3TGmQzJ7rF2bC7ZG+c/lng8= -github.com/vmware/go-ipfix v0.15.0/go.mod h1:GgFbcmEGqMQfA7jDC9UVLKAelNh2sy1jsxyV7Tor3Ig= github.com/wlynxg/anet v0.0.3/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= diff --git a/hack/examples/docker-ipfix-config.yaml b/hack/examples/docker-ipfix-config.yaml index acfe2da73..c420675b0 100644 --- a/hack/examples/docker-ipfix-config.yaml +++ b/hack/examples/docker-ipfix-config.yaml @@ -19,8 +19,8 @@ pipeline: parameters: - name: ingest ingest: # use nflow generator to simulate flows: ./nflow-generator -t localhost -p 2055 - type: collector - collector: + type: ipfix + ipfix: hostName: localhost port: 4739 # Use this for IPFIX / netflow v9 portLegacy: 2055 # Use this for legacy v5 netflow diff --git a/pkg/api/api.go b/pkg/api/api.go index a1d1bac39..574d9f723 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -22,7 +22,7 @@ const ( FileLoopType = "file_loop" FileChunksType = "file_chunks" SyntheticType = "synthetic" - CollectorType = "collector" + CollectorType = "collector" // deprecated: use 'ipfix' instead StdinType = "stdin" GRPCType = "grpc" FakeType = "fake" @@ -53,7 +53,7 @@ type API struct { PromEncode PromEncode `yaml:"prom" doc:"## Prometheus encode API\nFollowing is the supported API format for prometheus encode:\n"` KafkaEncode EncodeKafka `yaml:"kafka" doc:"## Kafka encode API\nFollowing is the supported API format for kafka encode:\n"` S3Encode EncodeS3 `yaml:"s3" doc:"## S3 encode API\nFollowing is the supported API format for S3 encode:\n"` - IngestCollector IngestCollector `yaml:"collector" doc:"## Ingest collector API\nFollowing is the supported API format for the NetFlow / IPFIX collector:\n"` + IngestIpfix IngestIpfix `yaml:"ipfix" doc:"## Ingest NetFlow/IPFIX API\nFollowing is the supported API format for the NetFlow / IPFIX collector:\n"` IngestKafka IngestKafka `yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"` IngestGRPCProto IngestGRPCProto `yaml:"grpc" doc:"## Ingest GRPC from Network Observability eBPF Agent\nFollowing is the supported API format for the Network Observability eBPF ingest:\n"` IngestStdin IngestStdin `yaml:"stdin" doc:"## Ingest Standard Input\nFollowing is the supported API format for the standard input ingest:\n"` diff --git a/pkg/api/ingest_collector.go b/pkg/api/ingest_collector.go deleted file mode 100644 index 090b3fdb0..000000000 --- a/pkg/api/ingest_collector.go +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (C) 2022 IBM, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package api - -type IngestCollector struct { - HostName string `yaml:"hostName,omitempty" json:"hostName,omitempty" doc:"the hostname to listen on"` - Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"the port number to listen on, for IPFIX/NetFlow v9. Omit or set to 0 to disable IPFIX/NetFlow v9 ingestion"` - PortLegacy int `yaml:"portLegacy,omitempty" json:"portLegacy,omitempty" doc:"the port number to listen on, for legacy NetFlow v5. Omit or set to 0 to disable NetFlow v5 ingestion"` - BatchMaxLen int `yaml:"batchMaxLen,omitempty" json:"batchMaxLen,omitempty" doc:"the number of accumulated flows before being forwarded for processing"` - Workers int `yaml:"workers,omitempty" json:"workers,omitempty" doc:"the number of netflow/ipfix decoding workers"` - Sockets int `yaml:"sockets,omitempty" json:"sockets,omitempty" doc:"the number of listening sockets"` -} diff --git a/pkg/api/ingest_ipfix.go b/pkg/api/ingest_ipfix.go new file mode 100644 index 000000000..6453c36db --- /dev/null +++ b/pkg/api/ingest_ipfix.go @@ -0,0 +1,64 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package api + +import ( + "fmt" + + "github.com/netsampler/goflow2/producer" +) + +type IngestIpfix struct { + HostName string `yaml:"hostName,omitempty" json:"hostName,omitempty" doc:"the hostname to listen on; defaults to 0.0.0.0"` + Port uint `yaml:"port,omitempty" json:"port,omitempty" doc:"the port number to listen on, for IPFIX/NetFlow v9. Omit or set to 0 to disable IPFIX/NetFlow v9 ingestion. If both port and portLegacy are omitted, defaults to 2055"` + PortLegacy uint `yaml:"portLegacy,omitempty" json:"portLegacy,omitempty" doc:"the port number to listen on, for legacy NetFlow v5. Omit or set to 0 to disable NetFlow v5 ingestion"` + Workers uint `yaml:"workers,omitempty" json:"workers,omitempty" doc:"the number of netflow/ipfix decoding workers"` + Sockets uint `yaml:"sockets,omitempty" json:"sockets,omitempty" doc:"the number of listening sockets"` + Mapping []producer.NetFlowMapField `yaml:"mapping,omitempty" json:"mapping,omitempty" doc:"custom field mapping"` +} + +func (i *IngestIpfix) SetDefaults() { + if i.HostName == "" { + i.HostName = "0.0.0.0" + } + if i.Port == 0 && i.PortLegacy == 0 { + i.Port = 2055 + } + if i.Workers == 0 { + i.Workers = 1 + } + if i.Sockets == 0 { + i.Sockets = 1 + } +} + +func (i *IngestIpfix) String() string { + hasMapping := "no" + if len(i.Mapping) > 0 { + hasMapping = "yes" + } + return fmt.Sprintf( + "hostname=%s, port=%d, portLegacy=%d, workers=%d, sockets=%d, mapping=%s", + i.HostName, + i.Port, + i.PortLegacy, + i.Workers, + i.Sockets, + hasMapping, + ) +} diff --git a/pkg/api/write_ipfix.go b/pkg/api/write_ipfix.go index 059952415..e3ea6064f 100644 --- a/pkg/api/write_ipfix.go +++ b/pkg/api/write_ipfix.go @@ -2,19 +2,24 @@ package api import ( "errors" + "time" ) type WriteIpfix struct { - TargetHost string `yaml:"targetHost,omitempty" json:"targetHost,omitempty" doc:"IPFIX Collector host target IP"` - TargetPort int `yaml:"targetPort,omitempty" json:"targetPort,omitempty" doc:"IPFIX Collector host target port"` - Transport string `yaml:"transport,omitempty" json:"transport,omitempty" doc:"Transport protocol (tcp/udp) to be used for the IPFIX connection"` - EnterpriseID int `yaml:"enterpriseId,omitempty" json:"EnterpriseId,omitempty" doc:"Enterprise ID for exporting transformations"` + TargetHost string `yaml:"targetHost,omitempty" json:"targetHost,omitempty" doc:"IPFIX Collector host target IP"` + TargetPort int `yaml:"targetPort,omitempty" json:"targetPort,omitempty" doc:"IPFIX Collector host target port"` + Transport string `yaml:"transport,omitempty" json:"transport,omitempty" doc:"Transport protocol (tcp/udp) to be used for the IPFIX connection"` + EnterpriseID int `yaml:"enterpriseId,omitempty" json:"enterpriseId,omitempty" doc:"Enterprise ID for exporting transformations"` + TplSendInterval Duration `yaml:"tplSendInterval,omitempty" json:"tplSendInterval,omitempty" doc:"Interval for resending templates to the collector (default: 1m)"` } func (w *WriteIpfix) SetDefaults() { if w.Transport == "" { w.Transport = "tcp" } + if w.TplSendInterval.Duration == 0 { + w.TplSendInterval.Duration = time.Minute + } } func (w *WriteIpfix) Validate() error { diff --git a/pkg/confgen/confgen_test.go b/pkg/confgen/confgen_test.go index a1ee1ba4a..1d420db6e 100644 --- a/pkg/confgen/confgen_test.go +++ b/pkg/confgen/confgen_test.go @@ -115,11 +115,11 @@ func Test_RunShortConfGen(t *testing.T) { ) // Expects ingest - require.Equal(t, &api.IngestCollector{ + require.Equal(t, &api.IngestIpfix{ HostName: "0.0.0.0", Port: 2155, PortLegacy: 2156, - }, out.Parameters[0].Ingest.Collector) + }, out.Parameters[0].Ingest.Ipfix) // Expects transform network require.Len(t, out.Parameters[1].Transform.Network.Rules, 1) @@ -207,11 +207,11 @@ func Test_RunConfGenNoAgg(t *testing.T) { ) // Expects ingest - require.Equal(t, &api.IngestCollector{ + require.Equal(t, &api.IngestIpfix{ HostName: "0.0.0.0", Port: 2155, PortLegacy: 2156, - }, out.Parameters[0].Ingest.Collector) + }, out.Parameters[0].Ingest.Ipfix) // Expects transform network require.Len(t, out.Parameters[1].Transform.Network.Rules, 1) @@ -295,11 +295,11 @@ func Test_RunLongConfGen(t *testing.T) { ) // Expects ingest - require.Equal(t, &api.IngestCollector{ + require.Equal(t, &api.IngestIpfix{ HostName: "0.0.0.0", Port: 2155, PortLegacy: 2156, - }, out.Parameters[0].Ingest.Collector) + }, out.Parameters[0].Ingest.Ipfix) // Expects transform generic require.Equal(t, api.ReplaceKeys, out.Parameters[1].Transform.Generic.Policy) diff --git a/pkg/confgen/config_test.go b/pkg/confgen/config_test.go index 02a446c9c..10b205a22 100644 --- a/pkg/confgen/config_test.go +++ b/pkg/confgen/config_test.go @@ -35,7 +35,7 @@ func expectedConfig() *Config { }, }, Ingest: config.Ingest{ - Collector: &api.IngestCollector{ + Collector: &api.IngestIpfix{ Port: 8888, }, }, diff --git a/pkg/confgen/flowlogs2metrics_config.go b/pkg/confgen/flowlogs2metrics_config.go index c31635cda..9faeda45a 100644 --- a/pkg/confgen/flowlogs2metrics_config.go +++ b/pkg/confgen/flowlogs2metrics_config.go @@ -72,12 +72,13 @@ func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() *config.ConfigFileStruct { }) } +// Deprecated: old function that only manages IPFIX ingestion func (cg *ConfGen) GenerateTruncatedConfig() []config.StageParam { parameters := make([]config.StageParam, len(cg.opts.GenerateStages)) for i, stage := range cg.opts.GenerateStages { switch stage { case "ingest": - parameters[i] = config.NewCollectorParams("ingest_collector", *cg.config.Ingest.Collector) + parameters[i] = config.NewIPFIXParams("ingest_collector", *cg.config.Ingest.Collector) case "transform_generic": parameters[i] = config.NewTransformGenericParams("transform_generic", *cg.config.Transform.Generic) case "transform_network": diff --git a/pkg/config/config.go b/pkg/config/config.go index d08b14715..9c0d4897a 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -102,7 +102,8 @@ type StageParam struct { type Ingest struct { Type string `yaml:"type" json:"type"` File *File `yaml:"file,omitempty" json:"file,omitempty"` - Collector *api.IngestCollector `yaml:"collector,omitempty" json:"collector,omitempty"` + Collector *api.IngestIpfix `yaml:"collector,omitempty" json:"collector,omitempty"` // deprecated: use 'ipfix' instead + Ipfix *api.IngestIpfix `yaml:"ipfix,omitempty" json:"ipfix,omitempty"` Kafka *api.IngestKafka `yaml:"kafka,omitempty" json:"kafka,omitempty"` GRPC *api.IngestGRPCProto `yaml:"grpc,omitempty" json:"grpc,omitempty"` Synthetic *api.IngestSynthetic `yaml:"synthetic,omitempty" json:"synthetic,omitempty"` diff --git a/pkg/config/pipeline_builder.go b/pkg/config/pipeline_builder.go index e28e95df5..9989fef9b 100644 --- a/pkg/config/pipeline_builder.go +++ b/pkg/config/pipeline_builder.go @@ -52,8 +52,11 @@ const PresetIngesterStage = "preset-ingester" // NewPipeline creates a new pipeline from an existing ingest func NewPipeline(name string, ingest *Ingest) (PipelineBuilderStage, error) { - if ingest.Collector != nil { - return NewCollectorPipeline(name, *ingest.Collector), nil + if ingest.Ipfix != nil { + return NewIPFIXPipeline(name, *ingest.Ipfix), nil + } + if ingest.Collector != nil { // for retro-compatibility + return NewIPFIXPipeline(name, *ingest.Collector), nil } if ingest.GRPC != nil { return NewGRPCPipeline(name, *ingest.GRPC), nil @@ -64,13 +67,13 @@ func NewPipeline(name string, ingest *Ingest) (PipelineBuilderStage, error) { return PipelineBuilderStage{}, errors.New("missing ingest params") } -// NewCollectorPipeline creates a new pipeline from an `IngestCollector` initial stage (listening for NetFlows / IPFIX) +// NewCollectorPipeline creates a new pipeline from an `IngestIpfix` initial stage (listening for NetFlows / IPFIX) // //nolint:golint,gocritic -func NewCollectorPipeline(name string, ingest api.IngestCollector) PipelineBuilderStage { +func NewIPFIXPipeline(name string, ingest api.IngestIpfix) PipelineBuilderStage { p := pipeline{ stages: []Stage{{Name: name}}, - config: []StageParam{NewCollectorParams(name, ingest)}, + config: []StageParam{NewIPFIXParams(name, ingest)}, } return PipelineBuilderStage{pipeline: &p, lastStage: name} } diff --git a/pkg/config/pipeline_builder_test.go b/pkg/config/pipeline_builder_test.go index deb1a77bc..b14fa7dc9 100644 --- a/pkg/config/pipeline_builder_test.go +++ b/pkg/config/pipeline_builder_test.go @@ -27,7 +27,7 @@ import ( ) func TestLokiPipeline(t *testing.T) { - pl := NewCollectorPipeline("ingest", api.IngestCollector{HostName: "127.0.0.1", Port: 9999}) + pl := NewIPFIXPipeline("ingest", api.IngestIpfix{HostName: "127.0.0.1", Port: 9999}) pl = pl.TransformNetwork("enrich", api.TransformNetwork{Rules: api.NetworkTransformRules{{ Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ @@ -54,7 +54,7 @@ func TestLokiPipeline(t *testing.T) { b, err = json.Marshal(params[0]) require.NoError(t, err) - require.Equal(t, `{"name":"ingest","ingest":{"type":"collector","collector":{"hostName":"127.0.0.1","port":9999}}}`, string(b)) + require.Equal(t, `{"name":"ingest","ingest":{"type":"ipfix","ipfix":{"hostName":"127.0.0.1","port":9999}}}`, string(b)) b, err = json.Marshal(params[1]) require.NoError(t, err) @@ -176,7 +176,7 @@ func TestKafkaPromPipeline(t *testing.T) { } func TestForkPipeline(t *testing.T) { - plFork := NewCollectorPipeline("ingest", api.IngestCollector{HostName: "127.0.0.1", Port: 9999}) + plFork := NewIPFIXPipeline("ingest", api.IngestIpfix{HostName: "127.0.0.1", Port: 9999}) plFork.WriteLoki("loki", api.WriteLoki{URL: "http://loki:3100/"}) plFork.WriteStdout("stdout", api.WriteStdout{}) stages := plFork.GetStages() @@ -191,7 +191,7 @@ func TestForkPipeline(t *testing.T) { b, err = json.Marshal(params[0]) require.NoError(t, err) - require.JSONEq(t, `{"name":"ingest","ingest":{"type":"collector","collector":{"hostName":"127.0.0.1","port":9999}}}`, string(b)) + require.JSONEq(t, `{"name":"ingest","ingest":{"type":"ipfix","ipfix":{"hostName":"127.0.0.1","port":9999}}}`, string(b)) b, err = json.Marshal(params[1]) require.NoError(t, err) @@ -203,7 +203,7 @@ func TestForkPipeline(t *testing.T) { } func TestIPFIXPipeline(t *testing.T) { - pl := NewCollectorPipeline("ingest", api.IngestCollector{HostName: "127.0.0.1", Port: 9999}) + pl := NewIPFIXPipeline("ingest", api.IngestIpfix{HostName: "127.0.0.1", Port: 9999}) pl = pl.TransformNetwork("enrich", api.TransformNetwork{Rules: api.NetworkTransformRules{{ Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ @@ -218,10 +218,11 @@ func TestIPFIXPipeline(t *testing.T) { }, }}}) pl = pl.WriteIpfix("ipfix", api.WriteIpfix{ - TargetHost: "ipfix-receiver-test", - TargetPort: 5999, - Transport: "tcp", - EnterpriseID: 1, + TargetHost: "ipfix-receiver-test", + TargetPort: 5999, + Transport: "tcp", + TplSendInterval: api.Duration{Duration: 40 * time.Second}, + EnterpriseID: 1, }) stages := pl.GetStages() require.Len(t, stages, 3) @@ -235,7 +236,7 @@ func TestIPFIXPipeline(t *testing.T) { b, err = json.Marshal(params[0]) require.NoError(t, err) - require.Equal(t, `{"name":"ingest","ingest":{"type":"collector","collector":{"hostName":"127.0.0.1","port":9999}}}`, string(b)) + require.Equal(t, `{"name":"ingest","ingest":{"type":"ipfix","ipfix":{"hostName":"127.0.0.1","port":9999}}}`, string(b)) b, err = json.Marshal(params[1]) require.NoError(t, err) @@ -243,5 +244,5 @@ func TestIPFIXPipeline(t *testing.T) { b, err = json.Marshal(params[2]) require.NoError(t, err) - require.JSONEq(t, `{"name":"ipfix","write":{"type":"ipfix","ipfix":{"targetHost":"ipfix-receiver-test","targetPort":5999,"transport":"tcp","EnterpriseId":1}}}`, string(b)) + require.JSONEq(t, `{"name":"ipfix","write":{"type":"ipfix","ipfix":{"targetHost":"ipfix-receiver-test","targetPort":5999,"tplSendInterval":"40s","transport":"tcp","enterpriseId":1}}}`, string(b)) } diff --git a/pkg/config/stage_params.go b/pkg/config/stage_params.go index 6c7809fe9..438d4d415 100644 --- a/pkg/config/stage_params.go +++ b/pkg/config/stage_params.go @@ -21,8 +21,8 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" ) -func NewCollectorParams(name string, ingest api.IngestCollector) StageParam { - return StageParam{Name: name, Ingest: &Ingest{Type: api.CollectorType, Collector: &ingest}} +func NewIPFIXParams(name string, ingest api.IngestIpfix) StageParam { + return StageParam{Name: name, Ingest: &Ingest{Type: api.IpfixType, Ipfix: &ingest}} } func NewGRPCParams(name string, ingest api.IngestGRPCProto) StageParam { diff --git a/pkg/pipeline/encode/encode_kafka_test.go b/pkg/pipeline/encode/encode_kafka_test.go index 3dc790fd8..f6e2f3817 100644 --- a/pkg/pipeline/encode/encode_kafka_test.go +++ b/pkg/pipeline/encode/encode_kafka_test.go @@ -88,7 +88,7 @@ func Test_EncodeKafka(t *testing.T) { func Test_TLSConfigEmpty(t *testing.T) { test.ResetPromRegistry() - pipeline := config.NewCollectorPipeline("ingest", api.IngestCollector{}) + pipeline := config.NewIPFIXPipeline("ingest", api.IngestIpfix{}) pipeline.EncodeKafka("encode-kafka", api.EncodeKafka{ Address: "any", Topic: "topic", @@ -103,7 +103,7 @@ func Test_TLSConfigCA(t *testing.T) { test.ResetPromRegistry() ca, cleanup := test.CreateCACert(t) defer cleanup() - pipeline := config.NewCollectorPipeline("ingest", api.IngestCollector{}) + pipeline := config.NewIPFIXPipeline("ingest", api.IngestIpfix{}) pipeline.EncodeKafka("encode-kafka", api.EncodeKafka{ Address: "any", Topic: "topic", @@ -124,7 +124,7 @@ func Test_MutualTLSConfig(t *testing.T) { test.ResetPromRegistry() ca, user, userKey, cleanup := test.CreateAllCerts(t) defer cleanup() - pipeline := config.NewCollectorPipeline("ingest", api.IngestCollector{}) + pipeline := config.NewIPFIXPipeline("ingest", api.IngestIpfix{}) pipeline.EncodeKafka("encode-kafka", api.EncodeKafka{ Address: "any", Topic: "topic", diff --git a/pkg/pipeline/ingest/ingest_collector.go b/pkg/pipeline/ingest/ingest_ipfix.go similarity index 61% rename from pkg/pipeline/ingest/ingest_collector.go rename to pkg/pipeline/ingest/ingest_ipfix.go index f6d4b246c..adf2a66a2 100644 --- a/pkg/pipeline/ingest/ingest_collector.go +++ b/pkg/pipeline/ingest/ingest_ipfix.go @@ -20,7 +20,6 @@ package ingest import ( "context" "encoding/binary" - "fmt" "net" ms "github.com/mitchellh/mapstructure" @@ -34,8 +33,9 @@ import ( goflowCommonFormat "github.com/netsampler/goflow2/format/common" _ "github.com/netsampler/goflow2/format/protobuf" // required for goflow protobuf goflowpb "github.com/netsampler/goflow2/pb" + "github.com/netsampler/goflow2/producer" "github.com/netsampler/goflow2/utils" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" ) @@ -43,15 +43,15 @@ const ( channelSize = 1000 ) -type ingestCollector struct { - hostname string - port int - portLegacy int - workers int - sockets int - in chan map[string]interface{} - exitChan <-chan struct{} - metrics *metrics +var ( + ilog = logrus.WithField("component", "ingest.Ipfix") +) + +type ingestIPFIX struct { + *api.IngestIpfix + in chan map[string]interface{} + exitChan <-chan struct{} + metrics *metrics } // TransportWrapper is an implementation of the goflow2 transport interface @@ -91,7 +91,7 @@ func (w *TransportWrapper) Send(_, data []byte) error { // temporary fix // A PR was submitted to log this error from goflow2: // https://github.com/netsampler/goflow2/pull/86 - log.Error(err) + ilog.Error(err) return err } renderedMsg, err := RenderMessage(&message) @@ -102,7 +102,7 @@ func (w *TransportWrapper) Send(_, data []byte) error { } // Ingest ingests entries from a network collector using goflow2 library (https://github.com/netsampler/goflow2) -func (c *ingestCollector) Ingest(out chan<- config.GenericMap) { +func (c *ingestIPFIX) Ingest(out chan<- config.GenericMap) { ctx := context.Background() c.metrics.createOutQueueLen(out) @@ -113,57 +113,67 @@ func (c *ingestCollector) Ingest(out chan<- config.GenericMap) { c.processLogLines(out) } -func (c *ingestCollector) initCollectorListener(ctx context.Context) { +func (c *ingestIPFIX) initCollectorListener(ctx context.Context) { transporter := NewWrapper(c.in) formatter, err := goflowFormat.FindFormat(ctx, "pb") if err != nil { - log.Fatal(err) + ilog.Fatal(err) } - if c.port > 0 { + if c.Port > 0 { // cf https://github.com/netsampler/goflow2/pull/49 tpl, err := templates.FindTemplateSystem(ctx, "memory") if err != nil { - log.Fatalf("goflow2 error: could not find memory template system: %v", err) + ilog.Fatalf("goflow2 error: could not find memory template system: %v", err) } defer tpl.Close(ctx) - log.Infof("listening for netflow on host %s, port = %d", c.hostname, c.port) - for i := 0; i < c.sockets; i++ { + ilog.Infof("listening for netflow on host %s, port = %d", c.HostName, c.Port) + for i := uint(0); i < c.Sockets; i++ { go func() { sNF := utils.NewStateNetFlow() sNF.Format = formatter sNF.Transport = transporter - sNF.Logger = log.StandardLogger() + sNF.Logger = logrus.StandardLogger() sNF.TemplateSystem = tpl - - err = sNF.FlowRoutine(c.workers, c.hostname, c.port, c.sockets > 1) - log.Fatal(err) + if len(c.Mapping) > 0 { + sNF.Config = &producer.ProducerConfig{ + IPFIX: producer.IPFIXProducerConfig{ + Mapping: c.Mapping, + }, + NetFlowV9: producer.NetFlowV9ProducerConfig{ + Mapping: c.Mapping, + }, + } + } + + err = sNF.FlowRoutine(int(c.Workers), c.HostName, int(c.Port), c.Sockets > 1) + ilog.Fatal(err) }() } } - if c.portLegacy > 0 { - log.Infof("listening for legacy netflow on host %s, port = %d", c.hostname, c.portLegacy) - for i := 0; i < c.sockets; i++ { + if c.PortLegacy > 0 { + ilog.Infof("listening for legacy netflow on host %s, port = %d", c.HostName, c.PortLegacy) + for i := uint(0); i < c.Sockets; i++ { go func() { sLegacyNF := utils.NewStateNFLegacy() sLegacyNF.Format = formatter sLegacyNF.Transport = transporter - sLegacyNF.Logger = log.StandardLogger() + sLegacyNF.Logger = logrus.StandardLogger() - err = sLegacyNF.FlowRoutine(c.workers, c.hostname, c.portLegacy, c.sockets > 1) - log.Fatal(err) + err = sLegacyNF.FlowRoutine(int(c.Workers), c.HostName, int(c.PortLegacy), c.Sockets > 1) + ilog.Fatal(err) }() } } } -func (c *ingestCollector) processLogLines(out chan<- config.GenericMap) { +func (c *ingestIPFIX) processLogLines(out chan<- config.GenericMap) { for { select { case <-c.exitChan: - log.Debugf("exiting ingestCollector because of signal") + ilog.Infof("Exit signal received, stop processing input") return case record := <-c.in: out <- record @@ -171,42 +181,26 @@ func (c *ingestCollector) processLogLines(out chan<- config.GenericMap) { } } -// NewIngestCollector create a new ingester -func NewIngestCollector(opMetrics *operational.Metrics, params config.StageParam) (Ingester, error) { - jsonIngestCollector := api.IngestCollector{} - if params.Ingest != nil && params.Ingest.Collector != nil { - jsonIngestCollector = *params.Ingest.Collector - } - if jsonIngestCollector.HostName == "" { - return nil, fmt.Errorf("ingest hostname not specified") - } - if jsonIngestCollector.Port == 0 && jsonIngestCollector.PortLegacy == 0 { - return nil, fmt.Errorf("no ingest port specified") - } - if jsonIngestCollector.Workers == 0 { - jsonIngestCollector.Workers = 1 - } - if jsonIngestCollector.Sockets == 0 { - jsonIngestCollector.Sockets = 1 +// NewIngestIPFIX create a new ingester +func NewIngestIPFIX(opMetrics *operational.Metrics, params config.StageParam) (Ingester, error) { + cfg := api.IngestIpfix{} + if params.Ingest != nil && params.Ingest.Ipfix != nil { + cfg = *params.Ingest.Ipfix + } else if params.Ingest != nil && params.Ingest.Collector != nil { + // For backward compatibility + cfg = *params.Ingest.Collector } - log.Infof("hostname = %s", jsonIngestCollector.HostName) - log.Infof("port = %d", jsonIngestCollector.Port) - log.Infof("portLegacy = %d", jsonIngestCollector.PortLegacy) - log.Infof("workers = %d", jsonIngestCollector.Workers) - log.Infof("sockets = %d", jsonIngestCollector.Sockets) + cfg.SetDefaults() + ilog.Infof("Ingest IPFIX config: [%s]", cfg.String()) in := make(chan map[string]interface{}, channelSize) metrics := newMetrics(opMetrics, params.Name, params.Ingest.Type, func() int { return len(in) }) - return &ingestCollector{ - hostname: jsonIngestCollector.HostName, - port: jsonIngestCollector.Port, - portLegacy: jsonIngestCollector.PortLegacy, - workers: jsonIngestCollector.Workers, - sockets: jsonIngestCollector.Sockets, - exitChan: pUtils.ExitChannel(), - in: in, - metrics: metrics, + return &ingestIPFIX{ + IngestIpfix: &cfg, + exitChan: pUtils.ExitChannel(), + in: in, + metrics: metrics, }, nil } diff --git a/pkg/pipeline/ingest/ingest_collector_test.go b/pkg/pipeline/ingest/ingest_ipfix_test.go similarity index 90% rename from pkg/pipeline/ingest/ingest_collector_test.go rename to pkg/pipeline/ingest/ingest_ipfix_test.go index 38b755d60..e2fdf155b 100644 --- a/pkg/pipeline/ingest/ingest_collector_test.go +++ b/pkg/pipeline/ingest/ingest_ipfix_test.go @@ -34,11 +34,10 @@ const timeout = 5 * time.Second func TestIngest(t *testing.T) { collectorPort, err := test.UDPPort() require.NoError(t, err) - stage := config.NewCollectorPipeline("ingest-ipfix", api.IngestCollector{ - HostName: "0.0.0.0", - Port: collectorPort, + stage := config.NewIPFIXPipeline("ingest-ipfix", api.IngestIpfix{ + Port: collectorPort, }) - ic, err := NewIngestCollector(operational.NewMetrics(&config.MetricsSettings{}), stage.GetStageParams()[0]) + ic, err := NewIngestIPFIX(operational.NewMetrics(&config.MetricsSettings{}), stage.GetStageParams()[0]) require.NoError(t, err) forwarded := make(chan config.GenericMap) diff --git a/pkg/pipeline/pipeline_builder.go b/pkg/pipeline/pipeline_builder.go index cfffc894f..a53630639 100644 --- a/pkg/pipeline/pipeline_builder.go +++ b/pkg/pipeline/pipeline_builder.go @@ -397,7 +397,10 @@ func getIngester(opMetrics *operational.Metrics, params config.StageParam) (inge case api.SyntheticType: ingester, err = ingest.NewIngestSynthetic(opMetrics, params) case api.CollectorType: - ingester, err = ingest.NewIngestCollector(opMetrics, params) + log.Warnf("Stage %s uses deprecated 'collector' API. It should be renamed 'ipfix', as 'collector' will be removed in a future release.", params.Name) + ingester, err = ingest.NewIngestIPFIX(opMetrics, params) + case api.IpfixType: + ingester, err = ingest.NewIngestIPFIX(opMetrics, params) case api.StdinType: ingester, err = ingest.NewIngestStdin(opMetrics, params) case api.KafkaType: diff --git a/pkg/pipeline/pipeline_builder_test.go b/pkg/pipeline/pipeline_builder_test.go index b227d4906..59e30aff2 100644 --- a/pkg/pipeline/pipeline_builder_test.go +++ b/pkg/pipeline/pipeline_builder_test.go @@ -2,6 +2,7 @@ package pipeline import ( "errors" + "reflect" "testing" "github.com/netobserv/flowlogs-pipeline/pkg/test" @@ -30,6 +31,27 @@ func TestConnectionVerification_Pass(t *testing.T) { assert.NoError(t, err) } +func TestPipelineIPFIXBackwardCompatible(t *testing.T) { + _, cfg := test.InitConfig(t, ` +pipeline: + - follows: ingest1 + name: write1 +parameters: + - name: ingest1 + ingest: + type: collector + - name: write1 + write: + type: none +`) + p, err := NewPipeline(cfg) + assert.NoError(t, err) + + assert.NotNil(t, p.pipelineStages[0].Ingester) + ty := reflect.TypeOf(p.pipelineStages[0].Ingester) + assert.Equal(t, "*ingest.ingestIPFIX", ty.String()) +} + func TestConnectionVerification(t *testing.T) { type testCase struct { description string diff --git a/pkg/pipeline/write/testnorace/write_ipfix_test.go b/pkg/pipeline/write/testnorace/write_ipfix_test.go index 7e0bcec7c..2a3e9a59c 100644 --- a/pkg/pipeline/write/testnorace/write_ipfix_test.go +++ b/pkg/pipeline/write/testnorace/write_ipfix_test.go @@ -10,9 +10,14 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write" + "github.com/netobserv/flowlogs-pipeline/pkg/test" + "github.com/netobserv/flowlogs-pipeline/pkg/utils" "github.com/netobserv/netobserv-ebpf-agent/pkg/decode" "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" + "github.com/netsampler/goflow2/producer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/vmware/go-ipfix/pkg/collector" @@ -25,7 +30,7 @@ import ( var ( startTime = time.Now() endTime = startTime.Add(7 * time.Second) - FullPBFlow = pbflow.Record{ + fullPBFlow = pbflow.Record{ Direction: pbflow.Direction_EGRESS, Bytes: 1024, DataLink: &pbflow.DataLink{ @@ -45,7 +50,7 @@ var ( EthProtocol: 2048, Packets: 3, Transport: &pbflow.Transport{ - Protocol: 17, + Protocol: 6, SrcPort: 23000, DstPort: 443, }, @@ -79,13 +84,57 @@ var ( }, }, } + + icmpPBFlow = pbflow.Record{ + Direction: pbflow.Direction_INGRESS, + Bytes: 1024, + DataLink: &pbflow.DataLink{ + DstMac: 0x112233445566, + SrcMac: 0x010203040506, + }, + Network: &pbflow.Network{ + SrcAddr: &pbflow.IP{ + IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x01020304}, + }, + DstAddr: &pbflow.IP{ + IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x05060708}, + }, + }, + EthProtocol: 2048, + Packets: 3, + Transport: &pbflow.Transport{ + Protocol: 1, + }, + TimeFlowStart: timestamppb.New(startTime), + TimeFlowEnd: timestamppb.New(endTime), + + AgentIp: &pbflow.IP{ + IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x0a090807}, + }, + Flags: 0x110, + IcmpCode: 10, + IcmpType: 8, + DupList: []*pbflow.DupMapEntry{ + { + Interface: "eth0", + Direction: pbflow.Direction_EGRESS, + }, + { + Interface: "a1234567", + Direction: pbflow.Direction_INGRESS, + }, + }, + } ) func TestEnrichedIPFIXFlow(t *testing.T) { cp := startCollector(t) addr := cp.GetAddress().(*net.UDPAddr) - flow := decode.PBFlowToMap(&FullPBFlow) + flow := decode.PBFlowToMap(&fullPBFlow) + + // Convert TCP flags + flow["Flags"] = utils.DecodeTCPFlags(uint(fullPBFlow.Flags)) // Add enrichment flow["SrcK8S_Name"] = "pod A" @@ -119,8 +168,12 @@ func TestEnrichedIPFIXFlow(t *testing.T) { cp.Stop() expectedFields := write.IPv4IANAFields - expectedFields = append(expectedFields, write.KubeFields...) - expectedFields = append(expectedFields, write.CustomNetworkFields...) + for _, f := range write.KubeFields { + expectedFields = append(expectedFields, f.Name) + } + for _, f := range write.CustomNetworkFields { + expectedFields = append(expectedFields, f.Name) + } // Check template assert.Equal(t, uint16(10), tplv4Msg.GetVersion()) @@ -146,7 +199,7 @@ func TestEnrichedIPFIXPartialFlow(t *testing.T) { cp := startCollector(t) addr := cp.GetAddress().(*net.UDPAddr) - flow := decode.PBFlowToMap(&FullPBFlow) + flow := decode.PBFlowToMap(&fullPBFlow) // Add partial enrichment flow["SrcK8S_Name"] = "pod A" @@ -180,8 +233,12 @@ func TestEnrichedIPFIXPartialFlow(t *testing.T) { cp.Stop() expectedFields := write.IPv4IANAFields - expectedFields = append(expectedFields, write.KubeFields...) - expectedFields = append(expectedFields, write.CustomNetworkFields...) + for _, f := range write.KubeFields { + expectedFields = append(expectedFields, f.Name) + } + for _, f := range write.CustomNetworkFields { + expectedFields = append(expectedFields, f.Name) + } // Check template assert.Equal(t, uint16(10), tplv4Msg.GetVersion()) @@ -207,7 +264,7 @@ func TestBasicIPFIXFlow(t *testing.T) { cp := startCollector(t) addr := cp.GetAddress().(*net.UDPAddr) - flow := decode.PBFlowToMap(&FullPBFlow) + flow := decode.PBFlowToMap(&fullPBFlow) // Add partial enrichment (must be ignored) flow["SrcK8S_Name"] = "pod A" @@ -257,9 +314,65 @@ func TestBasicIPFIXFlow(t *testing.T) { } // Make sure enriched fields are absent - for _, name := range write.KubeFields { + for _, f := range write.KubeFields { + element, _, exist := record.GetInfoElementWithValue(f.Name) + assert.Falsef(t, exist, "element with name %s should NOT exist in the record", f.Name) + assert.Nil(t, element) + } +} + +func TestICMPIPFIXFlow(t *testing.T) { + cp := startCollector(t) + addr := cp.GetAddress().(*net.UDPAddr) + + flow := decode.PBFlowToMap(&icmpPBFlow) + + writer, err := write.NewWriteIpfix(config.StageParam{ + Write: &config.Write{ + Ipfix: &api.WriteIpfix{ + TargetHost: addr.IP.String(), + TargetPort: addr.Port, + Transport: addr.Network(), + // No enterprise ID here + }, + }, + }) + require.NoError(t, err) + + writer.Write(flow) + + // Read collector + // 1st = IPv4 template + tplv4Msg := <-cp.GetMsgChan() + // 2nd = IPv6 template (ignore) + <-cp.GetMsgChan() + // 3rd = data record + dataMsg := <-cp.GetMsgChan() + cp.Stop() + + // Check template + assert.Equal(t, uint16(10), tplv4Msg.GetVersion()) + templateSet := tplv4Msg.GetSet() + templateElements := templateSet.GetRecords()[0].GetOrderedElementList() + assert.Len(t, templateElements, len(write.IPv4IANAFields)) + assert.Equal(t, uint32(0), templateElements[0].GetInfoElement().EnterpriseId) + + // Check data + assert.Equal(t, uint16(10), dataMsg.GetVersion()) + dataSet := dataMsg.GetSet() + record := dataSet.GetRecords()[0] + + for _, name := range write.IPv4IANAFields { element, _, exist := record.GetInfoElementWithValue(name) - assert.Falsef(t, exist, "element with name %s should NOT exist in the record", name) + assert.Truef(t, exist, "element with name %s should exist in the record", name) + assert.NotNil(t, element) + matchElement(t, element, flow) + } + + // Make sure enriched fields are absent + for _, f := range write.KubeFields { + element, _, exist := record.GetInfoElementWithValue(f.Name) + assert.Falsef(t, exist, "element with name %s should NOT exist in the record", f.Name) assert.Nil(t, element) } } @@ -319,3 +432,84 @@ func startCollector(t *testing.T) *collector.CollectingProcess { return cp } + +func TestIngestEnriched(t *testing.T) { + var pen uint32 = 2 + collectorPort, err := test.UDPPort() + require.NoError(t, err) + stage := config.NewIPFIXPipeline("ingest-ipfix", api.IngestIpfix{ + Port: collectorPort, + Mapping: generateWriteMapping(pen), + }) + ic, err := ingest.NewIngestIPFIX(operational.NewMetrics(&config.MetricsSettings{}), stage.GetStageParams()[0]) + require.NoError(t, err) + forwarded := make(chan config.GenericMap) + + go ic.Ingest(forwarded) + + flow := decode.PBFlowToMap(&fullPBFlow) + + // Convert TCP flags + flow["Flags"] = utils.DecodeTCPFlags(uint(fullPBFlow.Flags)) + + // Add enrichment + flow["SrcK8S_Name"] = "pod A" + flow["SrcK8S_Namespace"] = "ns1" + flow["DstK8S_Name"] = "pod B" + flow["DstK8S_Namespace"] = "ns2" + + writer, err := write.NewWriteIpfix(config.StageParam{ + Write: &config.Write{ + Ipfix: &api.WriteIpfix{ + TargetHost: "0.0.0.0", + TargetPort: int(collectorPort), + Transport: "udp", + EnterpriseID: int(pen), + }, + }, + }) + require.NoError(t, err) + writer.Write(flow) + + // Wait for flow + for { + select { + case received := <-forwarded: + assert.Equal(t, "1.2.3.4", received["SrcAddr"]) + assert.Equal(t, "127.0.0.1", received["SamplerAddress"]) + assert.Equal(t, []byte("ns1"), received["CustomBytes_1"]) + assert.Equal(t, []byte("pod A"), received["CustomBytes_2"]) + assert.Equal(t, []byte("ns2"), received["CustomBytes_3"]) + assert.Equal(t, []byte("pod B"), received["CustomBytes_4"]) + return + default: + // nothing yet received + time.Sleep(50 * time.Millisecond) + } + } +} + +func generateWriteMapping(pen uint32) []producer.NetFlowMapField { + var mapping []producer.NetFlowMapField + allCustom := []entities.InfoElement{} + allCustom = append(allCustom, write.KubeFields...) + allCustom = append(allCustom, write.CustomNetworkFields...) + countString := 0 + countOther := 0 + for _, in := range allCustom { + out := producer.NetFlowMapField{ + PenProvided: true, + Pen: pen, + Type: in.ElementId, + } + if in.DataType == entities.String { + countString++ + out.Destination = fmt.Sprintf("CustomBytes_%d", countString) + } else { + countOther++ + out.Destination = fmt.Sprintf("CustomInteger_%d", countOther) + } + mapping = append(mapping, out) + } + return mapping +} diff --git a/pkg/pipeline/write/write_ipfix.go b/pkg/pipeline/write/write_ipfix.go index 779de34fc..5aaa2d4ad 100644 --- a/pkg/pipeline/write/write_ipfix.go +++ b/pkg/pipeline/write/write_ipfix.go @@ -25,6 +25,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/utils" "github.com/sirupsen/logrus" "github.com/vmware/go-ipfix/pkg/entities" ipfixExporter "github.com/vmware/go-ipfix/pkg/exporter" @@ -32,22 +33,20 @@ import ( ) type writeIpfix struct { - hostPort string - transport string - templateIDv4 uint16 - templateIDv6 uint16 - enrichEnterpriseID uint32 - exporter *ipfixExporter.ExportingProcess - entitiesV4 []entities.InfoElementWithValue - entitiesV6 []entities.InfoElementWithValue + templateIDv4 uint16 + templateIDv6 uint16 + exporter *ipfixExporter.ExportingProcess + tplV4 entities.Set + tplV6 entities.Set + entitiesV4 []entities.InfoElementWithValue + entitiesV6 []entities.InfoElementWithValue } type FieldMap struct { - Key string - Getter func(entities.InfoElementWithValue) any - Setter func(entities.InfoElementWithValue, any) - Matcher func(entities.InfoElementWithValue, any) bool - Optional bool + Key string + Getter func(entities.InfoElementWithValue) any + Setter func(entities.InfoElementWithValue, any) + Matcher func(entities.InfoElementWithValue, any) bool } // IPv6Type value as defined in IEEE 802: https://www.iana.org/assignments/ieee-802-numbers/ieee-802-numbers.xhtml @@ -68,28 +67,33 @@ var ( "flowEndMilliseconds", "packetDeltaCount", "interfaceName", + "tcpControlBits", } IPv4IANAFields = append([]string{ "sourceIPv4Address", "destinationIPv4Address", + "icmpTypeIPv4", + "icmpCodeIPv4", }, IANAFields...) IPv6IANAFields = append([]string{ "sourceIPv6Address", "destinationIPv6Address", "nextHeaderIPv6", + "icmpTypeIPv6", + "icmpCodeIPv6", }, IANAFields...) - KubeFields = []string{ - "sourcePodNamespace", - "sourcePodName", - "destinationPodNamespace", - "destinationPodName", - "sourceNodeName", - "destinationNodeName", - } - CustomNetworkFields = []string{ - "timeFlowRttNs", - "interfaces", - "directions", + KubeFields = []entities.InfoElement{ + {Name: "sourcePodNamespace", ElementId: 7733, DataType: entities.String, Len: 65535}, + {Name: "sourcePodName", ElementId: 7734, DataType: entities.String, Len: 65535}, + {Name: "destinationPodNamespace", ElementId: 7735, DataType: entities.String, Len: 65535}, + {Name: "destinationPodName", ElementId: 7736, DataType: entities.String, Len: 65535}, + {Name: "sourceNodeName", ElementId: 7737, DataType: entities.String, Len: 65535}, + {Name: "destinationNodeName", ElementId: 7738, DataType: entities.String, Len: 65535}, + } + CustomNetworkFields = []entities.InfoElement{ + {Name: "timeFlowRttNs", ElementId: 7740, DataType: entities.Unsigned64, Len: 8}, + {Name: "interfaces", ElementId: 7741, DataType: entities.String, Len: 65535}, + {Name: "directions", ElementId: 7742, DataType: entities.String, Len: 65535}, } MapIPFIXKeys = map[string]FieldMap{ @@ -218,6 +222,60 @@ var ( return elt.GetStringValue() == ifs[0] }, }, + "tcpControlBits": { + Key: "Flags", + Getter: func(elt entities.InfoElementWithValue) any { + return elt.GetUnsigned16Value() + }, + Setter: func(elt entities.InfoElementWithValue, rec any) { + if decoded, isDecoded := rec.([]string); isDecoded { + // reencode for ipfix + reencoded := utils.EncodeTCPFlags(decoded) + elt.SetUnsigned16Value(uint16(reencoded)) + } else if raw, isRaw := rec.(uint16); isRaw { + elt.SetUnsigned16Value(raw) + } + }, + Matcher: func(elt entities.InfoElementWithValue, expected any) bool { + received := elt.GetUnsigned16Value() + if expSlice, isSlice := expected.([]string); isSlice { + decoded := utils.DecodeTCPFlags(uint(received)) + if len(expSlice) != len(decoded) { + return false + } + for i := 0; i < len(expSlice); i++ { + if expSlice[i] != decoded[i] { + return false + } + } + return true + } + if expected == nil { + return received == 0 + } + return received == expected + }, + }, + "icmpTypeIPv4": { + Key: "IcmpType", + Getter: func(elt entities.InfoElementWithValue) any { return elt.GetUnsigned8Value() }, + Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetUnsigned8Value(rec.(uint8)) }, + }, + "icmpCodeIPv4": { + Key: "IcmpCode", + Getter: func(elt entities.InfoElementWithValue) any { return elt.GetUnsigned8Value() }, + Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetUnsigned8Value(rec.(uint8)) }, + }, + "icmpTypeIPv6": { + Key: "IcmpType", + Getter: func(elt entities.InfoElementWithValue) any { return elt.GetUnsigned8Value() }, + Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetUnsigned8Value(rec.(uint8)) }, + }, + "icmpCodeIPv6": { + Key: "IcmpCode", + Getter: func(elt entities.InfoElementWithValue) any { return elt.GetUnsigned8Value() }, + Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetUnsigned8Value(rec.(uint8)) }, + }, "interfaces": { Key: "Interfaces", Getter: func(elt entities.InfoElementWithValue) any { return strings.Split(elt.GetStringValue(), ",") }, @@ -228,46 +286,39 @@ var ( }, }, "sourcePodNamespace": { - Key: "SrcK8S_Namespace", - Getter: func(elt entities.InfoElementWithValue) any { return elt.GetStringValue() }, - Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetStringValue(rec.(string)) }, - Optional: true, + Key: "SrcK8S_Namespace", + Getter: func(elt entities.InfoElementWithValue) any { return elt.GetStringValue() }, + Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetStringValue(rec.(string)) }, }, "sourcePodName": { - Key: "SrcK8S_Name", - Getter: func(elt entities.InfoElementWithValue) any { return elt.GetStringValue() }, - Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetStringValue(rec.(string)) }, - Optional: true, + Key: "SrcK8S_Name", + Getter: func(elt entities.InfoElementWithValue) any { return elt.GetStringValue() }, + Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetStringValue(rec.(string)) }, }, "destinationPodNamespace": { - Key: "DstK8S_Namespace", - Getter: func(elt entities.InfoElementWithValue) any { return elt.GetStringValue() }, - Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetStringValue(rec.(string)) }, - Optional: true, + Key: "DstK8S_Namespace", + Getter: func(elt entities.InfoElementWithValue) any { return elt.GetStringValue() }, + Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetStringValue(rec.(string)) }, }, "destinationPodName": { - Key: "DstK8S_Name", - Getter: func(elt entities.InfoElementWithValue) any { return elt.GetStringValue() }, - Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetStringValue(rec.(string)) }, - Optional: true, + Key: "DstK8S_Name", + Getter: func(elt entities.InfoElementWithValue) any { return elt.GetStringValue() }, + Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetStringValue(rec.(string)) }, }, "sourceNodeName": { - Key: "SrcK8S_HostName", - Getter: func(elt entities.InfoElementWithValue) any { return elt.GetStringValue() }, - Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetStringValue(rec.(string)) }, - Optional: true, + Key: "SrcK8S_HostName", + Getter: func(elt entities.InfoElementWithValue) any { return elt.GetStringValue() }, + Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetStringValue(rec.(string)) }, }, "destinationNodeName": { - Key: "DstK8S_HostName", - Getter: func(elt entities.InfoElementWithValue) any { return elt.GetStringValue() }, - Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetStringValue(rec.(string)) }, - Optional: true, + Key: "DstK8S_HostName", + Getter: func(elt entities.InfoElementWithValue) any { return elt.GetStringValue() }, + Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetStringValue(rec.(string)) }, }, "timeFlowRttNs": { - Key: "TimeFlowRttNs", - Getter: func(elt entities.InfoElementWithValue) any { return int64(elt.GetUnsigned64Value()) }, - Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetUnsigned64Value(uint64(rec.(int64))) }, - Optional: true, + Key: "TimeFlowRttNs", + Getter: func(elt entities.InfoElementWithValue) any { return int64(elt.GetUnsigned64Value()) }, + Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetUnsigned64Value(uint64(rec.(int64))) }, }, } ) @@ -289,7 +340,7 @@ func addElementToTemplate(elementName string, value []byte, elements *[]entities func addNetworkEnrichmentToTemplate(elements *[]entities.InfoElementWithValue, registryID uint32) error { for _, field := range CustomNetworkFields { - if err := addElementToTemplate(field, nil, elements, registryID); err != nil { + if err := addElementToTemplate(field.Name, nil, elements, registryID); err != nil { return err } } @@ -298,7 +349,7 @@ func addNetworkEnrichmentToTemplate(elements *[]entities.InfoElementWithValue, r func addKubeContextToTemplate(elements *[]entities.InfoElementWithValue, registryID uint32) error { for _, field := range KubeFields { - if err := addElementToTemplate(field, nil, elements, registryID); err != nil { + if err := addElementToTemplate(field.Name, nil, elements, registryID); err != nil { return err } } @@ -311,133 +362,52 @@ func loadCustomRegistry(enterpriseID uint32) error { ilog.WithError(err).Errorf("Failed to initialize registry") return err } - err = registry.PutInfoElement((*entities.NewInfoElement("sourcePodNamespace", 7733, entities.String, enterpriseID, 65535)), enterpriseID) - if err != nil { - ilog.WithError(err).Errorf("Failed to register element") - return err - } - err = registry.PutInfoElement((*entities.NewInfoElement("sourcePodName", 7734, entities.String, enterpriseID, 65535)), enterpriseID) - if err != nil { - ilog.WithError(err).Errorf("Failed to register element") - return err - } - err = registry.PutInfoElement((*entities.NewInfoElement("destinationPodNamespace", 7735, entities.String, enterpriseID, 65535)), enterpriseID) - if err != nil { - ilog.WithError(err).Errorf("Failed to register element") - return err - } - err = registry.PutInfoElement((*entities.NewInfoElement("destinationPodName", 7736, entities.String, enterpriseID, 65535)), enterpriseID) - if err != nil { - ilog.WithError(err).Errorf("Failed to register element") - return err - } - err = registry.PutInfoElement((*entities.NewInfoElement("sourceNodeName", 7737, entities.String, enterpriseID, 65535)), enterpriseID) - if err != nil { - ilog.WithError(err).Errorf("Failed to register element") - return err - } - err = registry.PutInfoElement((*entities.NewInfoElement("destinationNodeName", 7738, entities.String, enterpriseID, 65535)), enterpriseID) - if err != nil { - ilog.WithError(err).Errorf("Failed to register element") - return err - } - err = registry.PutInfoElement((*entities.NewInfoElement("timeFlowRttNs", 7740, entities.Unsigned64, enterpriseID, 8)), enterpriseID) - if err != nil { - ilog.WithError(err).Errorf("Failed to register element") - return err - } - err = registry.PutInfoElement((*entities.NewInfoElement("interfaces", 7741, entities.String, enterpriseID, 65535)), enterpriseID) - if err != nil { - ilog.WithError(err).Errorf("Failed to register element") - return err - } - err = registry.PutInfoElement((*entities.NewInfoElement("directions", 7742, entities.String, enterpriseID, 65535)), enterpriseID) - if err != nil { - ilog.WithError(err).Errorf("Failed to register element") - return err - } - return nil -} - -func SendTemplateRecordv4(exporter *ipfixExporter.ExportingProcess, enrichEnterpriseID uint32) (uint16, []entities.InfoElementWithValue, error) { - templateID := exporter.NewTemplateID() - templateSet := entities.NewSet(false) - err := templateSet.PrepareSet(entities.Template, templateID) - if err != nil { - ilog.WithError(err).Error("Failed in PrepareSet") - return 0, nil, err - } - elements := make([]entities.InfoElementWithValue, 0) - - for _, field := range IPv4IANAFields { - err = addElementToTemplate(field, nil, &elements, registry.IANAEnterpriseID) - if err != nil { - return 0, nil, err - } - } - if enrichEnterpriseID != 0 { - err = addKubeContextToTemplate(&elements, enrichEnterpriseID) - if err != nil { - return 0, nil, err - } - err = addNetworkEnrichmentToTemplate(&elements, enrichEnterpriseID) + allCustom := []entities.InfoElement{} + allCustom = append(allCustom, KubeFields...) + allCustom = append(allCustom, CustomNetworkFields...) + for _, f := range allCustom { + f.EnterpriseId = enterpriseID + err = registry.PutInfoElement(f, enterpriseID) if err != nil { - return 0, nil, err + ilog.WithError(err).Errorf("Failed to register element: %s", f.Name) + return err } } - err = templateSet.AddRecord(elements, templateID) - if err != nil { - ilog.WithError(err).Error("Failed in Add Record") - return 0, nil, err - } - _, err = exporter.SendSet(templateSet) - if err != nil { - ilog.WithError(err).Error("Failed to send template record") - return 0, nil, err - } - - return templateID, elements, nil + return nil } -func SendTemplateRecordv6(exporter *ipfixExporter.ExportingProcess, enrichEnterpriseID uint32) (uint16, []entities.InfoElementWithValue, error) { - templateID := exporter.NewTemplateID() +func prepareTemplate(templateID uint16, enrichEnterpriseID uint32, fields []string) (entities.Set, []entities.InfoElementWithValue, error) { templateSet := entities.NewSet(false) err := templateSet.PrepareSet(entities.Template, templateID) if err != nil { - return 0, nil, err + return nil, nil, err } elements := make([]entities.InfoElementWithValue, 0) - for _, field := range IPv6IANAFields { + for _, field := range fields { err = addElementToTemplate(field, nil, &elements, registry.IANAEnterpriseID) if err != nil { - return 0, nil, err + return nil, nil, err } } if enrichEnterpriseID != 0 { err = addKubeContextToTemplate(&elements, enrichEnterpriseID) if err != nil { - return 0, nil, err + return nil, nil, err } err = addNetworkEnrichmentToTemplate(&elements, enrichEnterpriseID) if err != nil { - return 0, nil, err + return nil, nil, err } } - err = templateSet.AddRecord(elements, templateID) if err != nil { - return 0, nil, err - } - _, err = exporter.SendSet(templateSet) - if err != nil { - return 0, nil, err + return nil, nil, err } - return templateID, elements, nil + return templateSet, elements, nil } -//nolint:cyclop func setElementValue(record config.GenericMap, ieValPtr *entities.InfoElementWithValue) error { ieVal := *ieValPtr name := ieVal.GetName() @@ -447,8 +417,6 @@ func setElementValue(record config.GenericMap, ieValPtr *entities.InfoElementWit } if value := record[mapping.Key]; value != nil { mapping.Setter(ieVal, value) - } else if !mapping.Optional { - return fmt.Errorf("unable to find %s (%s) in record", name, mapping.Key) } return nil } @@ -462,6 +430,7 @@ func setEntities(record config.GenericMap, elements *[]entities.InfoElementWithV } return nil } + func (t *writeIpfix) sendDataRecord(record config.GenericMap, v6 bool) error { dataSet := entities.NewSet(false) var templateID uint16 @@ -502,7 +471,6 @@ func (t *writeIpfix) sendDataRecord(record config.GenericMap, v6 bool) error { // Write writes a flow before being stored func (t *writeIpfix) Write(entry config.GenericMap) { - ilog.Tracef("entering writeIpfix Write") if IPv6Type == entry["Etype"].(uint16) { err := t.sendDataRecord(entry, true) if err != nil { @@ -518,8 +486,6 @@ func (t *writeIpfix) Write(entry config.GenericMap) { // NewWriteIpfix creates a new write func NewWriteIpfix(params config.StageParam) (Writer, error) { - ilog.Debugf("entering NewWriteIpfix") - ipfixConfigIn := api.WriteIpfix{} if params.Write != nil && params.Write.Ipfix != nil { ipfixConfigIn = *params.Write.Ipfix @@ -530,46 +496,59 @@ func NewWriteIpfix(params config.StageParam) (Writer, error) { if err := ipfixConfigIn.Validate(); err != nil { return nil, fmt.Errorf("the provided config is not valid: %w", err) } - writeIpfix := &writeIpfix{} - if params.Write != nil && params.Write.Ipfix != nil { - writeIpfix.transport = params.Write.Ipfix.Transport - writeIpfix.hostPort = fmt.Sprintf("%s:%d", params.Write.Ipfix.TargetHost, params.Write.Ipfix.TargetPort) - writeIpfix.enrichEnterpriseID = uint32(params.Write.Ipfix.EnterpriseID) - } - // Initialize IPFIX registry and send templates - registry.LoadRegistry() - var err error - if params.Write != nil && params.Write.Ipfix != nil && params.Write.Ipfix.EnterpriseID != 0 { - err = loadCustomRegistry(writeIpfix.enrichEnterpriseID) - if err != nil { - ilog.Fatalf("Failed to load Custom(%d) Registry", writeIpfix.enrichEnterpriseID) - } - } // Create exporter using local server info input := ipfixExporter.ExporterInput{ - CollectorAddress: writeIpfix.hostPort, - CollectorProtocol: writeIpfix.transport, + CollectorAddress: fmt.Sprintf("%s:%d", ipfixConfigIn.TargetHost, ipfixConfigIn.TargetPort), + CollectorProtocol: ipfixConfigIn.Transport, ObservationDomainID: 1, - TempRefTimeout: 1, + TempRefTimeout: uint32(ipfixConfigIn.TplSendInterval.Duration.Seconds()), } - writeIpfix.exporter, err = ipfixExporter.InitExportingProcess(input) + + exporter, err := ipfixExporter.InitExportingProcess(input) if err != nil { - ilog.Fatalf("Got error when connecting to server %s: %v", writeIpfix.hostPort, err) - return nil, err + return nil, fmt.Errorf("error when connecting to IPFIX collector %s: %w", input.CollectorAddress, err) + } + ilog.Infof("Created IPFIX exporter connecting to server with address: %s", input.CollectorAddress) + + eeid := uint32(ipfixConfigIn.EnterpriseID) + + registry.LoadRegistry() + if eeid != 0 { + if err := loadCustomRegistry(eeid); err != nil { + return nil, fmt.Errorf("failed to load custom registry with EnterpriseID=%d: %w", eeid, err) + } } - ilog.Infof("Created exporter connecting to server with address: %s", writeIpfix.hostPort) - writeIpfix.templateIDv4, writeIpfix.entitiesV4, err = SendTemplateRecordv4(writeIpfix.exporter, writeIpfix.enrichEnterpriseID) + idV4 := exporter.NewTemplateID() + setV4, entitiesV4, err := prepareTemplate(idV4, eeid, IPv4IANAFields) if err != nil { - ilog.WithError(err).Error("Failed in send IPFIX template v4 record") - return nil, err + return nil, fmt.Errorf("failed to prepare IPv4 template: %w", err) } - writeIpfix.templateIDv6, writeIpfix.entitiesV6, err = SendTemplateRecordv6(writeIpfix.exporter, writeIpfix.enrichEnterpriseID) + idV6 := exporter.NewTemplateID() + setV6, entitiesV6, err := prepareTemplate(idV6, eeid, IPv6IANAFields) if err != nil { - ilog.WithError(err).Error("Failed in send IPFIX template v6 record") - return nil, err + return nil, fmt.Errorf("failed to prepare IPv6 template: %w", err) } + + // First send sync + if _, err := exporter.SendSet(setV4); err != nil { + return nil, fmt.Errorf("failed to send IPv6 template: %w", err) + } + if _, err := exporter.SendSet(setV6); err != nil { + return nil, fmt.Errorf("failed to send IPv6 template: %w", err) + } + + writeIpfix := &writeIpfix{ + exporter: exporter, + templateIDv4: idV4, + tplV4: setV4, + entitiesV4: entitiesV4, + templateIDv6: idV6, + tplV6: setV6, + entitiesV6: entitiesV6, + } + return writeIpfix, nil } diff --git a/pkg/pipeline/write/write_stdout.go b/pkg/pipeline/write/write_stdout.go index cb24db278..073342ac7 100644 --- a/pkg/pipeline/write/write_stdout.go +++ b/pkg/pipeline/write/write_stdout.go @@ -40,7 +40,8 @@ func (t *writeStdout) Write(v config.GenericMap) { } func formatter(format string, reorder bool) func(config.GenericMap) string { - if format == "json" { + switch format { + case "json": jconf := jsonIter.Config{ SortMapKeys: reorder, }.Froze() @@ -48,7 +49,7 @@ func formatter(format string, reorder bool) func(config.GenericMap) string { b, _ := jconf.Marshal(v) return string(b) } - } else if format == "fields" { + case "fields": return func(v config.GenericMap) string { var sb strings.Builder var order sort.StringSlice diff --git a/pkg/test/ipfix.go b/pkg/test/ipfix.go index 600c31cc6..733a8c251 100644 --- a/pkg/test/ipfix.go +++ b/pkg/test/ipfix.go @@ -26,7 +26,7 @@ type IPFIXClient struct { } // NewIPFIXClient returns an IPFIXClient that sends data to the given port -func NewIPFIXClient(port int) (*IPFIXClient, error) { +func NewIPFIXClient(port uint) (*IPFIXClient, error) { conn, err := net.Dial("udp", fmt.Sprintf(":%d", port)) if err != nil { return nil, fmt.Errorf("can't open UDP connection on port %d :%w", @@ -100,7 +100,7 @@ func (ke *IPFIXClient) sendMessage(set entities.Set) error { } // UDPPort asks the kernel for a free open port that is ready to use. -func UDPPort() (int, error) { +func UDPPort() (uint, error) { addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") if err != nil { return 0, err @@ -111,5 +111,5 @@ func UDPPort() (int, error) { return 0, err } defer l.Close() - return l.LocalAddr().(*net.UDPAddr).Port, nil + return uint(l.LocalAddr().(*net.UDPAddr).Port), nil } diff --git a/pkg/utils/tcp_flags.go b/pkg/utils/tcp_flags.go index 4bcc55d0c..0ef14c660 100644 --- a/pkg/utils/tcp_flags.go +++ b/pkg/utils/tcp_flags.go @@ -18,6 +18,24 @@ var tcpFlags = []tcpFlag{ {value: 512, name: "FIN_ACK"}, {value: 1024, name: "RST_ACK"}, } +var flagsMap map[string]uint + +func init() { + flagsMap = make(map[string]uint, len(tcpFlags)) + for _, flag := range tcpFlags { + flagsMap[flag.name] = flag.value + } +} + +func EncodeTCPFlags(flags []string) uint { + var bf uint + for _, flag := range flags { + if v, ok := flagsMap[flag]; ok { + bf |= v + } + } + return bf +} func DecodeTCPFlags(bitfield uint) []string { var values []string diff --git a/vendor/github.com/vmware/go-ipfix/pkg/exporter/process.go b/vendor/github.com/vmware/go-ipfix/pkg/exporter/process.go index 8308755da..4d0fd8dd5 100644 --- a/vendor/github.com/vmware/go-ipfix/pkg/exporter/process.go +++ b/vendor/github.com/vmware/go-ipfix/pkg/exporter/process.go @@ -52,6 +52,7 @@ type templateValue struct { // maxMsgSize is not set correctly, the message may be fragmented. type ExportingProcess struct { connToCollector net.Conn + connMut sync.Mutex obsDomainID uint32 seqNumber uint32 templateID uint16 @@ -176,8 +177,7 @@ func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) { } conn, err = tls.Dial(input.CollectorProtocol, input.CollectorAddress, config) if err != nil { - klog.Errorf("Cannot the create the tls connection to the Collector %s: %v", input.CollectorAddress, err) - return nil, err + return nil, fmt.Errorf("cannot create the TLS connection to the Collector %q: %w", input.CollectorAddress, err) } case "udp": // use DTLS // TODO: support client authentication @@ -209,15 +209,13 @@ func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) { } conn, err = dtls.Dial(udpAddr.Network(), udpAddr, config) if err != nil { - klog.Errorf("Cannot the create the dtls connection to the Collector %s: %v", udpAddr.String(), err) - return nil, err + return nil, fmt.Errorf("cannot create the DTLS connection to the Collector %q: %w", udpAddr.String(), err) } } } else { conn, err = net.Dial(input.CollectorProtocol, input.CollectorAddress) if err != nil { - klog.Errorf("Cannot the create the connection to the Collector %s: %v", input.CollectorAddress, err) - return nil, err + return nil, fmt.Errorf("cannot create the connection to the Collector %q: %w", input.CollectorAddress, err) } } var isIPv6 bool @@ -299,14 +297,24 @@ func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) { case <-expProc.stopCh: return case <-ticker.C: + // Dial again (e.g. host name resolving to a different IP) + klog.V(2).Info("Refreshing connection") + conn, err = net.Dial(input.CollectorProtocol, input.CollectorAddress) + if err != nil { + klog.Errorf("Cannot connect to the collector %s: %v", input.CollectorAddress, err) + } else { + expProc.connMut.Lock() + expProc.connToCollector = conn + expProc.connMut.Unlock() + } + klog.V(2).Info("Sending refreshed templates to the collector") err := expProc.sendRefreshedTemplates() if err != nil { - klog.Errorf("Error when sending refreshed templates, closing the connection to the collector: %v", err) - expProc.closeConnToCollector() - return + klog.Errorf("Error when sending refreshed templates: %v", err) + } else { + klog.V(2).Info("Sent refreshed templates to the collector") } - klog.V(2).Info("Sent refreshed templates to the collector") } } }() @@ -444,6 +452,8 @@ func (ep *ExportingProcess) closeConnToCollector() { } klog.Info("Closing connection to the collector") close(ep.stopCh) + ep.connMut.Lock() + defer ep.connMut.Unlock() if err := ep.connToCollector.Close(); err != nil { // Just log the error that happened when closing the connection. Not returning error // as we do not expect library consumers to exit their programs with this error. @@ -454,6 +464,8 @@ func (ep *ExportingProcess) closeConnToCollector() { // checkConnToCollector checks whether the connection from exporter is still open // by trying to read from connection. Closed connection will return EOF from read. func (ep *ExportingProcess) checkConnToCollector(oneByteForRead []byte) bool { + ep.connMut.Lock() + defer ep.connMut.Unlock() ep.connToCollector.SetReadDeadline(time.Now().Add(time.Millisecond)) if _, err := ep.connToCollector.Read(oneByteForRead); err == io.EOF { return false @@ -479,6 +491,8 @@ func (ep *ExportingProcess) createAndSendIPFIXMsg(set entities.Set, buf *bytes.B } // Send the message on the exporter connection. + ep.connMut.Lock() + defer ep.connMut.Unlock() bytesSent, err := ep.connToCollector.Write(buf.Bytes()) if err != nil { @@ -503,6 +517,8 @@ func (ep *ExportingProcess) createAndSendJSONRecords(records []entities.Record, bytesSent := 0 elements := make(map[string]interface{}) message := make(map[string]interface{}, 2) + ep.connMut.Lock() + defer ep.connMut.Unlock() for _, record := range records { clear(elements) orderedElements := record.GetOrderedElementList() diff --git a/vendor/modules.txt b/vendor/modules.txt index acc87730b..02291ddb0 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -589,7 +589,7 @@ github.com/vladimirvivien/gexe/net github.com/vladimirvivien/gexe/prog github.com/vladimirvivien/gexe/str github.com/vladimirvivien/gexe/vars -# github.com/vmware/go-ipfix v0.15.0 +# github.com/vmware/go-ipfix v0.15.0 => github.com/jotak/go-ipfix v0.0.0-20250708115123-407c539ea101 ## explicit; go 1.23.0 github.com/vmware/go-ipfix/pkg/collector github.com/vmware/go-ipfix/pkg/entities @@ -1392,3 +1392,4 @@ sigs.k8s.io/structured-merge-diff/v4/value ## explicit; go 1.12 sigs.k8s.io/yaml sigs.k8s.io/yaml/goyaml.v2 +# github.com/vmware/go-ipfix => github.com/jotak/go-ipfix v0.0.0-20250708115123-407c539ea101