Skip to content

Commit b62ee80

Browse files
authored
Merge pull request #58 from youzan/feature-compact-ttl
avoid receive raft logs if apply is slow to reduce the memory in raft storage
2 parents be9b967 + 38a4d95 commit b62ee80

File tree

15 files changed

+39
-1957
lines changed

15 files changed

+39
-1957
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ PREFIX=/usr/local
22
DESTDIR=
33
BINDIR=${PREFIX}/bin
44
PROJECT?=github.com/youzan/ZanRedisDB
5-
VERBINARY?= 0.7.2
5+
VERBINARY?= 0.8.0
66
COMMIT?=$(shell git rev-parse --short HEAD)
77
BUILD_TIME?=$(shell date '+%Y-%m-%d_%H:%M:%S-%Z')
88
GOFLAGS=-ldflags "-X ${PROJECT}/common.VerBinary=${VERBINARY} -X ${PROJECT}/common.Commit=${COMMIT} -X ${PROJECT}/common.BuildTime=${BUILD_TIME}"

common/type.go

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,6 @@ const (
9595
// do not need to care about the data expiration. Every node in the cluster should start the 'TTLChecker' of the storage system
9696
// with this policy.
9797
LocalDeletion ExpirationPolicy = iota
98-
99-
// ConsistencyDeletion indicates all the expired data should be deleted through Raft, the underlying storage system should
100-
// not delete any data and all the expired keys should be sent to the expired channel. Only the leader should starts
101-
// the 'TTLChecker' with this policy.
102-
ConsistencyDeletion
103-
10498
//
10599
PeriodicalRotation
106100

@@ -111,9 +105,8 @@ const (
111105
)
112106

113107
const (
114-
DefaultExpirationPolicy = "local_deletion"
115-
ConsistencyDeletionExpirationPolicy = "consistency_deletion"
116-
WaitCompactExpirationPolicy = "wait_compact"
108+
DefaultExpirationPolicy = "local_deletion"
109+
WaitCompactExpirationPolicy = "wait_compact"
117110
)
118111

119112
var (
@@ -125,8 +118,6 @@ func StringToExpirationPolicy(s string) (ExpirationPolicy, error) {
125118
switch s {
126119
case DefaultExpirationPolicy:
127120
return LocalDeletion, nil
128-
case ConsistencyDeletionExpirationPolicy:
129-
return ConsistencyDeletion, nil
130121
case WaitCompactExpirationPolicy:
131122
return WaitCompact, nil
132123
default:

node/keys_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func getTestKVNode(t *testing.T) (*KVNode, string, chan struct{}) {
5656
nsConf.Replicator = 1
5757
nsConf.RaftGroupConf.GroupID = 1000
5858
nsConf.RaftGroupConf.SeedNodes = append(nsConf.RaftGroupConf.SeedNodes, replica)
59-
nsConf.ExpirationPolicy = "consistency_deletion"
59+
nsConf.ExpirationPolicy = common.DefaultExpirationPolicy
6060

6161
mconf := &MachineConfig{
6262
BroadcastAddr: "127.0.0.1",

node/kvstore.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,6 @@ func (s *KVStore) Destroy() error {
8787
return nil
8888
}
8989

90-
func (s *KVStore) CheckExpiredData(buffer common.ExpiredDataBuffer, stop chan struct{}) error {
91-
if s.opts.EngType == rockredis.EngType {
92-
return s.RockDB.CheckExpiredData(buffer, stop)
93-
}
94-
return nil
95-
}
96-
9790
func (s *KVStore) LocalLookup(key []byte) ([]byte, error) {
9891
value, err := s.KVGet(key)
9992
return value, err

node/node.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,6 @@ type KVNode struct {
187187
commitC <-chan applyInfo
188188
appliedIndex uint64
189189
clusterInfo common.IClusterInfo
190-
expireHandler *ExpireHandler
191190
expirationPolicy common.ExpirationPolicy
192191
remoteSyncedStates *remoteSyncedStateMgr
193192
applyWait wait.WaitTime
@@ -255,7 +254,6 @@ func NewKVNode(kvopts *KVOptions, config *RaftConfig,
255254
}
256255

257256
s.clusterInfo = clusterInfo
258-
s.expireHandler = NewExpireHandler(s)
259257

260258
s.registerHandler()
261259

@@ -306,7 +304,6 @@ func (nd *KVNode) Start(standalone bool) error {
306304
nd.readIndexLoop()
307305
}()
308306

309-
nd.expireHandler.Start()
310307
return nil
311308
}
312309

@@ -324,7 +321,6 @@ func (nd *KVNode) Stop() {
324321
}
325322
defer close(nd.stopDone)
326323
close(nd.stopChan)
327-
nd.expireHandler.Stop()
328324
nd.wg.Wait()
329325
nd.rn.StopNode()
330326
nd.sm.Close()
@@ -1383,8 +1379,6 @@ func (nd *KVNode) ReportMeLeaderToCluster() {
13831379

13841380
// should not block long in this
13851381
func (nd *KVNode) OnRaftLeaderChanged() {
1386-
nd.expireHandler.LeaderChanged()
1387-
13881382
if nd.rn.IsLead() {
13891383
go nd.ReportMeLeaderToCluster()
13901384
}

node/raft.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -870,7 +870,21 @@ func (rc *raftNode) serveChannels() {
870870
return
871871
case <-rc.node.EventNotifyCh():
872872
moreEntriesToApply := cap(rc.commitC)-len(rc.commitC) > 3
873-
rd, hasUpdate := rc.node.StepNode(moreEntriesToApply, rc.IsBusySnapshot())
873+
// we should slow down raft logs receiving while applying is slow, otherwise we
874+
// may have too much logs in memory if the applying is slow.
875+
busy := rc.IsBusySnapshot()
876+
if !busy {
877+
// note: if the lastIndex and FirstIndex is slow, we should avoid call it in every step
878+
last, err := rc.raftStorage.LastIndex()
879+
if err == nil {
880+
fi, _ := rc.raftStorage.FirstIndex()
881+
fi = fi - 1
882+
if last > fi && last-fi >= uint64(rc.config.SnapCatchup+rc.config.SnapCount)*10 {
883+
busy = true
884+
}
885+
}
886+
}
887+
rd, hasUpdate := rc.node.StepNode(moreEntriesToApply, busy)
874888
if !hasUpdate {
875889
continue
876890
}

node/state_machine.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ type StateMachine interface {
5252
GetStats(table string) common.NamespaceStats
5353
Start() error
5454
Close()
55-
CheckExpiredData(buffer common.ExpiredDataBuffer, stop chan struct{}) error
5655
GetBatchOperator() IBatchOperator
5756
}
5857

@@ -356,10 +355,6 @@ func (kvsm *kvStoreSM) Destroy() {
356355
kvsm.store.Destroy()
357356
}
358357

359-
func (kvsm *kvStoreSM) CheckExpiredData(buffer common.ExpiredDataBuffer, stop chan struct{}) error {
360-
return kvsm.store.CheckExpiredData(buffer, stop)
361-
}
362-
363358
func (kvsm *kvStoreSM) UpdateSnapshotState(term uint64, index uint64) {
364359
if kvsm.store != nil {
365360
kvsm.store.SetLatestSnapIndex(index)

0 commit comments

Comments
 (0)