Skip to content

Commit 5912edf

Browse files
authored
NETOBSERV-1208 & NETOBSERV-1233 Aggregators skip missing fields (#470)
* report missing aggregator option * filter toGenericMap instead of aggregator * skip bytes / packets zeros
1 parent 31e9d82 commit 5912edf

File tree

7 files changed

+134
-40
lines changed

7 files changed

+134
-40
lines changed

docs/api.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ Following is the supported API format for specifying connection tracking:
241241
last: last
242242
splitAB: When true, 2 output fields will be created. One for A->B and one for B->A flows.
243243
input: The input field to base the operation on. When omitted, 'name' is used
244+
reportMissing: When true, missing input will produce MissingFieldError metric and error logs
244245
scheduling: list of timeouts and intervals to apply per selector
245246
selector: key-value map to match against connection fields to apply this scheduling
246247
endConnectionTimeout: duration of time to wait from the last flow log to end a connection

pkg/api/conntrack.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,11 @@ type ConnTrackHash struct {
7070
}
7171

7272
type OutputField struct {
73-
Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"output field name"`
74-
Operation string `yaml:"operation,omitempty" json:"operation,omitempty" enum:"ConnTrackOperationEnum" doc:"aggregate operation on the field value"`
75-
SplitAB bool `yaml:"splitAB,omitempty" json:"splitAB,omitempty" doc:"When true, 2 output fields will be created. One for A->B and one for B->A flows."`
76-
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"The input field to base the operation on. When omitted, 'name' is used"`
73+
Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"output field name"`
74+
Operation string `yaml:"operation,omitempty" json:"operation,omitempty" enum:"ConnTrackOperationEnum" doc:"aggregate operation on the field value"`
75+
SplitAB bool `yaml:"splitAB,omitempty" json:"splitAB,omitempty" doc:"When true, 2 output fields will be created. One for A->B and one for B->A flows."`
76+
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"The input field to base the operation on. When omitted, 'name' is used"`
77+
ReportMissing bool `yaml:"reportMissing,omitempty" json:"reportMissing,omitempty" doc:"When true, missing input will produce MissingFieldError metric and error logs"`
7778
}
7879

7980
type ConnTrackOperationEnum struct {

pkg/pipeline/decode/decode_protobuf.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,11 @@ func PBFlowToMap(flow *pbflow.Record) config.GenericMap {
4040
}
4141
out := config.GenericMap{
4242
"FlowDirection": int(flow.Direction.Number()),
43-
"Bytes": flow.Bytes,
4443
"SrcAddr": ipToStr(flow.Network.GetSrcAddr()),
4544
"DstAddr": ipToStr(flow.Network.GetDstAddr()),
4645
"SrcMac": macToStr(flow.DataLink.GetSrcMac()),
4746
"DstMac": macToStr(flow.DataLink.GetDstMac()),
4847
"Etype": flow.EthProtocol,
49-
"Packets": flow.Packets,
5048
"Duplicate": flow.Duplicate,
5149
"Proto": flow.Transport.GetProtocol(),
5250
"TimeFlowStartMs": flow.TimeFlowStart.AsTime().UnixMilli(),
@@ -56,6 +54,14 @@ func PBFlowToMap(flow *pbflow.Record) config.GenericMap {
5654
"AgentIP": ipToStr(flow.AgentIp),
5755
}
5856

57+
if flow.Bytes != 0 {
58+
out["Bytes"] = flow.Bytes
59+
}
60+
61+
if flow.Packets != 0 {
62+
out["Packets"] = flow.Packets
63+
}
64+
5965
proto := flow.Transport.GetProtocol()
6066
if proto == syscall.IPPROTO_ICMP || proto == syscall.IPPROTO_ICMPV6 {
6167
out["IcmpType"] = flow.GetIcmpType()

pkg/pipeline/extract/conntrack/aggregator.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,12 @@ type aggregator interface {
3737
}
3838

3939
type aggregateBase struct {
40-
inputField string
41-
outputField string
42-
splitAB bool
43-
initVal interface{}
44-
metrics *metricsType
40+
inputField string
41+
outputField string
42+
splitAB bool
43+
initVal interface{}
44+
metrics *metricsType
45+
reportMissing bool
4546
}
4647

4748
type aSum struct{ aggregateBase }
@@ -64,7 +65,7 @@ func newAggregator(of api.OutputField, metrics *metricsType) (aggregator, error)
6465
} else {
6566
inputField = of.Name
6667
}
67-
aggBase := aggregateBase{inputField: inputField, outputField: of.Name, splitAB: of.SplitAB, metrics: metrics}
68+
aggBase := aggregateBase{inputField: inputField, outputField: of.Name, splitAB: of.SplitAB, metrics: metrics, reportMissing: of.ReportMissing}
6869
var agg aggregator
6970
switch of.Operation {
7071
case api.ConnTrackOperationName("Sum"):
@@ -109,10 +110,15 @@ func (agg *aggregateBase) getOutputField(d direction) string {
109110
func (agg *aggregateBase) getInputFieldValue(flowLog config.GenericMap) (float64, error) {
110111
rawValue, ok := flowLog[agg.inputField]
111112
if !ok {
112-
if agg.metrics != nil {
113-
agg.metrics.aggregatorErrors.WithLabelValues("MissingFieldError", agg.inputField).Inc()
113+
// error only if explicitly specified as FLP skip empty fields by default to reduce storage size
114+
if agg.reportMissing {
115+
if agg.metrics != nil {
116+
agg.metrics.aggregatorErrors.WithLabelValues("MissingFieldError", agg.inputField).Inc()
117+
}
118+
return 0, fmt.Errorf("missing field %v", agg.inputField)
114119
}
115-
return 0, fmt.Errorf("missing field %v", agg.inputField)
120+
// fallback on 0 without error
121+
return 0, nil
116122
}
117123
floatValue, err := utils.ConvertToFloat64(rawValue)
118124
if err != nil {
@@ -185,5 +191,7 @@ func (cp *aFirst) update(conn connection, flowLog config.GenericMap, d direction
185191
}
186192

187193
func (cp *aLast) update(conn connection, flowLog config.GenericMap, d direction, isNew bool) {
188-
conn.updateAggValue(cp.outputField, flowLog[cp.inputField])
194+
if flowLog[cp.inputField] != nil {
195+
conn.updateAggValue(cp.outputField, flowLog[cp.inputField])
196+
}
189197
}

pkg/pipeline/extract/conntrack/aggregator_test.go

Lines changed: 82 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -65,52 +65,52 @@ func TestNewAggregator_Valid(t *testing.T) {
6565
{
6666
name: "Default SplitAB",
6767
outputField: api.OutputField{Name: "MyAgg", Operation: "sum"},
68-
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0), nil}},
68+
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0), nil, false}},
6969
},
7070
{
7171
name: "Default input",
7272
outputField: api.OutputField{Name: "MyAgg", Operation: "sum", SplitAB: true},
73-
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", true, float64(0), nil}},
73+
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", true, float64(0), nil, false}},
7474
},
7575
{
7676
name: "Custom input",
7777
outputField: api.OutputField{Name: "MyAgg", Operation: "sum", Input: "MyInput"},
78-
expected: &aSum{aggregateBase{"MyInput", "MyAgg", false, float64(0), nil}},
78+
expected: &aSum{aggregateBase{"MyInput", "MyAgg", false, float64(0), nil, false}},
7979
},
8080
{
81-
name: "OperationType sum",
82-
outputField: api.OutputField{Name: "MyAgg", Operation: "sum"},
83-
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0), nil}},
81+
name: "OperationType sum with errors",
82+
outputField: api.OutputField{Name: "MyAgg", Operation: "sum", ReportMissing: true},
83+
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0), nil, true}},
8484
},
8585
{
86-
name: "OperationType count",
87-
outputField: api.OutputField{Name: "MyAgg", Operation: "count"},
88-
expected: &aCount{aggregateBase{"MyAgg", "MyAgg", false, float64(0), nil}},
86+
name: "OperationType count with errors",
87+
outputField: api.OutputField{Name: "MyAgg", Operation: "count", ReportMissing: true},
88+
expected: &aCount{aggregateBase{"MyAgg", "MyAgg", false, float64(0), nil, true}},
8989
},
9090
{
9191
name: "OperationType max",
9292
outputField: api.OutputField{Name: "MyAgg", Operation: "max"},
93-
expected: &aMax{aggregateBase{"MyAgg", "MyAgg", false, -math.MaxFloat64, nil}},
93+
expected: &aMax{aggregateBase{"MyAgg", "MyAgg", false, -math.MaxFloat64, nil, false}},
9494
},
9595
{
9696
name: "OperationType min",
9797
outputField: api.OutputField{Name: "MyAgg", Operation: "min"},
98-
expected: &aMin{aggregateBase{"MyAgg", "MyAgg", false, math.MaxFloat64, nil}},
98+
expected: &aMin{aggregateBase{"MyAgg", "MyAgg", false, math.MaxFloat64, nil, false}},
9999
},
100100
{
101101
name: "Default first",
102102
outputField: api.OutputField{Name: "MyCp", Operation: "first"},
103-
expected: &aFirst{aggregateBase{"MyCp", "MyCp", false, nil, nil}},
103+
expected: &aFirst{aggregateBase{"MyCp", "MyCp", false, nil, nil, false}},
104104
},
105105
{
106106
name: "Custom input first",
107107
outputField: api.OutputField{Name: "MyCp", Operation: "first", Input: "MyInput"},
108-
expected: &aFirst{aggregateBase{"MyInput", "MyCp", false, nil, nil}},
108+
expected: &aFirst{aggregateBase{"MyInput", "MyCp", false, nil, nil, false}},
109109
},
110110
{
111111
name: "Default last",
112112
outputField: api.OutputField{Name: "MyCp", Operation: "last"},
113-
expected: &aLast{aggregateBase{"MyCp", "MyCp", false, nil, nil}},
113+
expected: &aLast{aggregateBase{"MyCp", "MyCp", false, nil, nil, false}},
114114
},
115115
}
116116

