Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

[![Project Status](https://opensource.box.com/badges/active.svg)](https://opensource.box.com/badges)
[![Build Status](https://travis-ci.com/box/kube-iptables-tailer.svg?token=xQMR2mqCqLKhWA2AL639&branch=master)](https://travis-ci.com/box/kube-iptables-tailer)
[![Go Report Card](https://goreportcard.com/badge/github.com/box/kube-iptables-tailer)](https://goreportcard.com/report/github.com/box/kube-iptables-tailer)

kube-iptables-tailer is a service that gives you better visibility on networking issues in your Kubernetes cluster by detecting the traffic denied by iptables and surfacing corresponding information to the affected Pods via Kubernetes events.

Expand Down
13 changes: 6 additions & 7 deletions drop/parser.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package drop

import (
"errors"
"fmt"
"github.com/box/kube-iptables-tailer/util"
"github.com/golang/glog"
Expand All @@ -24,7 +23,7 @@ type PacketDrop struct {

var fieldCount = reflect.ValueOf(PacketDrop{}).NumField()

// Check if PacketDrop is expired
// IsExpired returns true if PacketDrop is expired
func (pd PacketDrop) IsExpired() bool {
logTime, err := pd.GetLogTime()
if err != nil {
Expand All @@ -37,12 +36,12 @@ func (pd PacketDrop) IsExpired() bool {
return curTime.Sub(logTime).Minutes() > expiredMinutes
}

// Get the time object of PacketDrop log time
// GetLogTime returns the time object of PacketDrop log time
func (pd PacketDrop) GetLogTime() (time.Time, error) {
return time.Parse(PacketDropLogTimeLayout, pd.LogTime)
}

// Parse the logs from given channel and insert objects of PacketDrop as parsing result to another channel
// RunParsing parses the logs from given channel and inserts objects of PacketDrop as parsing result to another channel
func RunParsing(logPrefix string, logChangeCh <-chan string, packetDropCh chan<- PacketDrop) {
for log := range logChangeCh {
parseErr := parse(logPrefix, log, packetDropCh)
Expand Down Expand Up @@ -117,7 +116,7 @@ func getPacketDropLogFields(packetDropLog string) ([]string, error) {
logFields := strings.Fields(packetDropLog)
// check if the logFields contain enough information about a packet drop
if len(logFields) < fieldCount {
return []string{}, errors.New(fmt.Sprintf("Invalid packet drop: log=%+v", packetDropLog))
return []string{}, fmt.Errorf("error invalid packet drop: log=%+v", packetDropLog)
}
return logFields, nil
}
Expand All @@ -128,11 +127,11 @@ func getFieldValue(logFields []string, fieldName string) (string, error) {
if strings.HasPrefix(field, fieldName) {
fieldStrs := strings.Split(field, "=")
if len(fieldStrs) < 2 {
return "", errors.New(fmt.Sprintf("Missing value: field=%+v", fieldName))
return "", fmt.Errorf("error missing value: field=%+v", fieldName)
}
return fieldStrs[1], nil

}
}
return "", errors.New(fmt.Sprintf("Missing field=%+v", fieldName))
return "", fmt.Errorf("error missing field=%+v", fieldName)
}
7 changes: 3 additions & 4 deletions drop/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package drop

import (
"bufio"
"errors"
"fmt"
"github.com/golang/glog"
"io"
Expand All @@ -20,7 +19,7 @@ type Watcher struct {
curFingerprint string
}

// Init a watcher object and return its pointer
// InitWatcher inits a watcher object and return its pointer
func InitWatcher(watchFileName string, watchInterval time.Duration) *Watcher {
watcher := Watcher{watchFileName: watchFileName, watchInterval: watchInterval}
return &watcher
Expand Down Expand Up @@ -84,10 +83,10 @@ func (watcher *Watcher) checkRotation(input io.Reader) error {
sizeRead, err := input.Read(bytes)
// check sizeRead before err according to documentation of Reader.Read()
if sizeRead < fingerprintSize {
return errors.New(fmt.Sprint("Error getting fingerprint, insufficient content."))
return fmt.Errorf("error getting fingerprint due to insufficient content, size read:%v", sizeRead)
}
if err != nil {
return errors.New(fmt.Sprintf("Error checking rotation: error=%+v", err.Error()))
return fmt.Errorf("error checking rotation: error=%+v", err.Error())
}
fingerprint := string(bytes[:])
if fingerprint != watcher.curFingerprint {
Expand Down
12 changes: 7 additions & 5 deletions event/locator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package event
import (
"bytes"
"context"
"errors"
"fmt"
"github.com/box/kube-iptables-tailer/util"
"github.com/golang/glog"
Expand All @@ -20,10 +19,12 @@ type DnsResolver interface {
LookupAddr(context context.Context, ip string) (names []string, err error)
}

// NodeGetter has an method to get a worker node in Kubernetes
type NodeGetter interface {
Get(name string, options metav1.GetOptions) (*v1.Node, error)
}

// TrafficDirection is used while generating message for Kubernetes events
type TrafficDirection int

const (
Expand All @@ -44,6 +45,7 @@ func (td TrafficDirection) String() string {

const indexerName = "podIp"

// Locator has methods to work with locating Pods in Kubernetes
type Locator interface {
Run(stopCh <-chan struct{})
LocatePod(ip string) (*v1.Pod, error)
Expand All @@ -54,9 +56,7 @@ type PodLocator struct {
informer cache.SharedIndexInformer
}

/*
* Returns a locator that pulls pod data from the apiserver
*/
// NewApiServerPodLocator returns a locator that pulls pod data from the apiserver
func NewApiServerPodLocator(client *kubernetes.Clientset) (*PodLocator, error) {
listWatch := cache.NewListWatchFromClient(
client.CoreV1().RESTClient(), "pods", v1.NamespaceAll, fields.Everything())
Expand Down Expand Up @@ -97,6 +97,7 @@ func podIPIndexer() func(obj interface{}) ([]string, error) {
return indexFunc
}

// Run process of locating Pods
func (locator *PodLocator) Run(stopCh <-chan struct{}) {
go locator.informer.Run(stopCh)

Expand All @@ -106,10 +107,11 @@ func (locator *PodLocator) Run(stopCh <-chan struct{}) {
}
}

// LocatePod locates the pod by given IP address
func (locator *PodLocator) LocatePod(ip string) (*v1.Pod, error) {
items, err := locator.informer.GetIndexer().ByIndex(indexerName, ip)
if err != nil {
return nil, errors.New(fmt.Sprintf("Error looking up pod: ip=%v", ip))
return nil, fmt.Errorf("error looking up Pod: ip=%v", ip)
} else if len(items) > 0 {
if pod, ok := items[0].(*v1.Pod); ok {
return pod, nil
Expand Down
5 changes: 2 additions & 3 deletions event/poster.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package event

import (
"errors"
"fmt"
"github.com/box/kube-iptables-tailer/drop"
"github.com/box/kube-iptables-tailer/metrics"
Expand Down Expand Up @@ -29,7 +28,7 @@ type Poster struct {
locator Locator
}

// Init Poster and return its pointer
// InitPoster inits a Poster and return its pointer
func InitPoster() (*Poster, error) {
kubeClient, err := initKubeClient()
if err != nil {
Expand All @@ -45,7 +44,7 @@ func InitPoster() (*Poster, error) {

locator, err := NewApiServerPodLocator(kubeClient)
if err != nil {
return nil, errors.New(fmt.Sprintf("Error creating locator: %+v", err))
return nil, fmt.Errorf("error creating locator: %+v", err)
}

return &Poster{
Expand Down
6 changes: 3 additions & 3 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Metrics struct {
packetDropsCount *prometheus.CounterVec
}

// Return the singleton instance of metrics
// GetInstance returns the singleton instance of metrics
func GetInstance() *Metrics {
once.Do(initMetricsSingleton) // thread-safe way to construct the singleton instance
return instance
Expand All @@ -43,13 +43,13 @@ func initMetricsSingleton() {
instance = &Metrics{packetDropsCount: packetDropCountsVec, registry: r}
}

// Return the handler of metrics
// GetHandler returns the handler of metrics
func (m *Metrics) GetHandler() http.Handler {
// need to specify registry to avoid getting extra data sent in prometheus
return promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{})
}

// Update the metrics by given service name
// ProcessPacketDrop updates the metrics by given service name
func (m *Metrics) ProcessPacketDrop(src, dst string) {
m.packetDropsCount.With(prometheus.Labels{
"src": src,
Expand Down
12 changes: 3 additions & 9 deletions util/envar.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
DefaultWatchLogsIntervalSecond = 5
)

// GetRequiredEnvString returns string environment variable of the given key
func GetRequiredEnvString(key string) string {
val := os.Getenv(key)
if len(val) == 0 {
Expand All @@ -46,15 +47,7 @@ func GetRequiredEnvString(key string) string {
return val
}

func GetRequiredEnvInt(key string) int {
stringVal := GetRequiredEnvString(key)
intVal, err := strconv.Atoi(stringVal)
if err != nil {
log.Fatalf("Error converting environment variable %s to int: %v", stringVal, err)
}
return intVal
}

// GetRequiredEnvString returns integer environment variable of the given key or default value if key does not exist
func GetEnvIntOrDefault(key string, def int) int {
if env := os.Getenv(key); env != "" {
val, err := strconv.Atoi(env)
Expand All @@ -67,6 +60,7 @@ func GetEnvIntOrDefault(key string, def int) int {
return def
}

// GetEnvStringOrDefault returns string environment variable of the given key or default value if key does not exist
func GetEnvStringOrDefault(key string, def string) string {
if val := os.Getenv(key); len(val) > 0 {
return val
Expand Down
7 changes: 4 additions & 3 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ import (
"time"
)

// PrettyPrint returns the string in JSON format
func PrettyPrint(i interface{}) string {
s, err := json.MarshalIndent(i, "", " ")
if err != nil {
glog.Errorf("Error marshaling JSON: obj=%+v", i)
return fmt.Sprintf("%+v", i)
} else {
return string(s)
}
return string(s)

}

// Utility functions for packet drop testing
// GetExpiredTimeInString returns an expired time which is used in packet drop testing
func GetExpiredTimeInString(expirationMinutes int, timeFormat string) string {
duration := time.Duration(expirationMinutes) * time.Minute
// add one more minute than given expiration to make sure the time is expired
Expand Down