Skip to content

Commit 8a7f38b

Browse files
committed
Shared page cache.
1 parent 83a2d77 commit 8a7f38b

File tree

3 files changed

+100
-8
lines changed

3 files changed

+100
-8
lines changed

litestream/api.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,13 @@ import (
1111
"github.com/ncruces/go-sqlite3/vfs"
1212
)
1313

14-
// The default poll interval.
15-
const DefaultPollInterval = 1 * time.Second
14+
const (
15+
// The default poll interval.
16+
DefaultPollInterval = 1 * time.Second
17+
18+
// The default cache size: 10 MiB.
19+
DefaultCacheSize = 10 * 1024 * 1024
20+
)
1621

1722
func init() {
1823
vfs.Register("litestream", liteVFS{})
@@ -28,11 +33,18 @@ var (
2833
type ReplicaOptions struct {
2934
// Where to log error messages. May be nil.
3035
Logger *slog.Logger
31-
// Minimum compaction level to track.
32-
MinLevel int
33-
// Replica poll interval. Must be less than the compaction interval
36+
37+
// Replica poll interval.
38+
// Should be less than the compaction interval
3439
// used by the replica at MinLevel+1.
3540
PollInterval time.Duration
41+
42+
// Minimum compaction level to track.
43+
MinLevel int
44+
45+
// CacheSize is the maximum size of the page cache in bytes.
46+
// Zero means DefaultCacheSize, negative disables caching.
47+
CacheSize int
3648
}
3749

3850
// NewReplica creates a read-replica from a Litestream client.
@@ -45,12 +57,16 @@ func NewReplica(name string, client litestream.ReplicaClient, options ReplicaOpt
4557
if options.PollInterval <= 0 {
4658
options.PollInterval = DefaultPollInterval
4759
}
60+
if options.CacheSize == 0 {
61+
options.CacheSize = DefaultCacheSize
62+
}
4863

4964
liteMtx.Lock()
5065
defer liteMtx.Unlock()
5166
liteDBs[name] = &liteDB{
5267
client: client,
53-
opts: &options,
68+
opts: options,
69+
cache: pageCache{size: options.CacheSize},
5470
}
5571
}
5672

litestream/cache.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package litestream
2+
3+
import (
4+
"encoding/binary"
5+
"sync"
6+
7+
"golang.org/x/sync/singleflight"
8+
9+
"github.com/superfly/ltx"
10+
)
11+
12+
type pageCache struct {
13+
single singleflight.Group
14+
pages map[uint32]cachedPage // +checklocks:mtx
15+
size int
16+
mtx sync.Mutex
17+
}
18+
19+
type cachedPage struct {
20+
data []byte
21+
txid ltx.TXID
22+
}
23+
24+
func (c *pageCache) getOrFetch(pgno uint32, maxTXID ltx.TXID, fetch func() (any, error)) ([]byte, error) {
25+
if c.size >= 0 {
26+
c.mtx.Lock()
27+
if c.pages == nil {
28+
c.pages = map[uint32]cachedPage{}
29+
}
30+
page := c.pages[pgno]
31+
c.mtx.Unlock()
32+
33+
if page.txid == maxTXID {
34+
return page.data, nil
35+
}
36+
}
37+
38+
var key [12]byte
39+
binary.LittleEndian.PutUint32(key[0:], pgno)
40+
binary.LittleEndian.PutUint64(key[4:], uint64(maxTXID))
41+
v, err, _ := c.single.Do(string(key[:]), fetch)
42+
43+
if err != nil {
44+
return nil, err
45+
}
46+
47+
page := cachedPage{v.([]byte), maxTXID}
48+
if c.size >= 0 {
49+
c.mtx.Lock()
50+
c.evict(len(page.data))
51+
c.pages[pgno] = page
52+
c.mtx.Unlock()
53+
}
54+
return page.data, nil
55+
}
56+
57+
// +checklocks:c.mtx
58+
func (c *pageCache) evict(pageSize int) {
59+
// Evict random keys until we're under the maximum size.
60+
// SQLite has its own page cache, which it will use for each connection.
61+
// Since this is a second layer of shared cache,
62+
// random eviction is probably good enough.
63+
if pageSize*len(c.pages) < c.size {
64+
return
65+
}
66+
for key := range c.pages {
67+
delete(c.pages, key)
68+
if pageSize*len(c.pages) < c.size {
69+
return
70+
}
71+
}
72+
}

litestream/vfs.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,10 @@ func (f *liteFile) ReadAt(p []byte, off int64) (n int, err error) {
8787
return 0, io.EOF
8888
}
8989

90-
_, data, err := litestream.FetchPage(ctx, f.db.client, elem.Level, elem.MinTXID, elem.MaxTXID, elem.Offset, elem.Size)
90+
data, err := f.db.cache.getOrFetch(pgno, elem.MaxTXID, func() (any, error) {
91+
_, data, err := litestream.FetchPage(ctx, f.db.client, elem.Level, elem.MinTXID, elem.MaxTXID, elem.Offset, elem.Size)
92+
return data, err
93+
})
9194
if err != nil {
9295
f.db.opts.Logger.Error("fetch page", "error", err)
9396
return 0, err
@@ -170,7 +173,8 @@ func (f *liteFile) context() context.Context {
170173

171174
type liteDB struct {
172175
client litestream.ReplicaClient
173-
opts *ReplicaOptions
176+
opts ReplicaOptions
177+
cache pageCache
174178
pages *pageIndex // +checklocks:mtx
175179
lastPoll time.Time // +checklocks:mtx
176180
txids levelTXIDs // +checklocks:mtx

0 commit comments

Comments
 (0)