From eefe3f681abcea0c3be039e7cfb3828728a3cb76 Mon Sep 17 00:00:00 2001 From: Emmanuel Keller Date: Wed, 15 Apr 2026 11:06:23 +0100 Subject: [PATCH 1/4] Add tests reproducing live query issues from #138 Add four new tests to LiveQueryTests covering the scenarios reported in issue #138 (live stream notifications and concurrent close): - liveStreamReceivesCreateNotification: blocks on next(), then CREATEs a record and asserts the notification arrives. - liveStreamReceivesUpdateNotification: same pattern for UPDATE. - closeUnblocksBlockedNext: verifies close() from another thread unblocks a thread blocked on next() without deadlocking. - concurrentNextAndCloseDoesNotCrash: stress test with multiple threads calling next() and close() concurrently. --- .../java/com/surrealdb/LiveQueryTests.java | 200 ++++++++++++++++++ 1 file changed, 200 insertions(+) diff --git a/src/test/java/com/surrealdb/LiveQueryTests.java b/src/test/java/com/surrealdb/LiveQueryTests.java index 0c31b5b4..ab83c8de 100644 --- a/src/test/java/com/surrealdb/LiveQueryTests.java +++ b/src/test/java/com/surrealdb/LiveQueryTests.java @@ -1,10 +1,18 @@ package com.surrealdb; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -70,6 +78,198 @@ void liveStreamTryWithResources() { } } + /** + * Reproduces issue #138: start a live query, block on next(), then CREATE + * a record — the notification must arrive. The bug report claims it never does. + */ + @Test + void liveStreamReceivesCreateNotification() throws Exception { + AtomicReference received = new AtomicReference<>(); + AtomicReference error = new AtomicReference<>(); + CountDownLatch consuming = new CountDownLatch(1); + + try (Surreal surreal = new Surreal()) { + surreal.connect("memory").useNs("test").useDb("test"); + surreal.query("DEFINE TABLE person SCHEMALESS"); + + try (LiveStream stream = surreal.selectLive("person")) { + Thread consumer = new Thread(() -> { + try { + consuming.countDown(); + Optional n = stream.next(); + n.ifPresent(received::set); + } catch (Throwable t) { + error.set(t); + } + }); + consumer.setDaemon(true); + consumer.start(); + + assertTrue(consuming.await(2, TimeUnit.SECONDS), + "Consumer thread did not start in time"); + Thread.sleep(500); + + surreal.create(new RecordId("person", 1), Helpers.tobie); + + consumer.join(5000); + assertFalse(consumer.isAlive(), + "Consumer thread still blocked — next() never returned"); + } + + if (error.get() != null) { + fail("next() threw an exception: " + error.get()); + } + assertNotNull(received.get(), + "No notification received after CREATE"); + assertEquals("CREATE", received.get().getAction().toUpperCase()); + assertNotNull(received.get().getValue(), + "Notification value should contain the created record"); + } + } + + /** + * Reproduces issue #138 variant: live query + UPDATE should deliver + * a notification to next(). + */ + @Test + void liveStreamReceivesUpdateNotification() throws Exception { + AtomicReference received = new AtomicReference<>(); + CountDownLatch consuming = new CountDownLatch(1); + + try (Surreal surreal = new Surreal()) { + surreal.connect("memory").useNs("test").useDb("test"); + RecordId id = new RecordId("person", 1); + surreal.create(id, Helpers.tobie); + + try (LiveStream stream = surreal.selectLive("person")) { + Thread consumer = new Thread(() -> { + consuming.countDown(); + Optional n = stream.next(); + n.ifPresent(received::set); + }); + consumer.setDaemon(true); + consumer.start(); + + assertTrue(consuming.await(2, TimeUnit.SECONDS)); + Thread.sleep(200); + + surreal.update(id, UpType.MERGE, Helpers.jaime); + + consumer.join(5000); + assertFalse(consumer.isAlive(), + "Consumer thread still blocked — next() never returned after UPDATE"); + } + + assertNotNull(received.get(), + "No notification received after UPDATE"); + assertEquals("UPDATE", received.get().getAction().toUpperCase()); + } + } + + /** + * Tests issue #138 deadlock claim: close() from another thread must + * unblock a thread that is blocked on next(), without deadlocking. + */ + @Test + void closeUnblocksBlockedNext() throws Exception { + AtomicReference> result = new AtomicReference<>(); + AtomicReference error = new AtomicReference<>(); + CountDownLatch consuming = new CountDownLatch(1); + + try (Surreal surreal = new Surreal()) { + surreal.connect("memory").useNs("test").useDb("test"); + surreal.query("DEFINE TABLE person SCHEMALESS"); + + LiveStream stream = surreal.selectLive("person"); + Thread consumer = new Thread(() -> { + try { + consuming.countDown(); + result.set(stream.next()); + } catch (Throwable t) { + error.set(t); + } + }); + consumer.setDaemon(true); + consumer.start(); + + assertTrue(consuming.await(2, TimeUnit.SECONDS)); + Thread.sleep(500); + + stream.close(); + + consumer.join(5000); + assertFalse(consumer.isAlive(), + "DEADLOCK: consumer thread still alive after close() — next() never returned"); + if (error.get() != null) { + fail("next() threw instead of returning empty: " + error.get()); + } + assertNotNull(result.get(), "next() should have returned after close()"); + assertFalse(result.get().isPresent(), + "next() should return empty after stream is closed"); + } + } + + /** + * Stress test for thread safety: multiple threads call next() and close() + * concurrently. No thread should hang or crash (SEGV / use-after-free). + */ + @Test + void concurrentNextAndCloseDoesNotCrash() throws Exception { + try (Surreal surreal = new Surreal()) { + surreal.connect("memory").useNs("test").useDb("test"); + surreal.query("DEFINE TABLE person SCHEMALESS"); + CountDownLatch ready = new CountDownLatch(3); + CountDownLatch go = new CountDownLatch(1); + AtomicReference error = new AtomicReference<>(); + + LiveStream stream = surreal.selectLive("person"); + + List threads = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + Thread t = new Thread(() -> { + try { + ready.countDown(); + go.await(); + stream.next(); + } catch (Throwable t1) { + error.compareAndSet(null, t1); + } + }); + t.setDaemon(true); + t.start(); + threads.add(t); + } + + Thread closer = new Thread(() -> { + try { + ready.countDown(); + go.await(); + Thread.sleep(100); + stream.close(); + } catch (Throwable t) { + error.compareAndSet(null, t); + } + }); + closer.setDaemon(true); + closer.start(); + threads.add(closer); + + assertTrue(ready.await(2, TimeUnit.SECONDS)); + go.countDown(); + + for (Thread t : threads) { + t.join(5000); + if (t.isAlive()) { + fail("Thread " + t.getName() + " is still alive — possible deadlock"); + } + } + + if (error.get() != null) { + fail("Concurrent access caused an exception: " + error.get()); + } + } + } + /** * Placeholder for future Surreal.kill(liveQueryId) support. The query ID is * available from {@link LiveNotification#getQueryId()}, but the Java client From 29b62885a6ca0ce28f7a30a1a6c1e8352124680a Mon Sep 17 00:00:00 2001 From: Emmanuel Keller Date: Wed, 15 Apr 2026 11:19:33 +0100 Subject: [PATCH 2/4] Fix LiveStream thread safety and surface selectLive() errors eagerly Two fixes for issues around live query reliability (#138): 1. Make LiveStream.handle volatile so that close() on one thread is visible to next() on another, preventing a stale-handle race (use-after-free). 2. Add a readiness handshake in selectLive(): the background thread now signals via a std::sync::mpsc channel once the live query subscription is established (or fails). selectLive() blocks until this signal arrives, surfacing errors like "table does not exist" immediately instead of deferring them to the first next() call. --- src/main/java/com/surrealdb/LiveStream.java | 2 +- src/main/rust/surreal.rs | 23 +++++++++++++++++-- .../java/com/surrealdb/LiveQueryTests.java | 17 +++++++++++++- 3 files changed, 38 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/surrealdb/LiveStream.java b/src/main/java/com/surrealdb/LiveStream.java index c971007a..a0cc95a7 100644 --- a/src/main/java/com/surrealdb/LiveStream.java +++ b/src/main/java/com/surrealdb/LiveStream.java @@ -13,7 +13,7 @@ public class LiveStream implements AutoCloseable { Loader.loadNative(); } - private long handle; + private volatile long handle; LiveStream(long handle) { this.handle = handle; diff --git a/src/main/rust/surreal.rs b/src/main/rust/surreal.rs index e0579630..29794ef5 100644 --- a/src/main/rust/surreal.rs +++ b/src/main/rust/surreal.rs @@ -495,14 +495,19 @@ pub extern "system" fn Java_com_surrealdb_Surreal_selectLive<'local>( let table = get_rust_string!(&mut env, &table, || 0); let (tx, rx) = async_channel::unbounded(); let (shutdown_tx, shutdown_rx) = async_channel::bounded::<()>(1); + let (ready_tx, ready_rx) = + std::sync::mpsc::channel::>(); let tx_thread = tx.clone(); let surreal_clone = surreal.clone(); let join_handle = std::thread::spawn(move || { TOKIO_RUNTIME.block_on(async move { let mut stream = match surreal_clone.select(table).live().await { - Ok(s) => s, + Ok(s) => { + let _ = ready_tx.send(Ok(())); + s + } Err(e) => { - let _ = tx_thread.send(Err(e)).await; + let _ = ready_tx.send(Err(e)); return; } }; @@ -519,6 +524,20 @@ pub extern "system" fn Java_com_surrealdb_Surreal_selectLive<'local>( } }); }); + match ready_rx.recv() { + Ok(Ok(())) => {} + Ok(Err(e)) => { + let _ = join_handle.join(); + return SurrealError::from(e).exception(&mut env, || 0); + } + Err(_) => { + let _ = join_handle.join(); + return SurrealError::SurrealDBJni( + "Live query background thread exited unexpectedly".to_string(), + ) + .exception(&mut env, || 0); + } + } let recv_mutex = std::sync::Arc::new(parking_lot::Mutex::new(())); JniTypes::new_live_stream(( recv_mutex, diff --git a/src/test/java/com/surrealdb/LiveQueryTests.java b/src/test/java/com/surrealdb/LiveQueryTests.java index ab83c8de..b21e149b 100644 --- a/src/test/java/com/surrealdb/LiveQueryTests.java +++ b/src/test/java/com/surrealdb/LiveQueryTests.java @@ -5,12 +5,12 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import org.junit.jupiter.api.Disabled; @@ -28,6 +28,7 @@ public class LiveQueryTests { void selectLiveReturnsLiveStream() { try (Surreal surreal = new Surreal()) { surreal.connect("memory").useNs("test").useDb("test"); + surreal.query("DEFINE TABLE person SCHEMALESS"); try (LiveStream stream = surreal.selectLive("person")) { assertNotNull(stream); } @@ -59,6 +60,7 @@ void liveStreamNextCanBlockAndClose() throws Exception { void liveStreamCloseReleases() { try (Surreal surreal = new Surreal()) { surreal.connect("memory").useNs("test").useDb("test"); + surreal.query("DEFINE TABLE person SCHEMALESS"); try (LiveStream stream = surreal.selectLive("person")) { // No exception; closing again or using after close is undefined but we don't // crash @@ -71,6 +73,7 @@ void liveStreamCloseReleases() { void liveStreamTryWithResources() { try (Surreal surreal = new Surreal()) { surreal.connect("memory").useNs("test").useDb("test"); + surreal.query("DEFINE TABLE person SCHEMALESS"); try (LiveStream stream = surreal.selectLive("person")) { assertNotNull(stream); } @@ -270,6 +273,18 @@ void concurrentNextAndCloseDoesNotCrash() throws Exception { } } + /** + * Verifies that selectLive() surfaces subscription errors immediately + * rather than deferring them to the first next() call. + */ + @Test + void selectLiveOnNonExistentTableThrowsImmediately() { + try (Surreal surreal = new Surreal()) { + surreal.connect("memory").useNs("test").useDb("test"); + assertThrows(SurrealException.class, () -> surreal.selectLive("no_such_table")); + } + } + /** * Placeholder for future Surreal.kill(liveQueryId) support. The query ID is * available from {@link LiveNotification#getQueryId()}, but the Java client From 476b5e19b3dc732133614f7f24586a42279d9810 Mon Sep 17 00:00:00 2001 From: Emmanuel Keller Date: Wed, 15 Apr 2026 11:27:18 +0100 Subject: [PATCH 3/4] Improve inline documentation for live query code Document the threading model, locking protocol, and shutdown sequence across LiveStream.java, Surreal.java, lib.rs, live.rs, and surreal.rs so the concurrency invariants are explicit and reviewable. --- src/main/java/com/surrealdb/LiveStream.java | 43 ++++++++++++++-- src/main/java/com/surrealdb/Surreal.java | 22 ++++++--- src/main/rust/lib.rs | 35 +++++++++++-- src/main/rust/live.rs | 54 +++++++++++++++++---- src/main/rust/surreal.rs | 41 ++++++++++++++++ 5 files changed, 170 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/surrealdb/LiveStream.java b/src/main/java/com/surrealdb/LiveStream.java index a0cc95a7..35b1f82b 100644 --- a/src/main/java/com/surrealdb/LiveStream.java +++ b/src/main/java/com/surrealdb/LiveStream.java @@ -3,9 +3,27 @@ import java.util.Optional; /** - * Blocking iterator over live query notifications. Call {@link #next()} in a - * loop and {@link #close()} when done. Implements {@link AutoCloseable} for - * use in try-with-resources. + * Blocking iterator over live query notifications returned by + * {@link Surreal#selectLive(String)}. + * + *