@@ -132,6 +132,7 @@ func TestAddField_and_Update(t *testing.T) {
132132
{Name: "maxFlowLogBytes", Operation: "max", Input: "Bytes"},
133133
{Name: "FirstFlowDirection", Operation: "first", Input: "FlowDirection"},
134134
{Name: "LastFlowDirection", Operation: "last", Input: "FlowDirection"},
135+
{Name: "PktDropLatestDropCause", Operation: "last", Input: "PktDropLatestDropCause"},
135136
}
136137
var aggs []aggregator
137138
for _, of := range ofs {
@@ -158,21 +159,67 @@ func TestAddField_and_Update(t *testing.T) {
158159
name: "flowLog 1",
159160
flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDirA, 100, 10, false),
160161
direction: dirAB,
161-
expected: map[string]interface{}{"Bytes_AB": float64(100), "Bytes_BA": float64(0), "Packets": float64(10), "maxFlowLogBytes": float64(100), "minFlowLogBytes": float64(100), "numFlowLogs": float64(1), "FirstFlowDirection": 0, "LastFlowDirection": 0},
162+
expected: map[string]interface{}{
163+
"Bytes_AB": float64(100),
164+
"Bytes_BA": float64(0),
165+
"Packets": float64(10),
166+
"maxFlowLogBytes": float64(100),
167+
"minFlowLogBytes": float64(100),
168+
"numFlowLogs": float64(1),
169+
"FirstFlowDirection": 0,
170+
"LastFlowDirection": 0,
171+
"PktDropLatestDropCause": nil,
172+
},
162173
},
163174
{
164175
name: "flowLog 2",
165-
flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDirB, 200, 20, false),
176+
flowLog: config.GenericMap{"SrcAddr": ipA, "DstAddr": ipB, "Bytes": 100, "FlowDirection": flowDirA, "PktDropLatestDropCause": "SKB_DROP_REASON_NO_SOCKET"},
177+
direction: dirAB,
178+
expected: map[string]interface{}{
179+
"Bytes_AB": float64(200), // updated bytes count
180+
"Bytes_BA": float64(0),
181+
"Packets": float64(10),
182+
"maxFlowLogBytes": float64(100),
183+
"minFlowLogBytes": float64(100),
184+
"numFlowLogs": float64(2), // updated flow count
185+
"FirstFlowDirection": 0,
186+
"LastFlowDirection": 0,
187+
"PktDropLatestDropCause": "SKB_DROP_REASON_NO_SOCKET", // added drop cause
188+
},
189+
},
190+
{
191+
name: "flowLog 3",
192+
flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDirB, 300, 20, false),
166193
direction: dirBA,
167-
expected: map[string]interface{}{"Bytes_AB": float64(100), "Bytes_BA": float64(200), "Packets": float64(30), "maxFlowLogBytes": float64(200), "minFlowLogBytes": float64(100), "numFlowLogs": float64(2), "FirstFlowDirection": 0, "LastFlowDirection": 1},
194+
expected: map[string]interface{}{
195+
"Bytes_AB": float64(200),
196+
"Bytes_BA": float64(300), // updated reverse direction byte count
197+
"Packets": float64(30),
198+
"maxFlowLogBytes": float64(300), // updated max bytes from any direction
199+
"minFlowLogBytes": float64(100),
200+
"numFlowLogs": float64(3), // updated count
201+
"FirstFlowDirection": 0,
202+
"LastFlowDirection": 1,
203+
"PktDropLatestDropCause": "SKB_DROP_REASON_NO_SOCKET", // missing field is kept to its last available value
204+
},
168205
},
169206
}
170207

