From 0d95c83b396e0a3ec43e1815232b3eba80381dfb Mon Sep 17 00:00:00 2001 From: Richard Burkhardt Date: Mon, 18 Aug 2025 12:46:48 +0200 Subject: [PATCH 1/5] added platform-specific `NetworkConnectivity` implementations for IO, Web, and Stub with respective unit tests for io --- lib/src/network/network_connectivity_io.dart | 67 +++++++++++++++++ .../network/network_connectivity_stub.dart | 19 +++++ lib/src/network/network_connectivity_web.dart | 51 +++++++++++++ .../network/network_connectivity_io_test.dart | 74 +++++++++++++++++++ .../network_connectivity_web_test.dart | 45 +++++++++++ 5 files changed, 256 insertions(+) create mode 100644 lib/src/network/network_connectivity_io.dart create mode 100644 lib/src/network/network_connectivity_stub.dart create mode 100644 lib/src/network/network_connectivity_web.dart create mode 100644 test/network/network_connectivity_io_test.dart create mode 100644 test/network/network_connectivity_web_test.dart diff --git a/lib/src/network/network_connectivity_io.dart b/lib/src/network/network_connectivity_io.dart new file mode 100644 index 0000000..27eb295 --- /dev/null +++ b/lib/src/network/network_connectivity_io.dart @@ -0,0 +1,67 @@ +import 'dart:async'; +import 'dart:io'; + +class NetworkConnectivity { + static final NetworkConnectivity instance = NetworkConnectivity._internal(); + NetworkConnectivity._internal(); + + static const Duration _defaultTimeout = Duration(seconds: 2); + + Future isOnline({String? testAddress}) async { + final target = testAddress ?? 'example.com:80'; + String host = target; + int port = 80; + try { + if (target.contains(':')) { + final lastColon = target.lastIndexOf(':'); + host = target.substring(0, lastColon); + port = int.tryParse(target.substring(lastColon + 1)) ?? 80; + } + final socket = await Socket.connect(host, port, timeout: _defaultTimeout); + socket.destroy(); + return true; + } on SocketException { + return false; + } on TimeoutException { + return false; + } catch (_) { + return false; + } + } + + Future waitUntilOnline({ + Duration pollInterval = const Duration(seconds: 2), + Duration? timeout, + String? testAddress, + }) async { + if (await isOnline(testAddress: testAddress)) return; + + final Completer completer = Completer(); + final DateTime? deadline = + timeout != null ? DateTime.now().add(timeout) : null; + + Timer? ticker; + + ticker = Timer.periodic(pollInterval, (t) async { + if (deadline != null && DateTime.now().isAfter(deadline)) { + if (!completer.isCompleted) completer.complete(); + t.cancel(); + return; + } + final online = await isOnline(testAddress: testAddress); + if (online) { + if (!completer.isCompleted) completer.complete(); + t.cancel(); + } + }); + + if (deadline != null) { + Timer(timeout!, () { + if (!completer.isCompleted) completer.complete(); + ticker?.cancel(); + }); + } + + await completer.future; + } +} diff --git a/lib/src/network/network_connectivity_stub.dart b/lib/src/network/network_connectivity_stub.dart new file mode 100644 index 0000000..74b28dd --- /dev/null +++ b/lib/src/network/network_connectivity_stub.dart @@ -0,0 +1,19 @@ +import 'dart:async'; + +/// Fallback connectivity for platforms where neither web nor io is available. +/// Always assumes online to avoid blocking behavior. +class NetworkConnectivity { + static final NetworkConnectivity instance = NetworkConnectivity._internal(); + NetworkConnectivity._internal(); + + Future isOnline({String? testAddress}) async => true; + + Future waitUntilOnline({ + Duration pollInterval = const Duration(seconds: 2), + Duration? timeout, + String? testAddress, + }) async { + // Immediately resolve since we assume online. + return; + } +} diff --git a/lib/src/network/network_connectivity_web.dart b/lib/src/network/network_connectivity_web.dart new file mode 100644 index 0000000..ed2b5a3 --- /dev/null +++ b/lib/src/network/network_connectivity_web.dart @@ -0,0 +1,51 @@ +import 'dart:async'; +import 'package:web/web.dart'; + +class NetworkConnectivity { + static final NetworkConnectivity instance = NetworkConnectivity._internal(); + NetworkConnectivity._internal(); + + Future isOnline({String? testAddress}) async { + // Browser-provided online status + try { + return window.navigator.onLine ?? true; + } catch (_) { + return true; + } + } + + Future waitUntilOnline({ + Duration pollInterval = const Duration(seconds: 2), + Duration? timeout, + String? testAddress, + }) async { + // If already online, return immediately + if (await isOnline()) return; + + final completer = Completer(); + void onlineListener(Event _) { + if (!completer.isCompleted) completer.complete(); + window.removeEventListener('online', onlineListener); + window.removeEventListener('offline', offlineListener); + } + + void offlineListener(Event _) { + // no-op, but keep symmetry and potential future logging + } + + window.addEventListener('online', onlineListener); + window.addEventListener('offline', offlineListener); + + Timer? timeoutTimer; + if (timeout != null) { + timeoutTimer = Timer(timeout, () { + if (!completer.isCompleted) completer.complete(); + window.removeEventListener('online', onlineListener); + window.removeEventListener('offline', offlineListener); + }); + } + + await completer.future; + timeoutTimer?.cancel(); + } +} diff --git a/test/network/network_connectivity_io_test.dart b/test/network/network_connectivity_io_test.dart new file mode 100644 index 0000000..0c7f61d --- /dev/null +++ b/test/network/network_connectivity_io_test.dart @@ -0,0 +1,74 @@ +@TestOn('vm') +import 'dart:async'; +import 'dart:io'; + +import 'package:test/test.dart'; +import 'package:connectanum/src/network/network_connectivity_io.dart'; + +void main() { + group('NetworkConnectivity IO', () { + test('isOnline returns true when local server is available, false otherwise', () async { + final server = await HttpServer.bind('127.0.0.1', 0); + final port = server.port; + + final online = await NetworkConnectivity.instance + .isOnline(testAddress: '127.0.0.1:$port'); + expect(online, isTrue); + + await server.close(force: true); + + final offline = await NetworkConnectivity.instance + .isOnline(testAddress: '127.0.0.1:$port'); + expect(offline, isFalse); + }); + + test('waitUntilOnline completes once server becomes available', () async { + // Find a free port by binding and immediately releasing it. + final temp = await ServerSocket.bind('127.0.0.1', 0); + final port = temp.port; + await temp.close(); + + final sw = Stopwatch()..start(); + + final wait = NetworkConnectivity.instance.waitUntilOnline( + pollInterval: const Duration(milliseconds: 50), + timeout: const Duration(seconds: 2), + testAddress: '127.0.0.1:$port', + ); + + // Bring server online after a short delay + await Future.delayed(const Duration(milliseconds: 200)); + final server = await HttpServer.bind('127.0.0.1', port); + + await wait; + sw.stop(); + + // It should have waited at least ~150ms before becoming online + expect(sw.elapsedMilliseconds, greaterThanOrEqualTo(150)); + // And should have completed well before timeout (2s) + expect(sw.elapsedMilliseconds, lessThan(2000)); + + await server.close(force: true); + }); + + test('waitUntilOnline completes after timeout if server never appears', () async { + // Use an unused port that we do not bind to during the test + final temp = await ServerSocket.bind('127.0.0.1', 0); + final port = temp.port; + await temp.close(); + + final sw = Stopwatch()..start(); + await NetworkConnectivity.instance.waitUntilOnline( + pollInterval: const Duration(milliseconds: 50), + timeout: const Duration(milliseconds: 300), + testAddress: '127.0.0.1:$port', + ); + sw.stop(); + + // Should not resolve immediately + expect(sw.elapsedMilliseconds, greaterThanOrEqualTo(250)); + // And should not exceed a generous 2s upper bound + expect(sw.elapsedMilliseconds, lessThan(2000)); + }); + }); +} diff --git a/test/network/network_connectivity_web_test.dart b/test/network/network_connectivity_web_test.dart new file mode 100644 index 0000000..feed5ed --- /dev/null +++ b/test/network/network_connectivity_web_test.dart @@ -0,0 +1,45 @@ +@TestOn('chrome') +import 'dart:async'; + +import 'package:test/test.dart'; +import 'package:connectanum/src/network/network_connectivity_web.dart'; +import 'package:web/web.dart'; + +void main() { + group('NetworkConnectivity Web', () { + test('isOnline returns a boolean', () async { + final online = await NetworkConnectivity.instance.isOnline(); + expect(online, anyOf(isTrue, isFalse)); + }); + + test('waitUntilOnline completes within timeout', () async { + final sw = Stopwatch()..start(); + await NetworkConnectivity.instance.waitUntilOnline( + pollInterval: const Duration(milliseconds: 50), + timeout: const Duration(seconds: 2), + ); + sw.stop(); + // Should complete within the given timeout window (allowing some jitter) + expect(sw.elapsedMilliseconds, lessThan(2500)); + }); + + test('waitUntilOnline completes on online event dispatch', () async { + // Simulate an online event to ensure the event-path resolves promptly + final wait = NetworkConnectivity.instance.waitUntilOnline( + pollInterval: const Duration(milliseconds: 50), + timeout: const Duration(seconds: 2), + ); + + // Fire an online event shortly after starting to wait + Timer(const Duration(milliseconds: 50), () { + window.dispatchEvent(Event('online')); + }); + + final sw = Stopwatch()..start(); + await wait; + sw.stop(); + + expect(sw.elapsedMilliseconds, lessThan(1000)); + }); + }); +} From 4f0a7f5317ba3e10986d799e64d28ae025aad8fb Mon Sep 17 00:00:00 2001 From: Richard Burkhardt Date: Mon, 18 Aug 2025 12:47:56 +0200 Subject: [PATCH 2/5] enable optional network-aware reconnect with platform-specific connectivity detection; bump version to 2.3.0 --- .gitignore | 1 + CHANGELOG.md | 14 ++++++++++++++ lib/src/client.dart | 33 +++++++++++++++++++++++++++++++++ pubspec.yaml | 2 +- 4 files changed, 49 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index f0ba458..ee1efd4 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ connectanum-dart.iml .packages pubspec.lock coverage +out \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index fd76d8d..9440c7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,17 @@ +## 2.3.0 +- Feature: Optional network-aware reconnect across all platforms. + - Added cross-platform connectivity detection service: + - Web: uses browser online/offline events via `package:web` and `Navigator.onLine`. + - IO: probes TCP connectivity (host:port) with periodic polling and timeout. + - Stub: defaults to online for unsupported platforms. + - New ClientConnectOptions: + - waitForNetwork (default: false) to enable waiting for connectivity before reconnect. + - networkCheckInterval (default: 2s) polling interval (IO). + - networkWaitTimeout (optional) max time to wait for network to return. + - connectivityTestAddress (optional, e.g. "example.com:80") probe target on IO. + - Client will, when enabled, wait until the network is back before applying the configured reconnect delay. +- Bump SDK constraints unchanged; package version bumped to 2.3.0. + ### 2.2.6 - fixed abort reason to match a value of the listed `_abortReasons` in the client for local transport diff --git a/lib/src/client.dart b/lib/src/client.dart index 0772d9b..055fb13 100644 --- a/lib/src/client.dart +++ b/lib/src/client.dart @@ -10,6 +10,9 @@ import 'authentication/abstract_authentication.dart'; import 'transport/abstract_transport.dart'; import 'message/uri_pattern.dart'; import 'protocol/session.dart'; +import 'network/network_connectivity_stub.dart' + if (dart.library.io) 'network/network_connectivity_io.dart' + if (dart.library.js_interop) 'network/network_connectivity_web.dart' as connectivity; enum _ClientState { /// Client is idle and not connected @@ -146,6 +149,24 @@ class Client { _changeState(_ClientState.waiting); + // Optionally wait for network to be online before attempting reconnect + if (options.waitForNetwork) { + try { + final online = await connectivity.NetworkConnectivity.instance + .isOnline(testAddress: options.connectivityTestAddress); + if (!online) { + _logger.info('Network offline detected. Waiting until online...'); + await connectivity.NetworkConnectivity.instance.waitUntilOnline( + pollInterval: options.networkCheckInterval, + timeout: options.networkWaitTimeout, + testAddress: options.connectivityTestAddress, + ); + } + } catch (e) { + _logger.fine('Connectivity check failed: $e'); + } + } + if (duration != null) { _logger.info('Waiting for (overridden) $duration before reconnecting'); await Future.delayed(duration); @@ -253,10 +274,22 @@ class ClientConnectOptions { Duration? reconnectTime; Duration? pingInterval; + // New options for network-aware reconnect + bool waitForNetwork; + Duration networkCheckInterval; + Duration? networkWaitTimeout; + /// Host:port to probe when checking connectivity on IO (e.g., 'example.com:80'). + /// If null, a reasonable default will be used by the platform implementation. + String? connectivityTestAddress; + ClientConnectOptions({ this.reconnectCount = 3, this.reconnectTime, this.pingInterval, + this.waitForNetwork = false, + this.networkCheckInterval = const Duration(seconds: 2), + this.networkWaitTimeout, + this.connectivityTestAddress, }); ClientConnectOptions minusReconnectRetry() { diff --git a/pubspec.yaml b/pubspec.yaml index 5a2e273..5183675 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,6 +1,6 @@ name: connectanum homepage: https://github.com/konsultaner/connectanum-dart -version: 2.2.6 +version: 2.3.0 description: >- This is a WAMP client (Web Application Messaging Protocol) implementation for the dart language and flutter projects. false_secrets: From da82fc281e0286566f9d505dd22e15daba53033c Mon Sep 17 00:00:00 2001 From: Richard Burkhardt Date: Sun, 7 Sep 2025 12:53:25 +0200 Subject: [PATCH 3/5] add `onOnlineState` stream for connectivity updates during reconnect; extend tests and docs; bump version to 2.3.1 --- CHANGELOG.md | 10 ++ README.md | 81 ++++++++++++++ lib/src/client.dart | 91 +++++++++++----- lib/src/network/network_connectivity_io.dart | 37 +++++++ .../network/network_connectivity_stub.dart | 26 +++++ lib/src/network/network_connectivity_web.dart | 75 +++++++++++-- pubspec.yaml | 2 +- test/client_online_state_io_test.dart | 103 ++++++++++++++++++ test/client_online_state_web_test.dart | 69 ++++++++++++ .../network/network_connectivity_io_test.dart | 2 + .../network_connectivity_web_test.dart | 2 + 11 files changed, 465 insertions(+), 33 deletions(-) create mode 100644 test/client_online_state_io_test.dart create mode 100644 test/client_online_state_web_test.dart diff --git a/CHANGELOG.md b/CHANGELOG.md index 9440c7e..3786768 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +## 2.3.1 +- Feature: Client exposes onOnlineState stream (broadcast Stream) to observe connectivity while waiting to reconnect. + - Emits immediately and then periodically at ClientConnectOptions.networkCheckInterval during waiting phases. + - Works across platforms leveraging the previously added NetworkConnectivity service. + - Properly cleaned up on disconnect or when leaving waiting state. +- Refactor: Client now subscribes to NetworkConnectivity.watch (per-platform) instead of maintaining its own polling Timer; simplifies behavior and relies on connectivity internals like waitUntilOnline. +- Tests: Added VM tests validating periodic emissions and online transition. +- Tests: Added Web test for onOnlineState using browser online/offline events. +- Docs: README includes example usage of network-aware reconnect, onOnlineState, and IO guidance for connectivityTestAddress. + ## 2.3.0 - Feature: Optional network-aware reconnect across all platforms. - Added cross-platform connectivity detection service: diff --git a/README.md b/README.md index 6077d66..88058de 100644 --- a/README.md +++ b/README.md @@ -158,3 +158,84 @@ await for (final result in session.call("my.procedure")) { // do something with the result } ``` + + +## Network‑aware reconnect (example) + +Enable optional network‑aware reconnect and observe connectivity while the client waits to reconnect. On IO platforms, set a probe target via `connectivityTestAddress` to avoid false negatives. + +```dart +import 'package:connectanum/src/client.dart'; +import 'package:connectanum/src/transport/websocket/websocket_transport_io.dart'; + +final transport = + WebSocketTransport.withJsonSerializer('ws://wamp.example.com:8080/ws'); +final client = Client(realm: 'com.my.realm', transport: transport); + +final options = ClientConnectOptions( + reconnectTime: const Duration(seconds: 2), + reconnectCount: -1, // infinite retries + waitForNetwork: true, // wait until network is back before retrying + networkCheckInterval: const Duration(seconds: 1), + networkWaitTimeout: const Duration(seconds: 30), + // IO only: probe target for connectivity checks (host:port) + connectivityTestAddress: 'wamp.example.com:8080', +); + +// Observe online/offline while the client is waiting to reconnect +client.onOnlineState.listen((online) { + print('Network online: $online'); +}); + +client.connect(options: options).listen( + (session) { + // connected + }, + onError: (e) { + // out of retries or unrecoverable error + }, +); +``` + +### IO defaults and `connectivityTestAddress` + +- On IO platforms, the connectivity check performs a TCP connect to a target (host:port). +- If you don’t specify `connectivityTestAddress`, a generic host is used, which can be blocked or unreliable on some networks. +- Recommended: + - Set `connectivityTestAddress` to your WAMP server’s host:port for the most relevant signal. + - If that’s not possible, choose a highly available TCP endpoint (e.g., an HTTPS port on a reliable host in your environment). + - Keep the `networkCheckInterval` modest (e.g., 1–2s) to balance responsiveness and network load. +- Note: The current implementation accepts a single probe target. If you need higher resilience, prefer using your own backend endpoint or handle multi‑target probing at the application layer. + +## Running tests (VM and Web) + +This project ships tests for both the Dart VM and the browser (Chrome). + +- Run the full suite on VM and Chrome (Chrome will be launched headlessly by the test runner): + - dart test + +- Run only Chrome tests: + - dart test -p chrome + +If Chrome is not detected automatically, set the CHROME_EXECUTABLE environment variable to the absolute path of your Chrome/Chromium binary. + +Typical paths: +- macOS: /Applications/Google Chrome.app/Contents/MacOS/Google Chrome +- Linux (Debian/Ubuntu): /usr/bin/google-chrome or /usr/bin/chromium +- Windows (PowerShell): C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe + +Examples: +- macOS/Linux (bash/zsh): + - export CHROME_EXECUTABLE="/Applications/Google Chrome.app/Contents/MacOS/Google Chrome" + - dart test -p chrome test/network/network_connectivity_web_test.dart +- Linux (Chromium): + - export CHROME_EXECUTABLE=/usr/bin/chromium + - dart test -p chrome +- Windows (PowerShell): + - $env:CHROME_EXECUTABLE = "C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe" + - dart test -p chrome + +Notes: +- The test runner is configured for Chrome in dart_test.yaml (platforms: [vm, chrome]). +- Browser tests compile with the dart2wasm compiler by default; ensure you’re on Dart >= 3.4. +- If tests appear flaky due to network status, try re-running them or ensuring your system is online. diff --git a/lib/src/client.dart b/lib/src/client.dart index 055fb13..5582ee5 100644 --- a/lib/src/client.dart +++ b/lib/src/client.dart @@ -54,6 +54,11 @@ class Client { final StreamController _controller = StreamController(); + // Broadcast stream emitting current online state during waiting periods + final StreamController _onlineStateController = + StreamController.broadcast(); + StreamSubscription? _onlineStreamSub; + StreamSubscription? _connectStreamSubscription; /// The client connects to the wamp server by using the given [transport] and @@ -99,6 +104,10 @@ class Client { Stream get onNextTryToReconnect => _connectStreamController.stream; + /// Broadcast stream emitting current online state while the client is waiting + /// to reconnect (e.g., during network wait or reconnect delay). + Stream get onOnlineState => _onlineStateController.stream; + /// Calling this method will start the authentication process and result into /// a [Session] object on success. If a [ClientConnectOptions.pingInterval] is /// given and the underlying transport supports sending of ping messages. the @@ -129,6 +138,7 @@ class Client { Future disconnect() async { _logger.shout('Disconnecting'); _changeState(_ClientState.done); + _stopOnlineTicker(); await _connectStreamSubscription?.cancel(); if (!_connectStreamController.isClosed) { await _connectStreamController.close(); @@ -136,6 +146,9 @@ class Client { if (!_controller.isClosed) { await _controller.close(); } + if (!_onlineStateController.isClosed) { + await _onlineStateController.close(); + } if (transport.isOpen) { await transport.close(); } @@ -149,35 +162,43 @@ class Client { _changeState(_ClientState.waiting); - // Optionally wait for network to be online before attempting reconnect - if (options.waitForNetwork) { - try { - final online = await connectivity.NetworkConnectivity.instance - .isOnline(testAddress: options.connectivityTestAddress); - if (!online) { - _logger.info('Network offline detected. Waiting until online...'); - await connectivity.NetworkConnectivity.instance.waitUntilOnline( - pollInterval: options.networkCheckInterval, - timeout: options.networkWaitTimeout, - testAddress: options.connectivityTestAddress, - ); + // Start online-state ticker during waiting + _startOnlineTicker(options.networkCheckInterval, options.connectivityTestAddress); + + try { + // Optionally wait for network to be online before attempting reconnect + if (options.waitForNetwork) { + try { + final online = await connectivity.NetworkConnectivity.instance + .isOnline(testAddress: options.connectivityTestAddress); + if (!online) { + _logger.info('Network offline detected. Waiting until online...'); + await connectivity.NetworkConnectivity.instance.waitUntilOnline( + pollInterval: options.networkCheckInterval, + timeout: options.networkWaitTimeout, + testAddress: options.connectivityTestAddress, + ); + } + } catch (e) { + _logger.fine('Connectivity check failed: $e'); } - } catch (e) { - _logger.fine('Connectivity check failed: $e'); } - } - if (duration != null) { - _logger.info('Waiting for (overridden) $duration before reconnecting'); - await Future.delayed(duration); - } else { - _logger.info('Waiting for ${options.reconnectTime!} before reconnecting'); - await Future.delayed(options.reconnectTime!); - } + if (duration != null) { + _logger.info('Waiting for (overridden) $duration before reconnecting'); + await Future.delayed(duration); + } else { + _logger.info('Waiting for ${options.reconnectTime!} before reconnecting'); + await Future.delayed(options.reconnectTime!); + } - // Check in case the client has been closed while we were waiting; - if (_state == _ClientState.done) return; - return _connectStreamController.add(options); + // Check in case the client has been closed while we were waiting; + if (_state == _ClientState.done) return; + return _connectStreamController.add(options); + } finally { + // Stop ticker when leaving waiting phase + _stopOnlineTicker(); + } } void _changeState(_ClientState newState) { @@ -187,6 +208,26 @@ class Client { _state = newState; } + void _startOnlineTicker(Duration interval, String? testAddress) { + _stopOnlineTicker(); + _onlineStreamSub = connectivity.NetworkConnectivity.instance + .watch(interval: interval, testAddress: testAddress) + .listen((online) { + if (!_onlineStateController.isClosed) { + _onlineStateController.add(online); + } + }, onError: (_) { + if (!_onlineStateController.isClosed) { + _onlineStateController.add(false); + } + }); + } + + void _stopOnlineTicker() { + _onlineStreamSub?.cancel(); + _onlineStreamSub = null; + } + Future _connect(ClientConnectOptions options) async { _logger.info('Connecting, attempts remaining: ${options.reconnectCount}'); await transport.open(pingInterval: options.pingInterval); diff --git a/lib/src/network/network_connectivity_io.dart b/lib/src/network/network_connectivity_io.dart index 27eb295..fbf2fbf 100644 --- a/lib/src/network/network_connectivity_io.dart +++ b/lib/src/network/network_connectivity_io.dart @@ -29,6 +29,43 @@ class NetworkConnectivity { } } + /// Returns a stream of online state. Emits immediately, then polls at [interval]. + Stream watch({ + Duration interval = const Duration(seconds: 2), + String? testAddress, + }) { + final controller = StreamController.broadcast(); + Timer? timer; + bool closed = false; + + Future emit() async { + try { + final online = await isOnline(testAddress: testAddress); + if (!controller.isClosed) controller.add(online); + } catch (_) { + if (!controller.isClosed) controller.add(false); + } + } + + controller.onListen = () { + // immediate emission + emit(); + // periodic polling + timer = Timer.periodic(interval, (_) => emit()); + }; + controller.onCancel = () { + if (controller.hasListener && !closed) { + // another listener still active; keep timer + return; + } + timer?.cancel(); + timer = null; + closed = true; + }; + + return controller.stream; + } + Future waitUntilOnline({ Duration pollInterval = const Duration(seconds: 2), Duration? timeout, diff --git a/lib/src/network/network_connectivity_stub.dart b/lib/src/network/network_connectivity_stub.dart index 74b28dd..a62e370 100644 --- a/lib/src/network/network_connectivity_stub.dart +++ b/lib/src/network/network_connectivity_stub.dart @@ -8,6 +8,32 @@ class NetworkConnectivity { Future isOnline({String? testAddress}) async => true; + /// Stream of online state; on stub we emit `true` immediately and periodically. + Stream watch({ + Duration interval = const Duration(seconds: 2), + String? testAddress, + }) { + final controller = StreamController.broadcast(); + Timer? timer; + + void emit() { + if (!controller.isClosed) controller.add(true); + } + + controller.onListen = () { + emit(); + timer = Timer.periodic(interval, (_) => emit()); + }; + controller.onCancel = () { + if (!controller.hasListener) { + timer?.cancel(); + timer = null; + } + }; + + return controller.stream; + } + Future waitUntilOnline({ Duration pollInterval = const Duration(seconds: 2), Duration? timeout, diff --git a/lib/src/network/network_connectivity_web.dart b/lib/src/network/network_connectivity_web.dart index ed2b5a3..850806b 100644 --- a/lib/src/network/network_connectivity_web.dart +++ b/lib/src/network/network_connectivity_web.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:js_interop'; import 'package:web/web.dart'; class NetworkConnectivity { @@ -6,32 +7,92 @@ class NetworkConnectivity { NetworkConnectivity._internal(); Future isOnline({String? testAddress}) async { - // Browser-provided online status try { - return window.navigator.onLine ?? true; + return window.navigator.onLine; } catch (_) { return true; } } + /// Stream of online state for the web. Emits immediately, then updates on browser events. + Stream watch({ + Duration interval = const Duration(seconds: 2), // unused on web + String? testAddress, + }) { + final controller = StreamController.broadcast(); + int listenerCount = 0; + + EventListener? onlineListener; + EventListener? offlineListener; + + void addListeners() { + if (onlineListener != null) return; + onlineListener = (((Event _) { + if (!controller.isClosed) controller.add(true); + }).toJS) as EventListener; + offlineListener = (((Event _) { + if (!controller.isClosed) controller.add(false); + }).toJS) as EventListener; + window.addEventListener('online', onlineListener); + window.addEventListener('offline', offlineListener); + } + + void removeListeners() { + if (onlineListener != null) { + window.removeEventListener('online', onlineListener); + onlineListener = null; + } + if (offlineListener != null) { + window.removeEventListener('offline', offlineListener); + offlineListener = null; + } + } + + controller.onListen = () { + listenerCount++; + // emit current state immediately + () async { + try { + final online = await isOnline(); + if (!controller.isClosed) controller.add(online); + } catch (_) { + if (!controller.isClosed) controller.add(true); + } + }(); + addListeners(); + }; + + controller.onCancel = () { + listenerCount--; + if (listenerCount <= 0) { + removeListeners(); + } + }; + + return controller.stream; + } + Future waitUntilOnline({ Duration pollInterval = const Duration(seconds: 2), Duration? timeout, String? testAddress, }) async { - // If already online, return immediately if (await isOnline()) return; final completer = Completer(); - void onlineListener(Event _) { + + EventListener? onlineListener; + EventListener? offlineListener; + + onlineListener = (((Event _) { if (!completer.isCompleted) completer.complete(); window.removeEventListener('online', onlineListener); window.removeEventListener('offline', offlineListener); - } + }).toJS) as EventListener; - void offlineListener(Event _) { + offlineListener = (((Event _) { // no-op, but keep symmetry and potential future logging - } + }).toJS) as EventListener; window.addEventListener('online', onlineListener); window.addEventListener('offline', offlineListener); diff --git a/pubspec.yaml b/pubspec.yaml index 5183675..af8f741 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,6 +1,6 @@ name: connectanum homepage: https://github.com/konsultaner/connectanum-dart -version: 2.3.0 +version: 2.3.1 description: >- This is a WAMP client (Web Application Messaging Protocol) implementation for the dart language and flutter projects. false_secrets: diff --git a/test/client_online_state_io_test.dart b/test/client_online_state_io_test.dart new file mode 100644 index 0000000..c85a040 --- /dev/null +++ b/test/client_online_state_io_test.dart @@ -0,0 +1,103 @@ +@TestOn('vm') +library; + +import 'dart:async'; +import 'dart:io'; + +import 'package:connectanum/src/client.dart'; +import 'package:connectanum/src/transport/websocket/websocket_transport_io.dart'; +import 'package:test/test.dart'; + +void main() { + group('Client onOnlineState (VM)', () { + test('emits periodically while waiting and flips to true when network appears', () async { + // Prepare a probe port that starts as offline + final temp = await ServerSocket.bind('127.0.0.1', 0); + final probePort = temp.port; + await temp.close(); + + // Use an unreachable transport URL to force reconnect + final transport = WebSocketTransport.withJsonSerializer('ws://127.0.0.1:1/wamp'); + final client = Client(realm: 'com.connectanum', transport: transport); + + final options = ClientConnectOptions( + reconnectTime: const Duration(milliseconds: 300), + reconnectCount: 2, + waitForNetwork: true, + networkCheckInterval: const Duration(milliseconds: 50), + networkWaitTimeout: const Duration(seconds: 3), + connectivityTestAddress: '127.0.0.1:$probePort', + ); + + final emissions = []; + final sub = client.onOnlineState.listen(emissions.add); + + // Start connecting; should fail and enter reconnect waiting, starting the ticker + final completer = Completer(); + client.connect(options: options).listen((_) {}, onError: (_) { + // We expect eventual failure after retries, but we only care about the stream + completer.complete(); + }); + + // Wait for a few offline emissions + final sw = Stopwatch()..start(); + while (emissions.length < 2 && sw.elapsedMilliseconds < 1000) { + await Future.delayed(const Duration(milliseconds: 20)); + } + expect(emissions, isNotEmpty, reason: 'Should emit at least once while waiting'); + expect(emissions.last, isFalse, reason: 'Expected offline initially'); + + // Bring the probe online; online should be detected on next ticker tick + final server = await HttpServer.bind('127.0.0.1', probePort); + + // Await a true emission within a reasonable time + bool sawTrue = false; + final trueWaitSw = Stopwatch()..start(); + while (!sawTrue && trueWaitSw.elapsedMilliseconds < 2000) { + await Future.delayed(const Duration(milliseconds: 25)); + if (emissions.contains(true)) { + sawTrue = true; + } + } + expect(sawTrue, isTrue, reason: 'Expected to see true after server appears'); + + await server.close(force: true); + await sub.cancel(); + await client.disconnect(); + }); + + test('emits multiple times while offline (periodic refresh)', () async { + // Prepare an unused port for probe (offline) + final temp = await ServerSocket.bind('127.0.0.1', 0); + final probePort = temp.port; + await temp.close(); + + final transport = WebSocketTransport.withJsonSerializer('ws://127.0.0.1:1/wamp'); + final client = Client(realm: 'com.connectanum', transport: transport); + + final options = ClientConnectOptions( + reconnectTime: const Duration(milliseconds: 400), + reconnectCount: 1, + waitForNetwork: true, + networkCheckInterval: const Duration(milliseconds: 50), + networkWaitTimeout: const Duration(milliseconds: 500), + connectivityTestAddress: '127.0.0.1:$probePort', + ); + + var count = 0; + final sub = client.onOnlineState.listen((_) => count++); + + final done = Completer(); + client.connect(options: options).listen((_) {}, onError: (_) => done.complete()); + + // Wait for timeout/retry cycle to finish + await done.future.timeout(const Duration(seconds: 3), onTimeout: () {}); + + // With 50ms interval over ~500-800ms waiting windows, expect multiple emissions + expect(count, greaterThanOrEqualTo(2)); + + await sub.cancel(); + await client.disconnect(); + }); + }); +} diff --git a/test/client_online_state_web_test.dart b/test/client_online_state_web_test.dart new file mode 100644 index 0000000..8d647c9 --- /dev/null +++ b/test/client_online_state_web_test.dart @@ -0,0 +1,69 @@ +@TestOn('chrome') +library; + +import 'dart:async'; + +import 'package:connectanum/src/client.dart' as connectanum_client; +import 'package:connectanum/src/transport/websocket/websocket_transport_web.dart'; +import 'package:test/test.dart'; +import 'package:web/web.dart'; + +void main() { + group('Client onOnlineState (Web)', () { + test('emits and reacts to online/offline events during waiting', () async { + // Use an unreachable URL to force reconnect/waiting + final transport = + WebSocketTransport.withJsonSerializer('ws://127.0.0.1:1/wamp'); + final client = connectanum_client.Client(realm: 'com.connectanum', transport: transport); + + final options = connectanum_client.ClientConnectOptions( + reconnectTime: const Duration(milliseconds: 500), + reconnectCount: 1, + waitForNetwork: true, + networkCheckInterval: const Duration(milliseconds: 100), + ); + + final emissions = []; + final sub = client.onOnlineState.listen(emissions.add); + + final done = Completer(); + client + .connect(options: options) + .listen((_) {}, onError: (_) => done.complete()); + + // Wait until the ticker starts and at least one emission arrives + final startSw = Stopwatch()..start(); + while (emissions.isEmpty && startSw.elapsedMilliseconds < 3000) { + await Future.delayed(const Duration(milliseconds: 25)); + } + expect(emissions, isNotEmpty, reason: 'Should emit while waiting'); + + // Simulate going offline -> expect a false emission + window.dispatchEvent(Event('offline')); + bool sawFalse = false; + final offlineSw = Stopwatch()..start(); + while (!sawFalse && offlineSw.elapsedMilliseconds < 2000) { + await Future.delayed(const Duration(milliseconds: 25)); + if (emissions.contains(false)) sawFalse = true; + } + expect(sawFalse, isTrue, reason: 'Expected false after offline event'); + + // Simulate going back online -> expect a true emission + window.dispatchEvent(Event('online')); + bool sawTrue = false; + final onlineSw = Stopwatch()..start(); + while (!sawTrue && onlineSw.elapsedMilliseconds < 2000) { + await Future.delayed(const Duration(milliseconds: 25)); + if (emissions.contains(true) && emissions.last == true) { + sawTrue = true; + } + } + expect(sawTrue, isTrue, reason: 'Expected true after online event'); + + await sub.cancel(); + await client.disconnect(); + await done.future.timeout(const Duration(seconds: 3), onTimeout: () {}); + }); + }); +} + diff --git a/test/network/network_connectivity_io_test.dart b/test/network/network_connectivity_io_test.dart index 0c7f61d..caba7c8 100644 --- a/test/network/network_connectivity_io_test.dart +++ b/test/network/network_connectivity_io_test.dart @@ -1,4 +1,6 @@ @TestOn('vm') +library; + import 'dart:async'; import 'dart:io'; diff --git a/test/network/network_connectivity_web_test.dart b/test/network/network_connectivity_web_test.dart index feed5ed..05d701b 100644 --- a/test/network/network_connectivity_web_test.dart +++ b/test/network/network_connectivity_web_test.dart @@ -1,4 +1,6 @@ @TestOn('chrome') +library; + import 'dart:async'; import 'package:test/test.dart'; From 50fe719f3154e26641f26cdbe0fc43046c734a0c Mon Sep 17 00:00:00 2001 From: Richard Burkhardt Date: Tue, 16 Sep 2025 08:46:07 +0200 Subject: [PATCH 4/5] add GitLab CI pipeline for analysis, testing, and coverage generation --- .gitlab-ci.yml | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 .gitlab-ci.yml diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 0000000..4d6df30 --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,55 @@ +variables: + HOME: "$CI_PROJECT_DIR" + PUB_CACHE: "$CI_PROJECT_DIR/.pub-cache" + +cache: + key: "pub-${CI_COMMIT_REF_SLUG}" + paths: + - .pub-cache/ + - .dart_tool/ + +default: + image: + name: konsultaner/flutter-ci-base:android-35.0.0-flutter-3.35.2 + entrypoint: [""] + before_script: + - git config --global --add safe.directory /flutter || true + - mkdir -p "$HOME" "$PUB_CACHE" + - export PATH="$PUB_CACHE/bin:$PATH" + - dart --version + +stages: + - analyze + - test + +analyze: + stage: analyze + rules: + - if: '$CI_PIPELINE_SOURCE == "merge_request_event"' + script: + - dart pub get + - dart analyze + +test:vm: + stage: test + needs: ["analyze"] + rules: + - if: '$CI_PIPELINE_SOURCE == "merge_request_event"' + script: + - dart pub get + - dart run test -p vm --coverage=coverage/raw + - dart pub global activate coverage + - format_coverage -l -v -i coverage/raw -o coverage/lcov.info --packages=.dart_tool/package_config.json --report-on=lib + - dart pub global activate cobertura + - cobertura convert -i coverage/lcov.info -o coverage/cobertura.xml -p pubspec.yaml + artifacts: + when: always + name: 'coverage' + expire_in: 1 day + reports: + coverage_report: + coverage_format: cobertura + path: coverage/cobertura.xml + paths: + - coverage/ + coverage: '/\s*lines\.*:\s*([\d\.]+%)/' \ No newline at end of file From a0ed5dbb6c3a08c2681aa700a5726fcbb2e47eb6 Mon Sep 17 00:00:00 2001 From: Richard Burkhardt Date: Tue, 16 Sep 2025 19:19:44 +0200 Subject: [PATCH 5/5] update GitLab CI: bump Flutter image version, remove unnecessary HOME variable, and add Chrome test job --- .gitlab-ci.yml | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 4d6df30..b9f042d 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,5 +1,4 @@ variables: - HOME: "$CI_PROJECT_DIR" PUB_CACHE: "$CI_PROJECT_DIR/.pub-cache" cache: @@ -10,11 +9,11 @@ cache: default: image: - name: konsultaner/flutter-ci-base:android-35.0.0-flutter-3.35.2 + name: konsultaner/flutter-ci-base:android-35.0.0-flutter-3.35.3 entrypoint: [""] before_script: - git config --global --add safe.directory /flutter || true - - mkdir -p "$HOME" "$PUB_CACHE" + - mkdir -p "$PUB_CACHE" - export PATH="$PUB_CACHE/bin:$PATH" - dart --version @@ -52,4 +51,13 @@ test:vm: path: coverage/cobertura.xml paths: - coverage/ - coverage: '/\s*lines\.*:\s*([\d\.]+%)/' \ No newline at end of file + coverage: '/\s*lines\.*:\s*([\d\.]+%)/' + +test:chrome: + stage: test + needs: ["analyze"] + rules: + - if: '$CI_PIPELINE_SOURCE == "merge_request_event"' + script: + - dart pub get + - dart run test -p chrome --timeout=5m --concurrency=1