Skip to content

fix(query): the stage utilized by the history table needs to be restricted #18380

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 52 additions & 17 deletions src/query/service/src/interpreters/access/privilege_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use databend_common_sql::plans::RewriteKind;
use databend_common_sql::Planner;
use databend_common_users::RoleCacheManager;
use databend_common_users::UserApiProvider;
use databend_common_users::BUILTIN_ROLE_ACCOUNT_ADMIN;
use databend_enterprise_resources_management::ResourcesManagement;
use databend_storages_common_table_meta::table::OPT_KEY_TEMP_PREFIX;

Expand Down Expand Up @@ -165,8 +166,9 @@ impl PrivilegeAccess {

fn access_system_history(
&self,
catalog_name: &str,
db_name: &str,
catalog_name: Option<&str>,
db_name: Option<&str>,
stage_name: Option<&str>,
privilege: UserPrivilegeType,
) -> Result<()> {
let cluster_id = GlobalConfig::instance().query.cluster_id.clone();
Expand All @@ -176,20 +178,50 @@ impl PrivilegeAccess {
{
return Ok(());
}
if catalog_name == CATALOG_DEFAULT
&& db_name.eq_ignore_ascii_case(SENSITIVE_SYSTEM_RESOURCE)
&& !matches!(
privilege,
UserPrivilegeType::Select | UserPrivilegeType::Drop
)
{
return Err(ErrorCode::PermissionDenied(
format!(
"Permission Denied: Operation '{:?}' on database 'default.system_history' is not allowed. This sensitive system resource only supports 'SELECT' and 'DROP'",
privilege
),
));
match (catalog_name, db_name, stage_name) {
(Some(catalog_name), Some(db_name), None) => {
if catalog_name == CATALOG_DEFAULT
&& db_name.eq_ignore_ascii_case(SENSITIVE_SYSTEM_RESOURCE)
&& !matches!(
privilege,
UserPrivilegeType::Select | UserPrivilegeType::Drop
)
{
return Err(ErrorCode::PermissionDenied(
format!(
"Permission Denied: Operation '{:?}' on database 'default.system_history' is not allowed. This sensitive system resource only supports 'SELECT' and 'DROP'",
privilege
),
));
}
}
(None, None, Some(stage_name)) => {
let config = GlobalConfig::instance();
let sensitive_system_stage = config.log.history.stage_name.clone();

return if stage_name.eq_ignore_ascii_case(&sensitive_system_stage) {
if let Some(current_role) = self.ctx.get_current_role() {
if current_role.name == BUILTIN_ROLE_ACCOUNT_ADMIN {
Ok(())
} else {
Err(ErrorCode::PermissionDenied(format!(
"Permission Denied: Operation '{:?}' on stage {sensitive_system_stage} is not allowed",
privilege
)))
}
} else {
Err(ErrorCode::PermissionDenied(format!(
"Permission Denied: Operation '{:?}' on stage {sensitive_system_stage} is not allowed",
privilege
)))
}
} else {
Ok(())
};
}
_ => unreachable!(),
}

Ok(())
}

Expand All @@ -200,7 +232,7 @@ impl PrivilegeAccess {
privileges: UserPrivilegeType,
if_exists: bool,
) -> Result<()> {
self.access_system_history(catalog_name, db_name, privileges)?;
self.access_system_history(Some(catalog_name), Some(db_name), None, privileges)?;
let tenant = self.ctx.get_tenant();
let check_current_role_only = match privileges {
// create table/stream need check db's Create Privilege
Expand Down Expand Up @@ -305,7 +337,7 @@ impl PrivilegeAccess {
return Ok(());
}

self.access_system_history(catalog_name, db_name, privilege)?;
self.access_system_history(Some(catalog_name), Some(db_name), None, privilege)?;

if self.ctx.is_temp_table(catalog_name, db_name, table_name) {
return Ok(());
Expand Down Expand Up @@ -625,6 +657,9 @@ impl PrivilegeAccess {
return Ok(());
}

// History Config has a inner stage, can not be operator by any user.
self.access_system_history(None, None, Some(&stage_info.stage_name), privilege)?;

// Note: validate_stage_access is not used for validate Create Stage privilege
self.validate_access(
&GrantObject::Stage(stage_info.stage_name.to_string()),
Expand Down
13 changes: 10 additions & 3 deletions src/query/service/src/interpreters/interpreter_privilege_grant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;

use databend_common_base::base::GlobalInstance;
use databend_common_config::GlobalConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::principal::GrantObject;
Expand All @@ -33,6 +34,7 @@ use log::info;

use crate::interpreters::common::validate_grant_object_exists;
use crate::interpreters::util::check_system_history;
use crate::interpreters::util::check_system_history_stage;
use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
use crate::sessions::QueryContext;
Expand Down Expand Up @@ -110,9 +112,14 @@ impl GrantPrivilegeInterpreter {
db_id: *db_id,
})
},
GrantObject::Stage(name) => Ok(OwnershipObject::Stage {
name: name.to_string(),
}),
GrantObject::Stage(name) => {
let config = GlobalConfig::instance();
let sensitive_system_stage = config.log.history.stage_name.clone();
check_system_history_stage(name, &sensitive_system_stage)?;
Ok(OwnershipObject::Stage {
name: name.to_string(),
})
},
GrantObject::UDF(name) => Ok(OwnershipObject::UDF {
name: name.to_string(),
}),
Expand Down
12 changes: 12 additions & 0 deletions src/query/service/src/interpreters/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@ pub fn check_system_history(
Ok(())
}

pub fn check_system_history_stage(
stage_name: &str,
sensitive_system_stage: &str,
) -> databend_common_exception::Result<()> {
if stage_name.eq_ignore_ascii_case(sensitive_system_stage) {
return Err(ErrorCode::IllegalGrant(
"Can not modify system history stage {sensitive_system_stage} ownership",
));
}
Ok(())
}

#[allow(clippy::type_complexity)]
pub fn generate_desc_schema(
schema: TableSchemaRef,
Expand Down
50 changes: 41 additions & 9 deletions tests/logging/check_logs_table.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ response=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-
# Wait for the query to be logged
sleep 13


# Test 1
check_query_log "basic-1" "$query_id" "SELECT count(*) FROM system_history.log_history WHERE target = 'databend::log::profile' and" "1"

Expand Down Expand Up @@ -106,6 +107,7 @@ response=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-
response=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d '{"sql": "grant select , drop on system_history.* to role ra"}')
response=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d '{"sql": "grant alter on system_history.* to role ra"}')
response=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d '{"sql": "grant role ra to a"}')
response=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d '{"sql": "grant write, read on stage log_1f93b76af0bd4b1d8e018667865fbc65 to a"}')

execute_and_verify() {
local cmd_description="$1"
Expand All @@ -114,14 +116,16 @@ execute_and_verify() {
local jq_expression="$4"
local result

echo -n "Executing: $cmd_description ... "
echo "Executing: $cmd_description ... "

result=$(curl -s -u "${user_cred}" -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "${json_payload}" | jq -r "${jq_expression}")

if [[ "$result" != "true" ]]; then
echo "Failed! Expected result: true, actual result: $result."
echo "$cmd_description failed."
exit 1 # Exit script immediately if it fails
else
echo "Description: $cmd_description completed successfully."
fi
}

Expand All @@ -140,32 +144,60 @@ check_system_history_permissions() {
'{"sql": "alter table system_history.log_history add column id int"}' \
'.state == "Failed"'

# Command 3: User 'a:123' attempts to drop 'system_history.access_history' table (expected to succeed)
# Command 3: User 'a:123' drop 'system_history.access_history' table (expected to succeed)
execute_and_verify \
"User 'a:123' attempts to drop 'system_history.access_history' table" \
"User 'a:123' drop 'system_history.access_history' table" \
"a:123" \
'{"sql": "drop table system_history.access_history"}' \
'.state == "Succeeded"'

# Command 4: User 'root:' attempts to grant ownership on 'system_history.*' (expected to fail)
# Command 4: User 'root:' grant ownership on 'system_history.*' (expected to fail)
execute_and_verify \
"User 'root:' attempts to grant ownership on 'system_history.*'" \
"User 'root:' grant ownership on 'system_history.*'" \
"root:" \
'{"sql": "grant ownership on system_history.* to role ra"}' \
'.state == "Failed"'

# Command 5: User 'root:' attempts to grant ownership on 'system_history.query_history' (expected to fail)
# Command 5: User 'root:' grant ownership on 'system_history.query_history' (expected to fail)
execute_and_verify \
"User 'root:' attempts to grant ownership on 'system_history.query_history'" \
"User 'root:' grant ownership on 'system_history.query_history'" \
"root:" \
'{"sql": "grant ownership on system_history.query_history to role ra"}' \
'.state == "Failed"'
# Command 6: User 'a:123' attempts to select 'system_history.query_history' table (expected to succeed)
# Command 6: User 'a:123' select 'system_history.query_history' table (expected to succeed)
execute_and_verify \
"User 'a:123' attempts to query 'system_history.query_history' table" \
"User 'a:123' query 'system_history.query_history' table" \
"a:123" \
'{"sql": "select count() from system_history.query_history"}' \
'.state == "Succeeded"'

# Command 7: User 'a:123' drop stage 'log_1f93b76af0bd4b1d8e018667865fbc65' table (expected to failed)
execute_and_verify \
"User 'a:123' drop stage log_1f93b76af0bd4b1d8e018667865fbc65" \
"a:123" \
'{"sql": "drop stage log_1f93b76af0bd4b1d8e018667865fbc65"}' \
'.state == "Failed"'

# Command 8: User 'a:123' copy into @log_1f93b76af0bd4b1d8e018667865fbc65 from (select * from system.one) (expected to failed)
execute_and_verify \
"User 'a:123' copy into @log_1f93b76af0bd4b1d8e018667865fbc65 from (select * from system.one);" \
"a:123" \
'{"sql": "copy into @log_1f93b76af0bd4b1d8e018667865fbc65 from (select * from system.one);"}' \
'.state == "Failed"'

# Command 9: User 'a:123' copy into t from (select * from @log_1f93b76af0bd4b1d8e018667865fbc65) (expected to failed)
execute_and_verify \
"User 'a:123' copy into t from (select * from @log_1f93b76af0bd4b1d8e018667865fbc65);" \
"a:123" \
'{"sql": "copy into t from (select * from @log_1f93b76af0bd4b1d8e018667865fbc65);"}' \
'.state == "Failed"'

# Command 10: User 'root' grant stage 'log_1f93b76af0bd4b1d8e018667865fbc65' ownership (expected to failed)
execute_and_verify \
"User 'root' grant ownership on stage log_1f93b76af0bd4b1d8e018667865fbc65 to role ra" \
"root:" \
'{"sql": "grant ownership on stage log_1f93b76af0bd4b1d8e018667865fbc65 to role ra"}' \
'.state == "Failed"'
}

check_system_history_permissions
Expand Down
Loading