Skip to content

Commit 0b67e53

Browse files
committed
more refactoring. getting better.
1 parent 12bceda commit 0b67e53

File tree

7 files changed

+59
-67
lines changed

7 files changed

+59
-67
lines changed

src/PSParallelPipeline/Commands/InvokeParallelCommand.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,13 @@ protected override void BeginProcessing()
6666
InitialSessionState = iss
6767
};
6868

69-
WorkerSettings workerSettings = new()
69+
TaskSettings workerSettings = new()
7070
{
71-
Token = _cts.Token,
71+
Script = ScriptBlock.ToString(),
7272
UsingStatements = ScriptBlock.GetUsingParameters(this)
7373
};
7474

75-
_worker = new Worker(poolSettings, workerSettings);
75+
_worker = new Worker(poolSettings, workerSettings, _cts.Token);
7676
_worker.Run();
7777
}
7878

@@ -83,7 +83,7 @@ protected override void ProcessRecord()
8383

8484
try
8585
{
86-
_worker.Enqueue(InputObject, ScriptBlock);
86+
_worker.Enqueue(InputObject);
8787
while (_worker.TryTake(out PSOutputData data))
8888
{
8989
ProcessOutput(data);

src/PSParallelPipeline/PSTask.cs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,17 @@ private PSTask(RunspacePool pool)
3030
_pool = pool;
3131
}
3232

33-
static internal PSTask Create(RunspacePool runspacePool)
33+
static internal async Task<PSTask> CreateAsync(
34+
object? input,
35+
RunspacePool runspacePool,
36+
TaskSettings settings)
3437
{
3538
PSTask ps = new(runspacePool);
3639
HookStreams(ps._internalStreams, runspacePool.Streams);
40+
ps.Runspace = await runspacePool.GetRunspaceAsync();
41+
ps.AddInput(input);
42+
ps.AddScript(settings.Script);
43+
ps.AddUsingStatements(settings.UsingStatements);
3744
return ps;
3845
}
3946

@@ -56,7 +63,7 @@ private static Task InvokePowerShellAsync(
5663
powerShell.BeginInvoke<PSObject, PSObject>(null, output),
5764
powerShell.EndInvoke);
5865

59-
internal PSTask AddInput(object? inputObject)
66+
private void AddInput(object? inputObject)
6067
{
6168
if (inputObject is not null)
6269
{
@@ -65,17 +72,12 @@ internal PSTask AddInput(object? inputObject)
6572
.AddArgument("_")
6673
.AddArgument(inputObject);
6774
}
68-
69-
return this;
7075
}
7176

72-
internal PSTask AddScript(ScriptBlock script)
73-
{
74-
_powershell.AddScript(script.ToString(), useLocalScope: true);
75-
return this;
76-
}
77+
private void AddScript(string script) =>
78+
_powershell.AddScript(script, useLocalScope: true);
7779

78-
internal PSTask AddUsingStatements(Dictionary<string, object?> usingParams)
80+
private void AddUsingStatements(Dictionary<string, object?> usingParams)
7981
{
8082
if (usingParams.Count > 0)
8183
{
@@ -84,18 +86,17 @@ internal PSTask AddUsingStatements(Dictionary<string, object?> usingParams)
8486
["--%"] = usingParams
8587
});
8688
}
87-
88-
return this;
8989
}
9090

