Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 40 additions & 5 deletions src/main/java/com/surrealdb/LiveStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,41 @@
import java.util.Optional;

/**
* Blocking iterator over live query notifications. Call {@link #next()} in a
* loop and {@link #close()} when done. Implements {@link AutoCloseable} for
* use in try-with-resources.
* Blocking iterator over live query notifications returned by
* {@link Surreal#selectLive(String)}.
*
* <p>Typical usage:
* <pre>{@code
* try (LiveStream stream = surreal.selectLive("person")) {
* while (true) {
* Optional<LiveNotification> n = stream.next();
* if (!n.isPresent()) break; // stream closed
* process(n.get());
* }
* }
* }</pre>
*
* <p><b>Thread safety:</b> {@link #next()} may be called from one thread while
* {@link #close()} is called from another. The {@code close()} call will
* unblock any thread currently waiting inside {@code next()}. The native
* handle is declared {@code volatile} so that the zeroing performed by
* {@code close()} is immediately visible to concurrent {@code next()} callers.
* Concurrent calls to {@code next()} from multiple threads are serialized by
* a mutex in the native layer.
*/
public class LiveStream implements AutoCloseable {

static {
Loader.loadNative();
}

private long handle;
/**
* Pointer to the native {@code LiveStreamChannel}. Zeroed by
* {@link #close()} after the native resources have been released. Declared
* {@code volatile} so that a {@code close()} on one thread is visible to a
* concurrent {@code next()} on another thread.
*/
private volatile long handle;

LiveStream(long handle) {
this.handle = handle;
Expand All @@ -22,7 +46,14 @@ public class LiveStream implements AutoCloseable {
/**
* Blocks until the next notification is available, or the stream ends.
*
* <p>Returns {@link Optional#empty()} when the stream has been closed
* (either explicitly via {@link #close()} or because the server ended the
* live query). If the underlying live query encounters an error, a
* {@link SurrealException} is thrown.
*
* @return the next notification, or empty if the stream has ended
* @throws SurrealException
* if the live query encounters an error
*/
public Optional<LiveNotification> next() {
if (handle == 0) {
Expand All @@ -33,7 +64,11 @@ public Optional<LiveNotification> next() {
}

/**
* Releases the live query and stops receiving notifications. Idempotent.
* Releases the live query and stops receiving notifications.
*
* <p>If another thread is blocked inside {@link #next()}, it will be
* unblocked and will return {@link Optional#empty()}. This method is
* idempotent: calling it more than once has no effect.
*/
@Override
public void close() {
Expand Down
22 changes: 16 additions & 6 deletions src/main/java/com/surrealdb/Surreal.java
Original file line number Diff line number Diff line change
Expand Up @@ -265,15 +265,25 @@ public boolean importSql(String path) {
}

/**
* Starts a live query on the given table. Returns a blocking stream of
* notifications (CREATE, UPDATE, DELETE). Call {@link LiveStream#next()} in a
* loop and {@link LiveStream#close()} when done.
* Starts a live query on the given table and returns a blocking stream of
* notifications (CREATE, UPDATE, DELETE).
*
* <p>This method blocks until the live query subscription is fully
* established on the server. If the subscription fails (e.g. the table does
* not exist), a {@link SurrealException} is thrown immediately rather than
* being deferred to the first {@link LiveStream#next()} call.
*
* <p>Call {@link LiveStream#next()} in a loop to receive notifications and
* {@link LiveStream#close()} when done. The returned stream implements
* {@link AutoCloseable} for use in try-with-resources.
*
* @param table
* table name to watch
* @return a LiveStream; must call {@link LiveStream#close()} when done
* table name to watch (must already exist)
* @return a LiveStream; the caller must call {@link LiveStream#close()} when
* done
* @throws SurrealException
* if live queries are not supported or the request fails
* if live queries are not supported, the table does not exist,
* or the subscription fails
*/
public LiveStream selectLive(String table) {
return new LiveStream(selectLive(getPtr(), table));
Expand Down
20 changes: 5 additions & 15 deletions src/main/rust/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ fn details_value(error: &surrealdb::Error) -> Value {
if let Value::Object(ref map) = value {
let kind_matches = map
.get("kind")
.map(|v| matches!(v, Value::String(s) if s.to_string() == kind_str))
.map(|v| matches!(v, Value::String(s) if *s == kind_str))
.unwrap_or(false);
if kind_matches {
if let Some(inner) = map.get("details") {
Expand Down Expand Up @@ -58,7 +58,6 @@ fn value_to_jobject<'a>(env: &mut JNIEnv<'a>, value: &Value) -> Option<JObject<'
)
.ok()
.and_then(|v| v.l().ok())
.map(JObject::from)
}
Value::Object(map) => {
let class = env.find_class("java/util/LinkedHashMap").ok()?;
Expand Down Expand Up @@ -100,7 +99,6 @@ fn number_to_jobject<'a>(env: &mut JNIEnv<'a>, n: &Number) -> Option<JObject<'a>
env.call_static_method(class, "valueOf", "(J)Ljava/lang/Long;", &[JValue::Long(*i)])
.ok()
.and_then(|v| v.l().ok())
.map(JObject::from)
}
Number::Float(f) => {
let class = env.find_class("java/lang/Double").ok()?;
Expand All @@ -112,7 +110,6 @@ fn number_to_jobject<'a>(env: &mut JNIEnv<'a>, n: &Number) -> Option<JObject<'a>
)
.ok()
.and_then(|v| v.l().ok())
.map(JObject::from)
}
Number::Decimal(d) => {
let class = env.find_class("java/lang/Double").ok()?;
Expand All @@ -125,7 +122,6 @@ fn number_to_jobject<'a>(env: &mut JNIEnv<'a>, n: &Number) -> Option<JObject<'a>
)
.ok()
.and_then(|v| v.l().ok())
.map(JObject::from)
}
}
}
Expand All @@ -136,6 +132,7 @@ fn number_to_jobject<'a>(env: &mut JNIEnv<'a>, n: &Number) -> Option<JObject<'a>
/// Matches on the Rust SDK's ErrorDetails enum to choose the Java exception class and
/// ErrorKind enum. Base ServerException uses (ErrorKind, String rawKindIfUnknown, message, details, cause);
/// subclasses use (String message, Object details, ServerException cause).
#[allow(clippy::redundant_closure)] // JObject::null as fn ptr produces 'static, breaking the 'a lifetime
fn build_server_exception<'a>(
env: &mut JNIEnv<'a>,
error: &surrealdb::Error,
Expand Down Expand Up @@ -201,8 +198,7 @@ fn build_server_exception<'a>(
.get_static_field(&enum_class, enum_name, "Lcom/surrealdb/ErrorKind;")
.ok()?
.l()
.ok()
.map(JObject::from)?;
.ok()?;
let raw_kind_jstr = match raw_kind_for_unknown {
Some(s) => env
.new_string(s)
Expand All @@ -219,10 +215,7 @@ fn build_server_exception<'a>(
JValue::Object(&details_obj),
JValue::Object(&java_cause),
];
match env.new_object(class, sig, &args) {
Ok(obj) => Some(obj),
Err(_) => None,
}
env.new_object(class, sig, &args).ok()
} else {
// Subclass(String message, Object details, ServerException cause)
let sig = "(Ljava/lang/String;Ljava/lang/Object;Lcom/surrealdb/ServerException;)V";
Expand All @@ -231,10 +224,7 @@ fn build_server_exception<'a>(
JValue::Object(&details_obj),
JValue::Object(&java_cause),
];
match env.new_object(class, sig, &args) {
Ok(obj) => Some(obj),
Err(_) => None,
}
env.new_object(class, sig, &args).ok()
}
}