171208
conn := NewConnBuilder(nil).Build()
172209
for _, agg := range aggs {
173210
agg.addField(conn)
174211
}
175-
expectedInits := map[string]interface{}{"Bytes_AB": float64(0), "Bytes_BA": float64(0), "Packets": float64(0), "maxFlowLogBytes": float64(-math.MaxFloat64), "minFlowLogBytes": float64(math.MaxFloat64), "numFlowLogs": float64(0), "FirstFlowDirection": nil, "LastFlowDirection": nil}
212+
expectedInits := map[string]interface{}{
213+
"Bytes_AB": float64(0),
214+
"Bytes_BA": float64(0),
215+
"Packets": float64(0),
216+
"maxFlowLogBytes": float64(-math.MaxFloat64),
217+
"minFlowLogBytes": float64(math.MaxFloat64),
218+
"numFlowLogs": float64(0),
219+
"FirstFlowDirection": nil,
220+
"LastFlowDirection": nil,
221+
"PktDropLatestDropCause": nil,
222+
}
176223
require.Equal(t, expectedInits, conn.(*connType).aggFields)
177224

178225
for i, test := range table {
@@ -188,7 +235,7 @@ func TestAddField_and_Update(t *testing.T) {
188235
func TestMissingFieldError(t *testing.T) {
189236
test.ResetPromRegistry()
190237
metrics := newMetrics(opMetrics)
191-
agg, err := newAggregator(api.OutputField{Name: "Bytes", Operation: "sum", SplitAB: true}, metrics)
238+
agg, err := newAggregator(api.OutputField{Name: "Bytes", Operation: "sum", SplitAB: true, ReportMissing: true}, metrics)
192239
require.NoError(t, err)
193240

194241
conn := NewConnBuilder(metrics).Build()
@@ -201,6 +248,22 @@ func TestMissingFieldError(t *testing.T) {
201248
require.Contains(t, exposed, `conntrack_aggregator_errors{error="MissingFieldError",field="Bytes"} 1`)
202249
}
203250

251+
func TestSkipMissingFieldError(t *testing.T) {
252+
test.ResetPromRegistry()
253+
metrics := newMetrics(opMetrics)
254+
agg, err := newAggregator(api.OutputField{Name: "Bytes", Operation: "sum", SplitAB: true}, metrics)
255+
require.NoError(t, err)
256+
257+
conn := NewConnBuilder(metrics).Build()
258+
agg.addField(conn)
259+
260+
flowLog := config.GenericMap{}
261+
agg.update(conn, flowLog, dirAB, true)
262+
263+
exposed := test.ReadExposedMetrics(t)
264+
require.NotContains(t, exposed, `conntrack_aggregator_errors{error="MissingFieldError",field="Bytes"}`)
265+
}
266+
204267
func TestFloat64ConversionError(t *testing.T) {
205268
test.ResetPromRegistry()
206269
metrics := newMetrics(opMetrics)

pkg/pipeline/extract/conntrack/conn.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package conntrack
1919

2020
import (
2121
"fmt"
22+
"reflect"
2223
"time"
2324

2425
"github.com/netobserv/flowlogs-pipeline/pkg/utils"
@@ -100,7 +101,9 @@ func (c *connType) getNextHeartbeatTime() time.Time {
100101
func (c *connType) toGenericMap() config.GenericMap {
101102
gm := config.GenericMap{}
102103
for k, v := range c.aggFields {
103-
gm[k] = v
104+
if v != nil && (reflect.TypeOf(v).Kind() != reflect.Float64 || v.(float64) != 0) {
105+
gm[k] = v
106+
}
104107
}
105108

106109
// In case of a conflict between the keys and the aggFields / cpFields, the keys should prevail.

pkg/pipeline/extract/conntrack/utils_test.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,26 @@ func newMockRecordConnAB(srcIP string, srcPort int, dstIP string, dstPort int, p
5959
"DstAddr": dstIP,
6060
"DstPort": dstPort,
6161
"Proto": protocol,
62-
"Bytes_AB": bytesAB,
63-
"Bytes_BA": bytesBA,
64-
"Packets_AB": packetsAB,
65-
"Packets_BA": packetsBA,
6662
"numFlowLogs": numFlowLogs,
6763
api.IsFirstFieldName: false,
6864
},
6965
}
66+
67+
if bytesAB != 0 {
68+
mock.record["Bytes_AB"] = bytesAB
69+
}
70+
71+
if bytesBA != 0 {
72+
mock.record["Bytes_BA"] = bytesBA
73+
}
74+
75+
if bytesAB != 0 {
76+
mock.record["Packets_AB"] = packetsAB
77+
}
78+
79+
if bytesBA != 0 {
80+
mock.record["Packets_BA"] = packetsBA
81+
}
7082
return mock
7183
}
7284

0 commit comments

Comments
 (0)