Skip to content

Commit e6910cc

Browse files
committed
feat(database): Add configuration for multiple database connections with environment variable support
refactor(AsyncMySQLi): Improve parameter handling and error logging in async queries fix(ConfigLoader): Update comments for clarity on configuration file paths refactor(test): Replace SSE example with DB benchmark tests for async operations
1 parent b529c32 commit e6910cc

File tree

5 files changed

+107
-76
lines changed

5 files changed

+107
-76
lines changed

config/database.php renamed to config/fiber-async/database.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,5 @@
8787
],
8888
],
8989

90-
'immutable_query_builder' => $_ENV['DB_IMMUTABLE_QUERY_BUILDER'] ?? true,
91-
9290
'pool_size' => (int) ($_ENV['DB_POOL_SIZE'] ?? 20),
9391
];

src/Api/AsyncMySQLi.php

Lines changed: 75 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ final class AsyncMySQLi
1515
{
1616
private static ?AsyncMySQLiPool $pool = null;
1717
private static bool $isInitialized = false;
18-
private const POOL_INTERVAL = 10; // microseconds
19-
private const POOL_MAX_INTERVAL = 100; // microseconds
18+
private const POOL_INTERVAL = 10;
19+
private const POOL_MAX_INTERVAL = 100;
2020

2121
public static function init(array $dbConfig, int $poolSize = 10): void
2222
{
@@ -54,22 +54,22 @@ public static function run(callable $callback): PromiseInterface
5454
})();
5555
}
5656

57-
public static function query(string $sql, array $params = [], string $types = ''): PromiseInterface
57+
public static function query(string $sql, array $params = [], ?string $types = null): PromiseInterface
5858
{
5959
return self::executeAsyncQuery($sql, $params, $types, 'fetchAll');
6060
}
6161

62-
public static function fetchOne(string $sql, array $params = [], string $types = ''): PromiseInterface
62+
public static function fetchOne(string $sql, array $params = [], ?string $types = null): PromiseInterface
6363
{
6464
return self::executeAsyncQuery($sql, $params, $types, 'fetchOne');
6565
}
6666

67-
public static function execute(string $sql, array $params = [], string $types = ''): PromiseInterface
67+
public static function execute(string $sql, array $params = [], ?string $types = null): PromiseInterface
6868
{
6969
return self::executeAsyncQuery($sql, $params, $types, 'execute');
7070
}
7171

72-
public static function fetchValue(string $sql, array $params = [], string $types = ''): PromiseInterface
72+
public static function fetchValue(string $sql, array $params = [], ?string $types = null): PromiseInterface
7373
{
7474
return self::executeAsyncQuery($sql, $params, $types, 'fetchValue');
7575
}
@@ -158,7 +158,7 @@ private static function startCancellableRacingTransaction(callable $transactionC
158158
$mysqli->autocommit(true);
159159
self::getPool()->release($mysqli);
160160
} catch (Throwable $e) {
161-
error_log("Failed to cancel transaction {$index}: ".$e->getMessage());
161+
error_log("Failed to cancel transaction {$index}: " . $e->getMessage());
162162
self::getPool()->release($mysqli);
163163
}
164164
}
@@ -197,7 +197,7 @@ private static function finalizeRacingTransactions(array $mysqliConnections, int
197197
echo "Transaction $winnerIndex: Winner committed!\n";
198198
self::getPool()->release($mysqli);
199199
} catch (Throwable $e) {
200-
error_log("Failed to commit winner transaction {$winnerIndex}: ".$e->getMessage());
200+
error_log("Failed to commit winner transaction {$winnerIndex}: " . $e->getMessage());
201201
$mysqli->rollback();
202202
$mysqli->autocommit(true);
203203
self::getPool()->release($mysqli);
@@ -220,7 +220,7 @@ private static function rollbackAllTransactions(array $mysqliConnections): Promi
220220
$mysqli->autocommit(true);
221221
self::getPool()->release($mysqli);
222222
} catch (Throwable $e) {
223-
error_log("Failed to rollback transaction {$index}: ".$e->getMessage());
223+
error_log("Failed to rollback transaction {$index}: " . $e->getMessage());
224224
self::getPool()->release($mysqli);
225225
}
226226
})();
@@ -231,40 +231,93 @@ private static function rollbackAllTransactions(array $mysqliConnections): Promi
231231
})();
232232
}
233233

