Skip to content

Commit 8a6f439

Browse files
haoheipiadrianbrad
authored andcommitted
fix cond.wait() not in for condition loop
1 parent 7301728 commit 8a6f439

File tree

2 files changed

+145
-2
lines changed

2 files changed

+145
-2
lines changed

blocking.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func (bq *Blocking[T]) OfferWait(elem T) {
7171
bq.lock.Lock()
7272
defer bq.lock.Unlock()
7373

74-
if bq.isFull() {
74+
for bq.isFull() {
7575
bq.notFullCond.Wait()
7676
}
7777

@@ -209,7 +209,7 @@ func (bq *Blocking[T]) PeekWait() T {
209209
bq.lock.Lock()
210210
defer bq.lock.Unlock()
211211

212-
if bq.isEmpty() {
212+
for bq.isEmpty() {
213213
bq.notEmptyCond.Wait()
214214
}
215215

blocking_test.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,149 @@ func TestBlocking(t *testing.T) {
554554
t.Fatalf("expected elem to be %d, got %d", 4, e)
555555
}
556556
})
557+
558+
t.Run("CondWaitWithCapacity", func(t *testing.T) {
559+
t.Parallel()
560+
561+
t.Run("OfferWait", func(t *testing.T) {
562+
t.Parallel()
563+
564+
elems := []int{1, 2, 3}
565+
initialSize := len(elems)
566+
567+
blockingQueue := queue.NewBlocking(
568+
elems,
569+
queue.WithCapacity(initialSize),
570+
)
571+
572+
added := make(chan struct{}, initialSize+1)
573+
for i := 1; i <= initialSize+1; i++ {
574+
go func(i int) {
575+
blockingQueue.OfferWait(i)
576+
added <- struct{}{}
577+
}(i)
578+
}
579+
580+
time.Sleep(time.Millisecond)
581+
_ = blockingQueue.Clear()
582+
583+
// one groutine block, and three are added
584+
for i := 1; i <= initialSize; i++ {
585+
<-added
586+
}
587+
588+
time.Sleep(time.Millisecond)
589+
if blockingQueue.Size() != initialSize {
590+
t.Fatalf("expected size to be %d, got %d", initialSize, blockingQueue.Size())
591+
}
592+
593+
_ = blockingQueue.GetWait()
594+
time.Sleep(time.Millisecond)
595+
if blockingQueue.Size() != initialSize {
596+
t.Fatalf("expected size to be %d, got %d", initialSize, blockingQueue.Size())
597+
}
598+
})
599+
600+
t.Run("GetWait", func(t *testing.T) {
601+
t.Parallel()
602+
603+
elems := []int{1, 2, 3}
604+
initialSize := len(elems)
605+
606+
blockingQueue := queue.NewBlocking(
607+
elems,
608+
queue.WithCapacity(initialSize),
609+
)
610+
611+
for i := 1; i <= initialSize; i++ {
612+
_ = blockingQueue.GetWait()
613+
}
614+
615+
if blockingQueue.Size() != 0 {
616+
t.Fatalf("expected size to be %d, got %d", 0, blockingQueue.Size())
617+
}
618+
619+
retrievedElem := make(chan int, initialSize+1)
620+
for i := 1; i <= initialSize+1; i++ {
621+
go func() {
622+
retrievedElem <- blockingQueue.GetWait()
623+
}()
624+
}
625+
626+
time.Sleep(time.Millisecond)
627+
blockingQueue.Reset()
628+
629+
// one groutine block, and three are retrieved
630+
for i := 1; i <= initialSize; i++ {
631+
<-retrievedElem
632+
}
633+
634+
if blockingQueue.Size() != 0 {
635+
t.Fatalf("expected size to be %d, got %d", initialSize, blockingQueue.Size())
636+
}
637+
638+
blockingQueue.OfferWait(4)
639+
if e := <-retrievedElem; e != 4 {
640+
t.Fatalf("expected elem to be %d, got %d", 4, e)
641+
}
642+
})
643+
644+
t.Run("PeekWait", func(t *testing.T) {
645+
t.Parallel()
646+
647+
elems := []int{1}
648+
initialSize := len(elems)
649+
650+
blockingQueue := queue.NewBlocking(
651+
elems,
652+
queue.WithCapacity(initialSize),
653+
)
654+
655+
for i := 1; i <= initialSize; i++ {
656+
_ = blockingQueue.GetWait()
657+
}
658+
659+
if blockingQueue.Size() != 0 {
660+
t.Fatalf("expected size to be %d, got %d", 0, blockingQueue.Size())
661+
}
662+
663+
getCh := make(chan int, 1)
664+
go func() {
665+
getCh <- blockingQueue.GetWait()
666+
}()
667+
668+
peekCh := make(chan int, 1)
669+
go func() {
670+
peekCh <- blockingQueue.PeekWait()
671+
}()
672+
673+
time.Sleep(time.Millisecond)
674+
blockingQueue.Reset()
675+
// If GetWait is called before PeekWait, PeekWait will block
676+
// If PeekWait is called before GetWait, PeekWait will not block
677+
select {
678+
case <-getCh:
679+
select {
680+
case <-peekCh:
681+
case <-time.After(time.Millisecond):
682+
t.Logf("GetWait is called before PeekWait")
683+
}
684+
case <-peekCh:
685+
select {
686+
case <-getCh:
687+
t.Logf("PeekWait is called before GetWait")
688+
case <-time.After(time.Millisecond):
689+
t.Fatalf("expected GetWait to not block")
690+
}
691+
case <-time.After(time.Millisecond):
692+
t.Fatalf("expected GetWait or PeekWait not block")
693+
}
694+
695+
if blockingQueue.Size() != 0 {
696+
t.Fatalf("expected size to be %d, got %d", 0, blockingQueue.Size())
697+
}
698+
})
699+
})
557700
}
558701

559702
func testResetOnMultipleRoutinesFunc[T comparable](

0 commit comments

Comments
 (0)