Skip to content

Commit 96efd71

Browse files
nipunn1313Convex, Inc.
authored andcommitted
Move logstreaming handlers to open source/self-hosted backend binary. (#38394)
Add handlers/code to the backend. Still need to update the embedded dashboard to include it too. GitOrigin-RevId: deacbd2a94b58934b6773b811b68fb2335d0102b
1 parent 95a7b92 commit 96efd71

File tree

12 files changed

+534
-25
lines changed

12 files changed

+534
-25
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/application/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ testing = [
1717
"function_runner/testing",
1818
"isolate/testing",
1919
"keybroker/testing",
20+
"log_streaming/testing",
2021
"metrics/testing",
2122
"model/testing",
2223
"node_executor/testing",
@@ -67,6 +68,7 @@ humansize = { workspace = true }
6768
isolate = { path = "../isolate" }
6869
itertools = { workspace = true }
6970
keybroker = { path = "../keybroker" }
71+
log_streaming = { path = "../log_streaming" }
7072
lru = { workspace = true }
7173
maplit = { workspace = true }
7274
metrics = { path = "../metrics" }
@@ -121,6 +123,7 @@ events = { path = "../events", features = ["testing"] }
121123
function_runner = { path = "../function_runner", features = ["testing"] }
122124
isolate = { path = "../isolate", features = ["testing"] }
123125
keybroker = { path = "../keybroker", features = ["testing"] }
126+
log_streaming = { path = "../log_streaming", features = ["testing"] }
124127
metrics = { path = "../metrics", features = ["testing"] }
125128
model = { path = "../model", features = ["testing"] }
126129
must-let = { workspace = true }

crates/application/src/lib.rs

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ use std::{
2121
},
2222
};
2323

24+
use ::log_streaming::{
25+
LogManager,
26+
LogManagerClient,
27+
};
2428
use airbyte_import::{
2529
AirbyteRecord,
2630
PrimaryKey,
@@ -74,7 +78,10 @@ use common::{
7478
report_error,
7579
JsError,
7680
},
77-
http::RequestDestination,
81+
http::{
82+
fetch::FetchClient,
83+
RequestDestination,
84+
},
7885
knobs::{
7986
APPLICATION_MAX_CONCURRENT_UPLOADS,
8087
MAX_JOBS_CANCEL_BATCH,
@@ -191,6 +198,7 @@ use keybroker::{
191198
Identity,
192199
KeyBroker,
193200
};
201+
use log_streaming::add_local_log_sink_on_startup;
194202
use maplit::{
195203
btreemap,
196204
btreeset,
@@ -201,6 +209,7 @@ use model::{
201209
AIRBYTE_PRIMARY_KEY_INDEX_DESCRIPTOR,
202210
},
203211
auth::AuthInfoModel,
212+
backend_info::BackendInfoModel,
204213
backend_state::BackendStateModel,
205214
canonical_urls::{
206215
types::CanonicalUrl,
@@ -389,6 +398,7 @@ pub mod cron_jobs;
389398
pub mod deploy_config;
390399
mod exports;
391400
pub mod function_log;
401+
mod log_streaming;
392402
pub mod log_visibility;
393403
mod metrics;
394404
mod module_cache;
@@ -548,11 +558,11 @@ pub struct Application<RT: Runtime> {
548558
export_worker: Arc<Mutex<Box<dyn SpawnHandle>>>,
549559
system_table_cleanup_worker: Arc<Mutex<Box<dyn SpawnHandle>>>,
550560
migration_worker: Arc<Mutex<Option<Box<dyn SpawnHandle>>>>,
551-
log_sender: Arc<dyn LogSender>,
552561
log_visibility: Arc<dyn LogVisibility<RT>>,
553562
module_cache: ModuleCache<RT>,
554563
system_env_var_names: HashSet<EnvVarName>,
555564
app_auth: Arc<ApplicationAuth>,
565+
log_manager_client: LogManagerClient,
556566
}
557567

558568
impl<RT: Runtime> Clone for Application<RT> {
@@ -579,11 +589,11 @@ impl<RT: Runtime> Clone for Application<RT> {
579589
export_worker: self.export_worker.clone(),
580590
system_table_cleanup_worker: self.system_table_cleanup_worker.clone(),
581591
migration_worker: self.migration_worker.clone(),
582-
log_sender: self.log_sender.clone(),
583592
log_visibility: self.log_visibility.clone(),
584593
module_cache: self.module_cache.clone(),
585594
system_env_var_names: self.system_env_var_names.clone(),
586595
app_auth: self.app_auth.clone(),
596+
log_manager_client: self.log_manager_client.clone(),
587597
}
588598
}
589599
}
@@ -671,10 +681,11 @@ impl<RT: Runtime> Application<RT> {
671681
segment_term_metadata_fetcher: Arc<dyn SegmentTermMetadataFetcher>,
672682
persistence: Arc<dyn Persistence>,
673683
node_actions: Actions<RT>,
674-
log_sender: Arc<dyn LogSender>,
675684
log_visibility: Arc<dyn LogVisibility<RT>>,
676685
app_auth: Arc<ApplicationAuth>,
677686
cache: QueryCache,
687+
fetch_client: Arc<dyn FetchClient>,
688+
local_log_sink: Option<String>,
678689
) -> anyhow::Result<Self> {
679690
let module_cache =
680691
ModuleCache::new(runtime.clone(), application_storage.modules_storage.clone()).await;
@@ -724,10 +735,32 @@ impl<RT: Runtime> Application<RT> {
724735
runtime.spawn("system_table_cleanup_worker", system_table_cleanup_worker),
725736
));
726737

738+
// If local_log_sink is passed in, this is a local instance, so we enable log
739+
// streaming by default. Otherwise, it's hard to grant the
740+
// entitlement in testing and in load generator. If not local, we
741+
// read the entitlement from the database.
742+
let mut tx = database.begin(Identity::system()).await?;
743+
let log_streaming_allowed = if let Some(path) = local_log_sink {
744+
add_local_log_sink_on_startup(database.clone(), path).await?;
745+
true
746+
} else {
747+
let mut bi = BackendInfoModel::new(&mut tx);
748+
bi.is_log_streaming_allowed().await?
749+
};
750+
751+
let log_manager_client = LogManager::start(
752+
runtime.clone(),
753+
database.clone(),
754+
fetch_client.clone(),
755+
instance_name.clone(),
756+
log_streaming_allowed,
757+
)
758+
.await;
759+
727760
let function_log = FunctionExecutionLog::new(
728761
runtime.clone(),
729762
database.usage_counter(),
730-
log_sender.clone(),
763+
Arc::new(log_manager_client.clone()),
731764
);
732765
let runner = Arc::new(ApplicationFunctionRunner::new(
733766
runtime.clone(),
@@ -816,11 +849,11 @@ impl<RT: Runtime> Application<RT> {
816849
snapshot_import_worker,
817850
system_table_cleanup_worker,
818851
migration_worker,
819-
log_sender,
820852
log_visibility,
821853
module_cache,
822854
system_env_var_names: default_system_env_vars.into_keys().collect(),
823855
app_auth,
856+
log_manager_client,
824857
})
825858
}
826859

@@ -848,6 +881,10 @@ impl<RT: Runtime> Application<RT> {
848881
self.function_log.clone()
849882
}
850883

884+
pub fn log_manager_client(&self) -> &LogManagerClient {
885+
&self.log_manager_client
886+
}
887+
851888
pub fn now_ts_for_reads(&self) -> RepeatableTimestamp {
852889
self.database.now_ts_for_reads()
853890
}
@@ -2990,7 +3027,7 @@ impl<RT: Runtime> Application<RT> {
29903027
})
29913028
.try_collect()?;
29923029

2993-
self.log_sender.send_logs(logs);
3030+
self.log_manager_client.send_logs(logs);
29943031
Ok(ts)
29953032
}
29963033

@@ -3088,7 +3125,7 @@ impl<RT: Runtime> Application<RT> {
30883125
})
30893126
.try_collect()?;
30903127

3091-
self.log_sender.send_logs(logs);
3128+
self.log_manager_client.send_logs(logs);
30923129
Ok((t, stats))
30933130
}
30943131

