Skip to content

NETOBSERV-2307: NETOBSERV-2315: fix several IPFIX issues #1019

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 9, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 41 additions & 34 deletions contrib/kubernetes/ipfix-collector-stdout.yaml
Original file line number Diff line number Diff line change
@@ -1,54 +1,45 @@
apiVersion: apps/v1
kind: Deployment
apiVersion: v1
kind: Pod
metadata:
name: flowlogs-pipeline
name: flp-ipfix-stdout
labels:
app: flowlogs-pipeline
app: flp-ipfix-stdout
spec:
replicas: 1
selector:
matchLabels:
app: flowlogs-pipeline
template:
metadata:
labels:
app: flowlogs-pipeline
spec:
containers:
- name: flowlogs-pipeline
image: quay.io/netobserv/flowlogs-pipeline:main
args:
- "--config=/etc/flowlogs-pipeline/config.yaml"
ports:
- containerPort: 2055
imagePullPolicy: IfNotPresent
volumeMounts:
- name: configuration
mountPath: "/etc/flowlogs-pipeline/"
volumes:
containers:
- name: flp-ipfix-stdout
image: quay.io/jotak/flowlogs-pipeline:main
imagePullPolicy: Always
args:
- "--config=/etc/flowlogs-pipeline/config.yaml"
ports:
- containerPort: 2055
volumeMounts:
- name: configuration
configMap:
name: flp-config
mountPath: "/etc/flowlogs-pipeline/"
volumes:
- name: configuration
configMap:
name: flp-ipfix-stdout-config
---
apiVersion: v1
kind: Service
metadata:
name: flowlogs-pipeline
name: flp-ipfix-stdout
labels:
app: flowlogs-pipeline
app: flp-ipfix-stdout
spec:
ports:
- port: 2055
targetPort: 2055
protocol: UDP
name: ipfix
selector:
app: flowlogs-pipeline
app: flp-ipfix-stdout
---
apiVersion: v1
kind: ConfigMap
metadata:
name: flp-config
name: flp-ipfix-stdout-config
data:
config.yaml: |
log-level: info
Expand All @@ -59,10 +50,26 @@ data:
parameters:
- name: ingest
ingest:
type: collector
collector:
hostName: 0.0.0.0
type: ipfix
ipfix:
port: 2055
mapping:
- penprovided: true
pen: 2
field: 7733
destination: CustomBytes_1
- penprovided: true
pen: 2
field: 7734
destination: CustomBytes_2
- penprovided: true
pen: 2
field: 7735
destination: CustomBytes_3
- penprovided: true
pen: 2
field: 7736
destination: CustomBytes_4
- name: write
write:
type: stdout
16 changes: 16 additions & 0 deletions contrib/local/ipfix-collector-stdout.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Use it with:
# ./flowlogs-pipeline --config=contrib/local/ipfix-collector-stdout.yaml
log-level: info
pipeline:
- name: ingest
- name: write
follows: ingest
parameters:
- name: ingest
ingest:
type: ipfix
ipfix:
port: 2055
- name: write
write:
type: stdout
11 changes: 6 additions & 5 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,17 @@ Following is the supported API format for S3 encode:
secure: true for https, false for http (default: false)
objectHeaderParameters: parameters to include in object header (key/value pairs)
</pre>
## Ingest collector API
## Ingest NetFlow/IPFIX API
Following is the supported API format for the NetFlow / IPFIX collector:

