Skip to content

Commit 9b6ae60

Browse files
authored
Merge pull request #28 from liquidata-inc/zachmu/insert-update-delete-perf
New interfaces for insert, update, delete, replace. The new interfaces allow batching of multiple rows for efficiency and allow implementors to implement atomic operations.
2 parents 4c6a7a7 + 7de007e commit 9b6ae60

File tree

6 files changed

+177
-50
lines changed

6 files changed

+177
-50
lines changed

engine_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3812,12 +3812,15 @@ func TestGenerators(t *testing.T) {
38123812
}
38133813
}
38143814

3815-
func insertRows(t *testing.T, table sql.Inserter, rows ...sql.Row) {
3815+
func insertRows(t *testing.T, table sql.InsertableTable, rows ...sql.Row) {
38163816
t.Helper()
38173817

3818+
ctx := newCtx()
3819+
inserter := table.Inserter(ctx)
38183820
for _, r := range rows {
3819-
require.NoError(t, table.Insert(newCtx(), r))
3821+
require.NoError(t, inserter.Insert(ctx, r))
38203822
}
3823+
require.NoError(t, inserter.Close(ctx))
38213824
}
38223825

38233826
var pid uint64

memory/table.go

Lines changed: 60 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ type Table struct {
2828
}
2929

3030
var _ sql.Table = (*Table)(nil)
31-
var _ sql.Inserter = (*Table)(nil)
31+
var _ sql.InsertableTable = (*Table)(nil)
32+
var _ sql.UpdatableTable = (*Table)(nil)
33+
var _ sql.DeletableTable = (*Table)(nil)
34+
var _ sql.ReplaceableTable = (*Table)(nil)
3235
var _ sql.FilteredTable = (*Table)(nil)
3336
var _ sql.ProjectedTable = (*Table)(nil)
3437
var _ sql.IndexableTable = (*Table)(nil)
@@ -241,30 +244,70 @@ func encodeIndexValue(value *indexValue) ([]byte, error) {
241244
return buf.Bytes(), nil
242245
}
243246

244-
// Insert a new row into the table.
247+
type tableEditor struct {
248+
table *Table
249+
}
250+
251+
var _ sql.RowReplacer = (*tableEditor)(nil)
252+
var _ sql.RowUpdater = (*tableEditor)(nil)
253+
var _ sql.RowInserter = (*tableEditor)(nil)
254+
var _ sql.RowDeleter = (*tableEditor)(nil)
255+
256+
func (t tableEditor) Close(*sql.Context) error {
257+
// TODO: it would be nice to apply all pending updates here at once, rather than directly in the Insert / Update
258+
// / Delete methods.
259+
return nil
260+
}
261+
262+
func (t *Table) Inserter(*sql.Context) sql.RowInserter {
263+
return &tableEditor{t}
264+
}
265+
266+
func (t *Table) Updater(*sql.Context) sql.RowUpdater {
267+
return &tableEditor{t}
268+
}
269+
270+
func (t *Table) Replacer(*sql.Context) sql.RowReplacer {
271+
return &tableEditor{t}
272+
}
273+
274+
func (t *Table) Deleter(*sql.Context) sql.RowDeleter {
275+
return &tableEditor{t}
276+
}
277+
278+
// Convenience method to avoid having to create an inserter in test setup
245279
func (t *Table) Insert(ctx *sql.Context, row sql.Row) error {
246-
if err := checkRow(t.schema, row); err != nil {
280+
inserter := t.Inserter(ctx)
281+
if err := inserter.Insert(ctx, row); err != nil {
282+
return err
283+
}
284+
return inserter.Close(ctx)
285+
}
286+
287+
// Insert a new row into the table.
288+
func (t *tableEditor) Insert(ctx *sql.Context, row sql.Row) error {
289+
if err := checkRow(t.table.schema, row); err != nil {
247290
return err
248291
}
249292

250-
key := string(t.keys[t.insert])
251-
t.insert++
252-
if t.insert == len(t.keys) {
253-
t.insert = 0
293+
key := string(t.table.keys[t.table.insert])
294+
t.table.insert++
295+
if t.table.insert == len(t.table.keys) {
296+
t.table.insert = 0
254297
}
255298

256-
t.partitions[key] = append(t.partitions[key], row)
299+
t.table.partitions[key] = append(t.table.partitions[key], row)
257300
return nil
258301
}
259302

260303
// Delete the given row from the table.
261-
func (t *Table) Delete(ctx *sql.Context, row sql.Row) error {
262-
if err := checkRow(t.schema, row); err != nil {
304+
func (t *tableEditor) Delete(ctx *sql.Context, row sql.Row) error {
305+
if err := checkRow(t.table.schema, row); err != nil {
263306
return err
264307
}
265308

266309
matches := false
267-
for partitionIndex, partition := range t.partitions {
310+
for partitionIndex, partition := range t.table.partitions {
268311
for partitionRowIndex, partitionRow := range partition {
269312
matches = true
270313
for rIndex, val := range row {
@@ -274,7 +317,7 @@ func (t *Table) Delete(ctx *sql.Context, row sql.Row) error {
274317
}
275318
}
276319
if matches {
277-
t.partitions[partitionIndex] = append(partition[:partitionRowIndex], partition[partitionRowIndex+1:]...)
320+
t.table.partitions[partitionIndex] = append(partition[:partitionRowIndex], partition[partitionRowIndex+1:]...)
278321
break
279322
}
280323
}
@@ -290,16 +333,16 @@ func (t *Table) Delete(ctx *sql.Context, row sql.Row) error {
290333
return nil
291334
}
292335

293-
func (t *Table) Update(ctx *sql.Context, oldRow sql.Row, newRow sql.Row) error {
294-
if err := checkRow(t.schema, oldRow); err != nil {
336+
func (t *tableEditor) Update(ctx *sql.Context, oldRow sql.Row, newRow sql.Row) error {
337+
if err := checkRow(t.table.schema, oldRow); err != nil {
295338
return err
296339
}
297-
if err := checkRow(t.schema, newRow); err != nil {
340+
if err := checkRow(t.table.schema, newRow); err != nil {
298341
return err
299342
}
300343

301344
matches := false
302-
for partitionIndex, partition := range t.partitions {
345+
for partitionIndex, partition := range t.table.partitions {
303346
for partitionRowIndex, partitionRow := range partition {
304347
matches = true
305348
for rIndex, val := range oldRow {
@@ -309,7 +352,7 @@ func (t *Table) Update(ctx *sql.Context, oldRow sql.Row, newRow sql.Row) error {
309352
}
310353
}
311354
if matches {
312-
t.partitions[partitionIndex][partitionRowIndex] = newRow
355+
t.table.partitions[partitionIndex][partitionRowIndex] = newRow
313356
break
314357
}
315358
}

sql/core.go

Lines changed: 62 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -205,28 +205,79 @@ type IndexableTable interface {
205205
IndexKeyValues(*Context, []string) (PartitionIndexKeyValueIter, error)
206206
}
207207

208-
// Inserter allow rows to be inserted in them.
209-
type Inserter interface {
210-
// Insert the given row.
208+
// InsertableTable is a table that can process insertion of new rows.
209+
type InsertableTable interface {
210+
// Inserter returns an Inserter for this table. The Inserter will get one call to Insert() for each row to be
211+
// inserted, and will end with a call to Close() to finalize the insert operation.
212+
Inserter(*Context) RowInserter
213+
}
214+
215+
// RowInserter is an insert cursor that can insert one or more values to a table.
216+
type RowInserter interface {
217+
// Insert inserts the row given, returning an error if it cannot. Insert will be called once for each row to process
218+
// for the insert operation, which may involve many rows. After all rows in an operation have been processed, Close
219+
// is called.
211220
Insert(*Context, Row) error
221+
// Close finalizes the insert operation, persisting its result.
222+
Closer
223+
}
224+
225+
// DeleteableTable is a table that can process the deletion of rows
226+
type DeletableTable interface {
227+
// Deleter returns a RowDeleter for this table. The RowDeleter will get one call to Delete for each row to be deleted,
228+
// and will end with a call to Close() to finalize the delete operation.
229+
Deleter(*Context) RowDeleter
230+
}
231+
232+
// RowDeleter is a delete cursor that can delete one or more rows from a table.
233+
type RowDeleter interface {
234+
// Delete deletes the given row. Returns ErrDeleteRowNotFound if the row was not found. Delete will be called once for
235+
// each row to process for the delete operation, which may involve many rows. After all rows have been processed,
236+
// Close is called.
237+
Delete(*Context, Row) error
238+
// Close finalizes the delete operation, persisting the result.
239+
Closer
240+
}
241+
242+
type Closer interface {
243+
Close(*Context) error
212244
}
213245

214-
// Deleter allow rows to be deleted from tables.
215-
type Deleter interface {
216-
// Delete the given row. Returns ErrDeleteRowNotFound if the row was not found.
246+
// RowReplacer is a combination of RowDeleter and RowInserter. We can't embed those interfaces because go doesn't allow
247+
// for overlapping interfaces (they both declare Close)
248+
type RowReplacer interface {
249+
// Insert inserts the row given, returning an error if it cannot. Insert will be called once for each row to process
250+
// for the replace operation, which may involve many rows. After all rows in an operation have been processed, Close
251+
// is called.
252+
Insert(*Context, Row) error
253+
// Delete deletes the given row. Returns ErrDeleteRowNotFound if the row was not found. Delete will be called once for
254+
// each row to process for the delete operation, which may involve many rows. After all rows have been processed,
255+
// Close is called.
217256
Delete(*Context, Row) error
257+
// Close finalizes the replace operation, persisting the result.
258+
Closer
218259
}
219260

220261
// Replacer allows rows to be replaced through a Delete (if applicable) then Insert.
221-
type Replacer interface {
222-
Deleter
223-
Inserter
262+
type ReplaceableTable interface {
263+
// Replacer returns a RowReplacer for this table. The RowReplacer will have Insert and optionally Delete called once
264+
// for each row, followed by a call to Close() when all rows have been processed.
265+
Replacer(ctx *Context) RowReplacer
266+
}
267+
268+
// UpdateableTable is a table that can process updates of existing rows via update statements.
269+
type UpdatableTable interface {
270+
// Updater returns a RowUpdater for this table. The RowUpdater will have Update called once for each row to be
271+
// updated, followed by a call to Close() when all rows have been processed.
272+
Updater(ctx *Context) RowUpdater
224273
}
225274

226-
// Updater allows rows to be updated.
227-
type Updater interface {
275+
// RowUpdater is an update cursor that can update one or more rows in a table.
276+
type RowUpdater interface {
228277
// Update the given row. Provides both the old and new rows.
229278
Update(ctx *Context, old Row, new Row) error
279+
// Close finalizes the update operation, persisting the result.
280+
Closer
230281
}
231282

232283
// Database represents the database.

sql/plan/delete.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ func (p *DeleteFrom) Children() []sql.Node {
3737
return []sql.Node{p.Node}
3838
}
3939

40-
func getDeletable(node sql.Node) (sql.Deleter, error) {
40+
func getDeletable(node sql.Node) (sql.DeletableTable, error) {
4141
switch node := node.(type) {
42-
case sql.Deleter:
42+
case sql.DeletableTable:
4343
return node, nil
4444
case *ResolvedTable:
4545
return getDeletableTable(node.Table)
@@ -53,9 +53,9 @@ func getDeletable(node sql.Node) (sql.Deleter, error) {
5353
return nil, ErrDeleteFromNotSupported.New()
5454
}
5555

56-
func getDeletableTable(t sql.Table) (sql.Deleter, error) {
56+
func getDeletableTable(t sql.Table) (sql.DeletableTable, error) {
5757
switch t := t.(type) {
58-
case sql.Deleter:
58+
case sql.DeletableTable:
5959
return t, nil
6060
case sql.TableWrapper:
6161
return getDeletableTable(t.Underlying())
@@ -76,6 +76,8 @@ func (p *DeleteFrom) Execute(ctx *sql.Context) (int, error) {
7676
return 0, err
7777
}
7878

79+
deleter := deletable.Deleter(ctx)
80+
7981
i := 0
8082
for {
8183
row, err := iter.Next()
@@ -88,14 +90,18 @@ func (p *DeleteFrom) Execute(ctx *sql.Context) (int, error) {
8890
return i, err
8991
}
9092

91-
if err := deletable.Delete(ctx, row); err != nil {
93+
if err := deleter.Delete(ctx, row); err != nil {
9294
_ = iter.Close()
9395
return i, err
9496
}
9597

9698
i++
9799
}
98100

101+
if err := deleter.Close(ctx); err != nil {
102+
return 0, err
103+
}
104+
99105
return i, nil
100106
}
101107

sql/plan/insert.go

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ func (p *InsertInto) Schema() sql.Schema {
4545
}}
4646
}
4747

48-
func getInsertable(node sql.Node) (sql.Inserter, error) {
48+
func getInsertable(node sql.Node) (sql.InsertableTable, error) {
4949
switch node := node.(type) {
50-
case sql.Inserter:
50+
case sql.InsertableTable:
5151
return node, nil
5252
case *ResolvedTable:
5353
return getInsertableTable(node.Table)
@@ -56,9 +56,9 @@ func getInsertable(node sql.Node) (sql.Inserter, error) {
5656
}
5757
}
5858

59-
func getInsertableTable(t sql.Table) (sql.Inserter, error) {
59+
func getInsertableTable(t sql.Table) (sql.InsertableTable, error) {
6060
switch t := t.(type) {
61-
case sql.Inserter:
61+
case sql.InsertableTable:
6262
return t, nil
6363
case sql.TableWrapper:
6464
return getInsertableTable(t.Underlying())
@@ -74,10 +74,10 @@ func (p *InsertInto) Execute(ctx *sql.Context) (int, error) {
7474
return 0, err
7575
}
7676

77-
var replaceable sql.Replacer
77+
var replaceable sql.ReplaceableTable
7878
if p.IsReplace {
7979
var ok bool
80-
replaceable, ok = insertable.(sql.Replacer)
80+
replaceable, ok = insertable.(sql.ReplaceableTable)
8181
if !ok {
8282
return 0, ErrReplaceIntoNotSupported.New()
8383
}
@@ -132,6 +132,14 @@ func (p *InsertInto) Execute(ctx *sql.Context) (int, error) {
132132
return 0, err
133133
}
134134

135+
var inserter sql.RowInserter
136+
var replacer sql.RowReplacer
137+
if replaceable != nil {
138+
replacer = replaceable.Replacer(ctx)
139+
} else {
140+
inserter = insertable.Inserter(ctx)
141+
}
142+
135143
i := 0
136144
for {
137145
row, err := iter.Next()
@@ -163,8 +171,8 @@ func (p *InsertInto) Execute(ctx *sql.Context) (int, error) {
163171
}
164172
}
165173

166-
if replaceable != nil {
167-
if err = replaceable.Delete(ctx, row); err != nil {
174+
if replacer != nil {
175+
if err = replacer.Delete(ctx, row); err != nil {
168176
if err != sql.ErrDeleteRowNotFound {
169177
_ = iter.Close()
170178
return i, err
@@ -173,19 +181,29 @@ func (p *InsertInto) Execute(ctx *sql.Context) (int, error) {
173181
i++
174182
}
175183

176-
if err = replaceable.Insert(ctx, row); err != nil {
184+
if err = replacer.Insert(ctx, row); err != nil {
177185
_ = iter.Close()
178186
return i, err
179187
}
180188
} else {
181-
if err := insertable.Insert(ctx, row); err != nil {
189+
if err := inserter.Insert(ctx, row); err != nil {
182190
_ = iter.Close()
183191
return i, err
184192
}
185193
}
186194
i++
187195
}
188196

197+
if replacer != nil {
198+
if err := replacer.Close(ctx); err != nil {
199+
return 0, err
200+
}
201+
} else {
202+
if err := inserter.Close(ctx); err != nil {
203+
return 0, err
204+
}
205+
}
206+
189207
return i, nil
190208
}
191209

0 commit comments

Comments
 (0)