Expand Down
35 changes: 30 additions & 5 deletions src/main/rust/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,40 @@ use surrealdb::method::Transaction;
use surrealdb::types::Value;
use surrealdb::{Connection, IndexedResults, Surreal};

/// Item type for the live query channel (Result<Notification<Value>>).
/// Item type for the live query notification channel.
pub(crate) type LiveNotificationResult =
std::result::Result<surrealdb::Notification<surrealdb::types::Value>, surrealdb::Error>;

/// Stored as handle for live streams. recv_mutex is held by nextNative during recv() so that
/// releaseNative can wait for no thread in recv() before taking and dropping the receiver.
/// join_handle, shutdown_tx and rx are in Mutex<Option<..>> so releaseNative can take/drop them via get_instance.
/// Native handle backing a Java `LiveStream` instance.
///
/// Created by `selectLive` (in surreal.rs) after the live query subscription
/// is confirmed, and freed by `releaseNative` (in live.rs) when the Java side
/// calls `close()`.
///
/// ## Fields (tuple elements)
///
/// 0. **`recv_mutex`** (`Arc<Mutex<()>>`) — held by `nextNative` for the
/// entire duration of the blocking `recv()` call. `releaseNative` acquires
/// it *after* the channel has been closed so it can be sure no thread is
/// still inside `recv()` before freeing the handle.
///
/// 1. **`join_handle`** (`Mutex<Option<JoinHandle>>`) — the background thread
/// that reads from the SurrealDB live-query stream and forwards
/// notifications into the async channel. Taken and joined by
/// `releaseNative` during shutdown.
///
/// 2. **`shutdown_tx`** (`Mutex<Option<Sender<()>>>`) — dropping this sender
/// signals the background thread (via `tokio::select!`) to exit.
///
/// 3. **`rx`** (`Mutex<Option<Receiver<LiveNotificationResult>>>`) — the
/// receiving end of the notification channel, read by `nextNative`.
///
/// ## Lock ordering
///
/// Both `nextNative` and `releaseNative` acquire `recv_mutex` **before**
/// `rx`, ensuring a consistent ordering and preventing deadlocks.
pub(crate) type LiveStreamChannel = (
std::sync::Arc<parking_lot::Mutex<()>>, // held during recv()
std::sync::Arc<parking_lot::Mutex<()>>,
parking_lot::Mutex<Option<std::thread::JoinHandle<()>>>,
parking_lot::Mutex<Option<async_channel::Sender<()>>>,
parking_lot::Mutex<Option<async_channel::Receiver<LiveNotificationResult>>>,
Expand Down
62 changes: 48 additions & 14 deletions src/main/rust/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,24 @@ use jni::JNIEnv;
use crate::error::SurrealError;
use crate::{get_instance, new_string, take_instance, JniTypes, LiveStreamChannel, TOKIO_RUNTIME};

/// JNI implementation of `LiveStream.nextNative(long handle)`.
///
/// Blocks the calling thread until a live-query notification arrives or the
/// stream ends. Returns a `LiveNotification` jobject, or `null` when the
/// stream has been closed (by `releaseNative` or because the server ended the
/// live query).
///
/// ## Locking protocol
///
/// 1. Acquire `recv_mutex` — serializes concurrent `nextNative` calls and
/// lets `releaseNative` know when no thread is inside `recv()`.
/// 2. Acquire `rx_mux` — borrows the channel receiver for the blocking call.
/// 3. Call `block_on(rx_ref.recv())` — parks the thread until a message
/// arrives or all senders are dropped (channel closed).
///
/// Both guards are held for the duration of the `recv()`. This is safe
/// because `releaseNative` only acquires these locks *after* the channel has
/// been closed, guaranteeing `recv()` will have already returned.
#[no_mangle]
pub extern "system" fn Java_com_surrealdb_LiveStream_nextNative<'local>(
mut env: JNIEnv<'local>,
Expand All @@ -16,23 +34,22 @@ pub extern "system" fn Java_com_surrealdb_LiveStream_nextNative<'local>(
let (recv_mutex, _join_handle_mux, _shutdown_tx_mux, rx_mux) =
match get_instance::<LiveStreamChannel>(handle_ptr, JniTypes::LiveStream) {
Ok(r) => r,
Err(e) => return e.exception(&mut env, || std::ptr::null_mut()),
Err(e) => return e.exception(&mut env, std::ptr::null_mut),
};
let _recv_guard = recv_mutex.lock();
let rx_opt_guard = rx_mux.lock();
let rx_ref = match rx_opt_guard.as_ref() {
Some(rx) => rx,
None => return JObject::null().into_raw(), // already released
None => return JObject::null().into_raw(),
};
let item = match TOKIO_RUNTIME.block_on(rx_ref.recv()) {
Ok(item) => item,
Err(_) => return JObject::null().into_raw(), // channel closed
Err(_) => return JObject::null().into_raw(),
};
let notification = match item {
Ok(n) => n,
Err(e) => return SurrealError::from(e).exception(&mut env, || std::ptr::null_mut()),
Err(e) => return SurrealError::from(e).exception(&mut env, std::ptr::null_mut),
};
// Build Java LiveNotification(action, valuePtr, queryId)
let action_raw = new_string!(&mut env, notification.action.to_string(), || {
std::ptr::null_mut()
});
Expand All @@ -44,7 +61,7 @@ pub extern "system" fn Java_com_surrealdb_LiveStream_nextNative<'local>(
let query_id_str = unsafe { JObject::from_raw(query_id_raw) };
let class = match env.find_class("com/surrealdb/LiveNotification") {
Ok(c) => c,
Err(e) => return SurrealError::from(e).exception(&mut env, || std::ptr::null_mut()),
Err(e) => return SurrealError::from(e).exception(&mut env, std::ptr::null_mut),
};
let args = [
JValue::Object(&action_str),
Expand All @@ -53,10 +70,31 @@ pub extern "system" fn Java_com_surrealdb_LiveStream_nextNative<'local>(
];
match env.new_object(class, "(Ljava/lang/String;JLjava/lang/String;)V", &args) {
Ok(obj) => obj.into_raw(),
Err(e) => SurrealError::from(e).exception(&mut env, || std::ptr::null_mut()),
Err(e) => SurrealError::from(e).exception(&mut env, std::ptr::null_mut),
}
}