<pre>
collector:
hostName: the hostname to listen on
port: the port number to listen on, for IPFIX/NetFlow v9. Omit or set to 0 to disable IPFIX/NetFlow v9 ingestion
ipfix:
hostName: the hostname to listen on; defaults to 0.0.0.0
port: the port number to listen on, for IPFIX/NetFlow v9. Omit or set to 0 to disable IPFIX/NetFlow v9 ingestion. If both port and portLegacy are omitted, defaults to 2055
portLegacy: the port number to listen on, for legacy NetFlow v5. Omit or set to 0 to disable NetFlow v5 ingestion
batchMaxLen: the number of accumulated flows before being forwarded for processing
workers: the number of netflow/ipfix decoding workers
sockets: the number of listening sockets
mapping: custom field mapping
</pre>
## Ingest Kafka API
Following is the supported API format for the kafka ingest:
Expand Down Expand Up @@ -340,6 +340,7 @@ Following is the supported API format for writing to an IPFIX collector:
targetPort: IPFIX Collector host target port
transport: Transport protocol (tcp/udp) to be used for the IPFIX connection
enterpriseId: Enterprise ID for exporting transformations
tplSendInterval: Interval for resending templates to the collector (default: 1m)
</pre>
## Aggregate metrics API
Following is the supported API format for specifying metrics aggregations:
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,5 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.7.0 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)

