From 4d2add2240bc4c7f8b3181191b720a91edde574e Mon Sep 17 00:00:00 2001 From: kalan Date: Sun, 12 Oct 2025 20:47:27 +0700 Subject: [PATCH 1/5] feat: add `FromChannelWithTimeout()` --- from.go | 29 +++++++++++++++++++++++++++++ from_test.go | 33 ++++++++++++++++++++++++++++++++- 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/from.go b/from.go index 3b94fdf..35cf1c8 100644 --- a/from.go +++ b/from.go @@ -4,6 +4,7 @@ import ( "fmt" "iter" "reflect" + "time" ) // Query is the type returned from query functions. It can be iterated manually @@ -68,6 +69,34 @@ func FromChannel[T any](source <-chan T) Query { } } +// FromChannelWithTimeout initializes a linq query with a passed channel, +// but stops iterating either when the channel is closed or when the timeout elapses. +func FromChannelWithTimeout[T any](source <-chan T, timeout time.Duration) Query { + return Query{ + Iterate: func(yield func(any) bool) { + timer := time.NewTimer(timeout) + defer timer.Stop() + + for { + select { + case item, ok := <-source: + if !ok { + // channel closed + return + } + if !yield(item) { + // consumer stopped early + return + } + case <-timer.C: + // timeout elapsed + return + } + } + }, + } +} + // FromString initializes a query from a string, iterating over its runes. func FromString[S ~string](source S) Query { return Query{ diff --git a/from_test.go b/from_test.go index 0e02f74..4278ff3 100644 --- a/from_test.go +++ b/from_test.go @@ -1,6 +1,9 @@ package linq -import "testing" +import ( + "testing" + "time" +) func TestFromSlice(t *testing.T) { s := [3]int{1, 2, 3} @@ -34,6 +37,34 @@ func TestFromChannel(t *testing.T) { } } +func TestFromChannelWithTimeout_Timeout(t *testing.T) { + c := make(chan int, 3) + defer close(c) + c <- 10 + c <- 15 + c <- -3 + + w := []any{10, 15, -3} + + if q := FromChannelWithTimeout(c, time.Second); !validateQuery(q, w) { + t.Errorf("FromChannelWithTimeout() failed expected %v", w) + } +} + +func TestFromChannelWithTimeout_Closed(t *testing.T) { + c := make(chan int, 3) + c <- 10 + c <- 15 + c <- -3 + close(c) + + w := []any{10, 15, -3} + + if q := FromChannelWithTimeout(c, time.Hour); !validateQuery(q, w) { + t.Errorf("FromChannelWithTimeout() failed expected %v", w) + } +} + func TestFromString(t *testing.T) { s := "string" w := []any{'s', 't', 'r', 'i', 'n', 'g'} From ffd937157d26d8af4720c596d1cf754eaafcbfc4 Mon Sep 17 00:00:00 2001 From: kalan Date: Mon, 13 Oct 2025 07:52:12 +0700 Subject: [PATCH 2/5] feat: add `FromChannelWithContext` --- from.go | 33 ++++++++++++++++++++++----------- from_test.go | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 11 deletions(-) diff --git a/from.go b/from.go index 35cf1c8..77bf6cf 100644 --- a/from.go +++ b/from.go @@ -1,6 +1,7 @@ package linq import ( + "context" "fmt" "iter" "reflect" @@ -69,34 +70,44 @@ func FromChannel[T any](source <-chan T) Query { } } -// FromChannelWithTimeout initializes a linq query with a passed channel, -// but stops iterating either when the channel is closed or when the timeout elapses. -func FromChannelWithTimeout[T any](source <-chan T, timeout time.Duration) Query { +// FromChannelWithContext initializes a linq query with a passed channel +// and stops iterating either when the channel is closed or when the context is canceled. +func FromChannelWithContext[T any](source <-chan T, ctx context.Context) Query { return Query{ Iterate: func(yield func(any) bool) { - timer := time.NewTimer(timeout) - defer timer.Stop() - for { select { + case <-ctx.Done(): + // Context canceled or deadline exceeded + return case item, ok := <-source: if !ok { - // channel closed + // Channel closed return } if !yield(item) { - // consumer stopped early + // Consumer stopped early return } - case <-timer.C: - // timeout elapsed - return } } }, } } +// FromChannelWithTimeout is a convenience wrapper over FromChannelWithContext. +// It stops iterating either when the channel is closed or when the timeout elapses. +func FromChannelWithTimeout[T any](source <-chan T, timeout time.Duration) Query { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + return Query{ + Iterate: func(yield func(any) bool) { + // cancel once iteration finishes (or is stopped) + defer cancel() + FromChannelWithContext(source, ctx).Iterate(yield) + }, + } +} + // FromString initializes a query from a string, iterating over its runes. func FromString[S ~string](source S) Query { return Query{ diff --git a/from_test.go b/from_test.go index 4278ff3..bdb0826 100644 --- a/from_test.go +++ b/from_test.go @@ -1,6 +1,7 @@ package linq import ( + "context" "testing" "time" ) @@ -37,6 +38,39 @@ func TestFromChannel(t *testing.T) { } } +func TestFromChannelWithContext_Cancel(t *testing.T) { + c := make(chan int, 3) + defer close(c) + c <- 10 + c <- 15 + c <- -3 + + w := []any{10, 15, -3} + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + if q := FromChannelWithContext(c, ctx); !validateQuery(q, w) { + t.Errorf("FromChannelWithContext() failed expected %v", w) + } +} + +func TestFromChannelWithContext_Closed(t *testing.T) { + c := make(chan int, 3) + c <- 10 + c <- 15 + c <- -3 + close(c) + + w := []any{10, 15, -3} + + ctx := context.Background() + + if q := FromChannelWithContext(c, ctx); !validateQuery(q, w) { + t.Errorf("FromChannelWithContext() failed expected %v", w) + } +} + func TestFromChannelWithTimeout_Timeout(t *testing.T) { c := make(chan int, 3) defer close(c) From f29e914f6d6e6afa19e24b641fe89a8e289ef44c Mon Sep 17 00:00:00 2001 From: kalan Date: Mon, 13 Oct 2025 09:24:31 +0700 Subject: [PATCH 3/5] chore: update README.md --- README.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 979a7c0..8239712 100644 --- a/README.md +++ b/README.md @@ -130,7 +130,7 @@ This will make your code free of `any` and type assertions. ```go var results []string -From(sentences). +FromSlice(sentences). // split sentences to words SelectManyT(func(sentence string) Query { return From(strings.Split(sentence, " ")) @@ -183,6 +183,8 @@ Available constructors: - `FromSlice` - creates a query from a slice - `FromMap` - creates a query from a map - `FromChannel` - creates a query from a channel +- `FromChannelWithContext` - creates a query from a channel with `Context` support +- `FromChannelWithTimeout` - creates a query from a channel with timeout support - `FromString` - creates a query from a string (iterating over runes) - `FromIterable` - creates a query from a custom collection implementing the `Iterable` interface @@ -194,7 +196,8 @@ significantly less efficient. For all new code, it’s recommended to use the ex ```text v4.0.0 (2025-10-12) * Breaking change: Migrated to standard Go iterator pattern. (thanks @kalaninja!) -* Added typed constructors: FromSlice(), FromMap(), FromChannel(), FromString(). +* Added typed constructors: FromSlice(), FromMap(), FromChannel(), + FromChannelWithContext(), FromChannelWithTimeout(), FromString(). * Breaking change: Removed FromChannelT() in favor of FromChannel(). v3.2.0 (2020-12-29) From 771eb7c5c375f21dff8269636b4a110c2e8f8f3f Mon Sep 17 00:00:00 2001 From: kalan Date: Mon, 13 Oct 2025 11:28:16 +0700 Subject: [PATCH 4/5] fix: FromChannelWithContext args order --- from.go | 4 ++-- from_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/from.go b/from.go index 77bf6cf..0c14f74 100644 --- a/from.go +++ b/from.go @@ -72,7 +72,7 @@ func FromChannel[T any](source <-chan T) Query { // FromChannelWithContext initializes a linq query with a passed channel // and stops iterating either when the channel is closed or when the context is canceled. -func FromChannelWithContext[T any](source <-chan T, ctx context.Context) Query { +func FromChannelWithContext[T any](ctx context.Context, source <-chan T) Query { return Query{ Iterate: func(yield func(any) bool) { for { @@ -103,7 +103,7 @@ func FromChannelWithTimeout[T any](source <-chan T, timeout time.Duration) Query Iterate: func(yield func(any) bool) { // cancel once iteration finishes (or is stopped) defer cancel() - FromChannelWithContext(source, ctx).Iterate(yield) + FromChannelWithContext(ctx, source).Iterate(yield) }, } } diff --git a/from_test.go b/from_test.go index bdb0826..c628fe8 100644 --- a/from_test.go +++ b/from_test.go @@ -50,7 +50,7 @@ func TestFromChannelWithContext_Cancel(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - if q := FromChannelWithContext(c, ctx); !validateQuery(q, w) { + if q := FromChannelWithContext(ctx, c); !validateQuery(q, w) { t.Errorf("FromChannelWithContext() failed expected %v", w) } } @@ -66,7 +66,7 @@ func TestFromChannelWithContext_Closed(t *testing.T) { ctx := context.Background() - if q := FromChannelWithContext(c, ctx); !validateQuery(q, w) { + if q := FromChannelWithContext(ctx, c); !validateQuery(q, w) { t.Errorf("FromChannelWithContext() failed expected %v", w) } } From e74749b81f9e62e34bfead90a365183ce281ae1c Mon Sep 17 00:00:00 2001 From: kalan Date: Wed, 15 Oct 2025 12:03:23 +0700 Subject: [PATCH 5/5] feat: remove FromChannelWithTimeout in favor of FromChannelWithContext --- README.md | 3 +-- from.go | 14 -------------- from_test.go | 28 ---------------------------- 3 files changed, 1 insertion(+), 44 deletions(-) diff --git a/README.md b/README.md index 8239712..758742c 100644 --- a/README.md +++ b/README.md @@ -184,7 +184,6 @@ Available constructors: - `FromMap` - creates a query from a map - `FromChannel` - creates a query from a channel - `FromChannelWithContext` - creates a query from a channel with `Context` support -- `FromChannelWithTimeout` - creates a query from a channel with timeout support - `FromString` - creates a query from a string (iterating over runes) - `FromIterable` - creates a query from a custom collection implementing the `Iterable` interface @@ -197,7 +196,7 @@ significantly less efficient. For all new code, it’s recommended to use the ex v4.0.0 (2025-10-12) * Breaking change: Migrated to standard Go iterator pattern. (thanks @kalaninja!) * Added typed constructors: FromSlice(), FromMap(), FromChannel(), - FromChannelWithContext(), FromChannelWithTimeout(), FromString(). + FromChannelWithContext(), FromString(). * Breaking change: Removed FromChannelT() in favor of FromChannel(). v3.2.0 (2020-12-29) diff --git a/from.go b/from.go index 0c14f74..1dcc7eb 100644 --- a/from.go +++ b/from.go @@ -5,7 +5,6 @@ import ( "fmt" "iter" "reflect" - "time" ) // Query is the type returned from query functions. It can be iterated manually @@ -95,19 +94,6 @@ func FromChannelWithContext[T any](ctx context.Context, source <-chan T) Query { } } -// FromChannelWithTimeout is a convenience wrapper over FromChannelWithContext. -// It stops iterating either when the channel is closed or when the timeout elapses. -func FromChannelWithTimeout[T any](source <-chan T, timeout time.Duration) Query { - ctx, cancel := context.WithTimeout(context.Background(), timeout) - return Query{ - Iterate: func(yield func(any) bool) { - // cancel once iteration finishes (or is stopped) - defer cancel() - FromChannelWithContext(ctx, source).Iterate(yield) - }, - } -} - // FromString initializes a query from a string, iterating over its runes. func FromString[S ~string](source S) Query { return Query{ diff --git a/from_test.go b/from_test.go index c628fe8..beffabd 100644 --- a/from_test.go +++ b/from_test.go @@ -71,34 +71,6 @@ func TestFromChannelWithContext_Closed(t *testing.T) { } } -func TestFromChannelWithTimeout_Timeout(t *testing.T) { - c := make(chan int, 3) - defer close(c) - c <- 10 - c <- 15 - c <- -3 - - w := []any{10, 15, -3} - - if q := FromChannelWithTimeout(c, time.Second); !validateQuery(q, w) { - t.Errorf("FromChannelWithTimeout() failed expected %v", w) - } -} - -func TestFromChannelWithTimeout_Closed(t *testing.T) { - c := make(chan int, 3) - c <- 10 - c <- 15 - c <- -3 - close(c) - - w := []any{10, 15, -3} - - if q := FromChannelWithTimeout(c, time.Hour); !validateQuery(q, w) { - t.Errorf("FromChannelWithTimeout() failed expected %v", w) - } -} - func TestFromString(t *testing.T) { s := "string" w := []any{'s', 't', 'r', 'i', 'n', 'g'}