Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ enum class AccessType : uint8_t
M(ALTER_SETTINGS, "ALTER SETTING, ALTER MODIFY SETTING, MODIFY SETTING, RESET SETTING", TABLE, ALTER_TABLE) /* allows to execute ALTER MODIFY SETTING */\
M(ALTER_MOVE_PARTITION, "ALTER MOVE PART, MOVE PARTITION, MOVE PART", TABLE, ALTER_TABLE) \
M(ALTER_EXPORT_PART, "ALTER EXPORT PART, EXPORT PART", TABLE, ALTER_TABLE) \
M(ALTER_EXPORT_PARTITION, "ALTER EXPORT PARTITION, EXPORT PARTITION", TABLE, ALTER_TABLE) \
M(ALTER_FETCH_PARTITION, "ALTER FETCH PART, FETCH PARTITION", TABLE, ALTER_TABLE) \
M(ALTER_FREEZE_PARTITION, "FREEZE PARTITION, UNFREEZE", TABLE, ALTER_TABLE) \
M(ALTER_UNLOCK_SNAPSHOT, "UNLOCK SNAPSHOT", TABLE, ALTER_TABLE) \
Expand Down
2 changes: 1 addition & 1 deletion src/Common/ZooKeeper/ZooKeeper.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,8 @@ class ZooKeeper
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);

using MultiGetChildrenResponse = MultiReadResponses<Coordination::ListResponse, false>;

using MultiTryGetChildrenResponse = MultiReadResponses<Coordination::ListResponse, true>;

template <typename TIter>
MultiGetChildrenResponse
getChildren(TIter start, TIter end, Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL)
Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6706,6 +6706,9 @@ Possible values:
)", 0) \
DECLARE(Bool, export_merge_tree_part_overwrite_file_if_exists, false, R"(
Overwrite file if it already exists when exporting a merge tree part
)", 0) \
DECLARE(Bool, export_merge_tree_partition_force_export, false, R"(
Ignore existing partition export and overwrite the zookeeper entry
)", 0) \
\
/* ####################################################### */ \
Expand Down
6 changes: 6 additions & 0 deletions src/Interpreters/InterpreterAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,12 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS
required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table);
break;
}
case ASTAlterCommand::EXPORT_PARTITION:
{
required_access.emplace_back(AccessType::ALTER_EXPORT_PARTITION, command.to_database, command.to_table);
required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table);
break;
}
case ASTAlterCommand::FETCH_PARTITION:
{
required_access.emplace_back(AccessType::ALTER_FETCH_PARTITION, database, table);
Expand Down
74 changes: 74 additions & 0 deletions src/Interpreters/InterpreterKillQueryQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,77 @@ BlockIO InterpreterKillQueryQuery::execute()

break;
}
case ASTKillQueryQuery::Type::ExportPartition:
{
Block exports_block = getSelectResult(
"source_database, source_table, transaction_id, destination_database, destination_table, partition_id",
"system.replicated_partition_exports");
if (!exports_block)
return res_io;

const ColumnString & src_db_col = typeid_cast<const ColumnString &>(*exports_block.getByName("source_database").column);
const ColumnString & src_tbl_col = typeid_cast<const ColumnString &>(*exports_block.getByName("source_table").column);
const ColumnString & dst_db_col = typeid_cast<const ColumnString &>(*exports_block.getByName("destination_database").column);
const ColumnString & dst_tbl_col = typeid_cast<const ColumnString &>(*exports_block.getByName("destination_table").column);
const ColumnString & tx_col = typeid_cast<const ColumnString &>(*exports_block.getByName("transaction_id").column);

auto header = exports_block.cloneEmpty();
header.insert(0, {ColumnString::create(), std::make_shared<DataTypeString>(), "kill_status"});

MutableColumns res_columns = header.cloneEmptyColumns();
auto table_id = StorageID::createEmpty();
AccessRightsElements required_access_rights;
auto access = getContext()->getAccess();
bool access_denied = false;

for (size_t i = 0; i < exports_block.rows(); ++i)
{
const auto src_database = src_db_col.getDataAt(i).toString();
const auto src_table = src_tbl_col.getDataAt(i).toString();
const auto dst_database = dst_db_col.getDataAt(i).toString();
const auto dst_table = dst_tbl_col.getDataAt(i).toString();

table_id = StorageID{src_database, src_table};
auto transaction_id = tx_col.getDataAt(i).toString();

CancellationCode code = CancellationCode::Unknown;
if (!query.test)
{
auto storage = DatabaseCatalog::instance().tryGetTable(table_id, getContext());
if (!storage)
code = CancellationCode::NotFound;
else
{
ASTAlterCommand alter_command{};
alter_command.type = ASTAlterCommand::EXPORT_PARTITION;
alter_command.move_destination_type = DataDestinationType::TABLE;
alter_command.from_database = src_database;
alter_command.from_table = src_table;
alter_command.to_database = dst_database;
alter_command.to_table = dst_table;

required_access_rights = InterpreterAlterQuery::getRequiredAccessForCommand(
alter_command, table_id.database_name, table_id.table_name);
if (!access->isGranted(required_access_rights))
{
access_denied = true;
continue;
}
code = storage->killExportPartition(transaction_id);
}
}

insertResultRow(i, code, exports_block, header, res_columns);
}

if (res_columns[0]->empty() && access_denied)
throw Exception(ErrorCodes::ACCESS_DENIED, "Not allowed to kill export partition. "
"To execute this query, it's necessary to have the grant {}", required_access_rights.toString());

res_io.pipeline = QueryPipeline(Pipe(std::make_shared<SourceFromSingleChunk>(header.cloneWithColumns(std::move(res_columns)))));

break;
}
case ASTKillQueryQuery::Type::Mutation:
{
Block mutations_block = getSelectResult("database, table, mutation_id, command", "system.mutations");
Expand Down Expand Up @@ -462,6 +533,9 @@ AccessRightsElements InterpreterKillQueryQuery::getRequiredAccessForDDLOnCluster
| AccessType::ALTER_MATERIALIZE_COLUMN
| AccessType::ALTER_MATERIALIZE_TTL
);
/// todo arthur think about this
else if (query.type == ASTKillQueryQuery::Type::ExportPartition)
required_access.emplace_back(AccessType::ALTER_EXPORT_PARTITION);
return required_access;
}

