Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/types"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/util/rowcodec"
"github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -473,25 +474,36 @@ func (ti *TableInfo) IndexByName(name string) ([]string, []int, bool) {
// Column is not case-sensitive on any platform, nor are column aliases.
// So we always match in lowercase.
// See also: https://dev.mysql.com/doc/refman/5.7/en/identifier-case-sensitivity.html
func (ti *TableInfo) OffsetsByNames(names []string) ([]int, bool) {
func (ti *TableInfo) OffsetsByNames(names []string) ([]int, error) {
// todo: optimize it
columnOffsets := make(map[string]int, len(ti.Columns))
columnOffsets := make(map[string]int)
virtualGeneratedColumn := make(map[string]struct{})
Comment on lines +479 to +480

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The previous implementation pre-allocated the columnOffsets map. This change removes the pre-allocation, which is a minor performance regression. It's good practice to pre-allocate maps with a known size to avoid reallocations. You can use len(ti.Columns) - len(ti.VirtualColumnsOffset) for columnOffsets and len(ti.VirtualColumnsOffset) for virtualGeneratedColumn.

Additionally, since this function can be on a hot path for column-based dispatching, consider caching these maps in the TableInfo struct (similar to nameToColID) to avoid rebuilding them on every call. The // todo: optimize it comment suggests this is a known area for improvement.

Suggested change
columnOffsets := make(map[string]int)
virtualGeneratedColumn := make(map[string]struct{})
columnOffsets := make(map[string]int, len(ti.Columns)-len(ti.VirtualColumnsOffset))
virtualGeneratedColumn := make(map[string]struct{}, len(ti.VirtualColumnsOffset))

for _, col := range ti.Columns {
if col != nil {
columnOffsets[col.Name.L] = ti.columnsOffset[col.ID]
if IsColCDCVisible(col) {
columnOffsets[col.Name.L] = ti.columnsOffset[col.ID]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There seems to be a bug here. ti.columnsOffset provides offsets into TableInfo.Columns, but the primary consumer of this function, ColumnsDispatcher, requires offsets into RowChangedEvent.Columns (which excludes virtual columns). Using ti.columnsOffset can lead to incorrect column access and data hashing, especially now that virtual columns are being handled. You should use ti.RowColumnsOffset instead, which is designed for this purpose and provides the correct offsets into the event's column data.

Suggested change
columnOffsets[col.Name.L] = ti.columnsOffset[col.ID]
columnOffsets[col.Name.L] = ti.RowColumnsOffset[col.ID]

} else {
virtualGeneratedColumn[col.Name.L] = struct{}{}
}
}
}

result := make([]int, 0, len(names))
for _, col := range names {
offset, ok := columnOffsets[strings.ToLower(col)]
name := strings.ToLower(col)
if _, ok := virtualGeneratedColumn[name]; ok {
return nil, errors.ErrDispatcherFailed.GenWithStack(
"found virtual generated columns when dispatch event, table: %v, columns: %v column: %v", ti.GetTableName(), names, name)
}
offset, ok := columnOffsets[name]
if !ok {
return nil, false
return nil, errors.ErrDispatcherFailed.GenWithStack(
"columns not found when dispatch event, table: %v, columns: %v, column: %v", ti.GetTableName(), names, name)
}
result = append(result, offset)
}

return result, true
return result, nil
}

// GetPrimaryKeyColumnNames returns the primary key column names
Expand Down
29 changes: 21 additions & 8 deletions cdc/model/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
timodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
parser_types "github.com/pingcap/tidb/pkg/parser/types"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -341,28 +342,40 @@ func TestColumnsByNames(t *testing.T) {
Name: timodel.NewCIStr("col3"),
ID: 2,
},
{
Name: pmodel.NewCIStr("col4"),
ID: 3,
GeneratedExprString: "generated",
},
},
})

names := []string{"col1", "col2", "col3"}
offsets, ok := tableInfo.OffsetsByNames(names)
require.True(t, ok)
offsets, err := tableInfo.OffsetsByNames(names)
require.NoError(t, err)
require.Equal(t, []int{1, 0, 2}, offsets)

names = []string{"col2"}
offsets, ok = tableInfo.OffsetsByNames(names)
require.True(t, ok)
offsets, err = tableInfo.OffsetsByNames(names)
require.NoError(t, err)
require.Equal(t, []int{0}, offsets)

names = []string{"col1", "col-not-found"}
offsets, ok = tableInfo.OffsetsByNames(names)
require.False(t, ok)
offsets, err = tableInfo.OffsetsByNames(names)
require.ErrorIs(t, err, errors.ErrDispatcherFailed)
require.ErrorContains(t, err, "columns not found")
require.Nil(t, offsets)

names = []string{"Col1", "COL2", "CoL3"}
offsets, ok = tableInfo.OffsetsByNames(names)
require.True(t, ok)
offsets, err = tableInfo.OffsetsByNames(names)
require.NoError(t, err)
require.Equal(t, []int{1, 0, 2}, offsets)

