Skip to content

Commit bbbca20

Browse files
authored
feat: add support for storing logs in table (#17598)
* feat: add support for storing logs in table * chore: make clippy happy and refine structure * chore: add rentention time, fix unit test * chore: clippy * chore: add buffer limit to prevent log flooding * fix vacuum * chore: add level config * test: add more test for persistent log * fix: sender should use `blocking_send` * fix: unit test * chore: add `retention_frequency` to config * fixup * fixup * taplo * test: init add logic test * refactor: make append operation lightly * feat: add different target and change schema of table * fixup * fixup * fixup * chore: add a new column to resolve json problem and apply review suggestion * move retention logic to next PR * fixup
1 parent c3d5d79 commit bbbca20

File tree

25 files changed

+1242
-6
lines changed

25 files changed

+1242
-6
lines changed

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ cidr = { version = "0.2.2" }
278278
clap = { version = "4.4.2", features = ["derive"] }
279279
codeq = { version = "0.5.2" }
280280
comfy-table = "7"
281+
concurrent-queue = "2.5.0"
281282
convert_case = "0.6.0"
282283
cookie = "0.18.1"
283284
crc32fast = "1.3.2"

scripts/ci/deploy/config/databend-query-node-otlp-logs.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ otlp_protocol = "http"
6565
pkey1 = "pvalue1"
6666
pkey2 = "pvalue2"
6767

68+
[log.persistentlog]
69+
on = true
70+
level = "INFO"
71+
6872
[meta]
6973
endpoints = ["0.0.0.0:9191"]
7074
username = "root"

src/binaries/query/entry.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use databend_common_storage::DataOperator;
3333
use databend_common_tracing::set_panic_hook;
3434
use databend_query::clusters::ClusterDiscovery;
3535
use databend_query::local;
36+
use databend_query::persistent_log::GlobalPersistentLog;
3637
use databend_query::servers::admin::AdminService;
3738
use databend_query::servers::flight::FlightService;
3839
use databend_query::servers::metrics::MetricService;
@@ -281,6 +282,10 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {
281282
if conf.log.structlog.on {
282283
println!(" structlog: {}", conf.log.structlog);
283284
}
285+
if conf.log.persistentlog.on {
286+
GlobalPersistentLog::instance().initialized();
287+
println!(" persistentlog: {}", conf.log.persistentlog);
288+
}
284289

285290
println!();
286291
println!(

src/common/tracing/Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,13 @@ publish = { workspace = true }
77
edition = { workspace = true }
88

99
[dependencies]
10+
anyhow = { workspace = true }
11+
arrow-array = { workspace = true }
12+
arrow-schema = { workspace = true }
13+
async-channel = { workspace = true }
1014
backtrace = { workspace = true, features = ["std", "serialize-serde"] }
1115
chrono = { workspace = true }
16+
concurrent-queue = { workspace = true }
1217
databend-common-base = { workspace = true }
1318
databend-common-exception = { workspace = true }
1419
defer = { workspace = true }
@@ -18,9 +23,11 @@ itertools = { workspace = true }
1823
libc = { workspace = true }
1924
log = { workspace = true }
2025
logforth = { workspace = true }
26+
opendal = { workspace = true }
2127
opentelemetry = { workspace = true }
2228
opentelemetry-otlp = { workspace = true, features = ["reqwest-client"] }
2329
opentelemetry_sdk = { workspace = true }
30+
parquet = { workspace = true }
2431
serde = { workspace = true }
2532
serde_json = { workspace = true }
2633
tonic = { workspace = true }

src/common/tracing/src/config.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ pub struct Config {
3131
pub profile: ProfileLogConfig,
3232
pub structlog: StructLogConfig,
3333
pub tracing: TracingConfig,
34+
pub persistentlog: PersistentLogConfig,
3435
}
3536

3637
impl Config {
@@ -340,3 +341,34 @@ impl Default for OTLPEndpointConfig {
340341
}
341342
}
342343
}
344+
345+
#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize)]
346+
pub struct PersistentLogConfig {
347+
pub on: bool,
348+
pub interval: usize,
349+
pub stage_name: String,
350+
pub level: String,
351+
pub retention: usize,
352+
}
353+
354+
impl Display for PersistentLogConfig {
355+
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
356+
write!(
357+
f,
358+
"enabled={}, interval={}, stage_name={}, level={}, retention={}",
359+
self.on, self.interval, self.stage_name, self.level, self.retention
360+
)
361+
}
362+
}
363+
364+
impl Default for PersistentLogConfig {
365+
fn default() -> Self {
366+
Self {
367+
on: false,
368+
interval: 2,
369+
stage_name: "log_1f93b76af0bd4b1d8e018667865fbc65".to_string(),
370+
retention: 72,
371+
level: "WARN".to_string(),
372+
}
373+
}
374+
}

src/common/tracing/src/init.rs

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414

1515
use std::borrow::Cow;
1616
use std::collections::BTreeMap;
17+
use std::sync::Arc;
1718
use std::time::Duration;
1819

1920
use databend_common_base::base::tokio;
21+
use databend_common_base::base::tokio::sync::RwLock;
2022
use databend_common_base::base::GlobalInstance;
2123
use databend_common_base::runtime::Thread;
2224
use fastrace::prelude::*;
@@ -25,24 +27,54 @@ use logforth::filter::env::EnvFilterBuilder;
2527
use logforth::filter::EnvFilter;
2628
use logforth::Dispatch;
2729
use logforth::Logger;
30+
use opendal::Operator;
2831
use opentelemetry_otlp::WithExportConfig;
2932

3033
use crate::config::OTLPProtocol;
3134
use crate::loggers::get_layout;
3235
use crate::loggers::new_rolling_file_appender;
36+
use crate::remote_log::RemoteLog;
3337
use crate::structlog::StructLogReporter;
3438
use crate::Config;
3539

3640
const HEADER_TRACE_PARENT: &str = "traceparent";
3741

3842
pub struct GlobalLogger {
3943
_drop_guards: Vec<Box<dyn Send + Sync + 'static>>,
44+
pub remote_log_operator: RwLock<Option<Operator>>,
4045
}
4146

4247
impl GlobalLogger {
4348
pub fn init(name: &str, cfg: &Config, labels: BTreeMap<String, String>) {
4449
let _drop_guards = init_logging(name, cfg, labels);
45-
GlobalInstance::set(Self { _drop_guards });
50+
51+
// GlobalLogger is initialized before DataOperator, so set the operator to None first
52+
let remote_log_operator = RwLock::new(None);
53+
54+
let instance = Arc::new(Self {
55+
_drop_guards,
56+
remote_log_operator,
57+
});
58+
GlobalInstance::set(instance);
59+
}
60+
61+
pub fn instance() -> Arc<GlobalLogger> {
62+
GlobalInstance::get()
63+
}
64+
65+
// Get the operator for remote log when it is ready.
66+
pub(crate) async fn get_operator(&self) -> Option<Operator> {
67+
let operator = self.remote_log_operator.read().await;
68+
if let Some(operator) = operator.as_ref() {
69+
return Some(operator.clone());
70+
}
71+
None
72+
}
73+
74+
// Set the operator for remote log, this should be only called once
75+
pub async fn set_operator(&self, operator: Operator) {
76+
let mut remote_log_operator = self.remote_log_operator.write().await;
77+
*remote_log_operator = Some(operator);
4678
}
4779
}
4880

@@ -100,7 +132,6 @@ pub fn init_logging(
100132
}
101133
),
102134
};
103-
104135
// initialize tracing a reporter
105136
if cfg.tracing.on {
106137
let endpoint = cfg.tracing.otlp.endpoint.clone();
@@ -345,6 +376,36 @@ pub fn init_logging(
345376
logger = logger.dispatch(dispatch);
346377
}
347378

379+
if cfg.persistentlog.on {
380+
let (remote_log, flush_guard) =
381+
RemoteLog::new(&labels, cfg).expect("initialize remote logger");
382+
383+
let mut filter_builder =
384+
EnvFilterBuilder::new().filter(Some("databend::log::structlog"), LevelFilter::Off);
385+
386+
if cfg.profile.on && !cfg.profile.dir.is_empty() {
387+
filter_builder =
388+
filter_builder.filter(Some("databend::log::profile"), LevelFilter::Trace);
389+
} else {
390+
filter_builder =
391+
filter_builder.filter(Some("databend::log::profile"), LevelFilter::Off);
392+
}
393+
if cfg.query.on && !cfg.query.dir.is_empty() {
394+
filter_builder =
395+
filter_builder.filter(Some("databend::log::query"), LevelFilter::Trace);
396+
} else {
397+
filter_builder = filter_builder.filter(Some("databend::log::query"), LevelFilter::Off);
398+
}
399+
let dispatch = Dispatch::new()
400+
.filter(EnvFilter::new(
401+
filter_builder.parse(&cfg.persistentlog.level),
402+
))
403+
.append(remote_log);
404+
405+
logger = logger.dispatch(dispatch);
406+
_drop_guards.push(flush_guard);
407+
}
408+
348409
// set global logger
349410
if logger.apply().is_err() {
350411
eprintln!("logger has already been set");

src/common/tracing/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ mod crash_hook;
2222
mod init;
2323
mod loggers;
2424
mod panic_hook;
25+
mod remote_log;
2526
mod structlog;
2627

2728
pub use crash_hook::pipe_file;
@@ -32,6 +33,7 @@ pub use crate::config::FileConfig;
3233
pub use crate::config::OTLPConfig;
3334
pub use crate::config::OTLPEndpointConfig;
3435
pub use crate::config::OTLPProtocol;
36+
pub use crate::config::PersistentLogConfig;
3537
pub use crate::config::ProfileLogConfig;
3638
pub use crate::config::QueryLogConfig;
3739
pub use crate::config::StderrConfig;
@@ -45,6 +47,11 @@ pub use crate::init::start_trace_for_remote_request;
4547
pub use crate::init::GlobalLogger;
4648
pub use crate::panic_hook::log_panic;
4749
pub use crate::panic_hook::set_panic_hook;
50+
pub use crate::remote_log::convert_to_batch;
51+
pub use crate::remote_log::LogBuffer as RemoteLogBuffer;
52+
pub use crate::remote_log::RemoteLog;
53+
pub use crate::remote_log::RemoteLogElement;
54+
pub use crate::remote_log::RemoteLogGuard;
4855
pub use crate::structlog::DummyReporter;
4956
pub use crate::structlog::StructLogReporter;
5057

0 commit comments

Comments
 (0)