diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index e5682c1a9ff..a27c9d1a931 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/types" "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/util/rowcodec" + "github.com/pingcap/tiflow/pkg/errors" "go.uber.org/zap" ) @@ -473,25 +474,36 @@ func (ti *TableInfo) IndexByName(name string) ([]string, []int, bool) { // Column is not case-sensitive on any platform, nor are column aliases. // So we always match in lowercase. // See also: https://dev.mysql.com/doc/refman/5.7/en/identifier-case-sensitivity.html -func (ti *TableInfo) OffsetsByNames(names []string) ([]int, bool) { +func (ti *TableInfo) OffsetsByNames(names []string) ([]int, error) { // todo: optimize it - columnOffsets := make(map[string]int, len(ti.Columns)) + columnOffsets := make(map[string]int) + virtualGeneratedColumn := make(map[string]struct{}) for _, col := range ti.Columns { if col != nil { - columnOffsets[col.Name.L] = ti.columnsOffset[col.ID] + if IsColCDCVisible(col) { + columnOffsets[col.Name.L] = ti.columnsOffset[col.ID] + } else { + virtualGeneratedColumn[col.Name.L] = struct{}{} + } } } result := make([]int, 0, len(names)) for _, col := range names { - offset, ok := columnOffsets[strings.ToLower(col)] + name := strings.ToLower(col) + if _, ok := virtualGeneratedColumn[name]; ok { + return nil, errors.ErrDispatcherFailed.GenWithStack( + "found virtual generated columns when dispatch event, table: %v, columns: %v column: %v", ti.GetTableName(), names, name) + } + offset, ok := columnOffsets[name] if !ok { - return nil, false + return nil, errors.ErrDispatcherFailed.GenWithStack( + "columns not found when dispatch event, table: %v, columns: %v, column: %v", ti.GetTableName(), names, name) } result = append(result, offset) } - return result, true + return result, nil } // GetPrimaryKeyColumnNames returns the primary key column names diff --git a/cdc/model/schema_storage_test.go b/cdc/model/schema_storage_test.go index 7dd835e5cc9..ac22b1e4410 100644 --- a/cdc/model/schema_storage_test.go +++ b/cdc/model/schema_storage_test.go @@ -20,6 +20,7 @@ import ( timodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" parser_types "github.com/pingcap/tidb/pkg/parser/types" + "github.com/pingcap/tiflow/pkg/errors" "github.com/stretchr/testify/require" ) @@ -341,28 +342,40 @@ func TestColumnsByNames(t *testing.T) { Name: timodel.NewCIStr("col3"), ID: 2, }, + { + Name: pmodel.NewCIStr("col4"), + ID: 3, + GeneratedExprString: "generated", + }, }, }) names := []string{"col1", "col2", "col3"} - offsets, ok := tableInfo.OffsetsByNames(names) - require.True(t, ok) + offsets, err := tableInfo.OffsetsByNames(names) + require.NoError(t, err) require.Equal(t, []int{1, 0, 2}, offsets) names = []string{"col2"} - offsets, ok = tableInfo.OffsetsByNames(names) - require.True(t, ok) + offsets, err = tableInfo.OffsetsByNames(names) + require.NoError(t, err) require.Equal(t, []int{0}, offsets) names = []string{"col1", "col-not-found"} - offsets, ok = tableInfo.OffsetsByNames(names) - require.False(t, ok) + offsets, err = tableInfo.OffsetsByNames(names) + require.ErrorIs(t, err, errors.ErrDispatcherFailed) + require.ErrorContains(t, err, "columns not found") require.Nil(t, offsets) names = []string{"Col1", "COL2", "CoL3"} - offsets, ok = tableInfo.OffsetsByNames(names) - require.True(t, ok) + offsets, err = tableInfo.OffsetsByNames(names) + require.NoError(t, err) require.Equal(t, []int{1, 0, 2}, offsets) + + names = []string{"Col4"} + offsets, err = tableInfo.OffsetsByNames(names) + require.ErrorIs(t, err, errors.ErrDispatcherFailed) + require.ErrorContains(t, err, "found virtual generated columns") + require.Nil(t, offsets) } func TestBuildTiDBTableInfoWithIntPrimaryKey(t *testing.T) { diff --git a/cdc/sink/dmlsink/mq/dispatcher/event_router.go b/cdc/sink/dmlsink/mq/dispatcher/event_router.go index 2d325b0c0d0..b15245a2860 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/event_router.go +++ b/cdc/sink/dmlsink/mq/dispatcher/event_router.go @@ -150,10 +150,9 @@ func (s *EventRouter) VerifyTables(infos []*model.TableInfo) error { } } case *partition.ColumnsDispatcher: - _, ok := table.OffsetsByNames(v.Columns) - if !ok { - return cerror.ErrDispatcherFailed.GenWithStack( - "columns not found when verify the table, table: %v, columns: %v", table.TableName, v.Columns) + _, err := table.OffsetsByNames(v.Columns) + if err != nil { + return err } default: } diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/columns.go b/cdc/sink/dmlsink/mq/dispatcher/partition/columns.go index abaab532749..ab2f4016a67 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/columns.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/columns.go @@ -19,7 +19,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/hash" "go.uber.org/zap" ) @@ -55,13 +54,10 @@ func (r *ColumnsDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent, dispatchCols = row.PreColumns } - offsets, ok := row.TableInfo.OffsetsByNames(r.Columns) - if !ok { - log.Error("columns not found when dispatch event", - zap.Any("tableName", row.TableInfo.GetTableName()), - zap.Strings("columns", r.Columns)) - return 0, "", errors.ErrDispatcherFailed.GenWithStack( - "columns not found when dispatch event, table: %v, columns: %v", row.TableInfo.GetTableName(), r.Columns) + offsets, err := row.TableInfo.OffsetsByNames(r.Columns) + if err != nil { + log.Error("dispatch event failed", zap.Error(err)) + return 0, "", err } for idx := 0; idx < len(r.Columns); idx++ { diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/columns_test.go b/cdc/sink/dmlsink/mq/dispatcher/partition/columns_test.go index 4510406c5ec..9684811b6ce 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/columns_test.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/columns_test.go @@ -31,9 +31,16 @@ func TestColumnsDispatcher(t *testing.T) { ID: 100, Name: timodel.NewCIStr("t1"), Columns: []*timodel.ColumnInfo{ +<<<<<<< HEAD {ID: 1, Name: timodel.NewCIStr("col2"), Offset: 1, FieldType: *types.NewFieldType(mysql.TypeLong)}, {ID: 2, Name: timodel.NewCIStr("col1"), Offset: 0, FieldType: *types.NewFieldType(mysql.TypeLong)}, {ID: 3, Name: timodel.NewCIStr("col3"), Offset: 2, FieldType: *types.NewFieldType(mysql.TypeLong)}, +======= + {ID: 1, Name: pmodel.NewCIStr("col2"), Offset: 1, FieldType: *types.NewFieldType(mysql.TypeLong)}, + {ID: 2, Name: pmodel.NewCIStr("col1"), Offset: 0, FieldType: *types.NewFieldType(mysql.TypeLong)}, + {ID: 3, Name: pmodel.NewCIStr("col3"), Offset: 2, FieldType: *types.NewFieldType(mysql.TypeLong)}, + {ID: 4, Name: pmodel.NewCIStr("col4"), Offset: 3, FieldType: *types.NewFieldType(mysql.TypeLong), GeneratedExprString: "generated"}, +>>>>>>> 35ff0a22ec (sink(ticdc): check virtual columns in column dispatcher (#12254)) }, } tableInfo := model.WrapTableInfo(100, "test", 33, tidbTableInfo) @@ -46,9 +53,15 @@ func TestColumnsDispatcher(t *testing.T) { }, } - p := NewColumnsDispatcher([]string{"col-2", "col-not-found"}) + p := NewColumnsDispatcher([]string{"col2", "col-not-found"}) _, _, err := p.DispatchRowChangedEvent(event, 16) require.ErrorIs(t, err, errors.ErrDispatcherFailed) + require.ErrorContains(t, err, "columns not found") + + p = NewColumnsDispatcher([]string{"col4"}) + _, _, err = p.DispatchRowChangedEvent(event, 16) + require.ErrorIs(t, err, errors.ErrDispatcherFailed) + require.ErrorContains(t, err, "found virtual generated columns") p = NewColumnsDispatcher([]string{"col2", "col1"}) index, _, err := p.DispatchRowChangedEvent(event, 16)