Expand Down
11 changes: 11 additions & 0 deletions src/Parsers/ASTAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,17 @@ void ASTAlterCommand::formatImpl(WriteBuffer & ostr, const FormatSettings & sett
}

}
else if (type == ASTAlterCommand::EXPORT_PARTITION)
{
ostr << (settings.hilite ? hilite_keyword : "") << "EXPORT PARTITION " << (settings.hilite ? hilite_none : "");
partition->format(ostr, settings, state, frame);
ostr << " TO TABLE ";
if (!to_database.empty())
{
ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_database) << (settings.hilite ? hilite_none : "") << ".";
}
ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_table) << (settings.hilite ? hilite_none : "");
}
else if (type == ASTAlterCommand::REPLACE_PARTITION)
{
ostr << (settings.hilite ? hilite_keyword : "") << (replace ? "REPLACE" : "ATTACH") << " PARTITION "
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTAlterQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class ASTAlterCommand : public IAST
UNFREEZE_PARTITION,
UNFREEZE_ALL,
EXPORT_PART,
EXPORT_PARTITION,

DELETE,
UPDATE,
Expand Down
3 changes: 3 additions & 0 deletions src/Parsers/ASTKillQueryQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ void ASTKillQueryQuery::formatQueryImpl(WriteBuffer & ostr, const FormatSettings
case Type::Transaction:
ostr << "TRANSACTION";
break;
case Type::ExportPartition:
ostr << "EXPORT PARTITION";
break;
}

ostr << (settings.hilite ? hilite_none : "");
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTKillQueryQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class ASTKillQueryQuery : public ASTQueryWithOutput, public ASTQueryWithOnCluste
{
Query, /// KILL QUERY
Mutation, /// KILL MUTATION
ExportPartition, /// KILL EXPORT_PARTITION
PartMoveToShard, /// KILL PART_MOVE_TO_SHARD
Transaction, /// KILL TRANSACTION
};
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/CommonParsers.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ namespace DB
MR_MACROS(MOVE_PART, "MOVE PART") \
MR_MACROS(MOVE_PARTITION, "MOVE PARTITION") \
MR_MACROS(EXPORT_PART, "EXPORT PART") \
MR_MACROS(EXPORT_PARTITION, "EXPORT PARTITION") \
MR_MACROS(MOVE, "MOVE") \
MR_MACROS(MS, "MS") \
MR_MACROS(MUTATION, "MUTATION") \
Expand Down
17 changes: 17 additions & 0 deletions src/Parsers/ParserAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_move_partition(Keyword::MOVE_PARTITION);
ParserKeyword s_move_part(Keyword::MOVE_PART);
ParserKeyword s_export_part(Keyword::EXPORT_PART);
ParserKeyword s_export_partition(Keyword::EXPORT_PARTITION);
ParserKeyword s_drop_detached_partition(Keyword::DROP_DETACHED_PARTITION);
ParserKeyword s_drop_detached_part(Keyword::DROP_DETACHED_PART);
ParserKeyword s_fetch_partition(Keyword::FETCH_PARTITION);
Expand Down Expand Up @@ -553,6 +554,22 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
return false;
command->move_destination_type = DataDestinationType::TABLE;
}
else if (s_export_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, command_partition, expected))
return false;

command->type = ASTAlterCommand::EXPORT_PARTITION;

if (!s_to_table.ignore(pos, expected))
{
return false;
}