replace github.com/vmware/go-ipfix => github.com/jotak/go-ipfix v0.0.0-20250704140557-a3d746019169
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFF
github.com/josharian/native v1.0.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA=
github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
github.com/jotak/go-ipfix v0.0.0-20250704140557-a3d746019169 h1:S3VDdAZlSH3xZ2ORXrIkWZ5US+RrfNKFU978woykoD0=
github.com/jotak/go-ipfix v0.0.0-20250704140557-a3d746019169/go.mod h1:GgFbcmEGqMQfA7jDC9UVLKAelNh2sy1jsxyV7Tor3Ig=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
Expand Down Expand Up @@ -380,8 +382,6 @@ github.com/vishvananda/netns v0.0.5 h1:DfiHV+j8bA32MFM7bfEunvT8IAqQ/NzSJHtcmW5zd
github.com/vishvananda/netns v0.0.5/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM=
github.com/vladimirvivien/gexe v0.5.0 h1:AWBVaYnrTsGYBktXvcO0DfWPeSiZxn6mnQ5nvL+A1/A=
github.com/vladimirvivien/gexe v0.5.0/go.mod h1:3gjgTqE2c0VyHnU5UOIwk7gyNzZDGulPb/DJPgcw64E=
github.com/vmware/go-ipfix v0.15.0 h1:F/3BjFoODvCHEHYk36jy3TGmQzJ7rF2bC7ZG+c/lng8=
github.com/vmware/go-ipfix v0.15.0/go.mod h1:GgFbcmEGqMQfA7jDC9UVLKAelNh2sy1jsxyV7Tor3Ig=
github.com/wlynxg/anet v0.0.3/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
FileLoopType = "file_loop"
FileChunksType = "file_chunks"
SyntheticType = "synthetic"
CollectorType = "collector"
CollectorType = "collector" // deprecated: use 'ipfix' instead
StdinType = "stdin"
GRPCType = "grpc"
FakeType = "fake"
Expand Down Expand Up @@ -53,7 +53,7 @@ type API struct {
PromEncode PromEncode `yaml:"prom" doc:"## Prometheus encode API\nFollowing is the supported API format for prometheus encode:\n"`
KafkaEncode EncodeKafka `yaml:"kafka" doc:"## Kafka encode API\nFollowing is the supported API format for kafka encode:\n"`
S3Encode EncodeS3 `yaml:"s3" doc:"## S3 encode API\nFollowing is the supported API format for S3 encode:\n"`
IngestCollector IngestCollector `yaml:"collector" doc:"## Ingest collector API\nFollowing is the supported API format for the NetFlow / IPFIX collector:\n"`
IngestIpfix IngestIpfix `yaml:"ipfix" doc:"## Ingest NetFlow/IPFIX API\nFollowing is the supported API format for the NetFlow / IPFIX collector:\n"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be carefull since this is still used in multiple places such in:
https://github.com/netobserv/netobserv-ebpf-agent/blob/ba5c7c6ffaea7dfaaf61aeb3111bdd14a86c871d/e2e/ipfix/manifests/20-flp-transformer.yml#L53

collector:
port: 2055
portLegacy: 2056
hostName: 0.0.0.0

collector:
hostName: 0.0.0.0
port: 2055
portLegacy: 2056

collector:
hostName: localhost
port: 4739 # Use this for IPFIX / netflow v9
portLegacy: 2055 # Use this for legacy v5 netflow

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done (except the first which is a different thing, it's the conf-generator tool, which I didn't touch here)
Also added a warn log for deprecated API usage

IngestKafka IngestKafka `yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"`
IngestGRPCProto IngestGRPCProto `yaml:"grpc" doc:"## Ingest GRPC from Network Observability eBPF Agent\nFollowing is the supported API format for the Network Observability eBPF ingest:\n"`
IngestStdin IngestStdin `yaml:"stdin" doc:"## Ingest Standard Input\nFollowing is the supported API format for the standard input ingest:\n"`
Expand Down
27 changes: 0 additions & 27 deletions pkg/api/ingest_collector.go

This file was deleted.

64 changes: 64 additions & 0 deletions pkg/api/ingest_ipfix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (C) 2022 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package api

import (
"fmt"

"github.com/netsampler/goflow2/producer"
)

type IngestIpfix struct {
HostName string `yaml:"hostName,omitempty" json:"hostName,omitempty" doc:"the hostname to listen on; defaults to 0.0.0.0"`
Port uint `yaml:"port,omitempty" json:"port,omitempty" doc:"the port number to listen on, for IPFIX/NetFlow v9. Omit or set to 0 to disable IPFIX/NetFlow v9 ingestion. If both port and portLegacy are omitted, defaults to 2055"`
PortLegacy uint `yaml:"portLegacy,omitempty" json:"portLegacy,omitempty" doc:"the port number to listen on, for legacy NetFlow v5. Omit or set to 0 to disable NetFlow v5 ingestion"`
Workers uint `yaml:"workers,omitempty" json:"workers,omitempty" doc:"the number of netflow/ipfix decoding workers"`
Sockets uint `yaml:"sockets,omitempty" json:"sockets,omitempty" doc:"the number of listening sockets"`
Mapping []producer.NetFlowMapField `yaml:"mapping,omitempty" json:"mapping,omitempty" doc:"custom field mapping"`
}

func (i *IngestIpfix) SetDefaults() {
if i.HostName == "" {
i.HostName = "0.0.0.0"
}
if i.Port == 0 && i.PortLegacy == 0 {
i.Port = 2055
}
if i.Workers == 0 {
i.Workers = 1
}
if i.Sockets == 0 {
i.Sockets = 1
}
}

func (i *IngestIpfix) String() string {
hasMapping := "no"
if len(i.Mapping) > 0 {
hasMapping = "yes"
}
return fmt.Sprintf(
"hostname=%s, port=%d, portLegacy=%d, workers=%d, sockets=%d, mapping=%s",
i.HostName,
i.Port,
i.PortLegacy,
i.Workers,
i.Sockets,
hasMapping,
)
}
13 changes: 9 additions & 4 deletions pkg/api/write_ipfix.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@ package api

import (
"errors"
"time"
)

type WriteIpfix struct {
TargetHost string `yaml:"targetHost,omitempty" json:"targetHost,omitempty" doc:"IPFIX Collector host target IP"`
TargetPort int `yaml:"targetPort,omitempty" json:"targetPort,omitempty" doc:"IPFIX Collector host target port"`
Transport string `yaml:"transport,omitempty" json:"transport,omitempty" doc:"Transport protocol (tcp/udp) to be used for the IPFIX connection"`
EnterpriseID int `yaml:"enterpriseId,omitempty" json:"EnterpriseId,omitempty" doc:"Enterprise ID for exporting transformations"`
TargetHost string `yaml:"targetHost,omitempty" json:"targetHost,omitempty" doc:"IPFIX Collector host target IP"`
TargetPort int `yaml:"targetPort,omitempty" json:"targetPort,omitempty" doc:"IPFIX Collector host target port"`
Transport string `yaml:"transport,omitempty" json:"transport,omitempty" doc:"Transport protocol (tcp/udp) to be used for the IPFIX connection"`
EnterpriseID int `yaml:"enterpriseId,omitempty" json:"enterpriseId,omitempty" doc:"Enterprise ID for exporting transformations"`
TplSendInterval Duration `yaml:"tplSendInterval,omitempty" json:"tplSendInterval,omitempty" doc:"Interval for resending templates to the collector (default: 1m)"`
}

func (w *WriteIpfix) SetDefaults() {
if w.Transport == "" {
w.Transport = "tcp"
}
if w.TplSendInterval.Duration == 0 {
w.TplSendInterval.Duration = time.Minute
}
}

func (w *WriteIpfix) Validate() error {
Expand Down
12 changes: 6 additions & 6 deletions pkg/confgen/confgen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ func Test_RunShortConfGen(t *testing.T) {
)

// Expects ingest
require.Equal(t, &api.IngestCollector{
require.Equal(t, &api.IngestIpfix{
HostName: "0.0.0.0",
Port: 2155,
PortLegacy: 2156,
}, out.Parameters[0].Ingest.Collector)
}, out.Parameters[0].Ingest.Ipfix)

// Expects transform network
require.Len(t, out.Parameters[1].Transform.Network.Rules, 1)
Expand Down Expand Up @@ -207,11 +207,11 @@ func Test_RunConfGenNoAgg(t *testing.T) {
)

// Expects ingest
require.Equal(t, &api.IngestCollector{
require.Equal(t, &api.IngestIpfix{
HostName: "0.0.0.0",
Port: 2155,
PortLegacy: 2156,
}, out.Parameters[0].Ingest.Collector)
}, out.Parameters[0].Ingest.Ipfix)

// Expects transform network
require.Len(t, out.Parameters[1].Transform.Network.Rules, 1)
Expand Down Expand Up @@ -295,11 +295,11 @@ func Test_RunLongConfGen(t *testing.T) {
)

// Expects ingest
require.Equal(t, &api.IngestCollector{
require.Equal(t, &api.IngestIpfix{
HostName: "0.0.0.0",
Port: 2155,
PortLegacy: 2156,
}, out.Parameters[0].Ingest.Collector)
}, out.Parameters[0].Ingest.Ipfix)

// Expects transform generic
require.Equal(t, api.ReplaceKeys, out.Parameters[1].Transform.Generic.Policy)
Expand Down
2 changes: 1 addition & 1 deletion pkg/confgen/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func expectedConfig() *Config {
},
},
Ingest: config.Ingest{
Collector: &api.IngestCollector{
Collector: &api.IngestIpfix{
Port: 8888,
},
},
Expand Down
3 changes: 2 additions & 1 deletion pkg/confgen/flowlogs2metrics_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,13 @@ func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() *config.ConfigFileStruct {
})
}

