@@ -32,23 +32,22 @@ class StreamAdapter extends AbstractAdapter
3232{
3333 use HeaderParserTrait;
3434
35- /**
36- * @inheritDoc
37- */
35+ public function __construct (
36+ protected ?int $ timeout = 2 ,
37+ protected ?float $ connectTimeout = null ,
38+ ) {
39+ }
40+
3841 public function getName (): string
3942 {
4043 return 'stream ' ;
4144 }
4245
43- /**
44- * @inheritDoc
45- */
4646 public function sendRequest (RequestInterface $ request , ?HttpContext $ context = null ): ResponseInterface
4747 {
4848 $ dateTime = new DateTimeImmutable ();
4949 $ initTime = microtime (true );
5050
51- // Create socket
5251 $ fp = $ this ->createSocketClient ($ request , $ context );
5352
5453 try {
@@ -59,8 +58,7 @@ public function sendRequest(RequestInterface $request, ?HttpContext $context = n
5958
6059 $ requestTime = microtime (true ) - $ connectTime ;
6160
62- // Read response
63- $ response = $ this ->readResponse ($ fp , $ request ->getMethod (), $ headersTime );
61+ $ response = $ this ->readResponse ($ fp , $ request ->getMethod (), $ headersTime , $ request );
6462
6563 $ waitTime = $ headersTime - $ requestTime ;
6664 $ totalTime = $ initTime - microtime (true );
@@ -76,7 +74,7 @@ public function sendRequest(RequestInterface $request, ?HttpContext $context = n
7674 return $ response ;
7775 } finally {
7876 // Close socket
79- fclose ($ fp );
77+ is_resource ( $ fp ) && fclose ($ fp );
8078 }
8179 }
8280
@@ -153,6 +151,7 @@ protected function createSocketClient(RequestInterface $request, ?HttpContext $c
153151 address: $ address = sprintf ('%s://%s:%d ' , $ wrapper , $ request ->getUri ()->getHost (), $ port ),
154152 error_code: $ errno ,
155153 error_message: $ errstr ,
154+ timeout: $ this ->connectTimeout ,
156155 context: $ this ->createContext ($ context ),
157156 );
158157
@@ -163,6 +162,11 @@ protected function createSocketClient(RequestInterface $request, ?HttpContext $c
163162 );
164163 }
165164
165+ stream_set_blocking ($ fp , true );
166+ if ($ this ->timeout !== null ) {
167+ stream_set_timeout ($ fp , $ this ->timeout );
168+ }
169+
166170 return $ fp ;
167171 }
168172
@@ -186,16 +190,17 @@ protected function writeRequest($fp, RequestInterface $request): void
186190 (!empty ($ request ->getUri ()->getQuery ()) ? '? ' . $ request ->getUri ()->getQuery () : '' ),
187191 $ request ->getProtocolVersion ()
188192 )
189- ) ?: throw new NetworkException ('Unable to write request headers ' , $ request );
193+ ) ?: $ this -> throwIfTimedOut ( $ fp , new NetworkException ('Unable to write request headers ' , $ request) );
190194
191195 // Headers
192196 foreach ($ this ->getHeadersLines ($ request ) as $ headerLine ) {
193- fwrite ($ fp , $ headerLine . "\r\n" ) ?: throw new NetworkException ( ' Unable to write request headers ' ,
194- $ request );
197+ fwrite ($ fp , $ headerLine . "\r\n" ) ?:
198+ $ this -> throwIfTimedOut ( $ fp , new NetworkException ( ' Unable to write request headers ' , $ request ) );
195199 }
196200
197201 // Separator for body
198- fwrite ($ fp , "\r\n" ) ?? throw new NetworkException ('Unable to write request separator ' , $ request );
202+ fwrite ($ fp , "\r\n" ) ?:
203+ $ this ->throwIfTimedOut ($ fp , new NetworkException ('Unable to write request separator ' , $ request ));
199204
200205 // Write body per packets 8K by 8K
201206 $ stream = $ request ->getBody ();
@@ -218,15 +223,21 @@ protected function writeRequest($fp, RequestInterface $request): void
218223 * @param $fp
219224 * @param string $method
220225 * @param float|null $headersTime
226+ * @param RequestInterface $request
221227 *
222228 * @return ResponseInterface
229+ * @throws NetworkException
223230 */
224- private function readResponse ($ fp , string $ method , ?float &$ headersTime ): ResponseInterface
225- {
231+ private function readResponse (
232+ $ fp ,
233+ string $ method ,
234+ ?float &$ headersTime ,
235+ RequestInterface $ request
236+ ): ResponseInterface {
226237 // Headers
227238 $ protocolVersion = $ statusCode = $ reasonPhrase = null ;
228239 $ headers = $ this ->parseHeaders (
229- $ this ->readHeaders ($ fp ),
240+ $ this ->readHeaders ($ fp, $ request ),
230241 $ protocolVersion ,
231242 $ statusCode ,
232243 $ reasonPhrase
@@ -260,11 +271,24 @@ private function readResponse($fp, string $method, ?float &$headersTime): Respon
260271 return $ response ;
261272 }
262273
274+ $ read = 0 ;
275+ $ buf = '' ;
276+ while ($ read < $ contentLength ) {
277+ $ chunk = fread ($ fp , $ contentLength - $ read );
278+
279+ if ($ chunk === false || $ chunk === '' ) {
280+ $ this ->throwIfTimedOut ($ fp , new NetworkException ('Read failed (fixed length) ' , $ request ));
281+ if (feof ($ fp )) {
282+ break ;
283+ }
284+ continue ;
285+ }
286+ $ buf .= $ chunk ;
287+ $ read += strlen ($ chunk );
288+ }
289+
263290 return $ response ->withBody (
264- $ this ->createStream (
265- fread ($ fp , (int )$ contentLength ),
266- $ encodingHeader
267- )
291+ $ this ->createStream ($ buf , $ encodingHeader )
268292 );
269293 }
270294
@@ -273,13 +297,30 @@ private function readResponse($fp, string $method, ?float &$headersTime): Respon
273297 // Chunked
274298 if (true === in_array ('chunked ' , $ encodingHeader )) {
275299 while (false !== ($ hex = fgets ($ fp ))) {
300+ if ($ hex === '' ) {
301+ $ this ->throwIfTimedOut ($ fp , new NetworkException ('Read timed out (chunk size) ' , $ request ));
302+ }
276303 $ length = (int )hexdec ($ hex );
277-
278304 if (0 === $ length ) {
279305 continue ;
280306 }
281307
282- $ content .= fread ($ fp , $ length );
308+ $ part = '' ;
309+ $ remain = $ length ;
310+ while ($ remain > 0 ) {
311+ $ chunk = fread ($ fp , $ remain );
312+ if ($ chunk === false || $ chunk === '' ) {
313+ $ this ->throwIfTimedOut ($ fp , new NetworkException ('Read timed out (chunk data) ' , $ request ));
314+ if (feof ($ fp )) {
315+ break 2 ;
316+ }
317+ continue ;
318+ }
319+ $ part .= $ chunk ;
320+ $ remain -= strlen ($ chunk );
321+ }
322+
323+ $ content .= $ part ;
283324 }
284325
285326 return $ response ->withBody (
@@ -290,9 +331,14 @@ private function readResponse($fp, string $method, ?float &$headersTime): Respon
290331 );
291332 }
292333
293- // Get all content
294334 while (false === feof ($ fp )) {
295- $ content .= fread ($ fp , 1024 );
335+ $ chunk = fread ($ fp , 1024 );
336+ if ($ chunk === false || $ chunk === '' ) {
337+ $ this ->throwIfTimedOut ($ fp , new NetworkException ('Read timed out (body) ' , $ request ));
338+ continue ;
339+ }
340+
341+ $ content .= $ chunk ;
296342 }
297343
298344 return $ response ->withBody (
@@ -307,14 +353,19 @@ private function readResponse($fp, string $method, ?float &$headersTime): Respon
307353 * Read headers.
308354 *
309355 * @param resource $fp
356+ * @param RequestInterface $request
310357 *
311358 * @return string
359+ * @throws NetworkException
312360 */
313- protected function readHeaders ($ fp ): string
361+ protected function readHeaders ($ fp, RequestInterface $ request ): string
314362 {
315363 $ headers = '' ;
316364
317365 while (false !== ($ buffer = fgets ($ fp ))) {
366+ if ($ buffer === '' ) {
367+ $ this ->throwIfTimedOut ($ fp , new NetworkException ('Read timed out (headers) ' , $ request ));
368+ }
318369 $ headers .= $ buffer ;
319370
320371 // First new line separator of headers and content
@@ -323,6 +374,25 @@ protected function readHeaders($fp): string
323374 }
324375 }
325376
377+ $ this ->throwIfTimedOut ($ fp , new NetworkException ('Failed to read headers ' , $ request ));
378+
326379 return $ headers ;
327380 }
328- }
381+
382+ /**
383+ * Throw if timeout.
384+ *
385+ * @param resource $fp
386+ * @param NetworkException $throwable
387+ *
388+ * @return void
389+ * @throws NetworkException
390+ */
391+ private function throwIfTimedOut ($ fp , NetworkException $ throwable ): void
392+ {
393+ $ meta = stream_get_meta_data ($ fp );
394+ if (!empty ($ meta ['timed_out ' ])) {
395+ throw $ throwable ;
396+ }
397+ }
398+ }
0 commit comments