9191
internal async Task InvokeAsync()
9292
{
9393
try
9494
{
95-
using CancellationTokenRegistration _ = _pool.RegisterCancellation(_powershell.Dispose);
96-
Runspace = await _pool.GetRunspaceAsync();
95+
using CancellationTokenRegistration _ = _pool.RegisterCancellation(_powershell.Stop);
9796
await InvokePowerShellAsync(_powershell, OutputStreams.Success);
9897
}
98+
catch (PipelineStoppedException)
99+
{ }
99100
catch (Exception exception)
100101
{
101102
OutputStreams.AddOutput(exception.CreateProcessingTaskError(this));

src/PSParallelPipeline/RunspacePool.cs

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using System;
22
using System.Collections.Concurrent;
3-
using System.Collections.Generic;
43
using System.Management.Automation.Runspaces;
54
using System.Threading;
65
using System.Threading.Tasks;
@@ -15,7 +14,9 @@ internal sealed class RunspacePool : IDisposable
1514

1615
private readonly ConcurrentQueue<Runspace> _pool = [];
1716

18-
private readonly List<Runspace> _created;
17+
// private readonly List<Runspace> _created;
18+
19+
private readonly ConcurrentDictionary<Guid, Runspace> _created;
1920

2021
private readonly bool _useNew;
2122

@@ -33,21 +34,18 @@ internal RunspacePool(
3334
(MaxRunspaces, _useNew, _iss) = settings;
3435
Streams = streams;
3536
Token = token;
36-
_created = new List<Runspace>(MaxRunspaces);
3737
_semaphore = new SemaphoreSlim(MaxRunspaces, MaxRunspaces);
38+
_created = new ConcurrentDictionary<Guid, Runspace>(
39+
Environment.ProcessorCount,
40+
MaxRunspaces);
3841
}
3942
internal void PushRunspace(Runspace runspace)
4043
{
4144
if (_useNew)
4245
{
4346
runspace.Dispose();
44-
45-
lock (_created)
46-
{
47-
_created.Remove(runspace);
48-
runspace = CreateRunspace();
49-
_created.Add(runspace);
50-
}
47+
_created.TryRemove(runspace.InstanceId, out _);
48+
runspace = CreateRunspace();
5149
}
5250

5351
_pool.Enqueue(runspace);
@@ -60,6 +58,7 @@ internal CancellationTokenRegistration RegisterCancellation(Action callback) =>
6058
private Runspace CreateRunspace()
6159
{
6260
Runspace rs = RunspaceFactory.CreateRunspace(_iss);
61+
_created[rs.InstanceId] = rs;
6362
rs.Open();
6463
return rs;
6564
}
@@ -72,18 +71,12 @@ internal async Task<Runspace> GetRunspaceAsync()
7271
return runspace;
7372
}
7473

75-
lock (_created)
76-
{
77-
runspace = CreateRunspace();
78-
_created.Add(runspace);
79-
}
80-
81-
return runspace;
74+
return CreateRunspace();
8275
}
8376

8477
public void Dispose()
8578
{
86-
foreach (Runspace runspace in _created)
79+
foreach (Runspace runspace in _created.Values)
8780
{
8881
runspace.Dispose();
8982
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
using System.Collections.Generic;
2+
3+
namespace PSParallelPipeline;
4+
5+
internal record struct TaskSettings(
6+
string Script,
7+
Dictionary<string, object?> UsingStatements);

src/PSParallelPipeline/Worker.cs

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,47 +3,39 @@
33
using System.Threading.Tasks;
44
using System.Collections.Concurrent;
55
using System.Collections.Generic;
6-
using System.Management.Automation;
76

87
namespace PSParallelPipeline;
98

109
internal sealed class Worker
1110
{
1211
private Task? _worker;
1312

14-
private readonly BlockingCollection<PSTask> _input = [];
13+
private readonly TaskSettings _taskSettings;
14+
15+
private readonly BlockingCollection<object?> _input = [];
1516

1617
private readonly BlockingCollection<PSOutputData> _output = [];
1718

1819
private readonly RunspacePool _pool;
1920

2021
private readonly CancellationToken _token;
2122

22-
private readonly Dictionary<string, object?> _usingParams;
23-
2423
private readonly PSOutputStreams _streams;
2524

2625
internal Worker(
2726
PoolSettings poolSettings,
28-
WorkerSettings workerSettings)
27+
TaskSettings taskSettings,
28+
CancellationToken token)
2929
{
30-
(_usingParams, _token) = workerSettings;
30+
_token = token;
31+
_taskSettings = taskSettings;
3132
_streams = new PSOutputStreams(_output);
3233
_pool = new RunspacePool(poolSettings, _streams, _token);
3334
}
3435

3536
internal void Wait() => _worker?.GetAwaiter().GetResult();
3637

37-
internal void Enqueue(object? input, ScriptBlock script)
38-
{
39-
_input.Add(
40-
item: PSTask
41-
.Create(_pool)
42-
.AddInput(input)
43-
.AddScript(script)
44-
.AddUsingStatements(_usingParams),
45-
cancellationToken: _token);
46-
}
38+
internal void Enqueue(object? input) => _input.Add(input, _token);
4739

4840
internal bool TryTake(out PSOutputData output) =>
4941
_output.TryTake(out output, 0, _token);
@@ -68,9 +60,14 @@ private async Task Start()
6860
await ProcessAnyAsync(tasks);
6961
}
7062

71-
if (_input.TryTake(out PSTask ps, 0, _token))
63+
if (_input.TryTake(out object? input, 0, _token))
7264
{
73-
tasks.Add(ps.InvokeAsync());
65+
PSTask task = await PSTask.CreateAsync(
66+
input: input,
67+
runspacePool: _pool,
68+
settings: _taskSettings);
69+
70+
tasks.Add(task.InvokeAsync());
7471
}
7572
}
7673

src/PSParallelPipeline/WorkerSettings.cs

Lines changed: 0 additions & 8 deletions
This file was deleted.

tests/PSParallelPipeline.tests.ps1

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,9 @@ Describe PSParallelPipeline {
9292

9393
Context 'UseNewRunspace Parameter' {
9494
It 'Should reuse runspaces by default' {
95-
$runspaces = 0..10 | Invoke-Parallel { [runspace]::DefaultRunspace } |
96-
Select-Object -ExpandProperty InstanceId -Unique
97-
$runspaces.Count | Should -BeLessOrEqual 5
95+
0..10 | Invoke-Parallel { [runspace]::DefaultRunspace } |
96+
Select-Object -ExpandProperty InstanceId -Unique |
97+
Should -HaveCount 5
9898
}
9999

100100
It 'Should use a new runspace when the -UseNewRunspace is used' {
@@ -120,7 +120,9 @@ Describe PSParallelPipeline {
120120
0..10 | Invoke-Parallel { Test-Function $_ } -Functions Test-Function |
121121
Sort-Object |
122122
Should -BeExactly @(0..10 | ForEach-Object { Test-Function $_ })
123+
}
123124

125+
It 'Should throw if a function could not be found' {
124126
{ Invoke-Parallel -Functions Test-NotExist { } } |
125127
Should -Throw -ExceptionType ([CommandNotFoundException])
126128
}
@@ -346,14 +348,14 @@ Describe PSParallelPipeline {
346348

347349
Assert-RunspaceCount {
348350
{ 0..1000 | Invoke-Parallel @invokeParallelSplat } |
349-
Should -Throw
351+
Should -Throw -ExceptionType ([TimeoutException])
350352
} -TestCount 50 # -WaitSeconds 1
351353

352354
Assert-RunspaceCount {
353355
$invokeParallelSplat['UseNewRunspace'] = $true
354356
$invokeParallelSplat['ThrottleLimit'] = 1001
355357
{ 0..1000 | Invoke-Parallel @invokeParallelSplat } |
356-
Should -Throw
358+
Should -Throw -ExceptionType ([TimeoutException])
357359
} -TestCount 50 # -WaitSeconds 1
358360
}
359361
}

0 commit comments

Comments
 (0)