Skip to content

Commit b529c32

Browse files
committed
fix(sse): Update SSE handling to use fetch-style options and improve error logging
1 parent 84bbb3d commit b529c32

File tree

2 files changed

+40
-24
lines changed

2 files changed

+40
-24
lines changed

src/Http/Request.php

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -471,11 +471,15 @@ public function sse(
471471
?callable $onError = null,
472472
?SSEReconnectConfig $reconnectConfig = null
473473
): CancellablePromiseInterface {
474-
$curlOptions = $this->buildCurlOptions('GET', $url);
474+
// Use fetch-style options instead of raw cURL options
475+
$options = $this->buildFetchOptions('GET');
476+
477+
// Remove or modify options that interfere with SSE
478+
unset($options['timeout']); // Don't set a total timeout for SSE
475479

476480
$effectiveReconnectConfig = $reconnectConfig ?? $this->sseReconnectConfig;
477481

478-
return $this->handler->sse($url, $curlOptions, $onEvent, $onError, $effectiveReconnectConfig);
482+
return $this->handler->sse($url, $options, $onEvent, $onError, $effectiveReconnectConfig);
479483
}
480484

481485
/**

test.php

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,37 +2,49 @@
22

33
use Rcalicdan\FiberAsync\Http\Handlers\HttpHandler;
44
use Rcalicdan\FiberAsync\Http\SSE\SSEEvent;
5-
use Rcalicdan\FiberAsync\Http\SSE\SSEReconnectConfig;
65

76
require "vendor/autoload.php";
87

98
run(function () {
109
$count = 0;
11-
$url = "https://stream.wikimedia.org/v2/stream/recentchange";
10+
$lastEventTime = time();
1211

13-
echo "Connecting to: $url\n";
12+
$promise = http()
13+
->sseReconnect(
14+
enabled: true,
15+
maxAttempts: 10,
16+
initialDelay: 1.0,
17+
maxDelay: 30.0,
18+
backoffMultiplier: 2.0,
19+
jitter: true,
20+
onReconnect: function (int $attempt, float $delay) {
21+
echo "[RECONNECT] Attempt #$attempt after {$delay}s delay\n";
22+
}
23+
)
24+
->sse(
25+
url: "https://stream.wikimedia.org/v2/stream/recentchange",
26+
onEvent: function (SSEEvent $event) use (&$count, &$lastEventTime) {
27+
$count++;
28+
$lastEventTime = time();
29+
$data = json_decode($event->data, true);
30+
echo "[EVENT] #$count at " . date('H:i:s') . " - Title: {$data['title']}\n";
31+
},
32+
onError: function (string $error) {
33+
echo "[ERROR] " . date('H:i:s') . " - Connection error: $error\n";
34+
}
35+
);
1436

15-
$http = new HttpHandler();
16-
17-
$promise = $http->sse(
18-
url: $url,
19-
options: [ // Changed from curlOptions to options
20-
CURLOPT_USERAGENT => 'Simple-Test/1.0',
21-
],
22-
onEvent: function (SSEEvent $event) use (&$count) {
23-
$count++;
24-
$data = json_decode($event->data, true);
25-
echo "Event: $count, Title: {$data['title']}\n";
26-
},
27-
onError: function (string $error) {
28-
echo "Error: $error\n";
29-
},
30-
reconnectConfig: new SSEReconnectConfig(true, 5)
31-
);
37+
// Also add a timer to detect when events stop coming
38+
$checkTimer = setInterval(function () use (&$lastEventTime, &$count) {
39+
$timeSinceLastEvent = time() - $lastEventTime;
40+
if ($timeSinceLastEvent > 10) { // If no events for 10+ seconds
41+
echo "[STATUS] No events received for {$timeSinceLastEvent}s (last count: $count)\n";
42+
}
43+
}, 5000); // Check every 5 seconds
3244

3345
try {
3446
await($promise);
35-
} catch (Exception $e) {
36-
echo "Exception: " . $e->getMessage() . "\n";
47+
} finally {
48+
clearInterval($checkTimer);
3749
}
3850
});

0 commit comments

Comments
 (0)