Skip to content

Commit 37e11d0

Browse files
authored
Remove (*Table)Write and introduce (*GenericTable[T]).Write (#626)
This commit removes `(*Table).Write` in favor of `(*GenericTable[T]).Write` `GenericTable` offers a lot of improvements from the previous API. One of the biggest advantage is there is no longer need to define schema before hand instead the schema is part of the struct that is written in the form of struct tags. Example code has been refactored to use `map[string]T` for dynamic columns, however slice struct dynamic columns still works, this is intentional to avoid the PR from blowing out of proportion. `GenericTable` embeds `Table` so from user perspective only few changes are needed to work with the new API.
1 parent 3a95f59 commit 37e11d0

File tree

8 files changed

+159
-389
lines changed

8 files changed

+159
-389
lines changed

aggregate_test.go

Lines changed: 9 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -256,48 +256,17 @@ func TestDurationAggregation(t *testing.T) {
256256
db, err := c.DB(context.Background(), "test")
257257
require.NoError(t, err)
258258

259-
schema := &schemapb.Schema{
260-
Name: "test",
261-
Columns: []*schemapb.Column{
262-
{
263-
Name: "timestamp",
264-
StorageLayout: &schemapb.StorageLayout{
265-
Type: schemapb.StorageLayout_TYPE_INT64,
266-
},
267-
Dynamic: false,
268-
},
269-
{
270-
Name: "stacktrace",
271-
StorageLayout: &schemapb.StorageLayout{
272-
Type: schemapb.StorageLayout_TYPE_STRING,
273-
},
274-
Dynamic: false,
275-
},
276-
{
277-
Name: "value",
278-
StorageLayout: &schemapb.StorageLayout{
279-
Type: schemapb.StorageLayout_TYPE_INT64,
280-
},
281-
Dynamic: false,
282-
},
283-
},
284-
SortingColumns: []*schemapb.SortingColumn{
285-
{
286-
Name: "timestamp",
287-
Direction: schemapb.SortingColumn_DIRECTION_ASCENDING,
288-
},
289-
},
259+
type Record struct {
260+
Timestamp int64 `frostdb:",asc"`
261+
Stacktrace string
262+
Value int64
290263
}
291264

292-
config := NewTableConfig(schema)
293-
table, err := db.Table("test", config)
265+
table, err := NewGenericTable[Record](db, "test", memory.NewGoAllocator())
294266
require.NoError(t, err)
267+
defer table.Release()
295268

296-
records := []struct {
297-
Timestamp int64
298-
Stacktrace string
299-
Value int64
300-
}{
269+
records := []Record{
301270
{
302271
Timestamp: 1 * int64(time.Second),
303272
Stacktrace: "stack1",
@@ -324,11 +293,8 @@ func TestDurationAggregation(t *testing.T) {
324293
Value: 3,
325294
},
326295
}
327-
328-
for _, record := range records {
329-
_, err := table.Write(context.Background(), record)
330-
require.NoError(t, err)
331-
}
296+
err = table.Write(context.Background(), records...)
297+
require.NoError(t, err)
332298

333299
engine := query.NewEngine(memory.DefaultAllocator, db.TableProvider())
334300

db_test.go

Lines changed: 12 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1062,37 +1062,12 @@ func Test_DB_Block_Optimization(t *testing.T) {
10621062

10631063
func Test_DB_TableWrite_FlatSchema(t *testing.T) {
10641064
ctx := context.Background()
1065-
flatDefinition := &schemapb.Schema{
1066-
Name: "test",
1067-
Columns: []*schemapb.Column{{
1068-
Name: "example_type",
1069-
StorageLayout: &schemapb.StorageLayout{
1070-
Type: schemapb.StorageLayout_TYPE_STRING,
1071-
Encoding: schemapb.StorageLayout_ENCODING_RLE_DICTIONARY,
1072-
},
1073-
Dynamic: false,
1074-
}, {
1075-
Name: "timestamp",
1076-
StorageLayout: &schemapb.StorageLayout{
1077-
Type: schemapb.StorageLayout_TYPE_INT64,
1078-
},
1079-
Dynamic: false,
1080-
}, {
1081-
Name: "value",
1082-
StorageLayout: &schemapb.StorageLayout{
1083-
Type: schemapb.StorageLayout_TYPE_INT64,
1084-
},
1085-
Dynamic: false,
1086-
}},
1087-
SortingColumns: []*schemapb.SortingColumn{{
1088-
Name: "example_type",
1089-
Direction: schemapb.SortingColumn_DIRECTION_ASCENDING,
1090-
}, {
1091-
Name: "timestamp",
1092-
Direction: schemapb.SortingColumn_DIRECTION_ASCENDING,
1093-
}},
1065+
1066+
type Flat struct {
1067+
ExampleType string `frostdb:",rle_dict,asc(0)"`
1068+
Timestamp int64 `frostdb:",asc(1)"`
1069+
Value int64
10941070
}
1095-
config := NewTableConfig(flatDefinition)
10961071

10971072
c, err := New(WithLogger(newTestLogger(t)))
10981073
require.NoError(t, err)
@@ -1101,20 +1076,15 @@ func Test_DB_TableWrite_FlatSchema(t *testing.T) {
11011076
db, err := c.DB(ctx, "flatschema")
11021077
require.NoError(t, err)
11031078

1104-
table, err := db.Table("test", config)
1079+
table, err := NewGenericTable[Flat](db, "test", memory.NewGoAllocator())
11051080
require.NoError(t, err)
1081+
defer table.Release()
11061082

1107-
s := struct {
1108-
ExampleType string
1109-
Timestamp int64
1110-
Value int64
1111-
}{
1083+
err = table.Write(ctx, Flat{
11121084
ExampleType: "hello-world",
11131085
Timestamp: 7,
11141086
Value: 8,
1115-
}
1116-
1117-
_, err = table.Write(ctx, s)
1087+
})
11181088
require.NoError(t, err)
11191089

11201090
engine := query.NewEngine(
@@ -1132,9 +1102,6 @@ func Test_DB_TableWrite_FlatSchema(t *testing.T) {
11321102

11331103
func Test_DB_TableWrite_DynamicSchema(t *testing.T) {
11341104
ctx := context.Background()
1135-
config := NewTableConfig(
1136-
dynparquet.SampleDefinition(),
1137-
)
11381105

11391106
c, err := New(WithLogger(newTestLogger(t)))
11401107
require.NoError(t, err)
@@ -1143,8 +1110,9 @@ func Test_DB_TableWrite_DynamicSchema(t *testing.T) {
11431110
db, err := c.DB(ctx, "sampleschema")
11441111
require.NoError(t, err)
11451112

1146-
table, err := db.Table("test", config)
1113+
table, err := NewGenericTable[dynparquet.Sample](db, "test", memory.NewGoAllocator())
11471114
require.NoError(t, err)
1115+
defer table.Release()
11481116

11491117
now := time.Now()
11501118
ts := now.UnixMilli()
@@ -1191,7 +1159,7 @@ func Test_DB_TableWrite_DynamicSchema(t *testing.T) {
11911159
},
11921160
}
11931161

1194-
_, err = table.Write(ctx, samples[0], samples[1], samples[2])
1162+
err = table.Write(ctx, samples[0], samples[1], samples[2])
11951163
require.NoError(t, err)
11961164

11971165
engine := query.NewEngine(

dynparquet/record_builder.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,9 @@ func newMapFieldBuilder(newField func(string) fieldBuilder) *mapFieldBuilder {
343343
var _ fieldBuilder = (*mapFieldBuilder)(nil)
344344

345345
func (m *mapFieldBuilder) Fields() (o []arrow.Field) {
346+
if len(m.columns) == 0 {
347+
return []arrow.Field{}
348+
}
346349
o = make([]arrow.Field, 0, len(m.columns))
347350
m.keys = slices.Grow(m.keys, len(m.columns))
348351
for k := range m.columns {
@@ -356,6 +359,9 @@ func (m *mapFieldBuilder) Fields() (o []arrow.Field) {
356359
}
357360

358361
func (m *mapFieldBuilder) NewArray(a []arrow.Array) []arrow.Array {
362+
if len(m.columns) == 0 {
363+
return a
364+
}
359365
m.keys = m.keys[:0]
360366
for k := range m.columns {
361367
m.keys = append(m.keys, k)
@@ -393,6 +399,12 @@ func (m *mapFieldBuilder) Append(v reflect.Value) error {
393399
}
394400

395401
func (m *mapFieldBuilder) appendMap(v reflect.Value) error {
402+
if v.IsNil() || v.Len() == 0 {
403+
for _, v := range m.columns {
404+
v.AppendNull()
405+
}
406+
return nil
407+
}
396408
clear(m.seen)
397409
keys := v.MapKeys()
398410
size := m.Len()
@@ -416,6 +428,12 @@ func (m *mapFieldBuilder) appendMap(v reflect.Value) error {
416428
}
417429

418430
func (m *mapFieldBuilder) appendSlice(v reflect.Value) error {
431+
if v.IsNil() || v.Len() == 0 {
432+
for _, v := range m.columns {
433+
v.AppendNull()
434+
}
435+
return nil
436+
}
419437
clear(m.seen)
420438
size := m.Len()
421439
for n := 0; n < v.Len(); n++ {

examples/aggregations/aggregations.go

Lines changed: 19 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"github.com/apache/arrow/go/v14/arrow/memory"
99

1010
"github.com/polarsignals/frostdb"
11-
schemapb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/schema/v1alpha1"
1211
"github.com/polarsignals/frostdb/query"
1312
"github.com/polarsignals/frostdb/query/logicalplan"
1413
)
@@ -23,38 +22,34 @@ func main() {
2322
// Open up a database in the column store
2423
database, _ := columnstore.DB(context.Background(), "weather_db")
2524

26-
// Define our aggregation schema of labels and values
27-
schema := aggregationSchema()
28-
29-
// Create a table named snowfall_table in our database
30-
table, _ := database.Table(
31-
"snowfall_table",
32-
frostdb.NewTableConfig(schema),
33-
)
34-
3525
// Create values to insert into the database. We support a dynamic structure for city to
3626
// accommodate cities in different regions
3727
type WeatherRecord struct {
38-
City interface{}
39-
Day string
28+
City map[string]string `frostdb:",rle_dict,asc(0)"`
29+
Day string `frostdb:",rle_dict,asc(1)"`
4030
Snowfall float64
4131
}
4232

43-
type CityInProvince struct {
44-
Name string
45-
Province string
46-
}
33+
// Create a table named snowfall_table in our database
34+
table, _ := frostdb.NewGenericTable[WeatherRecord](
35+
database, "snowfall_table", memory.DefaultAllocator,
36+
)
37+
defer table.Release()
4738

48-
type CityInState struct {
49-
Name string
50-
State string
39+
montreal := map[string]string{
40+
"name": "Montreal",
41+
"province": "Quebec",
42+
}
43+
toronto := map[string]string{
44+
"name": "Toronto",
45+
"province": "Ontario",
46+
}
47+
minneapolis := map[string]string{
48+
"name": "Minneapolis",
49+
"state": "Minnesota",
5150
}
5251

53-
montreal := CityInProvince{Name: "Montreal", Province: "Quebec"}
54-
toronto := CityInProvince{Name: "Toronto", Province: "Ontario"}
55-
minneapolis := CityInState{Name: "Minneapolis", State: "Minnesota"}
56-
57-
_, _ = table.Write(context.Background(),
52+
_ = table.Write(context.Background(),
5853
WeatherRecord{Day: "Mon", Snowfall: 20, City: montreal},
5954
WeatherRecord{Day: "Tue", Snowfall: 00, City: montreal},
6055
WeatherRecord{Day: "Wed", Snowfall: 30, City: montreal},
@@ -105,45 +100,3 @@ func main() {
105100
return nil
106101
})
107102
}
108-
109-
func aggregationSchema() *schemapb.Schema {
110-
return &schemapb.Schema{
111-
Name: "snowfall_table",
112-
Columns: []*schemapb.Column{
113-
{
114-
Name: "city",
115-
StorageLayout: &schemapb.StorageLayout{
116-
Type: schemapb.StorageLayout_TYPE_STRING,
117-
Encoding: schemapb.StorageLayout_ENCODING_RLE_DICTIONARY,
118-
Nullable: false,
119-
},
120-
Dynamic: true,
121-
},
122-
{
123-
Name: "day",
124-
StorageLayout: &schemapb.StorageLayout{
125-
Type: schemapb.StorageLayout_TYPE_STRING,
126-
Encoding: schemapb.StorageLayout_ENCODING_RLE_DICTIONARY,
127-
Nullable: false,
128-
},
129-
},
130-
{
131-
Name: "snowfall",
132-
StorageLayout: &schemapb.StorageLayout{
133-
Type: schemapb.StorageLayout_TYPE_DOUBLE,
134-
},
135-
Dynamic: false,
136-
},
137-
},
138-
SortingColumns: []*schemapb.SortingColumn{
139-
{
140-
Name: "city",
141-
Direction: schemapb.SortingColumn_DIRECTION_ASCENDING,
142-
},
143-
{
144-
Name: "day",
145-
Direction: schemapb.SortingColumn_DIRECTION_ASCENDING,
146-
},
147-
},
148-
}
149-
}

0 commit comments

Comments
 (0)