@@ -209,28 +209,40 @@ Long getThrottlingLimit(final String taskKey) {
209
209
return tasksThreshold .get (taskKey );
210
210
}
211
211
212
+ private void checkForClusterManagerThrottling (
213
+ final ThrottlingKey clusterManagerThrottlingKey ,
214
+ final String taskThrottlingKey ,
215
+ final long taskCount ,
216
+ final int tasksSize
217
+ ) {
218
+ if (clusterManagerThrottlingKey .isThrottlingEnabled ()) {
219
+ Long threshold = tasksThreshold .get (taskThrottlingKey );
220
+ if (threshold != null && shouldThrottle (threshold , taskCount , tasksSize )) {
221
+ clusterManagerTaskThrottlerListener .onThrottle (taskThrottlingKey , tasksSize );
222
+ logger .warn (
223
+ "Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]" ,
224
+ taskThrottlingKey ,
225
+ tasksSize ,
226
+ threshold
227
+ );
228
+ throw new ClusterManagerThrottlingException ("Throttling Exception : Limit exceeded for " + taskThrottlingKey );
229
+ }
230
+ }
231
+ }
232
+
212
233
@ Override
213
234
public void onBeginSubmit (List <? extends TaskBatcher .BatchedTask > tasks ) {
214
- ThrottlingKey clusterManagerThrottlingKey = ((ClusterStateTaskExecutor <Object >) tasks .get (0 ).batchingKey )
235
+ final ThrottlingKey clusterManagerThrottlingKey = ((ClusterStateTaskExecutor <Object >) tasks .get (0 ).batchingKey )
215
236
.getClusterManagerThrottlingKey ();
216
- tasksCount .putIfAbsent (clusterManagerThrottlingKey .getTaskThrottlingKey (), 0L );
217
- tasksCount .computeIfPresent (clusterManagerThrottlingKey .getTaskThrottlingKey (), (key , count ) -> {
237
+ final String taskThrottlingKey = clusterManagerThrottlingKey .getTaskThrottlingKey ();
238
+ tasksCount .putIfAbsent (taskThrottlingKey , 0L );
239
+
240
+ // Performing shallow check before taking lock, performing throttle check and computing new count
241
+ checkForClusterManagerThrottling (clusterManagerThrottlingKey , taskThrottlingKey , tasksCount .get (taskThrottlingKey ), tasks .size ());
242
+
243
+ tasksCount .computeIfPresent (taskThrottlingKey , (key , count ) -> {
218
244
int size = tasks .size ();
219
- if (clusterManagerThrottlingKey .isThrottlingEnabled ()) {
220
- Long threshold = tasksThreshold .get (clusterManagerThrottlingKey .getTaskThrottlingKey ());
221
- if (threshold != null && shouldThrottle (threshold , count , size )) {
222
- clusterManagerTaskThrottlerListener .onThrottle (clusterManagerThrottlingKey .getTaskThrottlingKey (), size );
223
- logger .warn (
224
- "Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]" ,
225
- clusterManagerThrottlingKey .getTaskThrottlingKey (),
226
- tasks .size (),
227
- threshold
228
- );
229
- throw new ClusterManagerThrottlingException (
230
- "Throttling Exception : Limit exceeded for " + clusterManagerThrottlingKey .getTaskThrottlingKey ()
231
- );
232
- }
233
- }
245
+ checkForClusterManagerThrottling (clusterManagerThrottlingKey , taskThrottlingKey , count , size );
234
246
return count + size ;
235
247
});
236
248
}
0 commit comments