@@ -3439,7 +3476,7 @@ impl<RT: Runtime> Application<RT> {
34393476
}
34403477

34413478
pub async fn shutdown(&self) -> anyhow::Result<()> {
3442-
self.log_sender.shutdown()?;
3479+
self.log_manager_client.shutdown()?;
34433480
self.table_summary_worker.shutdown().await?;
34443481
self.system_table_cleanup_worker.lock().shutdown();
34453482
self.schema_worker.lock().shutdown();
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use common::runtime::Runtime;
2+
use database::Database;
3+
use errors::ErrorMetadata;
4+
use keybroker::Identity;
5+
use model::{
6+
backend_info::BackendInfoModel,
7+
log_sinks::{
8+
types::{
9+
SinkConfig,
10+
SinkType,
11+
},
12+
LogSinksModel,
13+
},
14+
};
15+
16+
use crate::Application;
17+
18+
pub async fn add_local_log_sink_on_startup<RT: Runtime>(
19+
db: Database<RT>,
20+
path: String,
21+
) -> anyhow::Result<()> {
22+
let mut tx = db.begin(Identity::system()).await?;
23+
let mut log_sink_model = LogSinksModel::new(&mut tx);
24+
log_sink_model
25+
.add_on_startup(SinkConfig::Local(path.clone()))
26+
.await?;
27+
db.commit_with_write_source(tx, "add_local_log_sink_startup")
28+
.await?;
29+
tracing::info!("Local log sink configured at {path}.");
30+
Ok(())
31+
}
32+
33+
impl<RT: Runtime> Application<RT> {
34+
pub async fn add_log_sink(&self, config: SinkConfig) -> anyhow::Result<()> {
35+
let mut tx = self.begin(Identity::system()).await?;
36+
let mut model = LogSinksModel::new(&mut tx);
37+
model.add_or_update(config).await?;
38+
self.commit(tx, "add_log_sink").await?;
39+
Ok(())
40+
}
41+
42+
pub async fn remove_log_sink(&self, sink_type: SinkType) -> anyhow::Result<()> {
43+
let mut tx = self.begin(Identity::system()).await?;
44+
let mut model = LogSinksModel::new(&mut tx);
45+
46+
let Some(row) = model.get_by_provider(sink_type.clone()).await? else {
47+
return Err(ErrorMetadata::bad_request(
48+
"SinkDoesntExist",
49+
"Cannot remove a sink that is not configured for this project.",
50+
)
51+
.into());
52+
};
53+
54+
model.mark_for_removal(row.id()).await?;
55+
self.commit(tx, "remove_log_sink").await?;
56+
57+
Ok(())
58+
}
59+
60+
pub async fn ensure_log_streaming_allowed(&self, identity: Identity) -> anyhow::Result<()> {
61+
let mut tx = self.begin(identity).await?;
62+
BackendInfoModel::new(&mut tx)
63+
.ensure_log_streaming_allowed()
64+
.await
65+
}
66+
}

crates/application/src/test_helpers.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ use common::{
2929
ACTION_USER_TIMEOUT,
3030
UDF_CACHE_MAX_SIZE,
3131
},
32-
log_streaming::NoopLogSender,
3332
persistence::Persistence,
3433
runtime::Runtime,
3534
shutdown::ShutdownSignal,
@@ -227,7 +226,7 @@ impl<RT: Runtime> ApplicationTestExt<RT> for Application<RT> {
227226
modules_storage: application_storage.modules_storage.clone(),
228227
},
229228
database.clone(),
230-
fetch_client,
229+
fetch_client.clone(),
231230
)
232231
.await?,
233232
);
@@ -266,13 +265,14 @@ impl<RT: Runtime> ApplicationTestExt<RT> for Application<RT> {
266265
segment_term_metadata_fetcher,
267266
Arc::new(persistence.clone()),
268267
actions,
269-
Arc::new(NoopLogSender),
270268
Arc::new(RedactLogsToClient::new(false)),
271269
Arc::new(ApplicationAuth::new(
272270
kb.clone(),
273271
Arc::new(NullAccessTokenAuth),
274272
)),
275273
QueryCache::new(*UDF_CACHE_MAX_SIZE),
274+
fetch_client,
275+
None, // local_log_sink
276276
)
277277
.await?;
278278

0 commit comments

Comments
 (0)