3
3
namespace Rcalicdan \FiberAsync \Api ;
4
4
5
5
use PDO ;
6
- use Rcalicdan \FiberAsync \Async \Handlers \PromiseCollectionHandler ;
7
6
use Rcalicdan \FiberAsync \PDO \AsyncPdoPool ;
8
7
use Rcalicdan \FiberAsync \Promise \CancellablePromise ;
9
8
use Rcalicdan \FiberAsync \Promise \Interfaces \PromiseInterface ;
@@ -30,7 +29,7 @@ final class AsyncPDO
30
29
* This is the single point of configuration and must be called before
31
30
* using any other AsyncPDO methods. Multiple calls are ignored.
32
31
*
33
- * @param array $dbConfig Database configuration array containing:
32
+ * @param array<string, mixed> $dbConfig Database configuration array containing:
34
33
* - dsn: Database connection string
35
34
* - username: Database username
36
35
* - password: Database password
@@ -55,7 +54,7 @@ public static function init(array $dbConfig, int $poolSize = 10): void
55
54
*/
56
55
public static function reset (): void
57
56
{
58
- if (self ::$ pool ) {
57
+ if (self ::$ pool !== null ) {
59
58
self ::$ pool ->close ();
60
59
}
61
60
self ::$ pool = null ;
@@ -68,23 +67,23 @@ public static function reset(): void
68
67
* Automatically handles connection acquisition and release. The callback
69
68
* receives a PDO instance and can perform any database operations.
70
69
*
71
- * @param callable $callback Function that receives PDO instance
72
- * Signature: function( PDO $pdo): mixed
73
- * @return PromiseInterface Promise resolving to callback's return value
70
+ * @template TResult
71
+ * @param callable(PDO): TResult $callback Function that receives PDO instance
72
+ * @return PromiseInterface<TResult> Promise resolving to callback's return value
74
73
*
75
74
* @throws \RuntimeException If AsyncPDO is not initialized
76
75
*/
77
76
public static function run (callable $ callback ): PromiseInterface
78
77
{
79
- return Async::async (function () use ($ callback ) {
78
+ return Async::async (function () use ($ callback ): mixed {
80
79
$ pdo = null ;
81
80
82
81
try {
83
82
$ pdo = await (self ::getPool ()->get ());
84
83
85
84
return $ callback ($ pdo );
86
85
} finally {
87
- if ($ pdo ) {
86
+ if ($ pdo !== null ) {
88
87
self ::getPool ()->release ($ pdo );
89
88
}
90
89
}
@@ -95,15 +94,15 @@ public static function run(callable $callback): PromiseInterface
95
94
* Executes a SELECT query and returns all matching rows.
96
95
*
97
96
* @param string $sql SQL query with optional parameter placeholders
98
- * @param array $params Parameter values for prepared statement
99
- * @return PromiseInterface Promise resolving to array of associative arrays
97
+ * @param array<string|int, mixed> $params Parameter values for prepared statement
98
+ * @return PromiseInterface<array<int, array<string, mixed>>> Promise resolving to array of associative arrays
100
99
*
101
100
* @throws \RuntimeException If AsyncPDO is not initialized
102
101
* @throws \PDOException If query execution fails
103
102
*/
104
103
public static function query (string $ sql , array $ params = []): PromiseInterface
105
104
{
106
- return self ::run (function (PDO $ pdo ) use ($ sql , $ params ) {
105
+ return self ::run (function (PDO $ pdo ) use ($ sql , $ params ): array {
107
106
$ stmt = $ pdo ->prepare ($ sql );
108
107
$ stmt ->execute ($ params );
109
108
@@ -115,15 +114,15 @@ public static function query(string $sql, array $params = []): PromiseInterface
115
114
* Executes a SELECT query and returns the first matching row.
116
115
*
117
116
* @param string $sql SQL query with optional parameter placeholders
118
- * @param array $params Parameter values for prepared statement
119
- * @return PromiseInterface Promise resolving to associative array or false if no rows
117
+ * @param array<string|int, mixed> $params Parameter values for prepared statement
118
+ * @return PromiseInterface<array<string, mixed>|false> Promise resolving to associative array or false if no rows
120
119
*
121
120
* @throws \RuntimeException If AsyncPDO is not initialized
122
121
* @throws \PDOException If query execution fails
123
122
*/
124
123
public static function fetchOne (string $ sql , array $ params = []): PromiseInterface
125
124
{
126
- return self ::run (function (PDO $ pdo ) use ($ sql , $ params ) {
125
+ return self ::run (function (PDO $ pdo ) use ($ sql , $ params ): array | false {
127
126
$ stmt = $ pdo ->prepare ($ sql );
128
127
$ stmt ->execute ($ params );
129
128
@@ -135,31 +134,52 @@ public static function fetchOne(string $sql, array $params = []): PromiseInterfa
135
134
* Executes an INSERT, UPDATE, or DELETE statement and returns affected row count.
136
135
*
137
136
* @param string $sql SQL statement with optional parameter placeholders
138
- * @param array $params Parameter values for prepared statement
139
- * @return PromiseInterface Promise resolving to number of affected rows
137
+ * @param array<string|int, mixed> $params Parameter values for prepared statement
138
+ * @return PromiseInterface<int> Promise resolving to number of affected rows
140
139
*
141
140
* @throws \RuntimeException If AsyncPDO is not initialized
142
141
* @throws \PDOException If statement execution fails
143
142
*/
144
143
public static function execute (string $ sql , array $ params = []): PromiseInterface
145
144
{
146
- return self ::run (function (PDO $ pdo ) use ($ sql , $ params ) {
145
+ return self ::run (function (PDO $ pdo ) use ($ sql , $ params ): int {
147
146
$ stmt = $ pdo ->prepare ($ sql );
148
147
$ stmt ->execute ($ params );
149
148
150
149
return $ stmt ->rowCount ();
151
150
});
152
151
}
153
152
153
+ /**
154
+ * Executes a query and returns a single column value from the first row.
155
+ *
156
+ * Useful for queries that return a single scalar value like COUNT, MAX, etc.
157
+ *
158
+ * @param string $sql SQL query with optional parameter placeholders
159
+ * @param array<string|int, mixed> $params Parameter values for prepared statement
160
+ * @return PromiseInterface<mixed> Promise resolving to scalar value or false if no rows
161
+ *
162
+ * @throws \RuntimeException If AsyncPDO is not initialized
163
+ * @throws \PDOException If query execution fails
164
+ */
165
+ public static function fetchValue (string $ sql , array $ params = []): PromiseInterface
166
+ {
167
+ return self ::run (function (PDO $ pdo ) use ($ sql , $ params ): mixed {
168
+ $ stmt = $ pdo ->prepare ($ sql );
169
+ $ stmt ->execute ($ params );
170
+
171
+ return $ stmt ->fetch (PDO ::FETCH_COLUMN );
172
+ });
173
+ }
174
+
154
175
/**
155
176
* Executes multiple operations within a database transaction.
156
177
*
157
178
* Automatically handles transaction begin/commit/rollback. If the callback
158
179
* throws an exception, the transaction is rolled back automatically.
159
180
*
160
- * @param callable $callback Transaction callback receiving PDO instance
161
- * Signature: function(PDO $pdo): mixed
162
- * @return PromiseInterface Promise resolving to callback's return value
181
+ * @param callable(PDO): mixed $callback Transaction callback receiving PDO instance
182
+ * @return PromiseInterface<mixed> Promise resolving to callback's return value
163
183
*
164
184
* @throws \RuntimeException If AsyncPDO is not initialized
165
185
* @throws \PDOException If transaction operations fail
@@ -183,47 +203,27 @@ public static function transaction(callable $callback): PromiseInterface
183
203
});
184
204
}
185
205
186
- /**
187
- * Executes a query and returns a single column value from the first row.
188
- *
189
- * Useful for queries that return a single scalar value like COUNT, MAX, etc.
190
- *
191
- * @param string $sql SQL query with optional parameter placeholders
192
- * @param array $params Parameter values for prepared statement
193
- * @return PromiseInterface Promise resolving to scalar value or false if no rows
194
- *
195
- * @throws \RuntimeException If AsyncPDO is not initialized
196
- * @throws \PDOException If query execution fails
197
- */
198
- public static function fetchValue (string $ sql , array $ params = []): PromiseInterface
199
- {
200
- return self ::run (function (PDO $ pdo ) use ($ sql , $ params ) {
201
- $ stmt = $ pdo ->prepare ($ sql );
202
- $ stmt ->execute ($ params );
203
-
204
- return $ stmt ->fetch (PDO ::FETCH_COLUMN );
205
- });
206
- }
207
-
208
206
/**
209
207
* Race multiple transactions and commit only the winner, rolling back all others.
210
208
*
211
209
* Executes multiple transactions concurrently and commits the first one to complete
212
210
* successfully while cancelling and rolling back all others. Useful for scenarios
213
211
* like inventory reservation where only one transaction should succeed.
214
212
*
215
- * @param array $transactions Array of transaction callbacks
216
- * Each callback signature: function(PDO $pdo): mixed
217
- * @return PromiseInterface Promise that resolves with the winner's result
213
+ * @param array<int, callable(PDO): mixed> $transactions Array of transaction callbacks
214
+ * @return PromiseInterface<mixed> Promise that resolves with the winner's result
218
215
*
219
216
* @throws \RuntimeException If AsyncPDO is not initialized
220
217
* @throws Throwable If all transactions fail or system error occurs
221
218
*/
222
219
public static function raceTransactions (array $ transactions ): PromiseInterface
223
220
{
224
- return Async::async (function () use ($ transactions ) {
221
+ return Async::async (function () use ($ transactions ): mixed {
222
+ /** @var array<int, CancellablePromise<array{result: mixed, winner_index: int, success: bool}>> $transactionPromises */
225
223
$ transactionPromises = [];
224
+ /** @var array<int, PDO> $pdoConnections */
226
225
$ pdoConnections = [];
226
+ /** @var array<int, CancellablePromise<array{result: mixed, winner_index: int, success: bool}>> $cancellablePromises */
227
227
$ cancellablePromises = [];
228
228
229
229
foreach ($ transactions as $ index => $ transactionCallback ) {
@@ -232,10 +232,9 @@ public static function raceTransactions(array $transactions): PromiseInterface
232
232
$ cancellablePromises [$ index ] = $ cancellablePromise ;
233
233
}
234
234
235
- $ collectionHandler = new PromiseCollectionHandler ;
236
-
237
235
try {
238
- $ winnerResult = await ($ collectionHandler ->race ($ transactionPromises ));
236
+ /** @var array{result: mixed, winner_index: int, success: bool} $winnerResult */
237
+ $ winnerResult = await (race ($ transactionPromises ));
239
238
240
239
self ::cancelLosingTransactions ($ cancellablePromises , $ winnerResult ['winner_index ' ]);
241
240
@@ -257,16 +256,16 @@ public static function raceTransactions(array $transactions): PromiseInterface
257
256
* Creates a cancellable promise that executes a transaction callback and
258
257
* stores the PDO connection for later cleanup operations.
259
258
*
260
- * @param callable $transactionCallback Transaction function to execute
259
+ * @param callable(PDO): mixed $transactionCallback Transaction function to execute
261
260
* @param int $index Transaction index for identification
262
- * @param array & $pdoConnections Reference to array storing PDO connections
263
- * @return CancellablePromise Promise that can be cancelled mid-execution
261
+ * @param array<int, PDO> $pdoConnections Reference to array storing PDO connections
262
+ * @return CancellablePromise<array{result: mixed, winner_index: int, success: bool}> Promise that can be cancelled mid-execution
264
263
*
265
264
* @internal This method is for internal use by raceTransactions()
266
265
*/
267
266
private static function startCancellableRacingTransaction (callable $ transactionCallback , int $ index , array &$ pdoConnections ): CancellablePromise
268
267
{
269
- $ cancellablePromise = new CancellablePromise (function ($ resolve , $ reject ) use ($ transactionCallback , $ index , &$ pdoConnections ) {
268
+ $ cancellablePromise = new CancellablePromise (function (callable $ resolve , callable $ reject ) use ($ transactionCallback , $ index , &$ pdoConnections ) {
270
269
$ pdo = await (self ::getPool ()->get ());
271
270
$ pdoConnections [$ index ] = $ pdo ;
272
271
@@ -295,7 +294,7 @@ private static function startCancellableRacingTransaction(callable $transactionC
295
294
}
296
295
self ::getPool ()->release ($ pdo );
297
296
} catch (Throwable $ e ) {
298
- error_log ("Failed to cancel transaction {$ index }: " . $ e ->getMessage ());
297
+ error_log ("Failed to cancel transaction {$ index }: " . $ e ->getMessage ());
299
298
self ::getPool ()->release ($ pdo );
300
299
}
301
300
}
@@ -310,7 +309,7 @@ private static function startCancellableRacingTransaction(callable $transactionC
310
309
* Iterates through all racing transactions and cancels those that didn't win,
311
310
* triggering their rollback handlers.
312
311
*
313
- * @param array $cancellablePromises Array of CancellablePromise instances
312
+ * @param array<int, CancellablePromise<array{result: mixed, winner_index: int, success: bool}>> $cancellablePromises Array of CancellablePromise instances
314
313
* @param int $winnerIndex Index of the winning transaction to preserve
315
314
*
316
315
* @internal This method is for internal use by raceTransactions()
@@ -329,7 +328,7 @@ private static function cancelLosingTransactions(array $cancellablePromises, int
329
328
*
330
329
* Emergency cancellation of all racing transactions when a system error occurs.
331
330
*
332
- * @param array $cancellablePromises Array of CancellablePromise instances
331
+ * @param array<int, CancellablePromise<array{result: mixed, winner_index: int, success: bool}>> $cancellablePromises Array of CancellablePromise instances
333
332
*
334
333
* @internal This method is for internal use by raceTransactions()
335
334
*/
@@ -348,17 +347,17 @@ private static function cancelAllTransactions(array $cancellablePromises): void
348
347
* Commits the winning transaction and releases its connection back to the pool.
349
348
* Losing transactions should already be cancelled by this point.
350
349
*
351
- * @param array $pdoConnections Array of PDO connections indexed by transaction
350
+ * @param array<int, PDO> $pdoConnections Array of PDO connections indexed by transaction
352
351
* @param int $winnerIndex Index of the winning transaction
353
- * @return PromiseInterface Promise that resolves when finalization is complete
352
+ * @return PromiseInterface<void> Promise that resolves when finalization is complete
354
353
*
355
354
* @throws Throwable If commit fails
356
355
*
357
356
* @internal This method is for internal use by raceTransactions()
358
357
*/
359
358
private static function finalizeRacingTransactions (array $ pdoConnections , int $ winnerIndex ): PromiseInterface
360
359
{
361
- return Async::async (function () use ($ pdoConnections , $ winnerIndex ) {
360
+ return Async::async (function () use ($ pdoConnections , $ winnerIndex ): void {
362
361
if (isset ($ pdoConnections [$ winnerIndex ])) {
363
362
$ pdo = $ pdoConnections [$ winnerIndex ];
364
363
@@ -369,7 +368,7 @@ private static function finalizeRacingTransactions(array $pdoConnections, int $w
369
368
}
370
369
self ::getPool ()->release ($ pdo );
371
370
} catch (Throwable $ e ) {
372
- error_log ("Failed to commit winner transaction {$ winnerIndex }: " . $ e ->getMessage ());
371
+ error_log ("Failed to commit winner transaction {$ winnerIndex }: " . $ e ->getMessage ());
373
372
$ pdo ->rollBack ();
374
373
self ::getPool ()->release ($ pdo );
375
374
@@ -385,32 +384,32 @@ private static function finalizeRacingTransactions(array $pdoConnections, int $w
385
384
* Emergency cleanup that rolls back all racing transactions when a system
386
385
* error occurs before a winner can be determined.
387
386
*
388
- * @param array $pdoConnections Array of PDO connections to rollback
389
- * @return PromiseInterface Promise that resolves when all rollbacks complete
387
+ * @param array<int, PDO> $pdoConnections Array of PDO connections to rollback
388
+ * @return PromiseInterface<void> Promise that resolves when all rollbacks complete
390
389
*
391
390
* @internal This method is for internal use by raceTransactions()
392
391
*/
393
392
private static function rollbackAllTransactions (array $ pdoConnections ): PromiseInterface
394
393
{
395
- return Async::async (function () use ($ pdoConnections ) {
394
+ return Async::async (function () use ($ pdoConnections ): void {
395
+ /** @var array<int, PromiseInterface<void>> $rollbackPromises */
396
396
$ rollbackPromises = [];
397
397
398
398
foreach ($ pdoConnections as $ index => $ pdo ) {
399
- $ rollbackPromises [] = Async::async (function () use ($ pdo , $ index ) {
399
+ $ rollbackPromises [] = Async::async (function () use ($ pdo , $ index ): void {
400
400
try {
401
401
if ($ pdo ->inTransaction ()) {
402
402
$ pdo ->rollBack ();
403
403
}
404
404
self ::getPool ()->release ($ pdo );
405
405
} catch (Throwable $ e ) {
406
- error_log ("Failed to rollback transaction {$ index }: " . $ e ->getMessage ());
406
+ error_log ("Failed to rollback transaction {$ index }: " . $ e ->getMessage ());
407
407
self ::getPool ()->release ($ pdo );
408
408
}
409
409
})();
410
410
}
411
411
412
- $ collectionHandler = new PromiseCollectionHandler ;
413
- await ($ collectionHandler ->all ($ rollbackPromises ));
412
+ await (all ($ rollbackPromises ));
414
413
})();
415
414
}
416
415
@@ -425,7 +424,7 @@ private static function rollbackAllTransactions(array $pdoConnections): PromiseI
425
424
*/
426
425
private static function getPool (): AsyncPdoPool
427
426
{
428
- if (! self ::$ isInitialized ) {
427
+ if (! self ::$ isInitialized || self :: $ pool === null ) {
429
428
throw new \RuntimeException (
430
429
'AsyncPDO has not been initialized. Please call AsyncPDO::init() at application startup. '
431
430
);
0 commit comments