Typical usage: + *

{@code
+ * try (LiveStream stream = surreal.selectLive("person")) {
+ *     while (true) {
+ *         Optional n = stream.next();
+ *         if (!n.isPresent()) break;   // stream closed
+ *         process(n.get());
+ *     }
+ * }
+ * }
+ * + *

Thread safety: {@link #next()} may be called from one thread while + * {@link #close()} is called from another. The {@code close()} call will + * unblock any thread currently waiting inside {@code next()}. The native + * handle is declared {@code volatile} so that the zeroing performed by + * {@code close()} is immediately visible to concurrent {@code next()} callers. + * Concurrent calls to {@code next()} from multiple threads are serialized by + * a mutex in the native layer. */ public class LiveStream implements AutoCloseable { @@ -13,6 +31,12 @@ public class LiveStream implements AutoCloseable { Loader.loadNative(); } + /** + * Pointer to the native {@code LiveStreamChannel}. Zeroed by + * {@link #close()} after the native resources have been released. Declared + * {@code volatile} so that a {@code close()} on one thread is visible to a + * concurrent {@code next()} on another thread. + */ private volatile long handle; LiveStream(long handle) { @@ -22,7 +46,14 @@ public class LiveStream implements AutoCloseable { /** * Blocks until the next notification is available, or the stream ends. * + *

Returns {@link Optional#empty()} when the stream has been closed + * (either explicitly via {@link #close()} or because the server ended the + * live query). If the underlying live query encounters an error, a + * {@link SurrealException} is thrown. + * * @return the next notification, or empty if the stream has ended + * @throws SurrealException + * if the live query encounters an error */ public Optional next() { if (handle == 0) { @@ -33,7 +64,11 @@ public Optional next() { } /** - * Releases the live query and stops receiving notifications. Idempotent. + * Releases the live query and stops receiving notifications. + * + *

If another thread is blocked inside {@link #next()}, it will be + * unblocked and will return {@link Optional#empty()}. This method is + * idempotent: calling it more than once has no effect. */ @Override public void close() { diff --git a/src/main/java/com/surrealdb/Surreal.java b/src/main/java/com/surrealdb/Surreal.java index b6c2eb7b..48de41dc 100644 --- a/src/main/java/com/surrealdb/Surreal.java +++ b/src/main/java/com/surrealdb/Surreal.java @@ -265,15 +265,25 @@ public boolean importSql(String path) { } /** - * Starts a live query on the given table. Returns a blocking stream of - * notifications (CREATE, UPDATE, DELETE). Call {@link LiveStream#next()} in a - * loop and {@link LiveStream#close()} when done. + * Starts a live query on the given table and returns a blocking stream of + * notifications (CREATE, UPDATE, DELETE). + * + *

This method blocks until the live query subscription is fully + * established on the server. If the subscription fails (e.g. the table does + * not exist), a {@link SurrealException} is thrown immediately rather than + * being deferred to the first {@link LiveStream#next()} call. + * + *

Call {@link LiveStream#next()} in a loop to receive notifications and + * {@link LiveStream#close()} when done. The returned stream implements + * {@link AutoCloseable} for use in try-with-resources. * * @param table - * table name to watch - * @return a LiveStream; must call {@link LiveStream#close()} when done + * table name to watch (must already exist) + * @return a LiveStream; the caller must call {@link LiveStream#close()} when + * done * @throws SurrealException - * if live queries are not supported or the request fails + * if live queries are not supported, the table does not exist, + * or the subscription fails */ public LiveStream selectLive(String table) { return new LiveStream(selectLive(getPtr(), table)); diff --git a/src/main/rust/lib.rs b/src/main/rust/lib.rs index 159ff1d2..051f5704 100644 --- a/src/main/rust/lib.rs +++ b/src/main/rust/lib.rs @@ -15,15 +15,40 @@ use surrealdb::method::Transaction; use surrealdb::types::Value; use surrealdb::{Connection, IndexedResults, Surreal}; -/// Item type for the live query channel (Result>). +/// Item type for the live query notification channel. pub(crate) type LiveNotificationResult = std::result::Result, surrealdb::Error>; -/// Stored as handle for live streams. recv_mutex is held by nextNative during recv() so that -/// releaseNative can wait for no thread in recv() before taking and dropping the receiver. -/// join_handle, shutdown_tx and rx are in Mutex> so releaseNative can take/drop them via get_instance. +/// Native handle backing a Java `LiveStream` instance. +/// +/// Created by `selectLive` (in surreal.rs) after the live query subscription +/// is confirmed, and freed by `releaseNative` (in live.rs) when the Java side +/// calls `close()`. +/// +/// ## Fields (tuple elements) +/// +/// 0. **`recv_mutex`** (`Arc>`) — held by `nextNative` for the +/// entire duration of the blocking `recv()` call. `releaseNative` acquires +/// it *after* the channel has been closed so it can be sure no thread is +/// still inside `recv()` before freeing the handle. +/// +/// 1. **`join_handle`** (`Mutex>`) — the background thread +/// that reads from the SurrealDB live-query stream and forwards +/// notifications into the async channel. Taken and joined by +/// `releaseNative` during shutdown. +/// +/// 2. **`shutdown_tx`** (`Mutex>>`) — dropping this sender +/// signals the background thread (via `tokio::select!`) to exit. +/// +/// 3. **`rx`** (`Mutex>>`) — the +/// receiving end of the notification channel, read by `nextNative`. +/// +/// ## Lock ordering +/// +/// Both `nextNative` and `releaseNative` acquire `recv_mutex` **before** +/// `rx`, ensuring a consistent ordering and preventing deadlocks. pub(crate) type LiveStreamChannel = ( - std::sync::Arc>, // held during recv() + std::sync::Arc>, parking_lot::Mutex>>, parking_lot::Mutex>>, parking_lot::Mutex>>, diff --git a/src/main/rust/live.rs b/src/main/rust/live.rs index f3f888df..a3455a4e 100644 --- a/src/main/rust/live.rs +++ b/src/main/rust/live.rs @@ -7,6 +7,24 @@ use jni::JNIEnv; use crate::error::SurrealError; use crate::{get_instance, new_string, take_instance, JniTypes, LiveStreamChannel, TOKIO_RUNTIME}; +/// JNI implementation of `LiveStream.nextNative(long handle)`. +/// +/// Blocks the calling thread until a live-query notification arrives or the +/// stream ends. Returns a `LiveNotification` jobject, or `null` when the +/// stream has been closed (by `releaseNative` or because the server ended the +/// live query). +/// +/// ## Locking protocol +/// +/// 1. Acquire `recv_mutex` — serializes concurrent `nextNative` calls and +/// lets `releaseNative` know when no thread is inside `recv()`. +/// 2. Acquire `rx_mux` — borrows the channel receiver for the blocking call. +/// 3. Call `block_on(rx_ref.recv())` — parks the thread until a message +/// arrives or all senders are dropped (channel closed). +/// +/// Both guards are held for the duration of the `recv()`. This is safe +/// because `releaseNative` only acquires these locks *after* the channel has +/// been closed, guaranteeing `recv()` will have already returned. #[no_mangle] pub extern "system" fn Java_com_surrealdb_LiveStream_nextNative<'local>( mut env: JNIEnv<'local>, @@ -22,17 +40,16 @@ pub extern "system" fn Java_com_surrealdb_LiveStream_nextNative<'local>( let rx_opt_guard = rx_mux.lock(); let rx_ref = match rx_opt_guard.as_ref() { Some(rx) => rx, - None => return JObject::null().into_raw(), // already released + None => return JObject::null().into_raw(), }; let item = match TOKIO_RUNTIME.block_on(rx_ref.recv()) { Ok(item) => item, - Err(_) => return JObject::null().into_raw(), // channel closed + Err(_) => return JObject::null().into_raw(), }; let notification = match item { Ok(n) => n, Err(e) => return SurrealError::from(e).exception(&mut env, || std::ptr::null_mut()), }; - // Build Java LiveNotification(action, valuePtr, queryId) let action_raw = new_string!(&mut env, notification.action.to_string(), || { std::ptr::null_mut() }); @@ -57,6 +74,27 @@ pub extern "system" fn Java_com_surrealdb_LiveStream_nextNative<'local>( } } +/// JNI implementation of `LiveStream.releaseNative(long handle)`. +/// +/// Shuts down the live query: stops the background thread, waits for any +/// in-progress `nextNative` call to finish, then frees the native handle. +/// +/// ## Shutdown sequence +/// +/// 1. **Drop `shutdown_tx`** — the background thread's `tokio::select!` loop +/// detects the closed shutdown channel and breaks. This also causes the +/// background thread to drop its `tx_thread` sender, closing the +/// notification channel. +/// 2. **Join the background thread** — ensures `tx_thread` has been dropped +/// and the channel is fully closed before proceeding. +/// 3. **Acquire `recv_mutex`** — at this point the channel is closed, so any +/// `nextNative` call blocked on `recv()` has already returned and released +/// the mutex. Acquiring it here is a final safety barrier. +/// 4. **Take the receiver** — drops it while holding `recv_mutex`, preventing +/// any new `recv()` attempt. +/// 5. **`take_instance`** — reclaims the boxed `LiveStreamChannel`, freeing +/// the allocation. The Java side zeroes its `handle` field after this +/// call returns, preventing further native access. #[no_mangle] pub extern "system" fn Java_com_surrealdb_LiveStream_releaseNative<'local>( _env: JNIEnv<'local>, @@ -66,21 +104,17 @@ pub extern "system" fn Java_com_surrealdb_LiveStream_releaseNative<'local>( if handle_ptr == 0 { return; } - // Do NOT take_instance yet: another thread may be in nextNative (get_instance + recv). - // First close the channel via the stored sender, join the background thread, then acquire - // the recv mutex (so no thread is in recv()), then take_instance and drop the receiver. let channel_ref = match get_instance::(handle_ptr, JniTypes::LiveStream) { Ok(r) => r, Err(_) => return, }; let (recv_mutex, join_handle_mux, shutdown_tx_mux, rx_mux) = channel_ref; - drop(shutdown_tx_mux.lock().take()); // drop sender so background thread exits and channel closes + drop(shutdown_tx_mux.lock().take()); if let Some(join_handle) = join_handle_mux.lock().take() { let _ = join_handle.join(); } - let _recv_guard = recv_mutex.lock(); // wait until any thread in nextNative has left recv() - let _rx = rx_mux.lock().take(); // take and drop receiver while holding recv_guard + let _recv_guard = recv_mutex.lock(); + let _rx = rx_mux.lock().take(); drop(_recv_guard); let _ = take_instance::(handle_ptr, JniTypes::LiveStream); - // free the box } diff --git a/src/main/rust/surreal.rs b/src/main/rust/surreal.rs index 29794ef5..c255f429 100644 --- a/src/main/rust/surreal.rs +++ b/src/main/rust/surreal.rs @@ -484,6 +484,39 @@ pub extern "system" fn Java_com_surrealdb_Surreal_run<'local>( JniTypes::new_value(Arc::new(result)) } +/// JNI implementation of `Surreal.selectLive(long ptr, String table)`. +/// +/// Sets up a live query on `table` and returns a `LiveStreamChannel` handle +/// that the Java `LiveStream` wrapper reads from. +/// +/// ## Architecture +/// +/// ```text +/// Java thread Background thread SurrealDB engine +/// ─────────── ───────────────── ──────────────── +/// selectLive() +/// ├─ spawn ──────▶ .select(table).live().await ──▶ subscribe +/// │ │ +/// │ ready_rx.recv() │ ready_tx.send(Ok/Err) +/// │◀──────────────────┤ +/// │ │ +/// ▼ ▼ +/// return handle loop { select! { notification ──▶ tx_thread.send() } } +/// │ +/// nextNative() │ +/// rx.recv() ◀─────────────────┘ +/// ``` +/// +/// A dedicated OS thread runs `block_on` on the shared tokio runtime to drive +/// the live-query stream. Notifications are forwarded through an unbounded +/// `async_channel` to the Java side, which reads them via `nextNative`. +/// +/// ## Readiness handshake +/// +/// The background thread signals via a `std::sync::mpsc` channel once the +/// subscription is established (`Ok(())`) or has failed (`Err(e)`). +/// `selectLive` blocks on this signal so that errors (e.g. table does not +/// exist) are thrown at call site instead of being deferred to `next()`. #[no_mangle] pub extern "system" fn Java_com_surrealdb_Surreal_selectLive<'local>( mut env: JNIEnv<'local>, @@ -493,10 +526,15 @@ pub extern "system" fn Java_com_surrealdb_Surreal_selectLive<'local>( ) -> jlong { let surreal = get_surreal_ref!(&mut env, ptr, || 0); let table = get_rust_string!(&mut env, &table, || 0); + + // Notification channel: background thread produces, nextNative consumes. let (tx, rx) = async_channel::unbounded(); + // Shutdown channel: dropping shutdown_tx signals the background thread to exit. let (shutdown_tx, shutdown_rx) = async_channel::bounded::<()>(1); + // Readiness channel: background thread confirms subscription before we return. let (ready_tx, ready_rx) = std::sync::mpsc::channel::>(); + let tx_thread = tx.clone(); let surreal_clone = surreal.clone(); let join_handle = std::thread::spawn(move || { @@ -524,6 +562,8 @@ pub extern "system" fn Java_com_surrealdb_Surreal_selectLive<'local>( } }); }); + + // Block until the subscription is confirmed or fails. match ready_rx.recv() { Ok(Ok(())) => {} Ok(Err(e)) => { @@ -538,6 +578,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_selectLive<'local>( .exception(&mut env, || 0); } } + let recv_mutex = std::sync::Arc::new(parking_lot::Mutex::new(())); JniTypes::new_live_stream(( recv_mutex, From 4a7e1413a11163c37add01ac5c162d78488dbdbe Mon Sep 17 00:00:00 2001 From: Emmanuel Keller Date: Wed, 15 Apr 2026 11:33:29 +0100 Subject: [PATCH 4/4] Fix all clippy warnings in Rust native layer Apply 43 auto-fixes and 2 targeted suppressions: - Remove redundant closures (|| std::ptr::null_mut -> std::ptr::null_mut) - Remove needless borrows (&surreal -> surreal) - Remove useless conversions (SurrealError::from(e) where e is already SurrealError, .map(JObject::from) after .l().ok()) - Replace manual match-to-Option with .ok() - Suppress redundant_closure on build_server_exception where the fix breaks JObject lifetime inference - Suppress too_many_arguments on up_record_id_range_value (JNI bridge functions mirror Java signatures) --- src/main/rust/error.rs | 20 ++++--------- src/main/rust/live.rs | 8 ++--- src/main/rust/surreal.rs | 63 ++++++++++++++++++++-------------------- 3 files changed, 41 insertions(+), 50 deletions(-) diff --git a/src/main/rust/error.rs b/src/main/rust/error.rs index 3146062b..68d3ab86 100644 --- a/src/main/rust/error.rs +++ b/src/main/rust/error.rs @@ -29,7 +29,7 @@ fn details_value(error: &surrealdb::Error) -> Value { if let Value::Object(ref map) = value { let kind_matches = map .get("kind") - .map(|v| matches!(v, Value::String(s) if s.to_string() == kind_str)) + .map(|v| matches!(v, Value::String(s) if *s == kind_str)) .unwrap_or(false); if kind_matches { if let Some(inner) = map.get("details") { @@ -58,7 +58,6 @@ fn value_to_jobject<'a>(env: &mut JNIEnv<'a>, value: &Value) -> Option { let class = env.find_class("java/util/LinkedHashMap").ok()?; @@ -100,7 +99,6 @@ fn number_to_jobject<'a>(env: &mut JNIEnv<'a>, n: &Number) -> Option env.call_static_method(class, "valueOf", "(J)Ljava/lang/Long;", &[JValue::Long(*i)]) .ok() .and_then(|v| v.l().ok()) - .map(JObject::from) } Number::Float(f) => { let class = env.find_class("java/lang/Double").ok()?; @@ -112,7 +110,6 @@ fn number_to_jobject<'a>(env: &mut JNIEnv<'a>, n: &Number) -> Option ) .ok() .and_then(|v| v.l().ok()) - .map(JObject::from) } Number::Decimal(d) => { let class = env.find_class("java/lang/Double").ok()?; @@ -125,7 +122,6 @@ fn number_to_jobject<'a>(env: &mut JNIEnv<'a>, n: &Number) -> Option ) .ok() .and_then(|v| v.l().ok()) - .map(JObject::from) } } } @@ -136,6 +132,7 @@ fn number_to_jobject<'a>(env: &mut JNIEnv<'a>, n: &Number) -> Option /// Matches on the Rust SDK's ErrorDetails enum to choose the Java exception class and /// ErrorKind enum. Base ServerException uses (ErrorKind, String rawKindIfUnknown, message, details, cause); /// subclasses use (String message, Object details, ServerException cause). +#[allow(clippy::redundant_closure)] // JObject::null as fn ptr produces 'static, breaking the 'a lifetime fn build_server_exception<'a>( env: &mut JNIEnv<'a>, error: &surrealdb::Error, @@ -201,8 +198,7 @@ fn build_server_exception<'a>( .get_static_field(&enum_class, enum_name, "Lcom/surrealdb/ErrorKind;") .ok()? .l() - .ok() - .map(JObject::from)?; + .ok()?; let raw_kind_jstr = match raw_kind_for_unknown { Some(s) => env .new_string(s) @@ -219,10 +215,7 @@ fn build_server_exception<'a>( JValue::Object(&details_obj), JValue::Object(&java_cause), ]; - match env.new_object(class, sig, &args) { - Ok(obj) => Some(obj), - Err(_) => None, - } + env.new_object(class, sig, &args).ok() } else { // Subclass(String message, Object details, ServerException cause) let sig = "(Ljava/lang/String;Ljava/lang/Object;Lcom/surrealdb/ServerException;)V"; @@ -231,10 +224,7 @@ fn build_server_exception<'a>( JValue::Object(&details_obj), JValue::Object(&java_cause), ]; - match env.new_object(class, sig, &args) { - Ok(obj) => Some(obj), - Err(_) => None, - } + env.new_object(class, sig, &args).ok() } } diff --git a/src/main/rust/live.rs b/src/main/rust/live.rs index a3455a4e..4260ac19 100644 --- a/src/main/rust/live.rs +++ b/src/main/rust/live.rs @@ -34,7 +34,7 @@ pub extern "system" fn Java_com_surrealdb_LiveStream_nextNative<'local>( let (recv_mutex, _join_handle_mux, _shutdown_tx_mux, rx_mux) = match get_instance::(handle_ptr, JniTypes::LiveStream) { Ok(r) => r, - Err(e) => return e.exception(&mut env, || std::ptr::null_mut()), + Err(e) => return e.exception(&mut env, std::ptr::null_mut), }; let _recv_guard = recv_mutex.lock(); let rx_opt_guard = rx_mux.lock(); @@ -48,7 +48,7 @@ pub extern "system" fn Java_com_surrealdb_LiveStream_nextNative<'local>( }; let notification = match item { Ok(n) => n, - Err(e) => return SurrealError::from(e).exception(&mut env, || std::ptr::null_mut()), + Err(e) => return SurrealError::from(e).exception(&mut env, std::ptr::null_mut), }; let action_raw = new_string!(&mut env, notification.action.to_string(), || { std::ptr::null_mut() @@ -61,7 +61,7 @@ pub extern "system" fn Java_com_surrealdb_LiveStream_nextNative<'local>( let query_id_str = unsafe { JObject::from_raw(query_id_raw) }; let class = match env.find_class("com/surrealdb/LiveNotification") { Ok(c) => c, - Err(e) => return SurrealError::from(e).exception(&mut env, || std::ptr::null_mut()), + Err(e) => return SurrealError::from(e).exception(&mut env, std::ptr::null_mut), }; let args = [ JValue::Object(&action_str), @@ -70,7 +70,7 @@ pub extern "system" fn Java_com_surrealdb_LiveStream_nextNative<'local>( ]; match env.new_object(class, "(Ljava/lang/String;JLjava/lang/String;)V", &args) { Ok(obj) => obj.into_raw(), - Err(e) => SurrealError::from(e).exception(&mut env, || std::ptr::null_mut()), + Err(e) => SurrealError::from(e).exception(&mut env, std::ptr::null_mut), } } diff --git a/src/main/rust/surreal.rs b/src/main/rust/surreal.rs index c255f429..88802017 100644 --- a/src/main/rust/surreal.rs +++ b/src/main/rust/surreal.rs @@ -183,7 +183,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_signinRoot<'local>( let refresh = token.refresh.map(|r| r.into_insecure_token()); match new_token_object(&mut env, access, refresh) { Ok(obj) => obj, - Err(e) => SurrealError::from(e).exception(&mut env, null_mut), + Err(e) => e.exception(&mut env, null_mut), } } Err(err) => SurrealError::from(err).exception(&mut env, null_mut), @@ -217,7 +217,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_signinNamespace<'local>( let refresh = token.refresh.map(|r| r.into_insecure_token()); match new_token_object(&mut env, access, refresh) { Ok(obj) => obj, - Err(e) => SurrealError::from(e).exception(&mut env, null_mut), + Err(e) => e.exception(&mut env, null_mut), } } Err(err) => SurrealError::from(err).exception(&mut env, null_mut), @@ -254,7 +254,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_signinDatabase<'local>( let refresh = token.refresh.map(|r| r.into_insecure_token()); match new_token_object(&mut env, access, refresh) { Ok(obj) => obj, - Err(e) => SurrealError::from(e).exception(&mut env, null_mut), + Err(e) => e.exception(&mut env, null_mut), } } Err(err) => SurrealError::from(err).exception(&mut env, null_mut), @@ -288,7 +288,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_signup<'local>( let refresh = token.refresh.map(|r| r.into_insecure_token()); match new_token_object(&mut env, access_str, refresh) { Ok(obj) => obj, - Err(e) => SurrealError::from(e).exception(&mut env, null_mut), + Err(e) => e.exception(&mut env, null_mut), } } Err(err) => SurrealError::from(err).exception(&mut env, null_mut), @@ -322,7 +322,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_signinRecord<'local>( let refresh = token.refresh.map(|r| r.into_insecure_token()); match new_token_object(&mut env, access_str, refresh) { Ok(obj) => obj, - Err(e) => SurrealError::from(e).exception(&mut env, null_mut), + Err(e) => e.exception(&mut env, null_mut), } } Err(err) => SurrealError::from(err).exception(&mut env, null_mut), @@ -369,7 +369,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_useNs<'local>( match TOKIO_RUNTIME.block_on(async { surreal.use_ns(ns).await }) { Ok((namespace, database)) => match new_ns_db_object(&mut env, namespace, database) { Ok(obj) => obj, - Err(e) => SurrealError::from(e).exception(&mut env, null_mut), + Err(e) => e.exception(&mut env, null_mut), }, Err(err) => SurrealError::from(err).exception(&mut env, null_mut), } @@ -387,7 +387,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_useDb<'local>( match TOKIO_RUNTIME.block_on(async { surreal.use_db(db).await }) { Ok((namespace, database)) => match new_ns_db_object(&mut env, namespace, database) { Ok(obj) => obj, - Err(e) => SurrealError::from(e).exception(&mut env, null_mut), + Err(e) => e.exception(&mut env, null_mut), }, Err(err) => SurrealError::from(err).exception(&mut env, null_mut), } @@ -403,7 +403,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_useDefaults<'local>( match TOKIO_RUNTIME.block_on(async { surreal.use_defaults().await }) { Ok((namespace, database)) => match new_ns_db_object(&mut env, namespace, database) { Ok(obj) => obj, - Err(e) => SurrealError::from(e).exception(&mut env, null_mut), + Err(e) => e.exception(&mut env, null_mut), }, Err(err) => SurrealError::from(err).exception(&mut env, null_mut), } @@ -421,7 +421,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_query<'local>( // Retrieve the query let query = get_rust_string!(&mut env, &query, || 0); // Execute the query - let res = surrealdb_query::<()>(&surreal, &query, None); + let res = surrealdb_query::<()>(surreal, &query, None); // Check the result let res = check_query_result!(&mut env, res, || 0); // Build a response instance @@ -449,7 +449,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_queryBind<'local>( params_map.insert(key, value.clone()); } - let res = surrealdb_query::(&surreal, &query, Some(params_map)); + let res = surrealdb_query::(surreal, &query, Some(params_map)); let res = check_query_result!(&mut env, res, || 0); // Build a response instance JniTypes::new_response(Arc::new(Mutex::new(res))) @@ -476,7 +476,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_run<'local>( } let args_list = placeholders.join(", "); let query = format!("RETURN {}({})", name, args_list); - let res = surrealdb_query::(&surreal, &query, Some(params_map)); + let res = surrealdb_query::(surreal, &query, Some(params_map)); let mut response = check_query_result!(&mut env, res, || 0); let mut result = take_one_result!(&mut env, response, || 0); return_value_array_first!(result); @@ -655,7 +655,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_createRecordIdValue<'local>( // Execute the query let query = format!("CREATE {} CONTENT $val", record_id.to_sql()); let params = BTreeMap::from([("val".to_string(), value.clone())]); - let res = surrealdb_query(&surreal, &query, Some(params)); + let res = surrealdb_query(surreal, &query, Some(params)); // Check the result let mut response = check_query_result!(&mut env, res, || 0); // There is only one statement @@ -690,7 +690,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_createTargetValues<'local>( } let query = queries.join(";\n"); // Execute the query - let res = surrealdb_query(&surreal, &query, Some(params)); + let res = surrealdb_query(surreal, &query, Some(params)); // Check the result let mut res = check_query_result!(&mut env, res, null_mut); // Prepare the result @@ -737,7 +737,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_insertTargetValues<'local>( } let query = format!("INSERT INTO {} [ {} ]", target, records.join(" , ")); // Execute the query - let res = surrealdb_query::<()>(&surreal, &query, None); + let res = surrealdb_query::<()>(surreal, &query, None); // Check the result let mut response = check_query_result!(&mut env, res, null_mut); // There is only one statement @@ -772,7 +772,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_insertRelationTargetValue<'loc let value = get_value_mut_instance!(&mut env, value_ptr, || 0); // Execute the query let query = format!("INSERT RELATION INTO {} {}", target, value.to_sql()); - let res = surrealdb_query::<()>(&surreal, &query, None); + let res = surrealdb_query::<()>(surreal, &query, None); // Check the result let mut response = check_query_result!(&mut env, res, || 0); // There is only one statement @@ -809,7 +809,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_insertRelationTargetValues<'lo records.join(" , ") ); // Execute the query - let res = surrealdb_query::<()>(&surreal, &query, None); + let res = surrealdb_query::<()>(surreal, &query, None); // Check the result let mut response = check_query_result!(&mut env, res, null_mut); // There is only one statement @@ -850,7 +850,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_relate<'local>( ("from".to_string(), from_value), ("to".to_string(), to_value), ]); - let res = surrealdb_query(&surreal, &query, Some(params)); + let res = surrealdb_query(surreal, &query, Some(params)); // Check the result let mut response = check_query_result!(&mut env, res, || 0); // There is only one statement @@ -889,7 +889,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_relateContent<'local>( ("from".to_string(), from_value), ("to".to_string(), to_value), ]); - let res = surrealdb_query(&surreal, &query, Some(params)); + let res = surrealdb_query(surreal, &query, Some(params)); // Check the result let mut response = check_query_result!(&mut env, res, || 0); // There is only one statement @@ -913,7 +913,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_selectRecordId<'local>( let record_id = get_value_instance!(&mut env, record_id_ptr, || 0); // Execute the query let query = format!("SELECT * FROM {}", record_id.to_sql()); - let res = surrealdb_query::<()>(&surreal, &query, None); + let res = surrealdb_query::<()>(surreal, &query, None); // Check the result let mut res = check_query_result!(&mut env, res, || 0); // There is only one statement @@ -949,7 +949,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_selectRecordIds<'local>( } // Execute the query let query = format!("SELECT * FROM {}", record_ids.join(",")); - let res = surrealdb_query::<()>(&surreal, &query, None); + let res = surrealdb_query::<()>(surreal, &query, None); // Check the result let mut res = check_query_result!(&mut env, res, null_mut); // There is only one statement @@ -983,7 +983,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_selectTargetsValues<'local>( // Prepare the query let query = format!("SELECT * FROM {}", targets.join(",")); // Execute the query - let res = surrealdb_query::<()>(&surreal, &query, None); + let res = surrealdb_query::<()>(surreal, &query, None); // Check the result let mut response = check_query_result!(&mut env, res, || 0); // There is only one statement @@ -1008,7 +1008,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_selectTargetsValuesSync<'local // Prepare the query let query = format!("SELECT * FROM {}", targets.join(",")); // Execute the query - let res = surrealdb_query::<()>(&surreal, &query, None); + let res = surrealdb_query::<()>(surreal, &query, None); // Check the result let mut response = check_query_result!(&mut env, res, || 0); // There is only one statement @@ -1033,7 +1033,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_deleteRecordId<'local>( // Prepare the params let params = BTreeMap::from([("t".to_string(), record_id)]); // Execute the query - let res = surrealdb_query(&surreal, "DELETE $t", Some(params)); + let res = surrealdb_query(surreal, "DELETE $t", Some(params)); // Check the result check_query_result!(&mut env, res, || false as jboolean); true as jboolean @@ -1061,7 +1061,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_deleteRecordIds<'local>( // Prepare the query let query = format!("DELETE {}", targets.join(",")); // Execute the query - let res = surrealdb_query(&surreal, &query, Some(params)); + let res = surrealdb_query(surreal, &query, Some(params)); // Check the result check_query_result!(&mut env, res, || 0); true as jboolean @@ -1083,7 +1083,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_selectRecordIdRange<'local>( Err(e) => return e.exception(&mut env, null_mut), }; let params = BTreeMap::from([("_range".to_string(), range_value)]); - let res = surrealdb_query(&surreal, "SELECT * FROM $_range", Some(params)); + let res = surrealdb_query(surreal, "SELECT * FROM $_range", Some(params)); let mut res = check_query_result!(&mut env, res, null_mut); let res = take_one_result!(&mut env, res, null_mut); if let Value::Array(a) = res { @@ -1115,7 +1115,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_deleteRecordIdRange<'local>( Err(e) => return e.exception(&mut env, || false as jboolean), }; let params = BTreeMap::from([("_range".to_string(), range_value)]); - let res = surrealdb_query(&surreal, "DELETE $_range", Some(params)); + let res = surrealdb_query(surreal, "DELETE $_range", Some(params)); check_query_result!(&mut env, res, || false as jboolean); true as jboolean } @@ -1134,7 +1134,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_deleteTarget<'local>( // Prepare the query let query = format!("DELETE FROM {}", target); // Execute the query - let res = surrealdb_query::<()>(&surreal, &query, None); + let res = surrealdb_query::<()>(surreal, &query, None); // Check the result check_query_result!(&mut env, res, || false as jboolean); true as jboolean @@ -1198,7 +1198,7 @@ fn up_record_id_value( // Execute the query let query = format!("{up} {} {up_type} $val", record_id.to_sql()); let params = BTreeMap::from([("val".to_string(), value.clone())]); - let res = surrealdb_query(&surreal, &query, Some(params)); + let res = surrealdb_query(surreal, &query, Some(params)); // Check the result let mut response = check_query_result!(&mut env, res, || 0); // There is only one statement @@ -1247,6 +1247,7 @@ pub extern "system" fn Java_com_surrealdb_Surreal_upsertRecordIdValue<'local>( ) } +#[allow(clippy::too_many_arguments)] fn up_record_id_range_value( mut env: JNIEnv, surreal_ptr: jlong, @@ -1269,7 +1270,7 @@ fn up_record_id_range_value( params.insert("_range".to_string(), range_value); params.insert("val".to_string(), value.clone()); let query = format!("{up} $_range {up_type} $val"); - let res = surrealdb_query(&surreal, &query, Some(params)); + let res = surrealdb_query(surreal, &query, Some(params)); let mut response = check_query_result!(&mut env, res, || 0); let mut result: Value = take_one_result!(&mut env, response, || 0); return_value_array_first!(result); @@ -1348,7 +1349,7 @@ fn up_target_value( // Execute the query let query = format!("{up} {} {up_type} $val", target); let params = BTreeMap::from([("val".to_string(), value.clone())]); - let res = surrealdb_query(&surreal, &query, Some(params)); + let res = surrealdb_query(surreal, &query, Some(params)); // Check the result let mut response = check_query_result!(&mut env, res, || 0); // There is only one statement @@ -1402,7 +1403,7 @@ fn up_target_value_sync<'local>( // Execute the query let query = format!("{up} {} {up_type} $val", target); let params = BTreeMap::from([("val".to_string(), value.clone())]); - let res = surrealdb_query(&surreal, &query, Some(params)); + let res = surrealdb_query(surreal, &query, Some(params)); // Check the result let mut response = check_query_result!(&mut env, res, || 0); // There is only one statement