Skip to content

Commit 560e5a3

Browse files
nipunn1313Convex, Inc.
authored andcommitted
Clean shutdown the event logger for usage tracking (#24947)
GitOrigin-RevId: a86742f2a39d415bf12d08c35cafa093e14308ac
1 parent 2fdc09b commit 560e5a3

File tree

6 files changed

+21
-13
lines changed

6 files changed

+21
-13
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/events/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ edition = "2021"
66
license = "LicenseRef-FSL-1.1-Apache-2.0"
77

88
[dependencies]
9+
anyhow = { workspace = true }
910
async-trait = { workspace = true }
1011
proptest = { workspace = true, optional = true }
1112
proptest-derive = { workspace = true, optional = true }

crates/events/src/usage.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ pub trait UsageEventLogger: Send + Sync + std::fmt::Debug {
134134

135135
/// Dump events into a buffer, waiting for the buffer to empty if it's full.
136136
async fn record_async(&self, events: Vec<UsageEvent>);
137+
138+
/// Cleanly shutdown, flushing events
139+
async fn shutdown(&self) -> anyhow::Result<()>;
137140
}
138141

139142
#[derive(Debug, Clone)]
@@ -144,4 +147,8 @@ impl UsageEventLogger for NoOpUsageEventLogger {
144147
fn record(&self, _events: Vec<UsageEvent>) {}
145148

146149
async fn record_async(&self, _events: Vec<UsageEvent>) {}
150+
151+
async fn shutdown(&self) -> anyhow::Result<()> {
152+
Ok(())
153+
}
147154
}

crates/local_backend/src/lib.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,7 @@ pub struct LocalAppState {
102102
pub application: Application<ProdRuntime>,
103103
// Number of sync protocol workers.
104104
pub live_ws_count: Arc<AtomicU64>,
105-
pub shutdown_rx: async_broadcast::Receiver<()>,
106-
pub shutdown_tx: ShutdownSignal,
105+
pub zombify_rx: async_broadcast::Receiver<()>,
107106
}
108107

109108
impl LocalAppState {
@@ -122,8 +121,7 @@ impl Clone for LocalAppState {
122121
instance_name: self.instance_name.clone(),
123122
application: self.application.clone(),
124123
live_ws_count: self.live_ws_count.clone(),
125-
shutdown_rx: self.shutdown_rx.clone(),
126-
shutdown_tx: self.shutdown_tx.clone(),
124+
zombify_rx: self.zombify_rx.clone(),
127125
}
128126
}
129127
}
@@ -135,16 +133,16 @@ pub async fn make_app(
135133
runtime: ProdRuntime,
136134
config: LocalConfig,
137135
persistence: Arc<dyn Persistence>,
138-
shutdown_rx: async_broadcast::Receiver<()>,
139-
shutdown_tx: ShutdownSignal,
136+
zombify_rx: async_broadcast::Receiver<()>,
137+
preempt_tx: ShutdownSignal,
140138
) -> anyhow::Result<LocalAppState> {
141139
let key_broker = config.key_broker()?;
142140
let searcher: Arc<dyn Searcher> = Arc::new(InProcessSearcher::new(runtime.clone()).await?);
143141
let database = Database::load(
144142
persistence.clone(),
145143
runtime.clone(),
146144
searcher.clone(),
147-
shutdown_tx.clone(),
145+
preempt_tx,
148146
virtual_system_mapping(),
149147
Arc::new(NoOpUsageEventLogger),
150148
)
@@ -257,8 +255,7 @@ pub async fn make_app(
257255
instance_name,
258256
application,
259257
live_ws_count: Arc::new(AtomicU64::new(0)),
260-
shutdown_rx,
261-
shutdown_tx,
258+
zombify_rx,
262259
};
263260

264261
Ok(app_state)

crates/local_backend/src/logs.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ pub async fn stream_udf_execution(
8282
let entries_future = st
8383
.application
8484
.stream_udf_execution(identity, query_args.cursor);
85-
let mut shutdown_rx = st.shutdown_rx.clone();
85+
let mut zombify_rx = st.zombify_rx.clone();
8686
futures::select_biased! {
8787
entries_future_r = entries_future.fuse() => {
8888
let (log_entries, new_cursor) = entries_future_r?;
@@ -103,7 +103,7 @@ pub async fn stream_udf_execution(
103103
};
104104
Ok(Json(response))
105105
},
106-
_ = shutdown_rx.recv().fuse() => {
106+
_ = zombify_rx.recv().fuse() => {
107107
// Return an error so the client reconnects after we come back up.
108108
Err(anyhow::anyhow!(ErrorMetadata::operational_internal_server_error()).context("Shutting down long poll request").into())
109109
},
@@ -139,7 +139,7 @@ pub async fn stream_function_logs(
139139
let entries_future = st
140140
.application
141141
.stream_function_logs(identity, query_args.cursor);
142-
let mut shutdown_rx = st.shutdown_rx.clone();
142+
let mut zombify_rx = st.zombify_rx.clone();
143143
let request_id = match (query_args.session_id, query_args.client_request_counter) {
144144
(Some(session_id), Some(client_request_counter)) => Some(RequestId::new_for_ws_session(
145145
session_id.parse().context("Invalid session ID")?,
@@ -218,7 +218,7 @@ pub async fn stream_function_logs(
218218
};
219219
Ok(Json(response))
220220
},
221-
_ = shutdown_rx.recv().fuse() => {
221+
_ = zombify_rx.recv().fuse() => {
222222
// Return an error so the client reconnects after we come back up.
223223
Err(anyhow::anyhow!(ErrorMetadata::operational_internal_server_error()).context("Shutting down long poll request").into())
224224
},

crates/usage_tracking/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ parking_lot = { workspace = true, features = ["hardware-lock-elision"] }
1717
pb = { path = "../pb" }
1818
proptest = { workspace = true, optional = true }
1919
proptest-derive = { workspace = true, optional = true }
20+
tracing = { workspace = true }
2021
value = { path = "../value" }
2122

2223
[dev-dependencies]

0 commit comments

Comments
 (0)