3
3
import com .badlogic .gdx .box2d .Box2d ;
4
4
import com .badlogic .gdx .box2d .structs .b2WorldDef ;
5
5
import com .badlogic .gdx .jnigen .runtime .closure .ClosureObject ;
6
+ import com .badlogic .gdx .jnigen .runtime .closure .PointingPoolManager ;
6
7
import com .badlogic .gdx .jnigen .runtime .pointer .VoidPointer ;
7
8
import com .badlogic .gdx .utils .Disposable ;
8
9
9
10
import java .util .concurrent .ArrayBlockingQueue ;
10
11
import java .util .concurrent .BlockingQueue ;
11
- import java .util .concurrent .CountDownLatch ;
12
12
import java .util .concurrent .atomic .AtomicInteger ;
13
+ import java .util .concurrent .locks .AbstractQueuedSynchronizer ;
13
14
14
15
/**
15
16
* This class implements a basic TaskSystem in java, for multithreading of a box2d world.
@@ -28,9 +29,13 @@ public class Box2dWorldTaskSystem implements Disposable {
28
29
private final AtomicInteger aliveWorkers ;
29
30
private final Box2dWorldTaskSystemDeathHandler deathHandler ;
30
31
private final BlockingQueue <Box2dTaskChunk > taskChunks ;
31
- private final CountDownLatch [] countDownLatches = new CountDownLatch [MAX_TASKS ];
32
+ private final Box2dCountdown [] countDownLatches = new Box2dCountdown [MAX_TASKS ];
32
33
private final ClosureObject <Box2d .b2EnqueueTaskCallback > enqueueTaskCallback ;
33
34
private final ClosureObject <Box2d .b2FinishTaskCallback > finishTaskCallback ;
35
+ private final PointingPoolManager poolManager = new PointingPoolManager (2 );
36
+ private final ArrayBlockingQueue <Box2dTaskChunk > chunkPool ;
37
+ private final VoidPointer TASK_INDEX_WRAPPER = new VoidPointer (0L , false );
38
+ private final boolean poolingEnabled ;
34
39
35
40
private int taskCount = 0 ;
36
41
private boolean running = true ;
@@ -43,23 +48,31 @@ public class Box2dWorldTaskSystem implements Disposable {
43
48
* @param worldDef The world to configure
44
49
* @param numWorkers The amount of worker
45
50
* @param deathHandler Handler for when the task system dies. see {@link Box2dWorldTaskSystemDeathHandler}
51
+ * @param enablePooling Whether the TaskSystem should avoid per-frame allocation
46
52
* @return The created {@link Box2dWorldTaskSystem}
47
53
*/
48
- public static Box2dWorldTaskSystem createForWorld (b2WorldDef worldDef , int numWorkers , Box2dWorldTaskSystemDeathHandler deathHandler ) {
49
- Box2dWorldTaskSystem multiThreader = new Box2dWorldTaskSystem (numWorkers , deathHandler );
54
+ public static Box2dWorldTaskSystem createForWorld (b2WorldDef worldDef , int numWorkers , Box2dWorldTaskSystemDeathHandler deathHandler , boolean enablePooling ) {
55
+ Box2dWorldTaskSystem multiThreader = new Box2dWorldTaskSystem (numWorkers , deathHandler , enablePooling );
50
56
multiThreader .configureForWorld (worldDef );
51
57
return multiThreader ;
52
58
}
53
59
54
- private Box2dWorldTaskSystem (int numWorkers , Box2dWorldTaskSystemDeathHandler deathHandler ) {
60
+ private Box2dWorldTaskSystem (int numWorkers , Box2dWorldTaskSystemDeathHandler deathHandler , boolean enablePooling ) {
55
61
if (numWorkers <= 1 )
56
62
throw new IllegalArgumentException ("Number of workers must be greater than 1" );
63
+
64
+ this .poolingEnabled = enablePooling ;
57
65
this .numWorkers = numWorkers ;
58
66
this .workers = new Thread [numWorkers ];
59
67
this .taskChunks = new ArrayBlockingQueue <>(MAX_TASKS * numWorkers );
68
+ this .chunkPool = new ArrayBlockingQueue <>(MAX_TASKS * numWorkers );
60
69
this .aliveWorkers = new AtomicInteger (numWorkers );
61
70
this .deathHandler = deathHandler ;
62
71
72
+ for (int i = 0 ; i < MAX_TASKS ; i ++) {
73
+ countDownLatches [i ] = new Box2dCountdown ();
74
+ }
75
+
63
76
for (int i = 0 ; i < numWorkers ; i ++) {
64
77
final int workerId = i ;
65
78
workers [i ] = new Thread (() -> {
@@ -69,6 +82,8 @@ private Box2dWorldTaskSystem(int numWorkers, Box2dWorldTaskSystemDeathHandler de
69
82
Box2dTaskChunk task = taskChunks .take ();
70
83
task .execute (workerId );
71
84
countDownLatches [task .taskIndex ].countDown ();
85
+ if (poolingEnabled )
86
+ chunkPool .offer (task );
72
87
} catch (InterruptedException e ) {
73
88
break ;
74
89
}
@@ -90,22 +105,26 @@ private Box2dWorldTaskSystem(int numWorkers, Box2dWorldTaskSystemDeathHandler de
90
105
int baseItemsPerTask = itemCount / numTasks ;
91
106
int remainingItems = itemCount % numTasks ;
92
107
93
- countDownLatches [taskCount ] = new CountDownLatch (numTasks );
108
+ countDownLatches [taskCount ]. setCount (numTasks );
94
109
95
110
int start = 0 ;
96
111
for (int i = 0 ; i < numTasks ; i ++) {
97
112
int itemsInThisTask = baseItemsPerTask + (i < remainingItems ? 1 : 0 );
98
113
int end = start + itemsInThisTask ;
99
114
100
- Box2dTaskChunk box2DTaskChunk = new Box2dTaskChunk (taskCount , task , start , end , taskContext );
115
+ Box2dTaskChunk box2DTaskChunk = poolingEnabled ? chunkPool .poll () : new Box2dTaskChunk ();
116
+ if (box2DTaskChunk == null )
117
+ box2DTaskChunk = new Box2dTaskChunk ();
118
+
119
+ box2DTaskChunk .setData (taskCount , task , start , end , taskContext .getPointer ());
101
120
if (!taskChunks .offer (box2DTaskChunk ))
102
121
throw new IllegalStateException ("Task queue is full - impossible" );
103
122
start = end ;
104
123
}
105
124
106
- VoidPointer taskIndex = new VoidPointer (( long ) taskCount + 1 , false );
125
+ TASK_INDEX_WRAPPER . setPointer ( taskCount + 1 );
107
126
taskCount ++;
108
- return taskIndex ;
127
+ return TASK_INDEX_WRAPPER ;
109
128
} else {
110
129
task .getClosure ().b2TaskCallback_call (0 , itemCount , 0 , taskContext );
111
130
return VoidPointer .NULL ;
@@ -119,11 +138,16 @@ private Box2dWorldTaskSystem(int numWorkers, Box2dWorldTaskSystemDeathHandler de
119
138
int taskId = (int ) userTask .getPointer () - 1 ;
120
139
try {
121
140
countDownLatches [taskId ].await ();
122
- countDownLatches [taskId ] = null ;
123
141
} catch (InterruptedException e ) {
124
142
throw new Box2dWorldTaskSystemInterruptException (e );
125
143
}
126
144
});
145
+
146
+ if (poolingEnabled ) {
147
+ poolManager .addPool (VoidPointer ::new , 2 );
148
+ enqueueTaskCallback .setPoolManager (poolManager );
149
+ finishTaskCallback .setPoolManager (poolManager );
150
+ }
127
151
}
128
152
129
153
private void onThreadDies () {
@@ -166,18 +190,18 @@ public void dispose() {
166
190
}
167
191
168
192
private static class Box2dTaskChunk {
169
- private final int taskIndex ;
170
- private final ClosureObject <Box2d .b2TaskCallback > task ;
171
- private final int start ;
172
- private final int end ;
173
- private final VoidPointer taskContext ;
193
+ private int taskIndex ;
194
+ private ClosureObject <Box2d .b2TaskCallback > task ;
195
+ private int start ;
196
+ private int end ;
197
+ private final VoidPointer taskContext = new VoidPointer ( 0L , false ) ;
174
198
175
- public Box2dTaskChunk (int taskIndex , ClosureObject <Box2d .b2TaskCallback > task , int start , int end , VoidPointer taskContext ) {
199
+ public void setData (int taskIndex , ClosureObject <Box2d .b2TaskCallback > task , int start , int end , long taskContext ) {
176
200
this .taskIndex = taskIndex ;
177
201
this .task = task ;
178
202
this .start = start ;
179
203
this .end = end ;
180
- this .taskContext = taskContext ;
204
+ this .taskContext . setPointer ( taskContext ) ;
181
205
}
182
206
183
207
public void execute (int workerId ) {
@@ -205,4 +229,41 @@ public interface Box2dWorldTaskSystemDeathHandler {
205
229
*/
206
230
void taskSystemDied ();
207
231
}
232
+
233
+ private static class Box2dCountdown extends AbstractQueuedSynchronizer {
234
+
235
+ private Box2dCountdown () {
236
+ setState (0 );
237
+ }
238
+
239
+ public void setCount (int count ) {
240
+ setState (count );
241
+ }
242
+
243
+ public void await () throws InterruptedException {
244
+ acquireSharedInterruptibly (1 );
245
+ }
246
+
247
+ public void countDown () {
248
+ releaseShared (1 );
249
+ }
250
+
251
+ @ Override
252
+ protected int tryAcquireShared (int acquires ) {
253
+ return (getState () == 0 ) ? 1 : -1 ;
254
+ }
255
+
256
+ @ Override
257
+ protected boolean tryReleaseShared (int releases ) {
258
+ // Decrement count; signal when transition to zero
259
+ for (;;) {
260
+ int c = getState ();
261
+ if (c == 0 )
262
+ return false ;
263
+ int nextc = c - 1 ;
264
+ if (compareAndSetState (c , nextc ))
265
+ return nextc == 0 ;
266
+ }
267
+ }
268
+ }
208
269
}
0 commit comments