Skip to content

Commit f6972d4

Browse files
authored
Added retry function to reconnect and retry after an unsuccessful flush to prevent the client to stay in a permanent disconnected state. (#2)
1 parent d590abc commit f6972d4

File tree

4 files changed

+40
-5
lines changed

4 files changed

+40
-5
lines changed

aggregator.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type Aggregator interface {
2020
SetInactive(string)
2121
Run(time.Duration, chan bool) Aggregator
2222
Flush() (int, error)
23+
Retry() (int, error)
2324
}
2425

2526
type aggregator struct {
@@ -33,6 +34,13 @@ func (a *aggregator) GetMetrics() map[string]Metric {
3334
return a.metrics
3435
}
3536

37+
// Retry tries to retry the flush of metrics in case something went wrong. If this
38+
// retry went wrong it won't try a third time.
39+
func (a *aggregator) Retry() (int, error) {
40+
a.client.Reconnect()
41+
return a.Flush()
42+
}
43+
3644
func (a *aggregator) getMetric(path string, defaultMetric Metric) Metric {
3745
metricPath := a.config.getMetricPath(path)
3846
if metric, exists := a.metrics[metricPath]; exists {
@@ -80,6 +88,9 @@ func (a *aggregator) run(period time.Duration, stopSendingMetrics chan bool) {
8088
case <-ticker.C:
8189
if _, err := a.Flush(); err != nil {
8290
log.Printf("Unable to send metrics: %s\n", err.Error())
91+
if _, err := a.Retry(); err != nil {
92+
log.Printf("Unable to send metrics after reconnecting neither: %s\n", err.Error())
93+
}
8394
}
8495
case <-stopSendingMetrics:
8596
return

aggregator_mock_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ type MockAggregator struct {
1414
MethodSetInactive func(*MockAggregator, string)
1515
MethodRun func(*MockAggregator, time.Duration, chan bool) Aggregator
1616
MethodFlush func(*MockAggregator) (int, error)
17+
MethodRetry func(*MockAggregator) (int, error)
1718
}
1819

1920
// AddSum is an implementation of Aggregator interface to be used with the mocking object.
@@ -76,3 +77,11 @@ func (m *MockAggregator) Flush() (int, error) {
7677
}
7778
return 0, nil
7879
}
80+
81+
// Retry is an implementation of Aggregator interface to be used with the mocking object.
82+
func (m *MockAggregator) Retry() (int, error) {
83+
if m.MethodRetry != nil {
84+
return m.MethodRetry(m)
85+
}
86+
return 0, nil
87+
}

aggregator_test.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ import (
44
"bytes"
55
"errors"
66
"fmt"
7-
. "github.com/onsi/ginkgo"
8-
. "github.com/onsi/gomega"
97
"strconv"
108
"sync"
119
"time"
10+
11+
. "github.com/onsi/ginkgo"
12+
. "github.com/onsi/gomega"
1213
)
1314

1415
var _ = Describe("graphite aggregator", func() {
@@ -48,7 +49,12 @@ var _ = Describe("graphite aggregator", func() {
4849

4950
BeforeEach(func() {
5051
client = &MockGraphite{
51-
Data: map[string]string{},
52+
Extra: map[string]interface{}{},
53+
Data: map[string]string{},
54+
MethodReconnect: func(m *MockGraphite) error {
55+
m.Extra["reconnected"] = "yes, it tried!"
56+
return nil
57+
},
5258
MethodSendBuffer: func(m *MockGraphite, buffer *bytes.Buffer) (int, error) {
5359
mutex.Lock()
5460
defer mutex.Unlock()
@@ -272,13 +278,21 @@ var _ = Describe("graphite aggregator", func() {
272278
agg.Run(2*time.Second, stop)
273279
agg.AddSum(failMetric, 15)
274280
time.Sleep(3 * time.Second)
275-
Expect(getFlushSent(client)).To(Equal(1))
281+
Expect(getFlushSent(client)).To(Equal(2))
276282
agg.AddSum(failMetric, 25)
277283
time.Sleep(2 * time.Second)
278-
Expect(getFlushSent(client)).To(Equal(2))
284+
Expect(getFlushSent(client)).To(Equal(4))
279285
metrics := agg.(*aggregator).GetMetrics()
280286
Expect(metrics[failMetric].Calculate()).To(Equal("40"))
281287
})
288+
289+
It("retries once if something went wrong", func() {
290+
agg.Run(2*time.Second, stop)
291+
agg.AddSum(failMetric, 15)
292+
time.Sleep(3 * time.Second)
293+
Expect(getFlushSent(client)).To(Equal(2))
294+
Expect(client.(*MockGraphite).Extra).To(HaveKey("reconnected"))
295+
})
282296
})
283297

284298
Context("uses client configuration", func() {

graphite_mock_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
// MockGraphite implements the interface Graphite
99
type MockGraphite struct {
10+
Extra map[string]interface{}
1011
Data map[string]string
1112
MethodSend func(*MockGraphite, string, string) (int, error)
1213
MethodSendBuffer func(*MockGraphite, *bytes.Buffer) (int, error)

0 commit comments

Comments
 (0)