if (!parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table))
return false;
command->move_destination_type = DataDestinationType::TABLE;
}
else if (s_move_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, command_partition, expected))
Expand Down
3 changes: 3 additions & 0 deletions src/Parsers/ParserKillQueryQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ bool ParserKillQueryQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expect
ParserKeyword p_kill{Keyword::KILL};
ParserKeyword p_query{Keyword::QUERY};
ParserKeyword p_mutation{Keyword::MUTATION};
ParserKeyword p_export_partition{Keyword::EXPORT_PARTITION};
ParserKeyword p_part_move_to_shard{Keyword::PART_MOVE_TO_SHARD};
ParserKeyword p_transaction{Keyword::TRANSACTION};
ParserKeyword p_on{Keyword::ON};
Expand All @@ -33,6 +34,8 @@ bool ParserKillQueryQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expect
query->type = ASTKillQueryQuery::Type::Query;
else if (p_mutation.ignore(pos, expected))
query->type = ASTKillQueryQuery::Type::Mutation;
else if (p_export_partition.ignore(pos, expected))
query->type = ASTKillQueryQuery::Type::ExportPartition;
else if (p_part_move_to_shard.ignore(pos, expected))
query->type = ASTKillQueryQuery::Type::PartMoveToShard;
else if (p_transaction.ignore(pos, expected))
Expand Down
68 changes: 68 additions & 0 deletions src/Storages/ExportReplicatedMergeTreePartitionManifest.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#pragma once

#include <base/types.h>
#include <Interpreters/StorageID.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Array.h>
#include <Poco/JSON/Parser.h>

namespace DB
{

struct ExportReplicatedMergeTreePartitionManifest
{
String transaction_id;
String partition_id;
String destination_database;
String destination_table;
String source_replica;
size_t number_of_parts;
std::vector<String> parts;
time_t create_time;

std::string toJsonString() const
{
Poco::JSON::Object json;
json.set("transaction_id", transaction_id);
json.set("partition_id", partition_id);
json.set("destination_database", destination_database);
json.set("destination_table", destination_table);
json.set("source_replica", source_replica);
json.set("number_of_parts", number_of_parts);

Poco::JSON::Array::Ptr parts_array = new Poco::JSON::Array();
for (const auto & part : parts)
parts_array->add(part);
json.set("parts", parts_array);

json.set("create_time", create_time);
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
return oss.str();
}

static ExportReplicatedMergeTreePartitionManifest fromJsonString(const std::string & json_string)
{
Poco::JSON::Parser parser;
auto json = parser.parse(json_string).extract<Poco::JSON::Object::Ptr>();
chassert(json);

ExportReplicatedMergeTreePartitionManifest manifest;
manifest.transaction_id = json->getValue<String>("transaction_id");
manifest.partition_id = json->getValue<String>("partition_id");
manifest.destination_database = json->getValue<String>("destination_database");
manifest.destination_table = json->getValue<String>("destination_table");
manifest.source_replica = json->getValue<String>("source_replica");
manifest.number_of_parts = json->getValue<size_t>("number_of_parts");

auto parts_array = json->getArray("parts");
for (size_t i = 0; i < parts_array->size(); ++i)
manifest.parts.push_back(parts_array->getElement<String>(static_cast<unsigned int>(i)));

manifest.create_time = json->getValue<time_t>("create_time");
return manifest;
}
};

}
21 changes: 21 additions & 0 deletions src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#pragma once

#include <Storages/ExportReplicatedMergeTreePartitionManifest.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>

namespace DB
{
struct ExportReplicatedMergeTreePartitionTaskEntry
{
using DataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;
ExportReplicatedMergeTreePartitionManifest manifest;

std::size_t parts_to_do;
/// References to the parts that should be exported
/// This is used to prevent the parts from being deleted before finishing the export operation
/// It does not mean this replica will export all the parts
/// There is also a chance this replica does not contain a given part and it is totally ok.
std::vector<DataPartPtr> part_references;
};

}
5 changes: 5 additions & 0 deletions src/Storages/IStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ CancellationCode IStorage::killPartMoveToShard(const UUID & /*task_uuid*/)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Part moves between shards are not supported by storage {}", getName());
}

CancellationCode IStorage::killExportPartition(const String & /*transaction_id*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Export partition is not supported by storage {}", getName());
}

StorageID IStorage::getStorageID() const
{
std::lock_guard lock(id_mutex);
Expand Down
12 changes: 12 additions & 0 deletions src/Storages/IStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,15 @@ It is currently only implemented in StorageObjectStorage.
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Import is not implemented for storage {}", getName());
}

virtual void commitExportPartitionTransaction(
const String & /* transaction_id */,
const String & /* partition_id */,
const Strings & /* exported_paths */,
ContextPtr /* local_context */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "commitExportPartitionTransaction is not implemented for storage type {}", getName());
}


/** Writes the data to a table in distributed manner.
Expand Down Expand Up @@ -565,6 +574,9 @@ It is currently only implemented in StorageObjectStorage.

virtual void setMutationCSN(const String & /*mutation_id*/, UInt64 /*csn*/);

/// Cancel a replicated partition export by transaction id.
virtual CancellationCode killExportPartition(const String & /*transaction_id*/);

/// Cancel a part move to shard.
virtual CancellationCode killPartMoveToShard(const UUID & /*task_uuid*/);

Expand Down
Loading
Loading