diff --git a/pkg/fileservice/fifocache/data_cache.go b/pkg/fileservice/fifocache/data_cache.go index a0aa508e015fb..917559c6845c2 100644 --- a/pkg/fileservice/fifocache/data_cache.go +++ b/pkg/fileservice/fifocache/data_cache.go @@ -45,6 +45,7 @@ func shardCacheKey(key fscache.CacheKey) uint64 { var hasher maphash.Hash hasher.SetSeed(seed) hasher.Write(util.UnsafeToBytes(&key.Offset)) + hasher.Write(util.UnsafeToBytes(&key.Sz)) hasher.WriteString(key.Path) return hasher.Sum64() } @@ -52,9 +53,7 @@ func shardCacheKey(key fscache.CacheKey) uint64 { var _ fscache.DataCache = new(DataCache) func (d *DataCache) Available() int64 { - d.fifo.queueLock.RLock() - defer d.fifo.queueLock.RUnlock() - ret := d.fifo.capacity() - d.fifo.used1 - d.fifo.used2 + ret := d.fifo.capacity() - d.fifo.Used() if ret < 0 { ret = 0 } @@ -66,24 +65,20 @@ func (d *DataCache) Capacity() int64 { } func (d *DataCache) DeletePaths(ctx context.Context, paths []string) { + deletes := make([]*_CacheItem[fscache.CacheKey, fscache.Data], 0, 10) for _, path := range paths { - for i := 0; i < len(d.fifo.shards); i++ { - d.deletePath(ctx, i, path) - } + + key := fscache.CacheKey{Path: path} + d.fifo.htab.CompareAndDelete(key, func(key1, key2 fscache.CacheKey) bool { + return key1.Path == key2.Path + }, func(value *_CacheItem[fscache.CacheKey, fscache.Data]) { + deletes = append(deletes, value) + }) } -} -func (d *DataCache) deletePath(ctx context.Context, shardIndex int, path string) { - shard := &d.fifo.shards[shardIndex] - shard.Lock() - defer shard.Unlock() - for key, item := range shard.values { - if key.Path == path { - delete(shard.values, key) - if d.fifo.postEvict != nil { - d.fifo.postEvict(ctx, item.key, item.value, item.size) - } - } + // FSCACHEDATA RELEASE + for _, item := range deletes { + item.MarkAsDeleted(ctx, d.fifo.postEvict) } } @@ -109,7 +104,5 @@ func (d *DataCache) Set(ctx context.Context, key query.CacheKey, value fscache.D } func (d *DataCache) Used() int64 { - d.fifo.queueLock.RLock() - defer d.fifo.queueLock.RUnlock() - return d.fifo.used1 + d.fifo.used2 + return d.fifo.Used() } diff --git a/pkg/fileservice/fifocache/data_cache_test.go b/pkg/fileservice/fifocache/data_cache_test.go index ac1692b729c68..4ddac5b25d506 100644 --- a/pkg/fileservice/fifocache/data_cache_test.go +++ b/pkg/fileservice/fifocache/data_cache_test.go @@ -63,7 +63,7 @@ func TestShardCacheKeyAllocs(t *testing.T) { Offset: 3, Path: strings.Repeat("abc", 42), } - if n := testing.AllocsPerRun(64, func() { + if n := testing.AllocsPerRun(64000, func() { shardCacheKey(key) }); n != 0 { t.Fatalf("should not allocate") diff --git a/pkg/fileservice/fifocache/fifo.go b/pkg/fileservice/fifocache/fifo.go index bb05456a1b199..c132433bc09cc 100644 --- a/pkg/fileservice/fifocache/fifo.go +++ b/pkg/fileservice/fifocache/fifo.go @@ -16,73 +16,148 @@ package fifocache import ( "context" - "runtime" "sync" "sync/atomic" - "golang.org/x/sys/cpu" - "github.com/matrixorigin/matrixone/pkg/fileservice/fscache" ) -const numShards = 256 - -// Cache implements an in-memory cache with FIFO-based eviction -// it's mostly like the S3-fifo, only without the ghost queue part +// Cache implements an in-memory cache with S3-FIFO-based eviction +// All postfn is very critical. They will increment and decrement the reference counter of the cache data and deallocate the memory when reference counter is 0. +// Make sure the postfn is protected by mutex from shardmap. type Cache[K comparable, V any] struct { - capacity fscache.CapacityFunc - capacity1 fscache.CapacityFunc - keyShardFunc func(K) uint64 + capacity fscache.CapacityFunc + capSmall fscache.CapacityFunc postSet func(ctx context.Context, key K, value V, size int64) postGet func(ctx context.Context, key K, value V, size int64) postEvict func(ctx context.Context, key K, value V, size int64) - shards [numShards]struct { - sync.Mutex - values map[K]*_CacheItem[K, V] - _ cpu.CacheLinePad - } - - itemQueue chan *_CacheItem[K, V] + htab *ShardMap[K, *_CacheItem[K, V]] - queueLock sync.RWMutex - used1 int64 - queue1 Queue[*_CacheItem[K, V]] - used2 int64 - queue2 Queue[*_CacheItem[K, V]] - - capacityCut atomic.Int64 + usedSmall atomic.Int64 + small Queue[*_CacheItem[K, V]] + usedMain atomic.Int64 + main Queue[*_CacheItem[K, V]] } type _CacheItem[K comparable, V any] struct { key K value V size int64 - count atomic.Int32 + + // mutex protect the deleted, freq and postFn + mu sync.Mutex + freq int8 + deleted bool // flag indicate item is already deleted by either hashtable or evict } -func (c *_CacheItem[K, V]) inc() { - for { - cur := c.count.Load() - if cur >= 3 { - return - } - if c.count.CompareAndSwap(cur, cur+1) { - return - } +// Thread-safe +func (c *_CacheItem[K, V]) Inc() { + c.mu.Lock() + defer c.mu.Unlock() + + if c.freq < 3 { + c.freq += 1 } } -func (c *_CacheItem[K, V]) dec() { - for { - cur := c.count.Load() - if cur <= 0 { - return - } - if c.count.CompareAndSwap(cur, cur-1) { - return - } +// Thread-safe +func (c *_CacheItem[K, V]) Dec() { + c.mu.Lock() + defer c.mu.Unlock() + + if c.freq > 0 { + c.freq -= 1 + } +} + +// Thread-safe +func (c *_CacheItem[K, V]) GetFreq() int8 { + c.mu.Lock() + defer c.mu.Unlock() + return c.freq +} + +// Thread-safe +func (c *_CacheItem[K, V]) IsDeleted() bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.deleted +} + +// Thread-safe +// first MarkAsDeleted will decrement the ref counter and call postfn and set deleted = true. +// After first call, MarkAsDeleted will do nothing. +func (c *_CacheItem[K, V]) MarkAsDeleted(ctx context.Context, fn func(ctx context.Context, key K, value V, size int64)) bool { + c.mu.Lock() + defer c.mu.Unlock() + + // check item is already deleted + if c.deleted { + // exit and return false which means no need to deallocate the memory + return false + } + + // set deleted = true + c.deleted = true + + // call postEvict before decrement the ref counter + if fn != nil { + fn(ctx, c.key, c.value, c.size) + } + + // decrement the ref counter + c.releaseValue() + return true +} + +// Thread-safe +func (c *_CacheItem[K, V]) PostFn(ctx context.Context, fn func(ctx context.Context, key K, value V, size int64)) { + if fn != nil { + c.mu.Lock() + defer c.mu.Unlock() + fn(ctx, c.key, c.value, c.size) + } +} + +// Thread-safe +func (c *_CacheItem[K, V]) Retain(ctx context.Context, fn func(ctx context.Context, key K, value V, size int64)) bool { + c.mu.Lock() + defer c.mu.Unlock() + + // first check item is already deleted + if c.deleted { + return false + } + + // if not deleted, increment ref counter to occupy the memory + c.retainValue() + + // value is safe to be accessed now and call postfn + if fn != nil { + fn(ctx, c.key, c.value, c.size) + } + + return true +} + +// INTERNAL: non-thread safe. +// if deleted = true, item value is already released by this Cache and is NOT valid to use it inside the Cache. +// if deleted = false, increment the reference counter of the value and it is safe to use now. +func (c *_CacheItem[K, V]) retainValue() { + cdata, ok := any(c.value).(fscache.Data) + if ok { + cdata.Retain() + } +} + +// INTERNAL: non-thread safe. +// decrement the reference counter +func (c *_CacheItem[K, V]) releaseValue() { + cdata, ok := any(c.value).(fscache.Data) + if ok { + cdata.Release() } } @@ -93,160 +168,92 @@ func New[K comparable, V any]( postGet func(ctx context.Context, key K, value V, size int64), postEvict func(ctx context.Context, key K, value V, size int64), ) *Cache[K, V] { + ret := &Cache[K, V]{ capacity: capacity, - capacity1: func() int64 { - return capacity() / 10 + capSmall: func() int64 { + cs := capacity() / 10 + if cs == 0 { + cs = 1 + } + return cs }, - itemQueue: make(chan *_CacheItem[K, V], runtime.GOMAXPROCS(0)*2), - queue1: *NewQueue[*_CacheItem[K, V]](), - queue2: *NewQueue[*_CacheItem[K, V]](), - keyShardFunc: keyShardFunc, - postSet: postSet, - postGet: postGet, - postEvict: postEvict, - } - for i := range ret.shards { - ret.shards[i].values = make(map[K]*_CacheItem[K, V], 1024) + small: *NewQueue[*_CacheItem[K, V]](), + main: *NewQueue[*_CacheItem[K, V]](), + postSet: postSet, + postGet: postGet, + postEvict: postEvict, + htab: NewShardMap[K, *_CacheItem[K, V]](keyShardFunc), } return ret } -func (c *Cache[K, V]) set(ctx context.Context, key K, value V, size int64) *_CacheItem[K, V] { - shard := &c.shards[c.keyShardFunc(key)%numShards] - shard.Lock() - defer shard.Unlock() - _, ok := shard.values[key] - if ok { - // existed - return nil - } +func (c *Cache[K, V]) Set(ctx context.Context, key K, value V, size int64) { item := &_CacheItem[K, V]{ key: key, value: value, size: size, } - shard.values[key] = item - if c.postSet != nil { - c.postSet(ctx, key, value, size) - } - return item -} + // FSCACHEDATA RETAIN + // increment the ref counter first no matter what to make sure the memory is occupied before hashtable.Set + item.Retain(ctx, nil) -func (c *Cache[K, V]) Set(ctx context.Context, key K, value V, size int64) { - if item := c.set(ctx, key, value, size); item != nil { - c.enqueue(item) - c.Evict(ctx, nil, 0) + ok := c.htab.Set(key, item, nil) + if !ok { + // existed + // FSCACHEDATA RELEASE + // decrement the ref counter if not set to release the resource + item.MarkAsDeleted(ctx, nil) + return } -} -func (c *Cache[K, V]) enqueue(item *_CacheItem[K, V]) { - if !c.queueLock.TryLock() { - // try put itemQueue - select { - case c.itemQueue <- item: - // let the queueLock holder do the job - return - default: - // block until get lock - c.queueLock.Lock() - defer c.queueLock.Unlock() - } - } else { - defer c.queueLock.Unlock() - } + // postSet + item.PostFn(ctx, c.postSet) + + // evict + c.evictAll(ctx, nil, 0) // enqueue - c.queue1.enqueue(item) - c.used1 += item.size - - // help enqueue - for { - select { - case item := <-c.itemQueue: - c.queue1.enqueue(item) - c.used1 += item.size - default: - return - } - } + c.small.enqueue(item) + c.usedSmall.Add(item.size) + } func (c *Cache[K, V]) Get(ctx context.Context, key K) (value V, ok bool) { - shard := &c.shards[c.keyShardFunc(key)%numShards] - shard.Lock() var item *_CacheItem[K, V] - item, ok = shard.values[key] + + item, ok = c.htab.Get(key, nil) if !ok { - shard.Unlock() return } - if c.postGet != nil { - c.postGet(ctx, item.key, item.value, item.size) + + // FSCACHEDATA RETAIN + ok = item.Retain(ctx, c.postGet) + if !ok { + return item.value, false } - shard.Unlock() - item.inc() + + // increment + item.Inc() + return item.value, true } func (c *Cache[K, V]) Delete(ctx context.Context, key K) { - shard := &c.shards[c.keyShardFunc(key)%numShards] - shard.Lock() - defer shard.Unlock() - item, ok := shard.values[key] - if !ok { - return - } - delete(shard.values, key) - if c.postEvict != nil { - c.postEvict(ctx, item.key, item.value, item.size) + item, ok := c.htab.GetAndDelete(key, nil) + + if ok { + // call Bytes.Release() to decrement the ref counter and protected by shardmap mutex. + // item.deleted makes sure postEvict only call once. + item.MarkAsDeleted(ctx, c.postEvict) } - // queues will be update in evict + } func (c *Cache[K, V]) Evict(ctx context.Context, done chan int64, capacityCut int64) { - if done == nil { - // can be async - if c.queueLock.TryLock() { - defer c.queueLock.Unlock() - } else { - if capacityCut > 0 { - // let the holder do more evict - c.capacityCut.Add(capacityCut) - } - return - } - - } else { - if cap(done) < 1 { - panic("should be buffered chan") - } - c.queueLock.Lock() - defer c.queueLock.Unlock() - } - - var target int64 - for { - globalCapacityCut := c.capacityCut.Swap(0) - target = c.capacity() - capacityCut - globalCapacityCut - if target < 0 { - target = 0 - } - if c.used1+c.used2 <= target { - break - } - target1 := c.capacity1() - capacityCut - globalCapacityCut - if target1 < 0 { - target1 = 0 - } - if c.used1 > target1 { - c.evict1(ctx) - } else { - c.evict2(ctx) - } - } + target := c.evictAll(ctx, done, capacityCut) if done != nil { done <- target } @@ -254,64 +261,97 @@ func (c *Cache[K, V]) Evict(ctx context.Context, done chan int64, capacityCut in // ForceEvict evicts n bytes despite capacity func (c *Cache[K, V]) ForceEvict(ctx context.Context, n int64) { - capacityCut := c.capacity() - c.used() + n + capacityCut := c.capacity() - c.Used() + n c.Evict(ctx, nil, capacityCut) } -func (c *Cache[K, V]) used() int64 { - c.queueLock.RLock() - defer c.queueLock.RUnlock() - return c.used1 + c.used2 +func (c *Cache[K, V]) Used() int64 { + return c.usedSmall.Load() + c.usedMain.Load() } -func (c *Cache[K, V]) evict1(ctx context.Context) { - // queue 1 - for { - item, ok := c.queue1.dequeue() +func (c *Cache[K, V]) evictAll(ctx context.Context, done chan int64, capacityCut int64) int64 { + var target int64 + target = c.capacity() - capacityCut - 1 + if target <= 0 { + target = 0 + } + targetSmall := c.capSmall() - capacityCut - 1 + if targetSmall <= 0 { + targetSmall = 0 + } + + usedsmall := c.usedSmall.Load() + usedmain := c.usedMain.Load() + + for usedmain+usedsmall > target { + if usedsmall > targetSmall { + c.evictSmall(ctx) + } else { + c.evictMain(ctx) + } + usedsmall = c.usedSmall.Load() + usedmain = c.usedMain.Load() + } + + return target + 1 +} + +func (c *Cache[K, V]) evictSmall(ctx context.Context) { + // small fifo + for c.usedSmall.Load() > 0 { + item, ok := c.small.dequeue() if !ok { // queue empty return } - if item.count.Load() > 1 { - // put queue2 - c.queue2.enqueue(item) - c.used1 -= item.size - c.used2 += item.size + + deleted := item.IsDeleted() + if deleted { + c.usedSmall.Add(-item.size) + return + } + + if item.GetFreq() > 1 { + // put main + c.main.enqueue(item) + c.usedSmall.Add(-item.size) + c.usedMain.Add(item.size) } else { // evict - c.deleteItem(ctx, item) - c.used1 -= item.size + c.htab.Remove(item.key) + c.usedSmall.Add(-item.size) + // mark item as deleted and item should not be accessed again + item.MarkAsDeleted(ctx, c.postEvict) return } } } -func (c *Cache[K, V]) deleteItem(ctx context.Context, item *_CacheItem[K, V]) { - shard := &c.shards[c.keyShardFunc(item.key)%numShards] - shard.Lock() - defer shard.Unlock() - delete(shard.values, item.key) - if c.postEvict != nil { - c.postEvict(ctx, item.key, item.value, item.size) - } -} - -func (c *Cache[K, V]) evict2(ctx context.Context) { - // queue 2 - for { - item, ok := c.queue2.dequeue() +func (c *Cache[K, V]) evictMain(ctx context.Context) { + // main fifo + for c.usedMain.Load() > 0 { + item, ok := c.main.dequeue() if !ok { // empty queue - break + return } - if item.count.Load() > 0 { + + deleted := item.IsDeleted() + if deleted { + c.usedMain.Add(-item.size) + return + } + + if item.GetFreq() > 0 { // re-enqueue - c.queue2.enqueue(item) - item.dec() + item.Dec() + c.main.enqueue(item) } else { // evict - c.deleteItem(ctx, item) - c.used2 -= item.size + c.htab.Remove(item.key) + c.usedMain.Add(-item.size) + // mark item as deleted and item should not be accessed again + item.MarkAsDeleted(ctx, c.postEvict) return } } diff --git a/pkg/fileservice/fifocache/fifo_test.go b/pkg/fileservice/fifocache/fifo_test.go index 9e5ca3fcde918..35557c7bad319 100644 --- a/pkg/fileservice/fifocache/fifo_test.go +++ b/pkg/fileservice/fifocache/fifo_test.go @@ -45,15 +45,15 @@ func TestCacheEvict(t *testing.T) { cache := New[int, int](fscache.ConstCapacity(8), ShardInt[int], nil, nil, nil) for i := 0; i < 64; i++ { cache.Set(ctx, i, i, 1) - if cache.used1+cache.used2 > cache.capacity() { - t.Fatalf("capacity %v, used1 %v used2 %v", cache.capacity(), cache.used1, cache.used2) + if cache.Used() > cache.capacity() { + t.Fatalf("capacity %v, usedSmall %v usedMain %v", cache.capacity(), cache.usedSmall.Load(), cache.usedMain.Load()) } } } func TestCacheEvict2(t *testing.T) { ctx := context.Background() - cache := New[int, int](fscache.ConstCapacity(2), ShardInt[int], nil, nil, nil) + cache := New[int, int](fscache.ConstCapacity(20), ShardInt[int], nil, nil, nil) cache.Set(ctx, 1, 1, 1) cache.Set(ctx, 2, 2, 1) @@ -76,8 +76,8 @@ func TestCacheEvict2(t *testing.T) { v, ok = cache.Get(ctx, 4) assert.True(t, ok) assert.Equal(t, 4, v) - assert.Equal(t, int64(1), cache.used1) - assert.Equal(t, int64(1), cache.used2) + assert.Equal(t, int64(4), cache.usedSmall.Load()) + assert.Equal(t, int64(0), cache.usedMain.Load()) } func TestCacheEvict3(t *testing.T) { @@ -99,7 +99,7 @@ func TestCacheEvict3(t *testing.T) { cache.Set(ctx, i, true, 1) cache.Get(ctx, i) cache.Get(ctx, i) - assert.True(t, cache.used1+cache.used2 <= 1024) + assert.True(t, cache.Used() <= 1024) } assert.Equal(t, 0, nEvict) assert.Equal(t, 1024, nSet) @@ -107,10 +107,10 @@ func TestCacheEvict3(t *testing.T) { for i := 0; i < 1024; i++ { cache.Set(ctx, 10000+i, true, 1) - assert.True(t, cache.used1+cache.used2 <= 1024) + assert.True(t, cache.Used() <= 1024) } - assert.Equal(t, int64(102), cache.used1) - assert.Equal(t, int64(922), cache.used2) + assert.Equal(t, int64(102), cache.usedSmall.Load()) + assert.Equal(t, int64(922), cache.usedMain.Load()) assert.Equal(t, 1024, nEvict) assert.Equal(t, 2048, nSet) assert.Equal(t, 2048, nGet) diff --git a/pkg/fileservice/fifocache/queue.go b/pkg/fileservice/fifocache/queue.go index fd9431d63e977..38fe4b5ce59be 100644 --- a/pkg/fileservice/fifocache/queue.go +++ b/pkg/fileservice/fifocache/queue.go @@ -17,9 +17,11 @@ package fifocache import "sync" type Queue[T any] struct { + mu sync.Mutex // Mutex to protect queue operations head *queuePart[T] tail *queuePart[T] partPool sync.Pool + size int } type queuePart[T any] struct { @@ -46,9 +48,9 @@ func NewQueue[T any]() *Queue[T] { return queue } +// empty is an internal helper, assumes lock is held func (p *Queue[T]) empty() bool { - return p.head == p.tail && - p.head.begin == len(p.head.values) + return p.head == p.tail && len(p.head.values) == p.head.begin } func (p *queuePart[T]) reset() { @@ -58,6 +60,9 @@ func (p *queuePart[T]) reset() { } func (p *Queue[T]) enqueue(v T) { + p.mu.Lock() // Acquire lock + defer p.mu.Unlock() // Ensure lock is released + if len(p.head.values) >= maxQueuePartCapacity { // extend newPart := p.partPool.Get().(*queuePart[T]) @@ -66,9 +71,13 @@ func (p *Queue[T]) enqueue(v T) { p.head = newPart } p.head.values = append(p.head.values, v) + p.size++ } func (p *Queue[T]) dequeue() (ret T, ok bool) { + p.mu.Lock() // Acquire lock + defer p.mu.Unlock() // Ensure lock is released + if p.empty() { return } @@ -76,17 +85,27 @@ func (p *Queue[T]) dequeue() (ret T, ok bool) { if p.tail.begin >= len(p.tail.values) { // shrink if p.tail.next == nil { - panic("impossible") + // This should ideally not happen if empty() check passes, + // but adding a safeguard. + // Consider logging an error here if it does. + return } part := p.tail p.tail = p.tail.next - p.partPool.Put(part) + p.partPool.Put(part) // Return the old part to the pool } ret = p.tail.values[p.tail.begin] var zero T p.tail.values[p.tail.begin] = zero p.tail.begin++ + p.size-- ok = true return } + +func (p *Queue[T]) Len() int { + p.mu.Lock() // Acquire lock + defer p.mu.Unlock() // Ensure lock is released + return p.size +} diff --git a/pkg/fileservice/fifocache/shardmap.go b/pkg/fileservice/fifocache/shardmap.go new file mode 100644 index 0000000000000..df9ee5f49ecbc --- /dev/null +++ b/pkg/fileservice/fifocache/shardmap.go @@ -0,0 +1,128 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fifocache + +import ( + "sync" + + "golang.org/x/sys/cpu" +) + +const numShards = 256 + +type ShardMap[K comparable, V any] struct { + shards [numShards]struct { + sync.RWMutex + values map[K]V + _ cpu.CacheLinePad + } + hashfn func(K) uint64 +} + +func NewShardMap[K comparable, V any](hashfn func(K) uint64) *ShardMap[K, V] { + m := &ShardMap[K, V]{hashfn: hashfn} + + for i := range m.shards { + m.shards[i].values = make(map[K]V, 1024) + } + return m +} + +func (m *ShardMap[K, V]) Set(key K, value V, postfn func(V)) bool { + + s := &m.shards[m.hashfn(key)%numShards] + s.Lock() + defer s.Unlock() + + _, ok := s.values[key] + if ok { + return false + } + + s.values[key] = value + + if postfn != nil { + // call postSet protected by mutex.Lock + postfn(value) + } + return true +} + +func (m *ShardMap[K, V]) Get(key K, postfn func(V)) (V, bool) { + + s := &m.shards[m.hashfn(key)%numShards] + s.RLock() + defer s.RUnlock() + v, ok := s.values[key] + + if !ok { + return v, ok + } + + if postfn != nil { + // call postGet protected the mutex RLock. + postfn(v) + } + return v, ok +} + +func (m *ShardMap[K, V]) Remove(key K) { + + s := &m.shards[m.hashfn(key)%numShards] + s.Lock() + defer s.Unlock() + delete(s.values, key) +} + +func (m *ShardMap[K, V]) CompareAndDelete(key K, fn func(k1, k2 K) bool, postfn func(V)) { + + for i := range m.shards { + s := &m.shards[i] + func() { + s.Lock() + defer s.Unlock() + for k, v := range s.values { + if fn(k, key) { + delete(s.values, k) + if postfn != nil { + // call postfn to let parent know the item get deleted. (protected by mutex.Lock) + postfn(v) + } + } + } + }() + } +} + +func (m *ShardMap[K, V]) GetAndDelete(key K, postfn func(V)) (V, bool) { + + s := &m.shards[m.hashfn(key)%numShards] + s.Lock() + defer s.Unlock() + + v, ok := s.values[key] + if !ok { + return v, ok + } + + delete(s.values, key) + + if postfn != nil { + // call postfn to let parent know the item get deleted. (protected by mutex.Lock) + postfn(v) + } + + return v, ok +} diff --git a/pkg/fileservice/fifocache/shardmap_test.go b/pkg/fileservice/fifocache/shardmap_test.go new file mode 100644 index 0000000000000..da1695d58a88f --- /dev/null +++ b/pkg/fileservice/fifocache/shardmap_test.go @@ -0,0 +1,45 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fifocache + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestShardMap(t *testing.T) { + + m := NewShardMap[int, string](ShardInt[int]) + ok := m.Set(1, "1", func(v string) { + }) + assert.Equal(t, ok, true) + ok = m.Set(1, "1", func(v string) { + }) + assert.Equal(t, ok, false) + + v, ok := m.Get(1, func(v string) { + }) + assert.Equal(t, ok, true) + assert.Equal(t, v, "1") + + _, ok = m.GetAndDelete(0, func(v string) { + }) + assert.Equal(t, ok, false) + + _, ok = m.GetAndDelete(1, func(v string) { + }) + assert.Equal(t, ok, true) +} diff --git a/pkg/fileservice/mem_cache.go b/pkg/fileservice/mem_cache.go index cbf8ed45f58d0..6044e224790e8 100644 --- a/pkg/fileservice/mem_cache.go +++ b/pkg/fileservice/mem_cache.go @@ -52,9 +52,6 @@ func NewMemCache( LogEvent(ctx, str_memory_cache_post_set_begin) defer LogEvent(ctx, str_memory_cache_post_set_end) - // retain - value.Retain() - // metrics LogEvent(ctx, str_update_metrics_begin) inuseBytes.Add(float64(size)) @@ -76,9 +73,6 @@ func NewMemCache( LogEvent(ctx, str_memory_cache_post_get_begin) defer LogEvent(ctx, str_memory_cache_post_get_end) - // retain - value.Retain() - // callbacks if callbacks != nil { LogEvent(ctx, str_memory_cache_callbacks_begin) @@ -94,9 +88,6 @@ func NewMemCache( LogEvent(ctx, str_memory_cache_post_evict_begin) defer LogEvent(ctx, str_memory_cache_post_evict_end) - // relaese - value.Release() - // metrics LogEvent(ctx, str_update_metrics_begin) inuseBytes.Add(float64(-size)) @@ -209,6 +200,7 @@ func (m *MemCache) Update( } LogEvent(ctx, str_set_memory_cache_entry_begin) + // NOTE: data existed in hashtable will skip setting this cache data. At a result, reference counter does not increment m.cache.Set(ctx, key, entry.CachedData) LogEvent(ctx, str_set_memory_cache_entry_end) }