Skip to content

Commit 3a95f59

Browse files
authored
Parquet file compaction (#621)
Adds support for file based compaction Adds Parquet file compaction that will append the Parquet files into a os temporary file. During compaction of the file part it will truncate the file after the parts are released so that new parts can be appended to the file.
1 parent 2bbff68 commit 3a95f59

File tree

8 files changed

+251
-53
lines changed

8 files changed

+251
-53
lines changed

db.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ type ColumnStore struct {
6565
// splitSize is the number of new granules that are created when granules are split (default =2)
6666
splitSize int
6767
// indexConfig is the configuration settings for the lsm index
68-
indexConfig []*index.LevelConfig
68+
indexConfig []*IndexConfig
6969

7070
sources []DataSource
7171
sinks []DataSink
@@ -227,7 +227,7 @@ func WithStoragePath(path string) Option {
227227
}
228228
}
229229

230-
func WithIndexConfig(indexConfig []*index.LevelConfig) Option {
230+
func WithIndexConfig(indexConfig []*IndexConfig) Option {
231231
return func(s *ColumnStore) error {
232232
s.indexConfig = indexConfig
233233
return nil

db_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2250,3 +2250,56 @@ func TestDBConcurrentOpen(t *testing.T) {
22502250
}
22512251
require.NoError(t, errg.Wait())
22522252
}
2253+
2254+
func Test_DB_WithParquetDiskCompaction(t *testing.T) {
2255+
config := NewTableConfig(
2256+
dynparquet.SampleDefinition(),
2257+
)
2258+
2259+
logger := newTestLogger(t)
2260+
2261+
cfg := DefaultIndexConfig()
2262+
cfg[0].Type = CompactionTypeParquetDisk // Create disk compaction
2263+
cfg[1].Type = CompactionTypeParquetDisk
2264+
c, err := New(
2265+
WithLogger(logger),
2266+
WithIndexConfig(cfg),
2267+
)
2268+
t.Cleanup(func() {
2269+
require.NoError(t, c.Close())
2270+
})
2271+
require.NoError(t, err)
2272+
db, err := c.DB(context.Background(), "test")
2273+
require.NoError(t, err)
2274+
table, err := db.Table("test", config)
2275+
require.NoError(t, err)
2276+
2277+
samples := dynparquet.NewTestSamples()
2278+
2279+
ctx := context.Background()
2280+
for i := 0; i < 100; i++ {
2281+
r, err := samples.ToRecord()
2282+
require.NoError(t, err)
2283+
_, err = table.InsertRecord(ctx, r)
2284+
require.NoError(t, err)
2285+
}
2286+
require.NoError(t, table.EnsureCompaction())
2287+
2288+
// Ensure that disk compacted data can be recovered
2289+
pool := memory.NewCheckedAllocator(memory.DefaultAllocator)
2290+
defer pool.AssertSize(t, 0)
2291+
rows := int64(0)
2292+
err = table.View(ctx, func(ctx context.Context, tx uint64) error {
2293+
return table.Iterator(
2294+
ctx,
2295+
tx,
2296+
pool,
2297+
[]logicalplan.Callback{func(ctx context.Context, ar arrow.Record) error {
2298+
rows += ar.NumRows()
2299+
return nil
2300+
}},
2301+
)
2302+
})
2303+
require.NoError(t, err)
2304+
require.Equal(t, int64(300), rows)
2305+
}

index/lsm.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -421,9 +421,7 @@ func (l *LSM) merge(level SentinelType, externalWriter func([]parts.Part) (parts
421421
l.Lock()
422422
defer l.Unlock()
423423
for _, part := range mergeList {
424-
if r := part.Record(); r != nil {
425-
r.Release()
426-
}
424+
part.Release()
427425
}
428426

429427
return nil

parts/arrow.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ func (p *arrowPart) Record() arrow.Record {
4141
return p.record
4242
}
4343

44+
func (p *arrowPart) Release() { p.record.Release() }
45+
4446
func (p *arrowPart) SerializeBuffer(schema *dynparquet.Schema, w dynparquet.ParquetWriter) error {
4547
return pqarrow.RecordToFile(schema, w, p.record)
4648
}

parts/parquet.go

Lines changed: 51 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ func (p *parquetPart) Record() arrow.Record {
2020
return nil
2121
}
2222

23+
func (p *parquetPart) Release() {
24+
if p.release != nil {
25+
p.release()
26+
}
27+
}
28+
2329
func (p *parquetPart) SerializeBuffer(_ *dynparquet.Schema, _ dynparquet.ParquetWriter) error {
2430
return fmt.Errorf("not a record part")
2531
}
@@ -57,19 +63,12 @@ func (p *parquetPart) Least() (*dynparquet.DynamicRow, error) {
5763
return p.minRow, nil
5864
}
5965

60-
rowBuf := &dynparquet.DynamicRows{Rows: make([]parquet.Row, 1)}
61-
reader := p.buf.DynamicRowGroup(0).DynamicRows()
62-
defer reader.Close()
63-
64-
if n, err := reader.ReadRows(rowBuf); err != nil {
65-
return nil, fmt.Errorf("read first row of part: %w", err)
66-
} else if n != 1 {
67-
return nil, fmt.Errorf("expected to read exactly 1 row, but read %d", n)
66+
minRow, err := min(p.buf)
67+
if err != nil {
68+
return nil, err
6869
}
6970

70-
// Copy here so that this reference does not prevent the decompressed page
71-
// from being GCed.
72-
p.minRow = rowBuf.GetCopy(0)
71+
p.minRow = minRow
7372
return p.minRow, nil
7473
}
7574

@@ -78,24 +77,11 @@ func (p *parquetPart) Most() (*dynparquet.DynamicRow, error) {
7877
return p.maxRow, nil
7978
}
8079

81-
rowBuf := &dynparquet.DynamicRows{Rows: make([]parquet.Row, 1)}
82-
rg := p.buf.DynamicRowGroup(p.buf.NumRowGroups() - 1)
83-
reader := rg.DynamicRows()
84-
defer reader.Close()
85-
86-
if err := reader.SeekToRow(rg.NumRows() - 1); err != nil {
87-
return nil, fmt.Errorf("seek to last row of part: %w", err)
88-
}
89-
90-
if n, err := reader.ReadRows(rowBuf); err != nil {
91-
return nil, fmt.Errorf("read last row of part: %w", err)
92-
} else if n != 1 {
93-
return nil, fmt.Errorf("expected to read exactly 1 row, but read %d", n)
80+
maxRow, err := max(p.buf)
81+
if err != nil {
82+
return nil, err
9483
}
95-
96-
// Copy here so that this reference does not prevent the decompressed page
97-
// from being GCed.
98-
p.maxRow = rowBuf.GetCopy(0)
84+
p.maxRow = maxRow
9985
return p.maxRow, nil
10086
}
10187

@@ -119,3 +105,40 @@ func (p *parquetPart) OverlapsWith(schema *dynparquet.Schema, otherPart Part) (b
119105

120106
return schema.Cmp(a, d) <= 0 && schema.Cmp(c, b) <= 0, nil
121107
}
108+
109+
func max(buf *dynparquet.SerializedBuffer) (*dynparquet.DynamicRow, error) {
110+
rowBuf := &dynparquet.DynamicRows{Rows: make([]parquet.Row, 1)}
111+
rg := buf.DynamicRowGroup(buf.NumRowGroups() - 1)
112+
reader := rg.DynamicRows()
113+
defer reader.Close()
114+
115+
if err := reader.SeekToRow(rg.NumRows() - 1); err != nil {
116+
return nil, fmt.Errorf("seek to last row of part: %w", err)
117+
}
118+
119+
if n, err := reader.ReadRows(rowBuf); err != nil {
120+
return nil, fmt.Errorf("read last row of part: %w", err)
121+
} else if n != 1 {
122+
return nil, fmt.Errorf("expected to read exactly 1 row, but read %d", n)
123+
}
124+
125+
// Copy here so that this reference does not prevent the decompressed page
126+
// from being GCed.
127+
return rowBuf.GetCopy(0), nil
128+
}
129+
130+
func min(buf *dynparquet.SerializedBuffer) (*dynparquet.DynamicRow, error) {
131+
rowBuf := &dynparquet.DynamicRows{Rows: make([]parquet.Row, 1)}
132+
reader := buf.DynamicRowGroup(0).DynamicRows()
133+
defer reader.Close()
134+
135+
if n, err := reader.ReadRows(rowBuf); err != nil {
136+
return nil, fmt.Errorf("read first row of part: %w", err)
137+
} else if n != 1 {
138+
return nil, fmt.Errorf("expected to read exactly 1 row, but read %d", n)
139+
}
140+
141+
// Copy here so that this reference does not prevent the decompressed page
142+
// from being GCed.
143+
return rowBuf.GetCopy(0), nil
144+
}

parts/part.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ type Part interface {
1212
// Record returns the Arrow record for the part. If the part is not an Arrow
1313
// record part, nil is returned.
1414
Record() arrow.Record
15+
Release()
1516
SerializeBuffer(schema *dynparquet.Schema, w dynparquet.ParquetWriter) error
1617
AsSerializedBuffer(schema *dynparquet.Schema) (*dynparquet.SerializedBuffer, error)
1718
NumRows() int64
@@ -28,6 +29,7 @@ type basePart struct {
2829
compactionLevel int
2930
minRow *dynparquet.DynamicRow
3031
maxRow *dynparquet.DynamicRow
32+
release func()
3133
}
3234

3335
func (p *basePart) CompactionLevel() int {
@@ -44,6 +46,12 @@ func WithCompactionLevel(level int) Option {
4446
}
4547
}
4648

49+
func WithRelease(release func()) Option {
50+
return func(p *basePart) {
51+
p.release = release
52+
}
53+
}
54+
4755
type PartSorter struct {
4856
schema *dynparquet.Schema
4957
parts []Part

0 commit comments

Comments
 (0)