@@ -107,6 +107,7 @@ public function __construct(Observable $client)
107
107
));
108
108
}
109
109
110
+ // If this event represents the connection_established event set the timeout
110
111
if ($ event ->getEvent () === 'pusher:connection_established ' ) {
111
112
$ this ->setActivityTimeout ($ event );
112
113
}
@@ -167,6 +168,7 @@ public static function create(LoopInterface $loop, string $app, Resolver $resolv
167
168
*/
168
169
public function channel (string $ channel ): Observable
169
170
{
171
+ // Only join a channel once
170
172
if (isset ($ this ->channels [$ channel ])) {
171
173
return $ this ->channels [$ channel ];
172
174
}
@@ -176,12 +178,14 @@ public function channel(string $channel): Observable
176
178
return $ event ->getChannel () !== '' && $ event ->getChannel () === $ channel ;
177
179
});
178
180
181
+ // Observable representing channel events
179
182
$ events = Observable::create (function (
180
183
ObserverInterface $ observer
181
184
) use (
182
185
$ channel ,
183
186
$ channelMessages
184
187
) {
188
+ // Subscribe to channel messages but filter out internal events
185
189
$ subscription = $ channelMessages
186
190
->filter (function (Event $ event ) {
187
191
return $ event ->getEvent () !== 'pusher_internal:subscription_succeeded ' ;
@@ -231,6 +235,7 @@ public function send(array $message): bool
231
235
*/
232
236
private function handleLowLevelError (Throwable $ throwable )
233
237
{
238
+ // Only allow certain, relevant, exceptions
234
239
if (!($ throwable instanceof WebsocketErrorException) &&
235
240
!($ throwable instanceof RuntimeException) &&
236
241
!($ throwable instanceof PusherErrorException)
0 commit comments