Skip to content

Commit 5895dcb

Browse files
authored
NETOBSERV-2307: NETOBSERV-2315: fix several IPFIX issues (#1019)
* NETOBSERV-2307: NETOBSERV-2315: fix several IPFIX issues - Fixed flows without ports that generated errors in logs, and were not exported. It shouldn't matter that ports are missing (e.g. ICMP) - More generally, any missing field won't trigger an error anymore - Some fields were missing: icmp type/code, tcp flags - Fix resending templates in case of collector being restarted * fix sending decoded tcpflags on ipfix * Use patched go-ipfix, add write+read test... The go-ipfix patch regularly recheck the udp connection, e.g. to account for restarted host with a different resolved IP ... also: - allow to provide data mapping for ingesting with goflow2 - rename API collector => ipfix - simplify ingest API with more default values - write ipfix: do not resend templates every second! Instead, use configurable periodicity * Add simple example of ipfix ingester * Warn about deprecated API usage * fix race
1 parent eec8261 commit 5895dcb

30 files changed

+690
-390
lines changed

contrib/kubernetes/flowlogs-pipeline.conf.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ pipeline:
2020
parameters:
2121
- name: ingest_collector
2222
ingest:
23-
type: collector
24-
collector:
23+
type: ipfix
24+
ipfix:
2525
hostName: 0.0.0.0
2626
port: 2055
2727
portLegacy: 2056
Lines changed: 41 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,45 @@
1-
apiVersion: apps/v1
2-
kind: Deployment
1+
apiVersion: v1
2+
kind: Pod
33
metadata:
4-
name: flowlogs-pipeline
4+
name: flp-ipfix-stdout
55
labels:
6-
app: flowlogs-pipeline
6+
app: flp-ipfix-stdout
77
spec:
8-
replicas: 1
9-
selector:
10-
matchLabels:
11-
app: flowlogs-pipeline
12-
template:
13-
metadata:
14-
labels:
15-
app: flowlogs-pipeline
16-
spec:
17-
containers:
18-
- name: flowlogs-pipeline
19-
image: quay.io/netobserv/flowlogs-pipeline:main
20-
args:
21-
- "--config=/etc/flowlogs-pipeline/config.yaml"
22-
ports:
23-
- containerPort: 2055
24-
imagePullPolicy: IfNotPresent
25-
volumeMounts:
26-
- name: configuration
27-
mountPath: "/etc/flowlogs-pipeline/"
28-
volumes:
8+
containers:
9+
- name: flp-ipfix-stdout
10+
image: quay.io/jotak/flowlogs-pipeline:main
11+
imagePullPolicy: Always
12+
args:
13+
- "--config=/etc/flowlogs-pipeline/config.yaml"
14+
ports:
15+
- containerPort: 2055
16+
volumeMounts:
2917
- name: configuration
30-
configMap:
31-
name: flp-config
18+
mountPath: "/etc/flowlogs-pipeline/"
19+
volumes:
20+
- name: configuration
21+
configMap:
22+
name: flp-ipfix-stdout-config
3223
---
3324
apiVersion: v1
3425
kind: Service
3526
metadata:
36-
name: flowlogs-pipeline
27+
name: flp-ipfix-stdout
3728
labels:
38-
app: flowlogs-pipeline
29+
app: flp-ipfix-stdout
3930
spec:
4031
ports:
4132
- port: 2055
4233
targetPort: 2055
4334
protocol: UDP
4435
name: ipfix
4536
selector:
46-
app: flowlogs-pipeline
37+
app: flp-ipfix-stdout
4738
---
4839
apiVersion: v1
4940
kind: ConfigMap
5041
metadata:
51-
name: flp-config
42+
name: flp-ipfix-stdout-config
5243
data:
5344
config.yaml: |
5445
log-level: info
@@ -59,10 +50,26 @@ data:
5950
parameters:
6051
- name: ingest
6152
ingest:
62-
type: collector
63-
collector:
64-
hostName: 0.0.0.0
53+
type: ipfix
54+
ipfix:
6555
port: 2055
56+
mapping:
57+
- penprovided: true
58+
pen: 2
59+
field: 7733
60+
destination: CustomBytes_1
61+
- penprovided: true
62+
pen: 2
63+
field: 7734
64+
destination: CustomBytes_2
65+
- penprovided: true
66+
pen: 2
67+
field: 7735
68+
destination: CustomBytes_3
69+
- penprovided: true
70+
pen: 2
71+
field: 7736
72+
destination: CustomBytes_4
6673
- name: write
6774
write:
6875
type: stdout
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Use it with:
2+
# ./flowlogs-pipeline --config=contrib/local/ipfix-collector-stdout.yaml
3+
log-level: info
4+
pipeline:
5+
- name: ingest
6+
- name: write
7+
follows: ingest
8+
parameters:
9+
- name: ingest
10+
ingest:
11+
type: ipfix
12+
ipfix:
13+
port: 2055
14+
- name: write
15+
write:
16+
type: stdout

docs/api.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,17 +81,17 @@ Following is the supported API format for S3 encode:
8181
secure: true for https, false for http (default: false)
8282
objectHeaderParameters: parameters to include in object header (key/value pairs)
8383
</pre>
84-
## Ingest collector API
84+
## Ingest NetFlow/IPFIX API
8585
Following is the supported API format for the NetFlow / IPFIX collector:
8686

8787
<pre>
88-
collector:
89-
hostName: the hostname to listen on
90-
port: the port number to listen on, for IPFIX/NetFlow v9. Omit or set to 0 to disable IPFIX/NetFlow v9 ingestion
88+
ipfix:
89+
hostName: the hostname to listen on; defaults to 0.0.0.0
90+
port: the port number to listen on, for IPFIX/NetFlow v9. Omit or set to 0 to disable IPFIX/NetFlow v9 ingestion. If both port and portLegacy are omitted, defaults to 2055
9191
portLegacy: the port number to listen on, for legacy NetFlow v5. Omit or set to 0 to disable NetFlow v5 ingestion
92-
batchMaxLen: the number of accumulated flows before being forwarded for processing
9392
workers: the number of netflow/ipfix decoding workers
9493
sockets: the number of listening sockets
94+
mapping: custom field mapping
9595
</pre>
9696
## Ingest Kafka API
9797
Following is the supported API format for the kafka ingest:
@@ -340,6 +340,7 @@ Following is the supported API format for writing to an IPFIX collector:
340340
targetPort: IPFIX Collector host target port
341341
transport: Transport protocol (tcp/udp) to be used for the IPFIX connection
342342
enterpriseId: Enterprise ID for exporting transformations
343+
tplSendInterval: Interval for resending templates to the collector (default: 1m)
343344
</pre>
344345
## Aggregate metrics API
345346
Following is the supported API format for specifying metrics aggregations:

go.mod

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,11 @@ require (
5151
sigs.k8s.io/e2e-framework v0.6.0
5252
)
5353

54-
require github.com/cenkalti/backoff/v5 v5.0.2 // indirect
55-
5654
require (
5755
github.com/beorn7/perks v1.0.1 // indirect
5856
github.com/blang/semver/v4 v4.0.0 // indirect
5957
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
58+
github.com/cenkalti/backoff/v5 v5.0.2 // indirect
6059
github.com/cenkalti/hub v1.0.2 // indirect
6160
github.com/cenkalti/rpc2 v0.0.0-20210604223624-c1acbc6ec984 // indirect
6261
github.com/cespare/xxhash/v2 v2.3.0 // indirect
@@ -170,3 +169,5 @@ require (
170169
sigs.k8s.io/structured-merge-diff/v4 v4.7.0 // indirect
171170
sigs.k8s.io/yaml v1.4.0 // indirect
172171
)
172+
173+
replace github.com/vmware/go-ipfix => github.com/jotak/go-ipfix v0.0.0-20250708115123-407c539ea101

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,8 @@ github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFF
180180
github.com/josharian/native v1.0.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
181181
github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA=
182182
github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
183+
github.com/jotak/go-ipfix v0.0.0-20250708115123-407c539ea101 h1:tpaHjydMAy2MTukKIUAVK4xIFUpL12xuexA0FuTVpuo=
184+
github.com/jotak/go-ipfix v0.0.0-20250708115123-407c539ea101/go.mod h1:GgFbcmEGqMQfA7jDC9UVLKAelNh2sy1jsxyV7Tor3Ig=
183185
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
184186
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
185187
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
@@ -380,8 +382,6 @@ github.com/vishvananda/netns v0.0.5 h1:DfiHV+j8bA32MFM7bfEunvT8IAqQ/NzSJHtcmW5zd
380382
github.com/vishvananda/netns v0.0.5/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM=
381383
github.com/vladimirvivien/gexe v0.5.0 h1:AWBVaYnrTsGYBktXvcO0DfWPeSiZxn6mnQ5nvL+A1/A=
382384
github.com/vladimirvivien/gexe v0.5.0/go.mod h1:3gjgTqE2c0VyHnU5UOIwk7gyNzZDGulPb/DJPgcw64E=
383-
github.com/vmware/go-ipfix v0.15.0 h1:F/3BjFoODvCHEHYk36jy3TGmQzJ7rF2bC7ZG+c/lng8=
384-
github.com/vmware/go-ipfix v0.15.0/go.mod h1:GgFbcmEGqMQfA7jDC9UVLKAelNh2sy1jsxyV7Tor3Ig=
385385
github.com/wlynxg/anet v0.0.3/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA=
386386
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
387387
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=

hack/examples/docker-ipfix-config.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ pipeline:
1919
parameters:
2020
- name: ingest
2121
ingest: # use nflow generator to simulate flows: ./nflow-generator -t localhost -p 2055
22-
type: collector
23-
collector:
22+
type: ipfix
23+
ipfix:
2424
hostName: localhost
2525
port: 4739 # Use this for IPFIX / netflow v9
2626
portLegacy: 2055 # Use this for legacy v5 netflow

pkg/api/api.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ const (
2222
FileLoopType = "file_loop"
2323
FileChunksType = "file_chunks"
2424
SyntheticType = "synthetic"
25-
CollectorType = "collector"
25+
CollectorType = "collector" // deprecated: use 'ipfix' instead
2626
StdinType = "stdin"
2727
GRPCType = "grpc"
2828
FakeType = "fake"
@@ -53,7 +53,7 @@ type API struct {
5353
PromEncode PromEncode `yaml:"prom" doc:"## Prometheus encode API\nFollowing is the supported API format for prometheus encode:\n"`
5454
KafkaEncode EncodeKafka `yaml:"kafka" doc:"## Kafka encode API\nFollowing is the supported API format for kafka encode:\n"`
5555
S3Encode EncodeS3 `yaml:"s3" doc:"## S3 encode API\nFollowing is the supported API format for S3 encode:\n"`
56-
IngestCollector IngestCollector `yaml:"collector" doc:"## Ingest collector API\nFollowing is the supported API format for the NetFlow / IPFIX collector:\n"`
56+
IngestIpfix IngestIpfix `yaml:"ipfix" doc:"## Ingest NetFlow/IPFIX API\nFollowing is the supported API format for the NetFlow / IPFIX collector:\n"`
5757
IngestKafka IngestKafka `yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"`
5858
IngestGRPCProto IngestGRPCProto `yaml:"grpc" doc:"## Ingest GRPC from Network Observability eBPF Agent\nFollowing is the supported API format for the Network Observability eBPF ingest:\n"`
5959
IngestStdin IngestStdin `yaml:"stdin" doc:"## Ingest Standard Input\nFollowing is the supported API format for the standard input ingest:\n"`

pkg/api/ingest_collector.go

Lines changed: 0 additions & 27 deletions
This file was deleted.

pkg/api/ingest_ipfix.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright (C) 2022 IBM, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package api
19+
20+
import (
21+
"fmt"
22+
23+
"github.com/netsampler/goflow2/producer"
24+
)
25+
26+
type IngestIpfix struct {
27+
HostName string `yaml:"hostName,omitempty" json:"hostName,omitempty" doc:"the hostname to listen on; defaults to 0.0.0.0"`
28+
Port uint `yaml:"port,omitempty" json:"port,omitempty" doc:"the port number to listen on, for IPFIX/NetFlow v9. Omit or set to 0 to disable IPFIX/NetFlow v9 ingestion. If both port and portLegacy are omitted, defaults to 2055"`
29+
PortLegacy uint `yaml:"portLegacy,omitempty" json:"portLegacy,omitempty" doc:"the port number to listen on, for legacy NetFlow v5. Omit or set to 0 to disable NetFlow v5 ingestion"`
30+
Workers uint `yaml:"workers,omitempty" json:"workers,omitempty" doc:"the number of netflow/ipfix decoding workers"`
31+
Sockets uint `yaml:"sockets,omitempty" json:"sockets,omitempty" doc:"the number of listening sockets"`
32+
Mapping []producer.NetFlowMapField `yaml:"mapping,omitempty" json:"mapping,omitempty" doc:"custom field mapping"`
33+
}
34+
35+
func (i *IngestIpfix) SetDefaults() {
36+
if i.HostName == "" {
37+
i.HostName = "0.0.0.0"
38+
}
39+
if i.Port == 0 && i.PortLegacy == 0 {
40+
i.Port = 2055
41+
}
42+
if i.Workers == 0 {
43+
i.Workers = 1
44+
}
45+
if i.Sockets == 0 {
46+
i.Sockets = 1
47+
}
48+
}
49+
50+
func (i *IngestIpfix) String() string {
51+
hasMapping := "no"
52+
if len(i.Mapping) > 0 {
53+
hasMapping = "yes"
54+
}
55+
return fmt.Sprintf(
56+
"hostname=%s, port=%d, portLegacy=%d, workers=%d, sockets=%d, mapping=%s",
57+
i.HostName,
58+
i.Port,
59+
i.PortLegacy,
60+
i.Workers,
61+
i.Sockets,
62+
hasMapping,
63+
)
64+
}

0 commit comments

Comments
 (0)