234-
private static function executeAsyncQuery(string $sql, array $params, string $types, string $resultType): PromiseInterface
234+
private static function detectParameterTypes(array $params): string
235+
{
236+
$types = '';
237+
238+
foreach ($params as $param) {
239+
$types .= match (true) {
240+
$param === null => 's',
241+
is_bool($param) => 'i',
242+
is_int($param) => 'i',
243+
is_float($param) => 'd',
244+
is_resource($param) => 'b',
245+
is_string($param) && str_contains($param, "\0") => 'b',
246+
is_string($param) => 's',
247+
is_array($param) => 's',
248+
is_object($param) => 's',
249+
default => 's',
250+
};
251+
}
252+
253+
return $types;
254+
}
255+
256+
private static function preprocessParameters(array $params): array
257+
{
258+
$processedParams = [];
259+
260+
foreach ($params as $param) {
261+
$processedParams[] = match (true) {
262+
$param === null => null,
263+
is_bool($param) => $param ? 1 : 0,
264+
is_int($param) || is_float($param) => $param,
265+
is_resource($param) => $param, // Keep resource as-is for blob
266+
is_string($param) => $param,
267+
is_array($param) => json_encode($param),
268+
is_object($param) && method_exists($param, '__toString') => (string) $param,
269+
is_object($param) => json_encode($param),
270+
default => (string) $param,
271+
};
272+
}
273+
274+
return $processedParams;
275+
}
276+
277+
private static function executeAsyncQuery(string $sql, array $params, ?string $types, string $resultType): PromiseInterface
235278
{
236279
return Async::async(function () use ($sql, $params, $types, $resultType) {
237280
$mysqli = await(self::getPool()->get());
238281

239282
try {
240-
if (! empty($params)) {
283+
if (count($params) > 0) {
241284
$stmt = $mysqli->prepare($sql);
242-
if (! $stmt) {
243-
throw new \RuntimeException('Prepare failed: '.$mysqli->error);
285+
if (!$stmt) {
286+
throw new \RuntimeException('Prepare failed: ' . $mysqli->error);
244287
}
245288

246-
if (empty($types)) {
289+
if ($types === null) {
290+
$types = self::detectParameterTypes($params);
291+
}
292+
293+
if ($types === '') {
247294
$types = str_repeat('s', count($params));
248295
}
249296

250-
if (! $stmt->bind_param($types, ...$params)) {
251-
throw new \RuntimeException('Bind param failed: '.$stmt->error);
297+
$processedParams = self::preprocessParameters($params);
298+
299+
if (!$stmt->bind_param($types, ...$processedParams)) {
300+
throw new \RuntimeException('Bind param failed: ' . $stmt->error);
252301
}
253302

254-
if (! $stmt->execute()) {
255-
throw new \RuntimeException('Execute failed: '.$stmt->error);
303+
if (!$stmt->execute()) {
304+
throw new \RuntimeException('Execute failed: ' . $stmt->error);
256305
}
257306

258-
if (stripos(trim($sql), 'SELECT') === 0 || stripos(trim($sql), 'SHOW') === 0 || stripos(trim($sql), 'DESCRIBE') === 0) {
307+
if (
308+
stripos(trim($sql), 'SELECT') === 0 ||
309+
stripos(trim($sql), 'SHOW') === 0 ||
310+
stripos(trim($sql), 'DESCRIBE') === 0
311+
) {
259312
$result = $stmt->get_result();
260313
} else {
261314
$result = true;
262315
}
263316

264317
return self::processResult($result, $resultType, $stmt, $mysqli);
265318
} else {
266-
if (! $mysqli->query($sql, MYSQLI_ASYNC)) {
267-
throw new \RuntimeException('Query failed: '.$mysqli->error);
319+
if (!$mysqli->query($sql, MYSQLI_ASYNC)) {
320+
throw new \RuntimeException('Query failed: ' . $mysqli->error);
268321
}
269322

270323
$result = await(self::waitForAsyncCompletion($mysqli));
@@ -320,7 +373,7 @@ private static function processResult($result, string $resultType, ?mysqli_stmt
320373
if ($result === false) {
321374
$error = $stmt?->error ?? $mysqli?->error ?? 'Unknown error';
322375

323-
throw new \RuntimeException('Query execution failed: '.$error);
376+
throw new \RuntimeException('Query execution failed: ' . $error);
324377
}
325378

326379
return match ($resultType) {

src/Api/DB.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
use Rcalicdan\FiberAsync\QueryBuilder\AsyncQueryBuilder;
88

99
/**
10-
* DB API - Main entry point for auto-configured async database operations
10+
* DB API - Main entry point for auto-configured async database operations using AsyncPDO under the hood
1111
* with asynchonous query builder support.
1212
*
1313
* This API automatically loads configuration from .env and config/database.php

src/Config/ConfigLoader.php

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
* A singleton configuration loader that automatically finds the project root.
1010
*
1111
* It searches upwards from its directory to locate the project root (identified
12-
* by a 'vendor' folder), loads the .env and config files,
12+
* by a 'vendor' folder), loads the .env and config files from config/fiber-async,
1313
* and then caches the results to ensure the expensive search happens only once.
1414
*/
1515
final class ConfigLoader
@@ -56,7 +56,7 @@ public static function reset(): void
5656

5757
/**
5858
* Retrieves a configuration array by its key (the filename).
59-
* e.g., get('database') loads and returns config/database.php
59+
* e.g., get('database') loads and returns config/fiber-async/database.php
6060
*
6161
* @param mixed $default
6262
* @return mixed
@@ -75,7 +75,7 @@ private function findProjectRoot(): ?string
7575
{
7676
$dir = __DIR__;
7777
for ($i = 0; $i < 10; $i++) {
78-
if (is_dir($dir.'/vendor')) {
78+
if (is_dir($dir . '/vendor')) {
7979
return $dir;
8080
}
8181

@@ -95,7 +95,7 @@ private function loadDotEnv(): void
9595
throw new Exception('Root path not found, cannot load .env file');
9696
}
9797

98-
$envFile = $this->rootPath.'/.env';
98+
$envFile = $this->rootPath . '/.env';
9999

100100
if (file_exists($envFile)) {
101101
file_get_contents($envFile);
@@ -112,17 +112,17 @@ private function loadDotEnv(): void
112112
}
113113

114114
/**
115-
* Loads all .php files from the project root's /config directory.
115+
* Loads all .php files from the project root's /config/fiber-async directory.
116116
*/
117117
private function loadConfigFiles(): void
118118
{
119119
if ($this->rootPath === null) {
120120
throw new Exception('Root path not found, cannot load config files');
121121
}
122122

123-
$configDir = $this->rootPath.'/config';
123+
$configDir = $this->rootPath . '/config/fiber-async';
124124
if (is_dir($configDir)) {
125-
$files = glob($configDir.'/*.php');
125+
$files = glob($configDir . '/*.php');
126126
if ($files !== false) {
127127
foreach ($files as $file) {
128128
$key = basename($file, '.php');

test.php

Lines changed: 24 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,30 @@
11
<?php
22

3-
use Rcalicdan\FiberAsync\Http\Handlers\HttpHandler;
4-
use Rcalicdan\FiberAsync\Http\SSE\SSEEvent;
3+
use Rcalicdan\FiberAsync\Api\DB;
4+
use Rcalicdan\FiberAsync\Benchmark\BenchmarkConfig;
5+
use Rcalicdan\FiberAsync\Benchmark\BenchmarkRunner;
56

67
require "vendor/autoload.php";
78

8-
run(function () {
9-
$count = 0;
10-
$lastEventTime = time();
11-
12-
$promise = http()
13-
->sseReconnect(
14-
enabled: true,
15-
maxAttempts: 10,
16-
initialDelay: 1.0,
17-
maxDelay: 30.0,
18-
backoffMultiplier: 2.0,
19-
jitter: true,
20-
onReconnect: function (int $attempt, float $delay) {
21-
echo "[RECONNECT] Attempt #$attempt after {$delay}s delay\n";
22-
}
23-
)
24-
->sse(
25-
url: "https://stream.wikimedia.org/v2/stream/recentchange",
26-
onEvent: function (SSEEvent $event) use (&$count, &$lastEventTime) {
27-
$count++;
28-
$lastEventTime = time();
29-
$data = json_decode($event->data, true);
30-
echo "[EVENT] #$count at " . date('H:i:s') . " - Title: {$data['title']}\n";
31-
},
32-
onError: function (string $error) {
33-
echo "[ERROR] " . date('H:i:s') . " - Connection error: $error\n";
34-
}
35-
);
9+
$config = BenchmarkConfig::create()->warmup(0)->runs(5);
3610

37-
// Also add a timer to detect when events stop coming
38-
$checkTimer = setInterval(function () use (&$lastEventTime, &$count) {
39-
$timeSinceLastEvent = time() - $lastEventTime;
40-
if ($timeSinceLastEvent > 10) { // If no events for 10+ seconds
41-
echo "[STATUS] No events received for {$timeSinceLastEvent}s (last count: $count)\n";
42-
}
43-
}, 5000); // Check every 5 seconds
44-
45-
try {
46-
await($promise);
47-
} finally {
48-
clearInterval($checkTimer);
49-
}
50-
});
11+
BenchmarkRunner::create("Measure true DB asynchonosity", $config)
12+
->callback(function () {
13+
$startTime = microtime(true);
14+
run_all([
15+
DB::table('users')->get()
16+
->then(fn() => print "Query 1:" . microtime(true) - $startTime . PHP_EOL),
17+
DB::table('users')->get()
18+
->then(fn() => print "Query 2:" . microtime(true) - $startTime . PHP_EOL),
19+
DB::table('users')->get()
20+
->then(fn() => print "Query 3:" . microtime(true) - $startTime . PHP_EOL),
21+
DB::table('users')->get()
22+
->then(fn() => print "Query 4:" . microtime(true) - $startTime . PHP_EOL),
23+
DB::table('users')->get()
24+
->then(fn() => print "Query 5:" . microtime(true) - $startTime . PHP_EOL),
25+
]);
26+
$endTime = microtime(true);
27+
$executionTime = $endTime - $startTime;
28+
echo "Execution time: " . $executionTime . " seconds\n\n";
29+
})
30+
->run();

0 commit comments

Comments
 (0)