-
Notifications
You must be signed in to change notification settings - Fork 293
sink(ticdc): check virtual columns in column dispatcher (#12254) #12321
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: release-8.1
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -23,6 +23,7 @@ import ( | |||||
"github.com/pingcap/tidb/pkg/parser/types" | ||||||
"github.com/pingcap/tidb/pkg/table/tables" | ||||||
"github.com/pingcap/tidb/pkg/util/rowcodec" | ||||||
"github.com/pingcap/tiflow/pkg/errors" | ||||||
"go.uber.org/zap" | ||||||
) | ||||||
|
||||||
|
@@ -473,25 +474,36 @@ func (ti *TableInfo) IndexByName(name string) ([]string, []int, bool) { | |||||
// Column is not case-sensitive on any platform, nor are column aliases. | ||||||
// So we always match in lowercase. | ||||||
// See also: https://dev.mysql.com/doc/refman/5.7/en/identifier-case-sensitivity.html | ||||||
func (ti *TableInfo) OffsetsByNames(names []string) ([]int, bool) { | ||||||
func (ti *TableInfo) OffsetsByNames(names []string) ([]int, error) { | ||||||
// todo: optimize it | ||||||
columnOffsets := make(map[string]int, len(ti.Columns)) | ||||||
columnOffsets := make(map[string]int) | ||||||
virtualGeneratedColumn := make(map[string]struct{}) | ||||||
for _, col := range ti.Columns { | ||||||
if col != nil { | ||||||
columnOffsets[col.Name.L] = ti.columnsOffset[col.ID] | ||||||
if IsColCDCVisible(col) { | ||||||
columnOffsets[col.Name.L] = ti.columnsOffset[col.ID] | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There seems to be a bug here.
Suggested change
|
||||||
} else { | ||||||
virtualGeneratedColumn[col.Name.L] = struct{}{} | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
result := make([]int, 0, len(names)) | ||||||
for _, col := range names { | ||||||
offset, ok := columnOffsets[strings.ToLower(col)] | ||||||
name := strings.ToLower(col) | ||||||
if _, ok := virtualGeneratedColumn[name]; ok { | ||||||
return nil, errors.ErrDispatcherFailed.GenWithStack( | ||||||
"found virtual generated columns when dispatch event, table: %v, columns: %v column: %v", ti.GetTableName(), names, name) | ||||||
} | ||||||
offset, ok := columnOffsets[name] | ||||||
if !ok { | ||||||
return nil, false | ||||||
return nil, errors.ErrDispatcherFailed.GenWithStack( | ||||||
"columns not found when dispatch event, table: %v, columns: %v, column: %v", ti.GetTableName(), names, name) | ||||||
} | ||||||
result = append(result, offset) | ||||||
} | ||||||
|
||||||
return result, true | ||||||
return result, nil | ||||||
} | ||||||
|
||||||
// GetPrimaryKeyColumnNames returns the primary key column names | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,9 +31,16 @@ func TestColumnsDispatcher(t *testing.T) { | |
ID: 100, | ||
Name: timodel.NewCIStr("t1"), | ||
Columns: []*timodel.ColumnInfo{ | ||
<<<<<<< HEAD | ||
{ID: 1, Name: timodel.NewCIStr("col2"), Offset: 1, FieldType: *types.NewFieldType(mysql.TypeLong)}, | ||
{ID: 2, Name: timodel.NewCIStr("col1"), Offset: 0, FieldType: *types.NewFieldType(mysql.TypeLong)}, | ||
{ID: 3, Name: timodel.NewCIStr("col3"), Offset: 2, FieldType: *types.NewFieldType(mysql.TypeLong)}, | ||
======= | ||
{ID: 1, Name: pmodel.NewCIStr("col2"), Offset: 1, FieldType: *types.NewFieldType(mysql.TypeLong)}, | ||
{ID: 2, Name: pmodel.NewCIStr("col1"), Offset: 0, FieldType: *types.NewFieldType(mysql.TypeLong)}, | ||
{ID: 3, Name: pmodel.NewCIStr("col3"), Offset: 2, FieldType: *types.NewFieldType(mysql.TypeLong)}, | ||
{ID: 4, Name: pmodel.NewCIStr("col4"), Offset: 3, FieldType: *types.NewFieldType(mysql.TypeLong), GeneratedExprString: "generated"}, | ||
>>>>>>> 35ff0a22ec (sink(ticdc): check virtual columns in column dispatcher (#12254)) | ||
Comment on lines
+34
to
+43
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This file contains unresolved merge conflict markers which will cause the build to fail. Please resolve the conflict. It appears the intent was to add a virtual column {ID: 1, Name: timodel.NewCIStr("col2"), Offset: 1, FieldType: *types.NewFieldType(mysql.TypeLong)},
{ID: 2, Name: timodel.NewCIStr("col1"), Offset: 0, FieldType: *types.NewFieldType(mysql.TypeLong)},
{ID: 3, Name: timodel.NewCIStr("col3"), Offset: 2, FieldType: *types.NewFieldType(mysql.TypeLong)},
{ID: 4, Name: timodel.NewCIStr("col4"), Offset: 3, FieldType: *types.NewFieldType(mysql.TypeLong), GeneratedExprString: "generated"}, |
||
}, | ||
} | ||
tableInfo := model.WrapTableInfo(100, "test", 33, tidbTableInfo) | ||
|
@@ -46,9 +53,15 @@ func TestColumnsDispatcher(t *testing.T) { | |
}, | ||
} | ||
|
||
p := NewColumnsDispatcher([]string{"col-2", "col-not-found"}) | ||
p := NewColumnsDispatcher([]string{"col2", "col-not-found"}) | ||
_, _, err := p.DispatchRowChangedEvent(event, 16) | ||
require.ErrorIs(t, err, errors.ErrDispatcherFailed) | ||
require.ErrorContains(t, err, "columns not found") | ||
|
||
p = NewColumnsDispatcher([]string{"col4"}) | ||
_, _, err = p.DispatchRowChangedEvent(event, 16) | ||
require.ErrorIs(t, err, errors.ErrDispatcherFailed) | ||
require.ErrorContains(t, err, "found virtual generated columns") | ||
|
||
p = NewColumnsDispatcher([]string{"col2", "col1"}) | ||
index, _, err := p.DispatchRowChangedEvent(event, 16) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous implementation pre-allocated the
columnOffsets
map. This change removes the pre-allocation, which is a minor performance regression. It's good practice to pre-allocate maps with a known size to avoid reallocations. You can uselen(ti.Columns) - len(ti.VirtualColumnsOffset)
forcolumnOffsets
andlen(ti.VirtualColumnsOffset)
forvirtualGeneratedColumn
.Additionally, since this function can be on a hot path for column-based dispatching, consider caching these maps in the
TableInfo
struct (similar tonameToColID
) to avoid rebuilding them on every call. The// todo: optimize it
comment suggests this is a known area for improvement.