Skip to content

Commit 21420b5

Browse files
authored
fix(tidb): fix topology spread issue of tidb (#6161)
1 parent 7c21633 commit 21420b5

File tree

15 files changed

+583
-97
lines changed

15 files changed

+583
-97
lines changed

pkg/controllers/pdgroup/tasks/updater.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,14 @@ func TaskUpdater(state *ReconcileContext, c client.Client) task.Task {
6363
}
6464
}
6565

66+
updateRevision, _, _ := state.Revision()
67+
6668
pds := state.Slice()
67-
topoPolicy, err := policy.NewTopologyPolicy(topos, pds...)
69+
topoPolicy, err := policy.NewTopologyPolicy(topos, updateRevision, pds...)
6870
if err != nil {
6971
return task.Fail().With("invalid topo policy, it should be validated: %w", err)
7072
}
7173

72-
updateRevision, _, _ := state.Revision()
73-
7474
wait, err := updater.New[runtime.PDTuple]().
7575
WithInstances(pds...).
7676
WithDesired(int(state.Group().Replicas())).
@@ -81,6 +81,7 @@ func TaskUpdater(state *ReconcileContext, c client.Client) task.Task {
8181
WithNewFactory(PDNewer(pdg, updateRevision)).
8282
WithAddHooks(topoPolicy).
8383
WithDelHooks(topoPolicy).
84+
WithUpdateHooks(topoPolicy).
8485
WithScaleInPreferPolicy(
8586
NotLeaderPolicy(),
8687
topoPolicy,

pkg/controllers/ticdcgroup/tasks/updater.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,14 @@ func TaskUpdater(state *ReconcileContext, c client.Client) task.Task {
6161
}
6262
}
6363

64+
updateRevision, _, _ := state.Revision()
65+
6466
cdcs := state.Slice()
65-
topoPolicy, err := policy.NewTopologyPolicy(topos, cdcs...)
67+
topoPolicy, err := policy.NewTopologyPolicy(topos, updateRevision, cdcs...)
6668
if err != nil {
6769
return task.Fail().With("invalid topo policy, it should be validated: %w", err)
6870
}
6971

70-
updateRevision, _, _ := state.Revision()
71-
7272
// TODO: get the real time owner info from TiCDC and prefer non-owner when scaling in or updating
7373
wait, err := updater.New[runtime.TiCDCTuple]().
7474
WithInstances(cdcs...).
@@ -80,6 +80,7 @@ func TaskUpdater(state *ReconcileContext, c client.Client) task.Task {
8080
WithNewFactory(TiCDCNewer(cdcg, updateRevision)).
8181
WithAddHooks(topoPolicy).
8282
WithDelHooks(topoPolicy).
83+
WithUpdateHooks(topoPolicy).
8384
WithScaleInPreferPolicy(
8485
topoPolicy,
8586
).

pkg/controllers/tidbgroup/tasks/updater.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,14 @@ func TaskUpdater(state *ReconcileContext, c client.Client) task.Task {
6262
}
6363
}
6464

65+
updateRevision, _, _ := state.Revision()
66+
6567
dbs := state.Slice()
66-
topoPolicy, err := policy.NewTopologyPolicy(topos, dbs...)
68+
topoPolicy, err := policy.NewTopologyPolicy(topos, updateRevision, dbs...)
6769
if err != nil {
6870
return task.Fail().With("invalid topo policy, it should be validated: %w", err)
6971
}
7072

71-
updateRevision, _, _ := state.Revision()
72-
7373
needUpdate, needRestart := precheckInstances(dbg, runtime.ToTiDBSlice(dbs), updateRevision)
7474
if !needUpdate {
7575
return task.Complete().With("all instances are synced")
@@ -90,6 +90,7 @@ func TaskUpdater(state *ReconcileContext, c client.Client) task.Task {
9090
WithNewFactory(TiDBNewer(dbg, updateRevision)).
9191
WithAddHooks(topoPolicy).
9292
WithDelHooks(topoPolicy).
93+
WithUpdateHooks(topoPolicy).
9394
WithScaleInPreferPolicy(
9495
topoPolicy,
9596
).

pkg/controllers/tiflashgroup/tasks/updater.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,14 @@ func TaskUpdater(state *ReconcileContext, c client.Client) task.Task {
6161
}
6262
}
6363

64+
updateRevision, _, _ := state.Revision()
65+
6466
fs := state.Slice()
65-
topoPolicy, err := policy.NewTopologyPolicy(topos, fs...)
67+
topoPolicy, err := policy.NewTopologyPolicy(topos, updateRevision, fs...)
6668
if err != nil {
6769
return task.Fail().With("invalid topo policy, it should be validated: %w", err)
6870
}
6971

70-
updateRevision, _, _ := state.Revision()
71-
7272
wait, err := updater.New[runtime.TiFlashTuple]().
7373
WithInstances(fs...).
7474
WithDesired(int(state.Group().Replicas())).
@@ -79,6 +79,7 @@ func TaskUpdater(state *ReconcileContext, c client.Client) task.Task {
7979
WithNewFactory(TiFlashNewer(fg, updateRevision)).
8080
WithAddHooks(topoPolicy).
8181
WithDelHooks(topoPolicy).
82+
WithUpdateHooks(topoPolicy).
8283
WithScaleInPreferPolicy(
8384
topoPolicy,
8485
).

pkg/controllers/tikvgroup/tasks/updater.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,15 @@ func TaskUpdater(state *ReconcileContext, c client.Client) task.Task {
6161
}
6262
}
6363

64+
updateRevision, _, _ := state.Revision()
65+
6466
kvs := state.Slice()
6567

66-
topoPolicy, err := policy.NewTopologyPolicy(topos, kvs...)
68+
topoPolicy, err := policy.NewTopologyPolicy(topos, updateRevision, kvs...)
6769
if err != nil {
6870
return task.Fail().With("invalid topo policy, it should be validated: %w", err)
6971
}
7072

71-
updateRevision, _, _ := state.Revision()
72-
7373
wait, err := updater.New[runtime.TiKVTuple]().
7474
WithInstances(kvs...).
7575
WithDesired(int(state.Group().Replicas())).
@@ -80,6 +80,7 @@ func TaskUpdater(state *ReconcileContext, c client.Client) task.Task {
8080
WithNewFactory(TiKVNewer(kvg, updateRevision)).
8181
WithAddHooks(topoPolicy).
8282
WithDelHooks(topoPolicy).
83+
WithUpdateHooks(topoPolicy).
8384
WithScaleInPreferPolicy(
8485
topoPolicy,
8586
).

pkg/updater/policy/topology.go

Lines changed: 74 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,50 +15,86 @@
1515
package policy
1616

1717
import (
18+
"maps"
19+
1820
"github.com/pingcap/tidb-operator/api/v2/core/v1alpha1"
1921
"github.com/pingcap/tidb-operator/pkg/runtime"
2022
"github.com/pingcap/tidb-operator/pkg/updater"
2123
"github.com/pingcap/tidb-operator/pkg/utils/topology"
2224
)
2325

2426
type topologyPolicy[R runtime.Instance] struct {
25-
scheduler topology.Scheduler
27+
all topology.Scheduler
28+
updated topology.Scheduler
29+
30+
rev string
2631
}
2732

2833
type TopologyPolicy[R runtime.Instance] interface {
2934
updater.AddHook[R]
3035
updater.DelHook[R]
36+
updater.UpdateHook[R]
3137
updater.PreferPolicy[R]
3238
}
3339

34-
func NewTopologyPolicy[R runtime.Instance](ts []v1alpha1.ScheduleTopology, rs ...R) (TopologyPolicy[R], error) {
35-
s, err := topology.New(ts)
40+
func NewTopologyPolicy[R runtime.Instance](ts []v1alpha1.ScheduleTopology, rev string, rs ...R) (TopologyPolicy[R], error) {
41+
all, err := topology.New(ts)
42+
if err != nil {
43+
return nil, err
44+
}
45+
updated, err := topology.New(ts)
3646
if err != nil {
3747
return nil, err
3848
}
3949
p := &topologyPolicy[R]{
40-
scheduler: s,
50+
all: all,
51+
updated: updated,
52+
rev: rev,
4153
}
4254
for _, r := range rs {
43-
p.scheduler.Add(r.GetName(), r.GetTopology())
55+
p.all.Add(r.GetName(), r.GetTopology())
56+
if r.GetUpdateRevision() == rev {
57+
p.updated.Add(r.GetName(), r.GetTopology())
58+
}
4459
}
4560
return p, nil
4661
}
4762

4863
func (p *topologyPolicy[R]) Add(update R) R {
49-
topo := p.scheduler.NextAdd()
64+
all := p.all.NextAdd()
65+
updated := p.updated.NextAdd()
66+
topo := choose(all, updated)
67+
5068
update.SetTopology(topo)
51-
p.scheduler.Add(update.GetName(), update.GetTopology())
69+
p.all.Add(update.GetName(), update.GetTopology())
70+
if update.GetUpdateRevision() == p.rev {
71+
p.updated.Add(update.GetName(), update.GetTopology())
72+
}
73+
74+
return update
75+
}
76+
77+
func (p *topologyPolicy[R]) Update(update, outdated R) R {
78+
update.SetTopology(outdated.GetTopology())
79+
80+
p.all.Add(update.GetName(), update.GetTopology())
81+
if update.GetUpdateRevision() == p.rev {
82+
p.updated.Add(update.GetName(), update.GetTopology())
83+
}
5284

5385
return update
5486
}
5587

5688
func (p *topologyPolicy[R]) Delete(name string) {
57-
p.scheduler.Del(name)
89+
p.all.Del(name)
90+
p.updated.Del(name)
5891
}
5992

6093
func (p *topologyPolicy[R]) Prefer(allowed []R) []R {
61-
names := p.scheduler.NextDel()
94+
if len(allowed) == 0 {
95+
return nil
96+
}
97+
names := p.all.NextDel()
6298
preferred := make([]R, 0, len(allowed))
6399
for _, item := range allowed {
64100
for _, name := range names {
@@ -70,3 +106,32 @@ func (p *topologyPolicy[R]) Prefer(allowed []R) []R {
70106

71107
return preferred
72108
}
109+
110+
// choose a preferred topology
111+
// - prefer all instances are well spread
112+
// - if no
113+
func choose(all, update []v1alpha1.Topology) v1alpha1.Topology {
114+
// No topology is preferred
115+
// Normally because of no topology policy is specified
116+
if len(all) == 0 {
117+
return nil
118+
}
119+
// Only one topology can be chosen
120+
if len(all) == 1 {
121+
return all[0]
122+
}
123+
124+
// More than one topologies can be chosen
125+
// Try to find the first topology which is in both all and update
126+
for _, at := range all {
127+
for _, bt := range update {
128+
if maps.Equal(at, bt) {
129+
return at
130+
}
131+
}
132+
}
133+
134+
// No intersection of preferred topologies of all and update
135+
// just return the first preferred topology of all
136+
return all[0]
137+
}

0 commit comments

Comments
 (0)