Skip to content

Commit a3a9149

Browse files
authored
feature/issue-305-parallel-async-result-validation (#318)
* * Add Async.parallelMap2 * Add Async.parallelMap3 * Add Async.parallelZip * Add parallelAsyncResult * Add parallelAsyncValidation * Add tests * Revert paket ignore * Fix Fable tests * Fix never for JS * Add parallelAsyncResult docs * Add book for parallelAsyncValidation
1 parent 6fef0ed commit a3a9149

19 files changed

+2376
-0
lines changed

gitbook/parallelAsyncResult/ce.md

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
## ParallelAsyncResult Computation Expression
2+
3+
Namespace: `FsToolkit.ErrorHandling`
4+
5+
This CE operates on the same type as `asyncResult`, but it adds the `and!` operator for running workflows in parallel.
6+
7+
When running concurrent workflows, fail-fast semantics are used. If any sub-task returns an `Error`, then all other tasks are cancelled and only that error is returned. To instead collect all errors, use `parallelAsyncValidation`.
8+
9+
10+
## Examples
11+
12+
### Example 1
13+
14+
Suppose we want to download 3 files.
15+
16+
Here is our simulated download function:
17+
18+
```fsharp
19+
// string -> Async<Result<string, string>>
20+
let downloadAsync stuff : Async<Result<string, string>> = async {
21+
do! Async.Sleep 3_000
22+
return Ok stuff
23+
}
24+
```
25+
26+
This workflow will download each item in sequence:
27+
28+
```fsharp
29+
let downloadAllSequential = ayncResult {
30+
let! x = downloadAsync (Ok "We")
31+
let! y = downloadAsync (Ok "run")
32+
let! z = downloadAsync (Ok "sequentially :(")
33+
return sprintf "%s %s %s" x y z
34+
}
35+
```
36+
37+
It takes 9 seconds to complete.
38+
39+
However, using `parallelAsyncResult`, we can download all 3 concurrently:
40+
41+
```fsharp
42+
// Async<Result<string, string>>
43+
let downloadAll = parallelAsyncResult {
44+
let! x = downloadAsync (Ok "We")
45+
and! y = downloadAsync (Ok "run")
46+
and! z = downloadAsync (Ok "concurrently!")
47+
return sprintf "%s %s %s" x y z
48+
}
49+
```
50+
51+
This takes just 3 seconds.

gitbook/parallelAsyncResult/map2.md

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
## ParallelAsyncResult.map2
2+
3+
Namespace: `FsToolkit.ErrorHandling`
4+
5+
Function Signature:
6+
7+
```fsharp
8+
('a -> 'b -> 'c) -> Async<Result<'a, 'd>> -> Async<Result<'b, 'd>>
9+
-> Async<Result<'c, 'd>>
10+
```
11+
12+
## Examples
13+
14+
Note: Many use-cases requiring `map2` operations can also be solved using [the `parallelAsyncResult` computation expression](../parallelAsyncResult/ce.md).
15+
16+
### Example 1
17+
18+
Given the functions
19+
20+
```fsharp
21+
getFollowerIds : UserId -> Async<Result<UserId list, exn>>
22+
createPost : CreatePostRequest -> Async<Result<PostId, exn>>
23+
```
24+
25+
And the type
26+
27+
```fsharp
28+
type NotifyNewPostRequest =
29+
{ UserIds : UserId list
30+
NewPostId : PostId }
31+
static member Create userIds newPostsId =
32+
{UserIds = userIds; NewPostId = newPostsId}
33+
```
34+
35+
We can create a `NotifyNewPostRequest` using `ParallelAsyncResult.map2` as below:
36+
37+
```fsharp
38+
let createPostAndGetNotifyRequest (req : CreatePostRequest) =
39+
// Async<Result<UserId list, exn>>
40+
let getFollowersResult = getFollowerIds req.UserId
41+
42+
// Async<Result<PostId, exn>>
43+
let createPostResult = createPost req
44+
45+
// Async<Result<NotifyNewPostRequest, exn>>
46+
let newPostRequestResult =
47+
ParallelAsyncResult.map2
48+
NotifyNewPostRequest.Create getFollowersResult createPostResult
49+
50+
// ...
51+
```
52+
53+
This workflow will run the sub-tasks `getFollowersResult` and `createPostResult` concurrently, which can increase throughput.

gitbook/parallelAsyncResult/map3.md

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
## ParallelAsyncResult.map3
2+
3+
Namespace: `FsToolkit.ErrorHandling`
4+
5+
Function Signature:
6+
7+
```fsharp
8+
('a -> 'b -> 'c -> 'd)
9+
-> Async<Result<'a, 'e>>
10+
-> Async<Result<'b, 'e>>
11+
-> Async<Result<'c, 'e>>
12+
-> Async<Result<'d, 'e>>
13+
```
14+
15+
## Examples
16+
17+
Note: Many use-cases requiring `map3` operations can also be solved using [the `parallelAsyncResult` computation expression](../parallelAsyncResult/ce.md).

gitbook/parallelAsyncValidation/ce.md

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
## ParallelAsyncValidation Computation Expression
2+
3+
Namespace: `FsToolkit.ErrorHandling`
4+
5+
This CE operates in the same way as `asyncValidation`, except that the `and!` operator will run workflows in parallel.
6+
7+
Concurrent workflows are run with the same semantics as [`Async.Parallel`](https://fsharp.github.io/fsharp-core-docs/reference/fsharp-control-fsharpasync.html#Parallel).
8+
9+
10+
## Examples
11+
12+
See [here](../validation/ce.md) for other validation-like examples
13+
14+
```fsharp
15+
// Result<string, string> -> Async<Result<string, string>>
16+
let downloadAsync stuff = async {
17+
return stuff
18+
}
19+
20+
// AsyncValidation<string, string>
21+
let addResult = parallelAsyncValidation {
22+
let! x = downloadAsync (Ok "I")
23+
and! y = downloadAsync (Ok "am")
24+
and! z = downloadAsync (Ok "concurrent!")
25+
return sprintf "%s %s %s" x y z
26+
}
27+
// async { return Ok "I am concurrent!" }
28+
29+
// AsyncValidation<string, string>
30+
let addResult = parallelAsyncValidation {
31+
let! x = downloadAsync (Error "Am")
32+
and! y = downloadAsync (Error "I")
33+
and! z = downloadAsync (Error "concurrent?")
34+
return sprintf "%s %s %s" x y z
35+
}
36+
// async { return Error [ "Am"; "I"; "concurrent?" ] }
37+
```
+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
## ParallelAsyncValidation.map2
2+
3+
Namespace: `FsToolkit.ErrorHandling`
4+
5+
Function Signature:
6+
7+
```fsharp
8+
('a -> 'b -> 'c)
9+
-> Async<Result<'a, 'd list>>
10+
-> Async<Result<'b, 'd list>>
11+
-> Async<Result<'c, 'd list>>
12+
```
13+
14+
Like [ParallelAsyncResult.map2](../parallelAsyncResult/map2.md), but collects the errors from both arguments.
+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
## ParallelAsyncValidation.map3
2+
3+
Namespace: `FsToolkit.ErrorHandling`
4+
5+
Function Signature:
6+
7+
```
8+
('a -> 'b -> 'c -> 'd)
9+
-> Async<Result<'a, 'e list>>
10+
-> Async<Result<'b, 'e list>>
11+
-> Async<Result<'c, 'e list>>
12+
-> Async<Result<'d, 'e list>>
13+
```
14+
15+
Like [ParallelAsyncResult.map3](../parallelAsyncResult/map3.md), but collects the errors from all arguments.
16+

src/FsToolkit.ErrorHandling/Async.fs

+89
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,95 @@ module Async =
114114
let inline zip (left: Async<'left>) (right: Async<'right>) : Async<'left * 'right> =
115115
bind (fun l -> bind (fun r -> singleton (l, r)) right) left
116116

117+
/// <summary>
118+
/// Executes two asyncs concurrently <see cref='M:Microsoft.FSharp.Control.FSharpAsync.Parallel``1'/> and returns a mapping of the values
119+
/// </summary>
120+
/// <param name="mapper">The function to apply to the values of the <c>Async</c> values.</param>
121+
/// <param name="input1">The first <c>Async</c> to execute</param>
122+
/// <param name="input2">The second <c>Async</c> to execute</param>
123+
/// <returns>The transformed <c>Async</c> value.</returns>
124+
let inline parallelMap2
125+
([<InlineIfLambda>] mapper: 'input1 -> 'input2 -> 'output)
126+
(input1: Async<'input1>)
127+
(input2: Async<'input2>)
128+
: Async<'output> =
129+
130+
#if FABLE_COMPILER && FABLE_COMPILER_PYTHON
131+
Async.Parallel(
132+
[|
133+
map box input1
134+
map box input2
135+
|]
136+
)
137+
#else
138+
Async.Parallel(
139+
[|
140+
map box input1
141+
map box input2
142+
|],
143+
maxDegreeOfParallelism = 2
144+
)
145+
#endif
146+
|> map (fun results ->
147+
let a =
148+
results[0]
149+
|> unbox<'input1>
150+
151+
let b =
152+
results[1]
153+
|> unbox<'input2>
154+
155+
mapper a b
156+
)
157+
158+
/// <summary>
159+
/// Executes three asyncs concurrently <see cref='M:Microsoft.FSharp.Control.FSharpAsync.Parallel``1'/> and returns a mapping of the values
160+
/// </summary>
161+
/// <param name="mapper">The function to apply to the values of the <c>Async</c> values.</param>
162+
/// <param name="input1">The first <c>Async</c> to execute</param>
163+
/// <param name="input2">The second <c>Async</c> to execute</param>
164+
/// <param name="input3">The third <c>Async</c> value to transform.</param>
165+
/// <returns>The transformed <c>Async</c> value.</returns>
166+
let inline parallelMap3
167+
([<InlineIfLambda>] mapper: 'input1 -> 'input2 -> 'input3 -> 'output)
168+
(input1: Async<'input1>)
169+
(input2: Async<'input2>)
170+
(input3: Async<'input3>)
171+
: Async<'output> =
172+
#if FABLE_COMPILER && FABLE_COMPILER_PYTHON
173+
Async.Parallel(
174+
[|
175+
map box input1
176+
map box input2
177+
map box input3
178+
|]
179+
)
180+
#else
181+
Async.Parallel(
182+
[|
183+
map box input1
184+
map box input2
185+
map box input3
186+
|],
187+
maxDegreeOfParallelism = 3
188+
)
189+
#endif
190+
|> map (fun results ->
191+
let a =
192+
results[0]
193+
|> unbox<'input1>
194+
195+
let b =
196+
results[1]
197+
|> unbox<'input2>
198+
199+
let c =
200+
results[2]
201+
|> unbox<'input3>
202+
203+
mapper a b c
204+
)
205+
117206
/// <summary>
118207
/// Operators for working with the <c>Async</c> type.
119208
/// </summary>

src/FsToolkit.ErrorHandling/FsToolkit.ErrorHandling.fsproj

+4
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@
4848
<Compile Include="TaskResultOption.fs" Condition="'$(FABLE_COMPILER)' != 'true'" />
4949
<Compile Include="TaskResultOptionCE.fs" Condition="'$(FABLE_COMPILER)' != 'true'" />
5050
<Compile Include="TaskResultOptionOp.fs" Condition="'$(FABLE_COMPILER)' != 'true'" />
51+
<Compile Include="ParallelAsyncResult.fs" />
52+
<Compile Include="ParallelAsyncResultCE.fs" />
53+
<Compile Include="ParallelAsyncValidation.fs" />
54+
<Compile Include="ParallelAsyncValidationCE.fs" />
5155
<Compile Include="List.fs" />
5256
<Compile Include="Array.fs" />
5357
<Compile Include="Seq.fs" />
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
namespace FsToolkit.ErrorHandling
2+
3+
open System
4+
5+
[<RequireQualifiedAccess>]
6+
module ParallelAsyncResult =
7+
8+
[<AutoOpen>]
9+
module InternalHelpers =
10+
11+
type AsyncResultErrorException<'a>(value: 'a) =
12+
inherit Exception()
13+
member this.Value = value
14+
15+
let toBoxedAsync (input: Async<Result<'ok, 'error>>) : Async<obj> =
16+
async {
17+
match! input with
18+
| Ok x -> return box x
19+
| Error e -> return raise (AsyncResultErrorException<'error>(e))
20+
}
21+
22+
/// <summary>
23+
/// Transforms two AsyncResults in one that executes them concurrently and combines the results using the specified function.
24+
/// If either AsyncResult resolves to an error, then the other is cancelled and only the first error is returned.
25+
/// </summary>
26+
/// <param name="mapper">The function to apply to the values of the AsyncResult values.</param>
27+
/// <param name="input1">The first AsyncResult value to transform.</param>
28+
/// <param name="input2">The second AsyncResult value to transform.</param>
29+
/// <returns>The transformed AsyncResult value.</returns>
30+
let inline map2
31+
([<InlineIfLambda>] mapper: 'a -> 'b -> 'c)
32+
(input1: Async<Result<'a, 'error>>)
33+
(input2: Async<Result<'b, 'error>>)
34+
: Async<Result<'c, 'error>> =
35+
async {
36+
try
37+
return!
38+
Async.parallelMap2
39+
(fun a b ->
40+
let a = unbox<'a> a
41+
let b = unbox<'b> b
42+
Ok(mapper a b)
43+
)
44+
(toBoxedAsync input1)
45+
(toBoxedAsync input2)
46+
47+
with :? AsyncResultErrorException<'error> as exn ->
48+
return Error exn.Value
49+
}
50+
51+
/// <summary>
52+
/// Transforms three AsyncResults in one that executes them concurrently and combines the results using the specified function.
53+
/// If any AsyncResult resolves to an error, then the others are cancelled and only the first error is returned.
54+
/// </summary>
55+
/// <param name="mapper">The function to apply to the values of the AsyncResult values.</param>
56+
/// <param name="input1">The first AsyncResult value to transform.</param>
57+
/// <param name="input2">The second AsyncResult value to transform.</param>
58+
/// <param name="input3">The third AsyncResult value to transform.</param>
59+
/// <returns>The transformed AsyncResult value.</returns>
60+
let inline map3
61+
([<InlineIfLambda>] mapper: 'a -> 'b -> 'c -> 'd)
62+
(input1: Async<Result<'a, 'error>>)
63+
(input2: Async<Result<'b, 'error>>)
64+
(input3: Async<Result<'c, 'error>>)
65+
: Async<Result<'d, 'error>> =
66+
async {
67+
try
68+
return!
69+
Async.parallelMap3
70+
(fun a b c ->
71+
let a = unbox<'a> a
72+
let b = unbox<'b> b
73+
let c = unbox<'c> c
74+
Ok(mapper a b c)
75+
)
76+
(toBoxedAsync input1)
77+
(toBoxedAsync input2)
78+
(toBoxedAsync input3)
79+
80+
with :? AsyncResultErrorException<'error> as exn ->
81+
return Error exn.Value
82+
}
83+
84+
let inline zip
85+
(a: Async<Result<'a, 'error>>)
86+
(b: Async<Result<'b, 'error>>)
87+
: Async<Result<'a * 'b, 'error>> =
88+
map2 (fun a b -> a, b) a b

0 commit comments

Comments
 (0)