Skip to content

Commit afc54df

Browse files
authored
feat: optimize graph based on sync.Map (#3152)
Signed-off-by: Gaius <gaius.qi@gmail.com>
1 parent 6099a91 commit afc54df

File tree

11 files changed

+235
-246
lines changed

11 files changed

+235
-246
lines changed

pkg/container/set/safe_set.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,11 @@ func (s *safeSet[T]) Values() []T {
4747
s.mu.RLock()
4848
defer s.mu.RUnlock()
4949

50-
var result []T
50+
if len(s.data) == 0 {
51+
return nil
52+
}
53+
54+
result := make([]T, 0, len(s.data))
5155
for k := range s.data {
5256
result = append(result, k)
5357
}

pkg/container/set/set.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,11 @@ func New[T comparable]() Set[T] {
3434
}
3535

3636
func (s *set[T]) Values() []T {
37-
var result []T
37+
if len(*s) == 0 {
38+
return nil
39+
}
40+
41+
result := make([]T, 0, len(*s))
3842
for k := range *s {
3943
result = append(result, k)
4044
}

pkg/graph/dag/dag.go

Lines changed: 84 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,9 @@ package dag
2020

2121
import (
2222
"errors"
23-
"math/rand"
2423
"sync"
25-
"time"
2624

27-
cmap "github.com/orcaman/concurrent-map/v2"
25+
"go.uber.org/atomic"
2826

2927
"d7y.io/dragonfly/v2/pkg/container/set"
3028
)
@@ -33,6 +31,9 @@ var (
3331
// ErrVertexNotFound represents vertex not found.
3432
ErrVertexNotFound = errors.New("vertex not found")
3533

34+
// ErrVertexInvalid represents vertex invalid.
35+
ErrVertexInvalid = errors.New("vertex invalid")
36+
3637
// ErrVertexAlreadyExists represents vertex already exists.
3738
ErrVertexAlreadyExists = errors.New("vertex already exists")
3839

@@ -63,17 +64,14 @@ type DAG[T comparable] interface {
6364
// GetRandomVertices returns random map of vertices.
6465
GetRandomVertices(n uint) []*Vertex[T]
6566

66-
// GetVertexKeys returns keys of vertices.
67-
GetVertexKeys() []string
68-
6967
// GetSourceVertices returns source vertices.
7068
GetSourceVertices() []*Vertex[T]
7169

7270
// GetSinkVertices returns sink vertices.
7371
GetSinkVertices() []*Vertex[T]
7472

7573
// VertexCount returns count of vertices.
76-
VertexCount() int
74+
VertexCount() uint64
7775

7876
// AddEdge adds edge between two vertices.
7977
AddEdge(fromVertexID, toVertexID string) error
@@ -93,14 +91,17 @@ type DAG[T comparable] interface {
9391

9492
// dag provides directed acyclic graph function.
9593
type dag[T comparable] struct {
94+
vertices *sync.Map
95+
count *atomic.Uint64
9696
mu sync.RWMutex
97-
vertices cmap.ConcurrentMap[string, *Vertex[T]]
9897
}
9998

10099
// New returns a new DAG interface.
101100
func NewDAG[T comparable]() DAG[T] {
102101
return &dag[T]{
103-
vertices: cmap.New[*Vertex[T]](),
102+
vertices: &sync.Map{},
103+
count: atomic.NewUint64(0),
104+
mu: sync.RWMutex{},
104105
}
105106
}
106107

@@ -109,11 +110,11 @@ func (d *dag[T]) AddVertex(id string, value T) error {
109110
d.mu.Lock()
110111
defer d.mu.Unlock()
111112

112-
if _, ok := d.vertices.Get(id); ok {
113+
if _, loaded := d.vertices.LoadOrStore(id, NewVertex(id, value)); loaded {
113114
return ErrVertexAlreadyExists
114115
}
115116

116-
d.vertices.Set(id, NewVertex(id, value))
117+
d.count.Inc()
117118
return nil
118119
}
119120

@@ -122,7 +123,12 @@ func (d *dag[T]) DeleteVertex(id string) {
122123
d.mu.Lock()
123124
defer d.mu.Unlock()
124125

125-
vertex, ok := d.vertices.Get(id)
126+
rawVertex, loaded := d.vertices.Load(id)
127+
if !loaded {
128+
return
129+
}
130+
131+
vertex, ok := rawVertex.(*Vertex[T])
126132
if !ok {
127133
return
128134
}
@@ -136,55 +142,75 @@ func (d *dag[T]) DeleteVertex(id string) {
136142
continue
137143
}
138144

139-
d.vertices.Remove(id)
145+
d.vertices.Delete(id)
146+
d.count.Dec()
140147
}
141148

142149
// GetVertex gets vertex from graph.
143150
func (d *dag[T]) GetVertex(id string) (*Vertex[T], error) {
144-
vertex, ok := d.vertices.Get(id)
145-
if !ok {
151+
rawVertex, loaded := d.vertices.Load(id)
152+
if !loaded {
146153
return nil, ErrVertexNotFound
147154
}
148155

156+
vertex, ok := rawVertex.(*Vertex[T])
157+
if !ok {
158+
return nil, ErrVertexInvalid
159+
}
160+
149161
return vertex, nil
150162
}
151163

152164
// GetVertices returns map of vertices.
153165
func (d *dag[T]) GetVertices() map[string]*Vertex[T] {
154-
return d.vertices.Items()
166+
d.mu.RLock()
167+
defer d.mu.RUnlock()
168+
169+
vertices := make(map[string]*Vertex[T], d.count.Load())
170+
d.vertices.Range(func(key, value interface{}) bool {
171+
vertex, ok := value.(*Vertex[T])
172+
if !ok {
173+
return true
174+
}
175+
176+
id, ok := key.(string)
177+
if !ok {
178+
return true
179+
}
180+
181+
vertices[id] = vertex
182+
return true
183+
})
184+
185+
return vertices
155186
}
156187

157188
// GetRandomVertices returns random map of vertices.
158189
func (d *dag[T]) GetRandomVertices(n uint) []*Vertex[T] {
159190
d.mu.RLock()
160191
defer d.mu.RUnlock()
161192

162-
keys := d.GetVertexKeys()
163-
if int(n) >= len(keys) {
164-
n = uint(len(keys))
193+
if n == 0 {
194+
return nil
165195
}
166196

167-
r := rand.New(rand.NewSource(time.Now().UnixNano()))
168-
permutation := r.Perm(len(keys))[:n]
169197
randomVertices := make([]*Vertex[T], 0, n)
170-
for _, v := range permutation {
171-
key := keys[v]
172-
if vertex, err := d.GetVertex(key); err == nil {
173-
randomVertices = append(randomVertices, vertex)
198+
d.vertices.Range(func(key, value interface{}) bool {
199+
vertex, ok := value.(*Vertex[T])
200+
if !ok {
201+
return true
174202
}
175-
}
176203

177-
return randomVertices
178-
}
204+
randomVertices = append(randomVertices, vertex)
205+
return uint(len(randomVertices)) < n
206+
})
179207

180-
// GetVertexKeys returns keys of vertices.
181-
func (d *dag[T]) GetVertexKeys() []string {
182-
return d.vertices.Keys()
208+
return randomVertices
183209
}
184210

185211
// VertexCount returns count of vertices.
186-
func (d *dag[T]) VertexCount() int {
187-
return d.vertices.Count()
212+
func (d *dag[T]) VertexCount() uint64 {
213+
return d.count.Load()
188214
}
189215

190216
// AddEdge adds edge between two vertices.
@@ -196,14 +222,14 @@ func (d *dag[T]) AddEdge(fromVertexID, toVertexID string) error {
196222
return ErrCycleBetweenVertices
197223
}
198224

199-
fromVertex, ok := d.vertices.Get(fromVertexID)
200-
if !ok {
201-
return ErrVertexNotFound
225+
fromVertex, err := d.GetVertex(fromVertexID)
226+
if err != nil {
227+
return err
202228
}
203229

204-
toVertex, ok := d.vertices.Get(toVertexID)
205-
if !ok {
206-
return ErrVertexNotFound
230+
toVertex, err := d.GetVertex(toVertexID)
231+
if err != nil {
232+
return err
207233
}
208234

209235
for _, child := range fromVertex.Children.Values() {
@@ -232,14 +258,14 @@ func (d *dag[T]) DeleteEdge(fromVertexID, toVertexID string) error {
232258
d.mu.Lock()
233259
defer d.mu.Unlock()
234260

235-
fromVertex, ok := d.vertices.Get(fromVertexID)
236-
if !ok {
237-
return ErrVertexNotFound
261+
fromVertex, err := d.GetVertex(fromVertexID)
262+
if err != nil {
263+
return err
238264
}
239265

240-
toVertex, ok := d.vertices.Get(toVertexID)
241-
if !ok {
242-
return ErrVertexNotFound
266+
toVertex, err := d.GetVertex(toVertexID)
267+
if err != nil {
268+
return err
243269
}
244270

245271
fromVertex.Children.Delete(toVertex)
@@ -256,12 +282,12 @@ func (d *dag[T]) CanAddEdge(fromVertexID, toVertexID string) bool {
256282
return false
257283
}
258284

259-
fromVertex, ok := d.vertices.Get(fromVertexID)
260-
if !ok {
285+
fromVertex, err := d.GetVertex(fromVertexID)
286+
if err != nil {
261287
return false
262288
}
263289

264-
if _, ok := d.vertices.Get(toVertexID); !ok {
290+
if _, err := d.GetVertex(toVertexID); err != nil {
265291
return false
266292
}
267293

@@ -283,9 +309,9 @@ func (d *dag[T]) DeleteVertexInEdges(id string) error {
283309
d.mu.Lock()
284310
defer d.mu.Unlock()
285311

286-
vertex, ok := d.vertices.Get(id)
287-
if !ok {
288-
return ErrVertexNotFound
312+
vertex, err := d.GetVertex(id)
313+
if err != nil {
314+
return err
289315
}
290316

291317
for _, parent := range vertex.Parents.Values() {
@@ -301,9 +327,9 @@ func (d *dag[T]) DeleteVertexOutEdges(id string) error {
301327
d.mu.Lock()
302328
defer d.mu.Unlock()
303329

304-
vertex, ok := d.vertices.Get(id)
305-
if !ok {
306-
return ErrVertexNotFound
330+
vertex, err := d.GetVertex(id)
331+
if err != nil {
332+
return err
307333
}
308334

309335
for _, child := range vertex.Children.Values() {
@@ -320,7 +346,7 @@ func (d *dag[T]) GetSourceVertices() []*Vertex[T] {
320346
defer d.mu.RUnlock()
321347

322348
var sourceVertices []*Vertex[T]
323-
for _, vertex := range d.vertices.Items() {
349+
for _, vertex := range d.GetVertices() {
324350
if vertex.InDegree() == 0 {
325351
sourceVertices = append(sourceVertices, vertex)
326352
}
@@ -335,7 +361,7 @@ func (d *dag[T]) GetSinkVertices() []*Vertex[T] {
335361
defer d.mu.RUnlock()
336362

337363
var sinkVertices []*Vertex[T]
338-
for _, vertex := range d.vertices.Items() {
364+
for _, vertex := range d.GetVertices() {
339365
if vertex.OutDegree() == 0 {
340366
sinkVertices = append(sinkVertices, vertex)
341367
}
@@ -354,8 +380,8 @@ func (d *dag[T]) depthFirstSearch(fromVertexID, toVertexID string) bool {
354380

355381
// search finds successors of vertex.
356382
func (d *dag[T]) search(vertexID string, successors map[string]struct{}) {
357-
vertex, ok := d.vertices.Get(vertexID)
358-
if !ok {
383+
vertex, err := d.GetVertex(vertexID)
384+
if err != nil {
359385
return
360386
}
361387

0 commit comments

Comments
 (0)