Skip to content

Commit 2bbff68

Browse files
authored
Support repeated columns for Generic Record Builder (#624)
* Support repeated columns for Generic Record Builder * fix lint: handle unchecked error
1 parent 1afca81 commit 2bbff68

File tree

2 files changed

+228
-3
lines changed

2 files changed

+228
-3
lines changed

dynparquet/record_builder.go

Lines changed: 139 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ const (
2929
//
3030
// - Dynamic columns are supported but only for maps and struct slices
3131
//
32-
// - Repeated Columns are not supported
33-
//
3432
// - Nested Columns are not supported
3533
//
3634
// # Tags
@@ -87,6 +85,15 @@ const (
8785
// // Use supported tags to customize the column value
8886
// Labels []Label `frostdb:"labels,dyn"`
8987
// }
88+
//
89+
// # Repeated columns
90+
//
91+
// Fields of type []int64, []float64, []bool, and []string are supported. These
92+
// are represented as arrow.LIST.
93+
//
94+
// Generated schema for the repeated columns applies all supported tags. By
95+
// default repeated fields are nullable. You can safely pass nil slices for
96+
// repeated columns.
9097
type Build[T any] struct {
9198
fields []*fieldRecord
9299
buffer []arrow.Array
@@ -208,7 +215,13 @@ func NewBuild[T any](mem memory.Allocator) *Build[T] {
208215
fr.nullable = true
209216
fr.build = newMapFieldBuilder(newFieldFunc(typ, mem, name))
210217
default:
211-
panic("frostdb/dynschema: repeated columns not supported ")
218+
typ, styp = baseType(fty.Elem(), dictionary)
219+
fr.typ = styp
220+
fr.repeated = true
221+
// Repeated columns are always nullable
222+
fr.nullable = true
223+
typ = arrow.ListOf(typ)
224+
fr.build = newFieldBuild(typ, mem, name, true)
212225
}
213226
case reflect.Int64, reflect.Float64, reflect.Bool, reflect.String:
214227
typ, styp = baseType(fty, dictionary)
@@ -270,6 +283,7 @@ func (b Build[T]) Schema(name string) (s *schemapb.Schema) {
270283
Encoding: f.encoding,
271284
Compression: f.compression,
272285
Nullable: f.nullable,
286+
Repeated: f.repeated,
273287
},
274288
})
275289
if f.sort {
@@ -530,12 +544,133 @@ func newFieldBuild(dt arrow.DataType, mem memory.Allocator, name string, nullabl
530544
f.buildFunc = func(v reflect.Value) error {
531545
return e.AppendString(v.Interface().(string))
532546
}
547+
case *array.ListBuilder:
548+
switch build := e.ValueBuilder().(type) {
549+
case *array.Int64Builder:
550+
f.buildFunc = func(v reflect.Value) error {
551+
if v.IsNil() {
552+
e.AppendNull()
553+
return nil
554+
}
555+
e.Append(true)
556+
build.Reserve(v.Len())
557+
return applyInt(v, func(i int64) error {
558+
build.Append(i)
559+
return nil
560+
})
561+
}
562+
case *array.Int64DictionaryBuilder:
563+
f.buildFunc = func(v reflect.Value) error {
564+
if v.IsNil() {
565+
e.AppendNull()
566+
return nil
567+
}
568+
e.Append(true)
569+
build.Reserve(v.Len())
570+
return applyInt(v, build.Append)
571+
}
572+
573+
case *array.Float64Builder:
574+
f.buildFunc = func(v reflect.Value) error {
575+
if v.IsNil() {
576+
e.AppendNull()
577+
return nil
578+
}
579+
e.Append(true)
580+
build.Reserve(v.Len())
581+
return applyFloat64(v, func(i float64) error {
582+
build.Append(i)
583+
return nil
584+
})
585+
}
586+
case *array.Float64DictionaryBuilder:
587+
f.buildFunc = func(v reflect.Value) error {
588+
if v.IsNil() {
589+
e.AppendNull()
590+
return nil
591+
}
592+
e.Append(true)
593+
build.Reserve(v.Len())
594+
return applyFloat64(v, build.Append)
595+
}
596+
597+
case *array.StringBuilder:
598+
f.buildFunc = func(v reflect.Value) error {
599+
if v.IsNil() {
600+
e.AppendNull()
601+
return nil
602+
}
603+
e.Append(true)
604+
build.Reserve(v.Len())
605+
return applyString(v, func(i string) error {
606+
build.Append(i)
607+
return nil
608+
})
609+
}
610+
case *array.BinaryDictionaryBuilder:
611+
f.buildFunc = func(v reflect.Value) error {
612+
if v.Len() == 0 {
613+
e.AppendNull()
614+
return nil
615+
}
616+
e.Append(true)
617+
build.Reserve(v.Len())
618+
return applyString(v, build.AppendString)
619+
}
620+
case *array.BooleanBuilder:
621+
f.buildFunc = func(v reflect.Value) error {
622+
if v.IsNil() {
623+
e.AppendNull()
624+
return nil
625+
}
626+
e.Append(true)
627+
build.Reserve(v.Len())
628+
return applyBool(v, func(i bool) error {
629+
build.Append(i)
630+
return nil
631+
})
632+
}
633+
}
533634
default:
534635
panic("frostdb:dynschema: unsupported array builder " + b.Type().String())
535636
}
536637
return
537638
}
538639

640+
func applyString(v reflect.Value, apply func(string) error) error {
641+
return listApply[string](v, func(v reflect.Value) string {
642+
return v.Interface().(string)
643+
}, apply)
644+
}
645+
646+
func applyFloat64(v reflect.Value, apply func(float64) error) error {
647+
return listApply[float64](v, func(v reflect.Value) float64 {
648+
return v.Float()
649+
}, apply)
650+
}
651+
652+
func applyBool(v reflect.Value, apply func(bool) error) error {
653+
return listApply[bool](v, func(v reflect.Value) bool {
654+
return v.Bool()
655+
}, apply)
656+
}
657+
658+
func applyInt(v reflect.Value, apply func(int64) error) error {
659+
return listApply[int64](v, func(v reflect.Value) int64 {
660+
return v.Int()
661+
}, apply)
662+
}
663+
664+
func listApply[T any](v reflect.Value, fn func(reflect.Value) T, apply func(T) error) error {
665+
for i := 0; i < v.Len(); i++ {
666+
err := apply(fn(v.Index(i)))
667+
if err != nil {
668+
return err
669+
}
670+
}
671+
return nil
672+
}
673+
539674
func newUUIDSliceField(mem memory.Allocator, name string) (f *fieldBuilderFunc) {
540675
dt := &arrow.DictionaryType{
541676
IndexType: &arrow.Int32Type{},
@@ -584,6 +719,7 @@ type fieldRecord struct {
584719
dynamic bool
585720
preHash bool
586721
nullable bool
722+
repeated bool
587723
sort bool
588724
nullFirst bool
589725
sortOrder int

dynparquet/record_builder_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/apache/arrow/go/v14/arrow/memory"
77
"github.com/stretchr/testify/require"
8+
"google.golang.org/protobuf/encoding/protojson"
89
"google.golang.org/protobuf/proto"
910
)
1011

@@ -41,6 +42,94 @@ func TestBuild(t *testing.T) {
4142
require.Nil(t, err)
4243
require.JSONEq(t, want, string(got))
4344
})
45+
46+
t.Run("Repeated", func(t *testing.T) {
47+
type Repeated struct {
48+
Int []int64
49+
Float []float64
50+
Bool []bool
51+
String []string
52+
StringDict []string `frostdb:",rle_dict"`
53+
}
54+
b := NewBuild[Repeated](memory.DefaultAllocator)
55+
defer b.Release()
56+
57+
wantSchema := `{
58+
"name": "repeated",
59+
"columns": [
60+
{
61+
"name": "int",
62+
"storageLayout": {
63+
"type": "TYPE_INT64",
64+
"nullable": true,
65+
"repeated": true
66+
}
67+
},
68+
{
69+
"name": "float",
70+
"storageLayout": {
71+
"type": "TYPE_DOUBLE",
72+
"nullable": true,
73+
"repeated": true
74+
}
75+
},
76+
{
77+
"name": "bool",
78+
"storageLayout": {
79+
"type": "TYPE_BOOL",
80+
"nullable": true,
81+
"repeated": true
82+
}
83+
},
84+
{
85+
"name": "string",
86+
"storageLayout": {
87+
"type": "TYPE_STRING",
88+
"nullable": true,
89+
"repeated": true
90+
}
91+
},
92+
{
93+
"name": "string_dict",
94+
"storageLayout": {
95+
"type": "TYPE_STRING",
96+
"encoding": "ENCODING_RLE_DICTIONARY",
97+
"nullable": true,
98+
"repeated": true
99+
}
100+
}
101+
]
102+
}`
103+
m := protojson.MarshalOptions{Multiline: true}
104+
d, _ := m.Marshal(b.Schema("repeated"))
105+
require.JSONEq(t, wantSchema, string(d))
106+
107+
err := b.Append(
108+
Repeated{}, // nulls
109+
Repeated{
110+
Int: []int64{1, 2},
111+
Float: []float64{1, 2},
112+
Bool: []bool{true, true},
113+
String: []string{"a", "b"},
114+
StringDict: []string{"a", "b"},
115+
},
116+
Repeated{
117+
Int: []int64{1, 2},
118+
Float: []float64{1, 2},
119+
Bool: []bool{true, true},
120+
String: []string{"a", "b"},
121+
StringDict: []string{"c", "d"},
122+
},
123+
)
124+
require.Nil(t, err)
125+
want := `[{"bool":null,"float":null,"int":null,"string":null,"string_dict":null}
126+
,{"bool":[true,true],"float":[1,2],"int":[1,2],"string":["a","b"],"string_dict":["a","b"]}
127+
,{"bool":[true,true],"float":[1,2],"int":[1,2],"string":["a","b"],"string_dict":["c","d"]}
128+
]`
129+
r := b.NewRecord()
130+
data, _ := r.MarshalJSON()
131+
require.JSONEq(t, want, string(data))
132+
})
44133
}
45134

46135
func BenchmarkBuild_Append_Then_NewRecord(b *testing.B) {

0 commit comments

Comments
 (0)