Skip to content

Commit 133a852

Browse files
committed
feat(sse): Implement SSE handler with reconnection logic and add test for reconnection behavior
1 parent 6dca0e8 commit 133a852

File tree

7 files changed

+241
-213
lines changed

7 files changed

+241
-213
lines changed

controlled_sse.php

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
header("Content-Type: text/event-stream");
3+
header("Cache-Control: no-cache");
4+
header("Access-Control-Allow-Origin: *");
5+
header("Connection: keep-alive");
6+
7+
function sendSSE($data, $event = null, $id = null) {
8+
if ($id !== null) echo "id: $id\n";
9+
if ($event) echo "event: $event\n";
10+
echo "data: $data\n\n";
11+
if (ob_get_level()) ob_flush();
12+
flush();
13+
}
14+
15+
// Send exactly 5 events with delays to simulate real streaming
16+
for ($i = 1; $i <= 5; $i++) {
17+
$timestamp = date("H:i:s");
18+
sendSSE("Controlled test message #{$i} sent at $timestamp", "message", $i);
19+
20+
if ($i < 5) {
21+
sleep(1); // 1 second delay between events
22+
}
23+
}
24+
25+
// Send final event to indicate completion
26+
sendSSE("Stream completed successfully", "done", "finished");
27+
?>

src/Http/Handlers/SSEHandler.php

Lines changed: 55 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
<?php
2+
// File: src/Http/Handlers/SSEHandler.php (FINAL, CORRECTED VERSION)
23

34
namespace Rcalicdan\FiberAsync\Http\Handlers;
45

