Skip to content

Commit e0c5e44

Browse files
authored
Fix Redis monitoring version (#109)
1 parent c880c51 commit e0c5e44

File tree

4 files changed

+164
-20
lines changed

4 files changed

+164
-20
lines changed

docker-compose.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,9 @@ services:
1717
- '8500:8500'
1818
- '8600:8600'
1919
- '8600:8600/udp'
20+
21+
redis:
22+
container_name: harvester_redis_dev
23+
image: redis:alpine
24+
ports:
25+
- '6379:6379'

monitor/redis/watcher.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package redis
33

44
import (
55
"context"
6+
"crypto/md5"
7+
"encoding/hex"
68
"errors"
79
"time"
810

@@ -16,6 +18,8 @@ import (
1618
type Watcher struct {
1719
client redis.UniversalClient
1820
keys []string
21+
versions []uint64
22+
hashes []string
1923
pollInterval time.Duration
2024
}
2125

@@ -34,6 +38,8 @@ func New(client redis.UniversalClient, pollInterval time.Duration, keys []string
3438
return &Watcher{
3539
client: client,
3640
keys: keys,
41+
versions: make([]uint64, len(keys)),
42+
hashes: make([]string, len(keys)),
3743
pollInterval: pollInterval,
3844
}, nil
3945
}
@@ -77,8 +83,26 @@ func (w *Watcher) getValues(ctx context.Context, ch chan<- []*change.Change) {
7783
continue
7884
}
7985

80-
changes = append(changes, change.New(config.SourceRedis, key, values[i].(string), 0))
86+
value := values[i].(string)
87+
hash := w.hash(value)
88+
if hash == w.hashes[i] {
89+
continue
90+
}
91+
92+
w.versions[i]++
93+
w.hashes[i] = hash
94+
95+
changes = append(changes, change.New(config.SourceRedis, key, value, w.versions[i]))
96+
}
97+
98+
if len(changes) == 0 {
99+
return
81100
}
82101

83102
ch <- changes
84103
}
104+
105+
func (w *Watcher) hash(value string) string {
106+
hash := md5.Sum([]byte(value))
107+
return hex.EncodeToString(hash[:])
108+
}

monitor/redis/watcher_integration_test.go

Lines changed: 50 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
//go:build integration
12
// +build integration
23

34
package redis
@@ -8,6 +9,7 @@ import (
89
"time"
910

1011
"github.com/beatlabs/harvester/change"
12+
"github.com/beatlabs/harvester/config"
1113
"github.com/go-redis/redis/v8"
1214
"github.com/stretchr/testify/assert"
1315
"github.com/stretchr/testify/require"
@@ -27,40 +29,57 @@ func TestWatch(t *testing.T) {
2729
val3 = "value3"
2830
)
2931

30-
set(t, client, key1, val1)
31-
set(t, client, key2, val2)
32-
set(t, client, key3, val3)
3332
defer func() {
3433
del(t, client, key1)
3534
del(t, client, key2)
3635
del(t, client, key3)
3736
}()
3837

39-
ch := make(chan []*change.Change)
4038
w, err := New(client, 10*time.Millisecond, []string{key1, key2, key3})
4139
require.NoError(t, err)
4240
require.NotNil(t, w)
41+
4342
ctx, cnl := context.WithCancel(context.Background())
4443
defer cnl()
44+
45+
// Initial values, set even before watching - not all keys have a value
46+
set(t, client, key1, val1)
47+
set(t, client, key2, val1)
48+
49+
// Start watching
50+
ch := make(chan []*change.Change, 100)
51+
defer close(ch)
4552
err = w.Watch(ctx, ch)
4653
require.NoError(t, err)
4754

48-
for i := 0; i < 2; i++ {
49-
cc := <-ch
50-
for _, cng := range cc {
51-
switch cng.Key() {
52-
case key1:
53-
assert.Equal(t, val1, cng.Value())
54-
case key2:
55-
assert.Equal(t, val2, cng.Value())
56-
case key3:
57-
assert.Equal(t, val3, cng.Value())
58-
default:
59-
assert.Fail(t, "key invalid", cng.Key())
60-
}
61-
assert.True(t, cng.Version() == 0)
62-
}
55+
// First values update
56+
time.Sleep(1 * time.Second)
57+
set(t, client, key1, val1) // Same value
58+
set(t, client, key2, val2)
59+
set(t, client, key3, val1) // First value for this key
60+
61+
// Second values update
62+
time.Sleep(1 * time.Second)
63+
set(t, client, key1, val1) // Same value
64+
set(t, client, key2, val1) // Second value - same as the initial value
65+
set(t, client, key3, val3)
66+
67+
time.Sleep(1 * time.Second)
68+
69+
found := transformChangesToSlices(ch)
70+
expected := []*change.Change{
71+
// Initial values
72+
change.New(config.SourceRedis, key1, val1, 1),
73+
change.New(config.SourceRedis, key2, val1, 1),
74+
// First update
75+
change.New(config.SourceRedis, key2, val2, 2),
76+
change.New(config.SourceRedis, key3, val1, 1),
77+
// Second update
78+
change.New(config.SourceRedis, key2, val1, 3),
79+
change.New(config.SourceRedis, key3, val3, 2),
6380
}
81+
82+
assert.Equal(t, expected, found)
6483
}
6584

6685
func set(t *testing.T, client redis.UniversalClient, key string, val string) {
@@ -74,3 +93,15 @@ func del(t *testing.T, client redis.UniversalClient, key string) {
7493
require.NoError(t, err)
7594
require.Equal(t, int64(1), delResult)
7695
}
96+
97+
func transformChangesToSlices(ch chan []*change.Change) []*change.Change {
98+
changes := make([]*change.Change, 0)
99+
for {
100+
select {
101+
case cc := <-ch:
102+
changes = append(changes, cc...)
103+
default:
104+
return changes
105+
}
106+
}
107+
}

monitor/redis/watcher_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ package redis
22

33
import (
44
"context"
5+
"sync"
56
"testing"
67
"time"
78

89
"github.com/beatlabs/harvester/change"
10+
"github.com/beatlabs/harvester/config"
911
"github.com/go-redis/redis/v8"
1012
"github.com/stretchr/testify/assert"
1113
"github.com/stretchr/testify/require"
@@ -65,3 +67,84 @@ func TestWatcher_Watch(t *testing.T) {
6567
})
6668
}
6769
}
70+
71+
func TestWatcher_Versioning(t *testing.T) {
72+
client := (&clientStub{t: t}).
73+
WithValues("val1.1", "val2.1", "val3.1"). // Initial values
74+
WithValues("val1.1", "val2.2", "val3.2"). // Only keys 2 and 3 are updated
75+
WithValues("val1.1", "val2.1", "val3.2") // Only 2 is updated, to its previous value
76+
77+
expected := [][]*change.Change{
78+
{
79+
change.New(config.SourceRedis, "key1", "val1.1", 1),
80+
change.New(config.SourceRedis, "key2", "val2.1", 1),
81+
change.New(config.SourceRedis, "key3", "val3.1", 1),
82+
},
83+
{
84+
change.New(config.SourceRedis, "key2", "val2.2", 2),
85+
change.New(config.SourceRedis, "key3", "val3.2", 2),
86+
},
87+
{
88+
change.New(config.SourceRedis, "key2", "val2.1", 3),
89+
},
90+
}
91+
92+
w, err := New(client, 1*time.Millisecond, []string{"key1", "key2", "key3"})
93+
require.NoError(t, err)
94+
assert.Equal(t, []uint64{0, 0, 0}, w.versions)
95+
assert.Equal(t, []string{"", "", ""}, w.hashes)
96+
97+
ctx, cancel := context.WithCancel(context.Background())
98+
99+
ch := make(chan []*change.Change, 10)
100+
err = w.Watch(ctx, ch)
101+
assert.NoError(t, err)
102+
103+
time.Sleep(100 * time.Millisecond)
104+
105+
cancel()
106+
107+
found := make([][]*change.Change, 0)
108+
109+
wg := sync.WaitGroup{}
110+
wg.Add(1)
111+
112+
go func() {
113+
for {
114+
select {
115+
case cc := <-ch:
116+
if len(cc) == 0 {
117+
break
118+
}
119+
found = append(found, cc)
120+
default:
121+
wg.Done()
122+
return
123+
}
124+
}
125+
}()
126+
wg.Wait()
127+
128+
assert.Equal(t, expected, found)
129+
}
130+
131+
type clientStub struct {
132+
t *testing.T
133+
*redis.Client
134+
135+
cmds []*redis.SliceCmd
136+
}
137+
138+
func (c *clientStub) WithValues(values ...interface{}) *clientStub {
139+
c.cmds = append(c.cmds, redis.NewSliceResult(values, nil))
140+
return c
141+
}
142+
143+
func (c *clientStub) MGet(_ context.Context, keys ...string) *redis.SliceCmd {
144+
if len(c.cmds) == 0 {
145+
return redis.NewSliceResult(make([]interface{}, len(keys)), nil)
146+
}
147+
shifted := c.cmds[0]
148+
c.cmds = c.cmds[1:]
149+
return shifted
150+
}

0 commit comments

Comments
 (0)