Skip to content

Commit d702547

Browse files
committed
fix: hold _isProcessing when dependencies not met to prevent queue orphaning in Watch.ProcessChange
1 parent 7970a6d commit d702547

File tree

1 file changed

+30
-8
lines changed

1 file changed

+30
-8
lines changed

src/Configuration/Watch.cs

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -207,13 +207,17 @@ public override void Run(ChangeInfo change, TriggerType trigger)
207207
// Start processing on thread pool
208208
_processingTask = Task.Run(() =>
209209
{
210+
bool releaseProcessing = true;
210211
try
211212
{
212-
ProcessChange();
213+
releaseProcessing = ProcessChange();
213214
}
214215
finally
215216
{
216-
Interlocked.Exchange(ref _isProcessing, 0);
217+
if (releaseProcessing)
218+
{
219+
Interlocked.Exchange(ref _isProcessing, 0);
220+
}
217221
}
218222
});
219223
}
@@ -413,20 +417,24 @@ private void CreateTimer()
413417
/// <summary>
414418
/// Process the changes in the queue.
415419
/// </summary>
416-
public void ProcessChange()
420+
/// <returns>
421+
/// <c>true</c> if the <see cref="_isProcessing"/> flag should be released
422+
/// by the caller; <c>false</c> if the flag must remain held because
423+
/// processing is pending a dependency completing.
424+
/// </returns>
425+
public bool ProcessChange()
417426
{
418427
if (string.IsNullOrWhiteSpace(Path))
419428
{
420-
return;
429+
return true;
421430
}
422431

423432
_queue ??= new ConcurrentQueue<ChangeInfo>();
424433

425434
if (_queue.IsEmpty)
426435
{
427-
Initialize();
428436
Thread.Sleep(EMPTY_QUEUE_SLEEP_MS);
429-
return;
437+
return true;
430438
}
431439

432440
// Peek at first item for correlation ID instead of copying entire queue
@@ -470,7 +478,10 @@ public void ProcessChange()
470478
$"{correlationPrefix}{IdLogString}: Watch blocked from running. Reason: {reason}. Queue will be processed when watch becomes available. (Watch.ProcessChange)",
471479
LogLevel.DEBUG);
472480
}
473-
return;
481+
// Return false to keep _isProcessing held. OnNeedsCompleted will call
482+
// ProcessChange() directly once the dependency completes, at which point
483+
// it will release _isProcessing itself.
484+
return false;
474485
}
475486

476487
var startTime = DateTime.Now;
@@ -543,6 +554,7 @@ public void ProcessChange()
543554
? $"[{firstCorrelationId.Value}] {IdLogString}: Tasks completed for watch."
544555
: $"{IdLogString}: Tasks completed for watch.";
545556
OnCompleted(this, new TaskEventArgs(true, IdLogString, completionMessage));
557+
return true;
546558
}
547559

548560
/// <summary>
@@ -1076,7 +1088,17 @@ public override void OnNeedsCompleted(object? sender, TaskEventArgs e)
10761088

10771089
base.OnNeedsCompleted(sender, e);
10781090

1079-
ProcessChange();
1091+
// _isProcessing is still held from the blocked Task.Run (ProcessChange
1092+
// returned false). Process the queued items now that the dependency has
1093+
// completed, then release the flag.
1094+
try
1095+
{
1096+
ProcessChange();
1097+
}
1098+
finally
1099+
{
1100+
Interlocked.Exchange(ref _isProcessing, 0);
1101+
}
10801102
}
10811103

10821104
}

0 commit comments

Comments
 (0)