Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ connectanum-dart.iml
.packages
pubspec.lock
coverage
out
63 changes: 63 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
variables:
PUB_CACHE: "$CI_PROJECT_DIR/.pub-cache"

cache:
key: "pub-${CI_COMMIT_REF_SLUG}"
paths:
- .pub-cache/
- .dart_tool/

default:
image:
name: konsultaner/flutter-ci-base:android-35.0.0-flutter-3.35.3
entrypoint: [""]
before_script:
- git config --global --add safe.directory /flutter || true
- mkdir -p "$PUB_CACHE"
- export PATH="$PUB_CACHE/bin:$PATH"
- dart --version

stages:
- analyze
- test

analyze:
stage: analyze
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
script:
- dart pub get
- dart analyze

test:vm:
stage: test
needs: ["analyze"]
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
script:
- dart pub get
- dart run test -p vm --coverage=coverage/raw
- dart pub global activate coverage
- format_coverage -l -v -i coverage/raw -o coverage/lcov.info --packages=.dart_tool/package_config.json --report-on=lib
- dart pub global activate cobertura
- cobertura convert -i coverage/lcov.info -o coverage/cobertura.xml -p pubspec.yaml
artifacts:
when: always
name: 'coverage'
expire_in: 1 day
reports:
coverage_report:
coverage_format: cobertura
path: coverage/cobertura.xml
paths:
- coverage/
coverage: '/\s*lines\.*:\s*([\d\.]+%)/'

test:chrome:
stage: test
needs: ["analyze"]
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
script:
- dart pub get
- dart run test -p chrome --timeout=5m --concurrency=1
24 changes: 24 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,27 @@
## 2.3.1
- Feature: Client exposes onOnlineState stream (broadcast Stream<bool>) to observe connectivity while waiting to reconnect.
- Emits immediately and then periodically at ClientConnectOptions.networkCheckInterval during waiting phases.
- Works across platforms leveraging the previously added NetworkConnectivity service.
- Properly cleaned up on disconnect or when leaving waiting state.
- Refactor: Client now subscribes to NetworkConnectivity.watch (per-platform) instead of maintaining its own polling Timer; simplifies behavior and relies on connectivity internals like waitUntilOnline.
- Tests: Added VM tests validating periodic emissions and online transition.
- Tests: Added Web test for onOnlineState using browser online/offline events.
- Docs: README includes example usage of network-aware reconnect, onOnlineState, and IO guidance for connectivityTestAddress.

## 2.3.0
- Feature: Optional network-aware reconnect across all platforms.
- Added cross-platform connectivity detection service:
- Web: uses browser online/offline events via `package:web` and `Navigator.onLine`.
- IO: probes TCP connectivity (host:port) with periodic polling and timeout.
- Stub: defaults to online for unsupported platforms.
- New ClientConnectOptions:
- waitForNetwork (default: false) to enable waiting for connectivity before reconnect.
- networkCheckInterval (default: 2s) polling interval (IO).
- networkWaitTimeout (optional) max time to wait for network to return.
- connectivityTestAddress (optional, e.g. "example.com:80") probe target on IO.
- Client will, when enabled, wait until the network is back before applying the configured reconnect delay.
- Bump SDK constraints unchanged; package version bumped to 2.3.0.

### 2.2.6

- fixed abort reason to match a value of the listed `_abortReasons` in the client for local transport
Expand Down
81 changes: 81 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,84 @@ await for (final result in session.call("my.procedure")) {
// do something with the result
}
```


## Network‑aware reconnect (example)

Enable optional network‑aware reconnect and observe connectivity while the client waits to reconnect. On IO platforms, set a probe target via `connectivityTestAddress` to avoid false negatives.

```dart
import 'package:connectanum/src/client.dart';
import 'package:connectanum/src/transport/websocket/websocket_transport_io.dart';

final transport =
WebSocketTransport.withJsonSerializer('ws://wamp.example.com:8080/ws');
final client = Client(realm: 'com.my.realm', transport: transport);