// Deprecated: old function that only manages IPFIX ingestion
func (cg *ConfGen) GenerateTruncatedConfig() []config.StageParam {
parameters := make([]config.StageParam, len(cg.opts.GenerateStages))
for i, stage := range cg.opts.GenerateStages {
switch stage {
case "ingest":
parameters[i] = config.NewCollectorParams("ingest_collector", *cg.config.Ingest.Collector)
parameters[i] = config.NewIPFIXParams("ingest_collector", *cg.config.Ingest.Collector)
case "transform_generic":
parameters[i] = config.NewTransformGenericParams("transform_generic", *cg.config.Transform.Generic)
case "transform_network":
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ type StageParam struct {
type Ingest struct {
Type string `yaml:"type" json:"type"`
File *File `yaml:"file,omitempty" json:"file,omitempty"`
Collector *api.IngestCollector `yaml:"collector,omitempty" json:"collector,omitempty"`
Collector *api.IngestIpfix `yaml:"collector,omitempty" json:"collector,omitempty"` // deprecated: use 'ipfix' instead
Ipfix *api.IngestIpfix `yaml:"ipfix,omitempty" json:"ipfix,omitempty"`
Kafka *api.IngestKafka `yaml:"kafka,omitempty" json:"kafka,omitempty"`
GRPC *api.IngestGRPCProto `yaml:"grpc,omitempty" json:"grpc,omitempty"`
Synthetic *api.IngestSynthetic `yaml:"synthetic,omitempty" json:"synthetic,omitempty"`
Expand Down
Loading