Skip to content

Commit 690652b

Browse files
committed
Merge pull request #682 from Particular/adopt_batches
AdoptOrphanedBatches running more then necessary
2 parents 939bf58 + a821b36 commit 690652b

File tree

3 files changed

+29
-10
lines changed

3 files changed

+29
-10
lines changed

src/ServiceControl/Infrastructure/TimeKeeper.cs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,23 @@ public class TimeKeeper : IDisposable
1010
ConcurrentDictionary<Timer, object> timers = new ConcurrentDictionary<Timer, object>();
1111
private ILog log = LogManager.GetLogger<TimeKeeper>();
1212

13-
public Timer New(Action callback, TimeSpan dueTime, TimeSpan period)
13+
public Timer NewTimer(Func<bool> callback, TimeSpan dueTime, TimeSpan period)
1414
{
1515
Timer timer = null;
1616

1717
timer = new Timer(_ =>
1818
{
19+
var reschedule = false;
20+
1921
try
2022
{
21-
callback();
23+
reschedule = callback();
2224
}
2325
catch (Exception ex)
2426
{
2527
log.Error("Reoccurring timer task failed.", ex);
2628
}
27-
if (timers.ContainsKey(timer))
29+
if (reschedule && timers.ContainsKey(timer))
2830
{
2931
try
3032
{
@@ -41,6 +43,15 @@ public Timer New(Action callback, TimeSpan dueTime, TimeSpan period)
4143
return timer;
4244
}
4345

46+
public Timer New(Action callback, TimeSpan dueTime, TimeSpan period)
47+
{
48+
return NewTimer(() =>
49+
{
50+
callback();
51+
return true;
52+
}, dueTime, period);
53+
}
54+
4455
public void Release(Timer timer)
4556
{
4657
object _;

src/ServiceControl/Recoverability/Retries/FailedMessageRetries.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,20 +76,23 @@ public AdoptOrphanBatchesFromPreviousSession(RetryDocumentManager retryDocumentM
7676
this.timeKeeper = timeKeeper;
7777
}
7878

79-
private void AdoptOrphanedBatches()
79+
private bool AdoptOrphanedBatches()
8080
{
81-
var allDone = retryDocumentManager.AdoptOrphanedBatches();
81+
bool hasMoreWorkToDo;
82+
retryDocumentManager.AdoptOrphanedBatches(out hasMoreWorkToDo);
8283

83-
if (allDone)
84+
if (!hasMoreWorkToDo)
8485
{
8586
//Disable timeout
8687
timer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
8788
}
89+
90+
return hasMoreWorkToDo;
8891
}
8992

9093
protected override void OnStart()
9194
{
92-
timer = timeKeeper.New(AdoptOrphanedBatches, TimeSpan.Zero, TimeSpan.FromMinutes(2));
95+
timer = timeKeeper.NewTimer(AdoptOrphanedBatches, TimeSpan.Zero, TimeSpan.FromMinutes(2));
9396
}
9497

9598
protected override void OnStop()

src/ServiceControl/Recoverability/Retries/RetryDocumentManager.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public void RemoveFailedMessageRetryDocument(string uniqueMessageId)
107107
Store.DatabaseCommands.Delete(FailedMessageRetry.MakeDocumentId(uniqueMessageId), null);
108108
}
109109

110-
internal bool AdoptOrphanedBatches()
110+
internal void AdoptOrphanedBatches(out bool hasMoreWorkToDo)
111111
{
112112
using (var session = Store.OpenSession())
113113
{
@@ -123,8 +123,13 @@ internal bool AdoptOrphanedBatches()
123123

124124
AdoptBatches(session, orphanedBatchIds);
125125

126-
var moreToDo = stats.IsStale || orphanedBatchIds.Any();
127-
return abort || !moreToDo;
126+
if (abort)
127+
{
128+
hasMoreWorkToDo = false;
129+
return;
130+
}
131+
132+
hasMoreWorkToDo = stats.IsStale || orphanedBatchIds.Any();
128133
}
129134
}
130135

0 commit comments

Comments
 (0)