names = []string{"Col4"}
offsets, err = tableInfo.OffsetsByNames(names)
require.ErrorIs(t, err, errors.ErrDispatcherFailed)
require.ErrorContains(t, err, "found virtual generated columns")
require.Nil(t, offsets)
}

func TestBuildTiDBTableInfoWithIntPrimaryKey(t *testing.T) {
Expand Down
7 changes: 3 additions & 4 deletions cdc/sink/dmlsink/mq/dispatcher/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,9 @@ func (s *EventRouter) VerifyTables(infos []*model.TableInfo) error {
}
}
case *partition.ColumnsDispatcher:
_, ok := table.OffsetsByNames(v.Columns)
if !ok {
return cerror.ErrDispatcherFailed.GenWithStack(
"columns not found when verify the table, table: %v, columns: %v", table.TableName, v.Columns)
_, err := table.OffsetsByNames(v.Columns)
if err != nil {
return err
}
default:
}
Expand Down
12 changes: 4 additions & 8 deletions cdc/sink/dmlsink/mq/dispatcher/partition/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/hash"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -55,13 +54,10 @@ func (r *ColumnsDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent,
dispatchCols = row.PreColumns
}

offsets, ok := row.TableInfo.OffsetsByNames(r.Columns)
if !ok {
log.Error("columns not found when dispatch event",
zap.Any("tableName", row.TableInfo.GetTableName()),
zap.Strings("columns", r.Columns))
return 0, "", errors.ErrDispatcherFailed.GenWithStack(
"columns not found when dispatch event, table: %v, columns: %v", row.TableInfo.GetTableName(), r.Columns)
offsets, err := row.TableInfo.OffsetsByNames(r.Columns)
if err != nil {
log.Error("dispatch event failed", zap.Error(err))
return 0, "", err
}

for idx := 0; idx < len(r.Columns); idx++ {
Expand Down
15 changes: 14 additions & 1 deletion cdc/sink/dmlsink/mq/dispatcher/partition/columns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,16 @@ func TestColumnsDispatcher(t *testing.T) {
ID: 100,
Name: timodel.NewCIStr("t1"),
Columns: []*timodel.ColumnInfo{
<<<<<<< HEAD
{ID: 1, Name: timodel.NewCIStr("col2"), Offset: 1, FieldType: *types.NewFieldType(mysql.TypeLong)},
{ID: 2, Name: timodel.NewCIStr("col1"), Offset: 0, FieldType: *types.NewFieldType(mysql.TypeLong)},
{ID: 3, Name: timodel.NewCIStr("col3"), Offset: 2, FieldType: *types.NewFieldType(mysql.TypeLong)},
=======
{ID: 1, Name: pmodel.NewCIStr("col2"), Offset: 1, FieldType: *types.NewFieldType(mysql.TypeLong)},
{ID: 2, Name: pmodel.NewCIStr("col1"), Offset: 0, FieldType: *types.NewFieldType(mysql.TypeLong)},
{ID: 3, Name: pmodel.NewCIStr("col3"), Offset: 2, FieldType: *types.NewFieldType(mysql.TypeLong)},
{ID: 4, Name: pmodel.NewCIStr("col4"), Offset: 3, FieldType: *types.NewFieldType(mysql.TypeLong), GeneratedExprString: "generated"},
>>>>>>> 35ff0a22ec (sink(ticdc): check virtual columns in column dispatcher (#12254))
Comment on lines +34 to +43

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This file contains unresolved merge conflict markers which will cause the build to fail. Please resolve the conflict. It appears the intent was to add a virtual column col4 for testing, and the alias pmodel should be timodel as used elsewhere in this file.

			{ID: 1, Name: timodel.NewCIStr("col2"), Offset: 1, FieldType: *types.NewFieldType(mysql.TypeLong)},
			{ID: 2, Name: timodel.NewCIStr("col1"), Offset: 0, FieldType: *types.NewFieldType(mysql.TypeLong)},
			{ID: 3, Name: timodel.NewCIStr("col3"), Offset: 2, FieldType: *types.NewFieldType(mysql.TypeLong)},
			{ID: 4, Name: timodel.NewCIStr("col4"), Offset: 3, FieldType: *types.NewFieldType(mysql.TypeLong), GeneratedExprString: "generated"},

},
}
tableInfo := model.WrapTableInfo(100, "test", 33, tidbTableInfo)
Expand All @@ -46,9 +53,15 @@ func TestColumnsDispatcher(t *testing.T) {
},
}

p := NewColumnsDispatcher([]string{"col-2", "col-not-found"})
p := NewColumnsDispatcher([]string{"col2", "col-not-found"})
_, _, err := p.DispatchRowChangedEvent(event, 16)
require.ErrorIs(t, err, errors.ErrDispatcherFailed)
require.ErrorContains(t, err, "columns not found")

p = NewColumnsDispatcher([]string{"col4"})
_, _, err = p.DispatchRowChangedEvent(event, 16)
require.ErrorIs(t, err, errors.ErrDispatcherFailed)
require.ErrorContains(t, err, "found virtual generated columns")

p = NewColumnsDispatcher([]string{"col2", "col1"})
index, _, err := p.DispatchRowChangedEvent(event, 16)
Expand Down
Loading