@@ -42,7 +43,7 @@ public function connect(
4243
?callable $onError = null,
4344
?SSEReconnectConfig $reconnectConfig = null
4445
): CancellablePromiseInterface {
45-
if ($reconnectConfig !== null) {
46+
if ($reconnectConfig !== null && $reconnectConfig->enabled) {
4647
return $this->connectWithReconnection($url, $options, $onEvent, $onError, $reconnectConfig);
4748
}
4849

@@ -61,16 +62,15 @@ private function connectWithReconnection(
6162
): CancellablePromiseInterface {
6263
/** @var CancellablePromise<SSEResponse> $mainPromise */
6364
$mainPromise = new CancellablePromise;
64-
6565
$connectionState = new SSEConnectionState($url, $options, $reconnectConfig);
6666

67-
// Wrap callbacks to handle reconnection
6867
$wrappedOnEvent = $this->wrapEventCallback($onEvent, $connectionState);
69-
$wrappedOnError = $this->wrapErrorCallback($onError, $connectionState, $mainPromise);
68+
$wrappedOnError = $this->wrapErrorCallback($onError, $connectionState);
7069

71-
// Start initial connection
70+
// Start the first connection attempt
7271
$this->attemptConnection($connectionState, $wrappedOnEvent, $wrappedOnError, $mainPromise);
7372

73+
// The main promise's cancellation now controls the entire state machine.
7474
$mainPromise->setCancelHandler(function () use ($connectionState) {
7575
$connectionState->cancel();
7676
});
@@ -87,57 +87,62 @@ private function attemptConnection(
8787
?callable $onError,
8888
CancellablePromise $mainPromise
8989
): void {
90+
// Guard against starting a new attempt if the session has been cancelled.
9091
if ($connectionState->isCancelled()) {
92+
if (!$mainPromise->isSettled()) {
93+
$mainPromise->reject(new Exception('SSE connection cancelled before attempt.'));
94+
}
9195
return;
9296
}
9397

9498
$connectionState->incrementAttempt();
9599

96-
// Add Last-Event-ID header if we have one
97100
$options = $connectionState->getOptions();
98101
if ($connectionState->getLastEventId() !== null) {
99102
$headers = $options[CURLOPT_HTTPHEADER] ?? [];
103+
// Remove previous Last-Event-ID header if it exists to avoid duplicates
104+
$headers = array_filter($headers, fn($h) => !str_starts_with(strtolower($h), 'last-event-id:'));
100105
$headers[] = 'Last-Event-ID: ' . $connectionState->getLastEventId();
101106
$options[CURLOPT_HTTPHEADER] = $headers;
102107
}
103108

104-
$connectionPromise = $this->createSSEConnection(
105-
$connectionState->getUrl(),
106-
$options,
107-
$onEvent,
108-
$onError
109-
);
110-
109+
$connectionPromise = $this->createSSEConnection($connectionState->getUrl(), $options, $onEvent, $onError);
111110
$connectionState->setCurrentConnection($connectionPromise);
112111

113112
$connectionPromise->then(
114113
function (SSEResponse $response) use ($mainPromise, $connectionState) {
115-
if (!$mainPromise->isResolved()) {
114+
if ($connectionState->isCancelled()) return;
115+
116+
if (!$mainPromise->isSettled()) {
116117
$mainPromise->resolve($response);
117118
}
118119
$connectionState->onConnected();
119120
},
120121
function (Exception $error) use ($mainPromise, $connectionState, $onEvent, $onError) {
122+
// When a connection fails, check the master cancellation flag first.
121123
if ($connectionState->isCancelled()) {
124+
if (!$mainPromise->isSettled()) {
125+
$mainPromise->reject(new Exception('SSE connection cancelled during failure handling.'));
126+
}
122127
return;
123128
}
124129

125-
$shouldReconnect = $connectionState->shouldReconnect($error);
126-
127-
if (!$shouldReconnect) {
128-
if (!$mainPromise->isResolved()) {
130+
if (!$connectionState->shouldReconnect($error)) {
131+
if (!$mainPromise->isSettled()) {
129132
$mainPromise->reject($error);
130133
}
131134
return;
132135
}
133136

134-
// Schedule reconnection
135137
$delay = $connectionState->getReconnectDelay();
136138
$connectionState->getConfig()->onReconnect?->call($this, $connectionState->getAttemptCount(), $delay, $error);
137139

138-
EventLoop::getInstance()->addTimer($delay, function () use ($connectionState, $onEvent, $onError, $mainPromise) {
140+
// When we schedule the timer, we get its ID and store it in the state object.
141+
$timerId = EventLoop::getInstance()->addTimer($delay, function () use ($connectionState, $onEvent, $onError, $mainPromise) {
142+
$connectionState->setReconnectTimerId(null); // Timer is firing, so clear the ID.
139143
$this->attemptConnection($connectionState, $onEvent, $onError, $mainPromise);
140144
});
145+
$connectionState->setReconnectTimerId($timerId);
141146
}
142147
);
143148
}
@@ -153,53 +158,41 @@ private function createSSEConnection(
153158
): CancellablePromiseInterface {
154159
/** @var CancellablePromise<SSEResponse> $promise */
155160
$promise = new CancellablePromise;
156-
157-
$responseStream = fopen('php://temp', 'w+b');
158-
if ($responseStream === false) {
159-
$promise->reject(new HttpStreamException('Failed to create SSE response stream'));
160-
return $promise;
161-
}
162-
163-
/** @var list<string> $headerAccumulator */
164-
$headerAccumulator = [];
165161
$sseResponse = null;
162+
$headersProcessed = false;
166163

167-
// Filter to only include valid CURLOPT_* integer keys
168164
$curlOnlyOptions = array_filter($options, 'is_int', ARRAY_FILTER_USE_KEY);
169-
170-
// Set up SSE-specific headers and options
171165
$sseOptions = array_replace($curlOnlyOptions, [
172166
CURLOPT_HEADER => false,
173167
CURLOPT_HTTPHEADER => array_merge(
174-
$this->extractHttpHeaders($curlOnlyOptions),
175-
[
176-
'Accept: text/event-stream',
177-
'Cache-Control: no-cache',
178-
'Connection: keep-alive',
179-
]
168+
$curlOnlyOptions[CURLOPT_HTTPHEADER] ?? [],
169+
['Accept: text/event-stream', 'Cache-Control: no-cache', 'Connection: keep-alive']
180170
),
181-
CURLOPT_WRITEFUNCTION => function ($ch, string $data) use ($responseStream, &$sseResponse, $onEvent): int {
182-
fwrite($responseStream, $data);
183-
184-
// If we have an SSE response and event callback, parse events in real-time
171+
CURLOPT_WRITEFUNCTION => function ($ch, string $data) use ($onEvent, &$sseResponse) {
185172
if ($sseResponse !== null && $onEvent !== null) {
186173
try {
187174
$events = $sseResponse->parseEvents($data);
188175
foreach ($events as $event) {
189176
$onEvent($event);
190177
}
191178
} catch (Exception $e) {
192-
// Continue processing even if event parsing fails
193179
error_log("SSE event parsing error: " . $e->getMessage());
194180
}
195181
}
196-
197182
return strlen($data);
198183
},
199-
CURLOPT_HEADERFUNCTION => function ($ch, string $header) use (&$headerAccumulator): int {
200-
$trimmedHeader = trim($header);
201-
if ($trimmedHeader !== '') {
202-
$headerAccumulator[] = $trimmedHeader;
184+
CURLOPT_HEADERFUNCTION => function ($ch, string $header) use ($promise, &$sseResponse, &$headersProcessed) {
185+
if ($promise->isSettled()) return strlen($header);
186+
187+
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
188+
if (!$headersProcessed && $httpCode > 0) {
189+
if ($httpCode >= 200 && $httpCode < 300) {
190+
$sseResponse = new SSEResponse(new Stream(fopen('php://temp', 'r+')), $httpCode, []);
191+
$promise->resolve($sseResponse);
192+
} else {
193+
$promise->reject(new HttpStreamException("SSE connection failed with status: {$httpCode}"));
194+
}
195+
$headersProcessed = true;
203196
}
204197
return strlen($header);
205198
},
@@ -208,43 +201,18 @@ private function createSSEConnection(
208201
$requestId = EventLoop::getInstance()->addHttpRequest(
209202
$url,
210203
$sseOptions,
211-
function (?string $error, $response, ?int $httpCode, array $headers = [], ?string $httpVersion = null) use ($promise, $responseStream, &$headerAccumulator, &$sseResponse, $onError): void {
212-
if ($promise->isCancelled()) {
213-
fclose($responseStream);
214-
return;
215-
}
216-
217-
if ($error !== null) {
218-
fclose($responseStream);
219-
if ($onError !== null) {
204+
function (?string $error) use ($promise, $onError) {
205+
if ($promise->isSettled()) {
206+
if ($onError !== null && $error !== null) {
220207
$onError($error);
221208
}
222-
$promise->reject(new HttpStreamException("SSE connection failed: {$error}"));
223-
} else {
224-
rewind($responseStream);
225-
$stream = new Stream($responseStream);
226-
227-
$formattedHeaders = $this->formatHeaders($headerAccumulator);
228-
$sseResponse = new SSEResponse($stream, $httpCode ?? 200, $formattedHeaders);
229-
230-
if ($httpVersion !== null) {
231-
$sseResponse->setHttpVersion($httpVersion);
232-
}
233-
234-
$promise->resolve($sseResponse);
209+
return;
235210
}
211+
$promise->reject(new HttpStreamException("SSE connection failed: {$error}"));
236212
}
237213
);
238-
239-
// Initialize SSE response early for real-time event parsing
240-
$sseResponse = new SSEResponse(new Stream($responseStream), 200, []);
241-
242-
$promise->setCancelHandler(function () use ($requestId, $responseStream): void {
243-
EventLoop::getInstance()->cancelHttpRequest($requestId);
244-
if (is_resource($responseStream)) {
245-
fclose($responseStream);
246-
}
247-
});
214+
215+
$promise->setCancelHandler(fn() => EventLoop::getInstance()->cancelHttpRequest($requestId));
248216

249217
return $promise;
250218
}
@@ -254,75 +222,22 @@ function (?string $error, $response, ?int $httpCode, array $headers = [], ?strin
254222
*/
255223
private function wrapEventCallback(?callable $onEvent, SSEConnectionState $state): ?callable
256224
{
257-
if ($onEvent === null) {
258-
return null;
259-
}
260-
225+
if ($onEvent === null) return null;
261226
return function (SSEEvent $event) use ($onEvent, $state) {
262-
// Track last event ID for reconnection
263-
if ($event->id !== null) {
264-
$state->setLastEventId($event->id);
265-
}
266-
267-
// Handle retry directive
268-
if ($event->retry !== null) {
269-
$state->setRetryInterval($event->retry);
270-
}
271-
272-
// Call the original callback
227+
if ($event->id !== null) $state->setLastEventId($event->id);
228+
if ($event->retry !== null) $state->setRetryInterval($event->retry);
273229
$onEvent($event);
274230
};
275231
}
276232

277233
/**
278234
* Wraps the error callback to handle reconnection logic.
279235
*/
280-
private function wrapErrorCallback(
281-
?callable $onError,
282-
SSEConnectionState $state,
283-
CancellablePromise $mainPromise
284-
): ?callable {
285-
return function (string $error) use ($onError, $state, $mainPromise) {
286-
// Call the original error callback
287-
if ($onError !== null) {
288-
$onError($error);
289-
}
290-
291-
// Mark connection as failed for reconnection logic
236+
private function wrapErrorCallback(?callable $onError, SSEConnectionState $state): ?callable
237+
{
238+
return function (string $error) use ($onError, $state) {
239+
if ($onError !== null) $onError($error);
292240
$state->onConnectionFailed(new Exception($error));
293241
};
294242
}
295-
296-
/**
297-
* Extracts HTTP headers from cURL options.
298-
*/
299-
private function extractHttpHeaders(array $curlOptions): array
300-
{
301-
return $curlOptions[CURLOPT_HTTPHEADER] ?? [];
302-
}
303-
304-
/**
305-
* Formats raw headers into structured array.
306-
*/
307-
private function formatHeaders(array $headerAccumulator): array
308-
{
309-
$formattedHeaders = [];
310-
foreach ($headerAccumulator as $header) {
311-
if (str_contains($header, ':')) {
312-
[$key, $value] = explode(':', $header, 2);
313-
$key = trim($key);
314-
$value = trim($value);
315-
if (isset($formattedHeaders[$key])) {
316-
if (is_array($formattedHeaders[$key])) {
317-
$formattedHeaders[$key][] = $value;
318-
} else {
319-
$formattedHeaders[$key] = [$formattedHeaders[$key], $value];
320-
}
321-
} else {
322-
$formattedHeaders[$key] = $value;
323-
}
324-
}
325-
}
326-
return $formattedHeaders;
327-
}
328243
}

0 commit comments

Comments
 (0)