final options = ClientConnectOptions(
reconnectTime: const Duration(seconds: 2),
reconnectCount: -1, // infinite retries
waitForNetwork: true, // wait until network is back before retrying
networkCheckInterval: const Duration(seconds: 1),
networkWaitTimeout: const Duration(seconds: 30),
// IO only: probe target for connectivity checks (host:port)
connectivityTestAddress: 'wamp.example.com:8080',
);

// Observe online/offline while the client is waiting to reconnect
client.onOnlineState.listen((online) {
print('Network online: $online');
});

client.connect(options: options).listen(
(session) {
// connected
},
onError: (e) {
// out of retries or unrecoverable error
},
);
```

### IO defaults and `connectivityTestAddress`

- On IO platforms, the connectivity check performs a TCP connect to a target (host:port).
- If you don’t specify `connectivityTestAddress`, a generic host is used, which can be blocked or unreliable on some networks.
- Recommended:
- Set `connectivityTestAddress` to your WAMP server’s host:port for the most relevant signal.
- If that’s not possible, choose a highly available TCP endpoint (e.g., an HTTPS port on a reliable host in your environment).
- Keep the `networkCheckInterval` modest (e.g., 1–2s) to balance responsiveness and network load.
- Note: The current implementation accepts a single probe target. If you need higher resilience, prefer using your own backend endpoint or handle multi‑target probing at the application layer.

## Running tests (VM and Web)

This project ships tests for both the Dart VM and the browser (Chrome).

- Run the full suite on VM and Chrome (Chrome will be launched headlessly by the test runner):
- dart test

- Run only Chrome tests:
- dart test -p chrome

If Chrome is not detected automatically, set the CHROME_EXECUTABLE environment variable to the absolute path of your Chrome/Chromium binary.

Typical paths:
- macOS: /Applications/Google Chrome.app/Contents/MacOS/Google Chrome
- Linux (Debian/Ubuntu): /usr/bin/google-chrome or /usr/bin/chromium
- Windows (PowerShell): C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe

Examples:
- macOS/Linux (bash/zsh):
- export CHROME_EXECUTABLE="/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"
- dart test -p chrome test/network/network_connectivity_web_test.dart
- Linux (Chromium):
- export CHROME_EXECUTABLE=/usr/bin/chromium
- dart test -p chrome
- Windows (PowerShell):
- $env:CHROME_EXECUTABLE = "C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe"
- dart test -p chrome

Notes:
- The test runner is configured for Chrome in dart_test.yaml (platforms: [vm, chrome]).
- Browser tests compile with the dart2wasm compiler by default; ensure you’re on Dart >= 3.4.
- If tests appear flaky due to network status, try re-running them or ensuring your system is online.
94 changes: 84 additions & 10 deletions lib/src/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import 'authentication/abstract_authentication.dart';
import 'transport/abstract_transport.dart';
import 'message/uri_pattern.dart';
import 'protocol/session.dart';
import 'network/network_connectivity_stub.dart'
if (dart.library.io) 'network/network_connectivity_io.dart'
if (dart.library.js_interop) 'network/network_connectivity_web.dart' as connectivity;

enum _ClientState {
/// Client is idle and not connected
Expand Down Expand Up @@ -51,6 +54,11 @@ class Client {

final StreamController<Session> _controller = StreamController<Session>();

// Broadcast stream emitting current online state during waiting periods
final StreamController<bool> _onlineStateController =
StreamController<bool>.broadcast();
StreamSubscription<bool>? _onlineStreamSub;

StreamSubscription<ClientConnectOptions>? _connectStreamSubscription;

/// The client connects to the wamp server by using the given [transport] and
Expand Down Expand Up @@ -96,6 +104,10 @@ class Client {
Stream<ClientConnectOptions> get onNextTryToReconnect =>
_connectStreamController.stream;

/// Broadcast stream emitting current online state while the client is waiting
/// to reconnect (e.g., during network wait or reconnect delay).
Stream<bool> get onOnlineState => _onlineStateController.stream;

/// Calling this method will start the authentication process and result into
/// a [Session] object on success. If a [ClientConnectOptions.pingInterval] is
/// given and the underlying transport supports sending of ping messages. the
Expand Down Expand Up @@ -126,13 +138,17 @@ class Client {
Future<void> disconnect() async {
_logger.shout('Disconnecting');
_changeState(_ClientState.done);
_stopOnlineTicker();
await _connectStreamSubscription?.cancel();
if (!_connectStreamController.isClosed) {
await _connectStreamController.close();
}
if (!_controller.isClosed) {
await _controller.close();
}
if (!_onlineStateController.isClosed) {
await _onlineStateController.close();
}
if (transport.isOpen) {
await transport.close();
}
Expand All @@ -146,17 +162,43 @@ class Client {

_changeState(_ClientState.waiting);

if (duration != null) {
_logger.info('Waiting for (overridden) $duration before reconnecting');
await Future.delayed(duration);
} else {
_logger.info('Waiting for ${options.reconnectTime!} before reconnecting');
await Future.delayed(options.reconnectTime!);
}
// Start online-state ticker during waiting
_startOnlineTicker(options.networkCheckInterval, options.connectivityTestAddress);

try {
// Optionally wait for network to be online before attempting reconnect
if (options.waitForNetwork) {
try {
final online = await connectivity.NetworkConnectivity.instance
.isOnline(testAddress: options.connectivityTestAddress);
if (!online) {
_logger.info('Network offline detected. Waiting until online...');
await connectivity.NetworkConnectivity.instance.waitUntilOnline(
pollInterval: options.networkCheckInterval,
timeout: options.networkWaitTimeout,
testAddress: options.connectivityTestAddress,
);
}
} catch (e) {
_logger.fine('Connectivity check failed: $e');
}
}

// Check in case the client has been closed while we were waiting;
if (_state == _ClientState.done) return;
return _connectStreamController.add(options);
if (duration != null) {
_logger.info('Waiting for (overridden) $duration before reconnecting');
await Future.delayed(duration);
} else {
_logger.info('Waiting for ${options.reconnectTime!} before reconnecting');
await Future.delayed(options.reconnectTime!);
}

// Check in case the client has been closed while we were waiting;
if (_state == _ClientState.done) return;
return _connectStreamController.add(options);
} finally {
// Stop ticker when leaving waiting phase
_stopOnlineTicker();
}
}

void _changeState(_ClientState newState) {
Expand All @@ -166,6 +208,26 @@ class Client {
_state = newState;
}

void _startOnlineTicker(Duration interval, String? testAddress) {
_stopOnlineTicker();
_onlineStreamSub = connectivity.NetworkConnectivity.instance
.watch(interval: interval, testAddress: testAddress)
.listen((online) {
if (!_onlineStateController.isClosed) {
_onlineStateController.add(online);
}
}, onError: (_) {
if (!_onlineStateController.isClosed) {
_onlineStateController.add(false);
}
});
}

void _stopOnlineTicker() {
_onlineStreamSub?.cancel();
_onlineStreamSub = null;
}

Future<void> _connect(ClientConnectOptions options) async {
_logger.info('Connecting, attempts remaining: ${options.reconnectCount}');
await transport.open(pingInterval: options.pingInterval);
Expand Down Expand Up @@ -253,10 +315,22 @@ class ClientConnectOptions {
Duration? reconnectTime;
Duration? pingInterval;

// New options for network-aware reconnect
bool waitForNetwork;
Duration networkCheckInterval;
Duration? networkWaitTimeout;
/// Host:port to probe when checking connectivity on IO (e.g., 'example.com:80').
/// If null, a reasonable default will be used by the platform implementation.
String? connectivityTestAddress;

ClientConnectOptions({
this.reconnectCount = 3,
this.reconnectTime,
this.pingInterval,
this.waitForNetwork = false,
this.networkCheckInterval = const Duration(seconds: 2),
this.networkWaitTimeout,
this.connectivityTestAddress,
});

ClientConnectOptions minusReconnectRetry() {
Expand Down
Loading