/// JNI implementation of `LiveStream.releaseNative(long handle)`.
///
/// Shuts down the live query: stops the background thread, waits for any
/// in-progress `nextNative` call to finish, then frees the native handle.
///
/// ## Shutdown sequence
///
/// 1. **Drop `shutdown_tx`** — the background thread's `tokio::select!` loop
/// detects the closed shutdown channel and breaks. This also causes the
/// background thread to drop its `tx_thread` sender, closing the
/// notification channel.
/// 2. **Join the background thread** — ensures `tx_thread` has been dropped
/// and the channel is fully closed before proceeding.
/// 3. **Acquire `recv_mutex`** — at this point the channel is closed, so any
/// `nextNative` call blocked on `recv()` has already returned and released
/// the mutex. Acquiring it here is a final safety barrier.
/// 4. **Take the receiver** — drops it while holding `recv_mutex`, preventing
/// any new `recv()` attempt.
/// 5. **`take_instance`** — reclaims the boxed `LiveStreamChannel`, freeing
/// the allocation. The Java side zeroes its `handle` field after this
/// call returns, preventing further native access.
#[no_mangle]
pub extern "system" fn Java_com_surrealdb_LiveStream_releaseNative<'local>(
_env: JNIEnv<'local>,
Expand All @@ -66,21 +104,17 @@ pub extern "system" fn Java_com_surrealdb_LiveStream_releaseNative<'local>(
if handle_ptr == 0 {
return;
}
// Do NOT take_instance yet: another thread may be in nextNative (get_instance + recv).
// First close the channel via the stored sender, join the background thread, then acquire
// the recv mutex (so no thread is in recv()), then take_instance and drop the receiver.
let channel_ref = match get_instance::<LiveStreamChannel>(handle_ptr, JniTypes::LiveStream) {
Ok(r) => r,
Err(_) => return,
};
let (recv_mutex, join_handle_mux, shutdown_tx_mux, rx_mux) = channel_ref;
drop(shutdown_tx_mux.lock().take()); // drop sender so background thread exits and channel closes
drop(shutdown_tx_mux.lock().take());
if let Some(join_handle) = join_handle_mux.lock().take() {
let _ = join_handle.join();
}
let _recv_guard = recv_mutex.lock(); // wait until any thread in nextNative has left recv()
let _rx = rx_mux.lock().take(); // take and drop receiver while holding recv_guard
let _recv_guard = recv_mutex.lock();
let _rx = rx_mux.lock().take();
drop(_recv_guard);
let _ = take_instance::<LiveStreamChannel>(handle_ptr, JniTypes::LiveStream);
// free the box
}
Loading
Loading