diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000000..ffc2eb4ac2 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,26 @@ +version: '3.8' + +services: + zookeeper: + image: confluentinc/cp-zookeeper:latest + container_name: zookeeper + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ports: + - "2181:2181" + + kafka: + image: confluentinc/cp-kafka:latest + container_name: kafka + depends_on: + - zookeeper + ports: + - "9092:9092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + ALLOW_PLAINTEXT_LISTENER: "yes" + diff --git a/my_schema.json b/my_schema.json new file mode 100644 index 0000000000..f1358c1eec --- /dev/null +++ b/my_schema.json @@ -0,0 +1,9 @@ +{ + "type": "record", + "name": "Data", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "name", "type": "string"}, + {"name": "score", "type": "float"} + ] +} diff --git a/src/Formats/FormatSchemaInfo.cpp b/src/Formats/FormatSchemaInfo.cpp index 0f83ae8ffe..2aa4a764cb 100644 --- a/src/Formats/FormatSchemaInfo.cpp +++ b/src/Formats/FormatSchemaInfo.cpp @@ -2,10 +2,33 @@ #include #include #include +#include namespace DB { +String avroTypeToClickHouseType(const String & avro_type) +{ + static const std::unordered_map avro_to_clickhouse = { + {"null", "Nullable(String)"}, + {"boolean", "Bool"}, + {"int", "int32"}, + {"long", "int64"}, + {"float", "float32"}, + {"double", "float64"}, + {"bytes", "string"}, + {"string", "string"}, + {"array", "Array(string)"}, + {"map", "Map(string, string)"}, + {"fixed", "FixedString(16)"} + }; + + auto it = avro_to_clickhouse.find(avro_type); + if (it != avro_to_clickhouse.end()) + return it->second; + + return "string"; +} namespace ErrorCodes { extern const int BAD_ARGUMENTS; @@ -32,6 +55,7 @@ namespace } + FormatSchemaInfo::FormatSchemaInfo(const String & format_schema, const String & format, bool require_message, bool is_server, const std::string & format_schema_path) { if (format_schema.empty()) @@ -100,6 +124,19 @@ FormatSchemaInfo::FormatSchemaInfo(const String & format_schema, const String & schema_path = path.filename(); schema_directory = path.parent_path() / ""; } + else if (path.has_parent_path() && !fs::weakly_canonical(default_schema_directory_path / path).string().starts_with(fs::weakly_canonical(default_schema_directory_path).string())) + { + if (is_server) + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Path in the 'format_schema' setting shouldn't go outside the 'format_schema_path' directory: {} ({} not in {})", + default_schema_directory(), + path.string(), + default_schema_directory()); + path = default_schema_directory_path / path; + schema_path = path.filename(); + schema_directory = path.parent_path() / ""; + } else { schema_path = path; diff --git a/src/Formats/FormatSchemaInfo.h b/src/Formats/FormatSchemaInfo.h index 8c430218af..d5a7592842 100644 --- a/src/Formats/FormatSchemaInfo.h +++ b/src/Formats/FormatSchemaInfo.h @@ -5,6 +5,8 @@ namespace DB { +String avroTypeToClickHouseType(const String & avro_type); + class Context; /// Extracts information about where the format schema file is from passed context and keep it. diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index 98ae4f4827..a7c594bfe6 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -13,6 +13,10 @@ #include +#include + +#include +#include #include #include @@ -26,6 +30,11 @@ #include #include +#include +#include +#include +#include + #include #include #include @@ -1458,8 +1467,70 @@ BlockIO InterpreterCreateQuery::execute() /// CREATE|ATTACH DATABASE if (create.database && !create.table) return createDatabase(create); - else - return createTable(create); +// else +// return createTable(create); + + /// CREATE EXTERNAL STREAM + if (create.is_external && create.storage && create.storage->settings) + { + // Extract data_schema from SETTINGS + auto * set_query = create.storage->settings; + for (const auto & change : set_query->changes) + { + if (change.name == "data_schema") + create.data_schema = change.value.safeGet(); + } + // Automatically derive schema + if (!create.columns_list && create.data_schema) + { + // FORMAT SCHEMA JSON + FormatSchemaInfo schema_info(*create.data_schema, "Avro", false, getContext()->getApplicationType() == Context::ApplicationType::SERVER, getContext()->getFormatSchemaPath()); + String schema_path = schema_info.absoluteSchemaPath(); + + std::string avro_schema_json; + { + ReadBufferFromFile in(schema_path); + readStringUntilEOF(avro_schema_json, in); + } + + Poco::JSON::Parser parser; + Poco::Dynamic::Var parsed_result = parser.parse(avro_schema_json); + Poco::JSON::Object::Ptr schema_obj = parsed_result.extract(); + + if (!schema_obj->has("fields")) + { + throw Exception("Invalid Avro schema: 'fields' not found in schema: " + *create.data_schema, + ErrorCodes::BAD_ARGUMENTS); + } + + Poco::JSON::Array::Ptr fields = schema_obj->getArray("fields"); + auto columns_list = std::make_shared(); + auto expr_list = std::make_shared(); + columns_list->columns = expr_list.get(); + columns_list->children.push_back(expr_list); + + for (size_t i = 0; i < fields->size(); ++i) + { + Poco::JSON::Object::Ptr field = fields->getObject(i); + String column_name = field->getValue("name"); + String column_type = field->getValue("type"); + + // Create ASTColumnDeclaration + auto column_decl = std::make_shared(); + column_decl->name = column_name; + + // Change Avro type into ClickHouse Type + String clickhouse_type = DB::avroTypeToClickHouseType(column_type); + auto type_ast = std::make_shared(clickhouse_type); + column_decl->type = type_ast; + + expr_list->children.push_back(column_decl); + } + + create.set(create.columns_list, columns_list); + } + } + return createTable(create); } diff --git a/src/Parsers/ASTCreateQuery.cpp b/src/Parsers/ASTCreateQuery.cpp index 608349d77e..659b29de51 100644 --- a/src/Parsers/ASTCreateQuery.cpp +++ b/src/Parsers/ASTCreateQuery.cpp @@ -213,6 +213,8 @@ ASTPtr ASTCreateQuery::clone() const if (comment) res->set(res->comment, comment->clone()); + res->data_schema = data_schema; + cloneOutputOptions(*res); cloneTableOptions(*res); @@ -254,6 +256,13 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat comment->formatImpl(settings, state, frame); } + if (data_schema) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << settings.nl_or_ws + << "data_schema = " << (settings.hilite ? hilite_none : "") + << quoteString(*data_schema); + } + return; } diff --git a/src/Parsers/ASTCreateQuery.h b/src/Parsers/ASTCreateQuery.h index 231fd0863f..2fabeea886 100644 --- a/src/Parsers/ASTCreateQuery.h +++ b/src/Parsers/ASTCreateQuery.h @@ -76,6 +76,9 @@ class ASTCreateQuery : public ASTQueryWithTableAndOutput, public ASTQueryWithOnC /// CREATE EXTERNAL STREAM bool is_external = false; + /// For store data_schema + std::optional data_schema; + ASTColumns * columns_list = nullptr; ASTExpressionList * tables = nullptr; diff --git a/src/Storages/ExecutableSettings.cpp b/src/Storages/ExecutableSettings.cpp index 136357eb6f..4419610be2 100644 --- a/src/Storages/ExecutableSettings.cpp +++ b/src/Storages/ExecutableSettings.cpp @@ -22,6 +22,14 @@ void ExecutableSettings::loadFromQuery(ASTStorage & storage_def) { try { + for (const auto & change : storage_def.settings->changes) + { + if (change.name == "data_schema") + { + this->data_schema = change.value.safeGet(); + } + } + applyChanges(storage_def.settings->changes); } catch (Exception & e) diff --git a/src/Storages/ExecutableSettings.h b/src/Storages/ExecutableSettings.h index c6c1f0b9eb..5e014c915e 100644 --- a/src/Storages/ExecutableSettings.h +++ b/src/Storages/ExecutableSettings.h @@ -9,6 +9,7 @@ namespace DB class ASTStorage; #define LIST_OF_EXECUTABLE_SETTINGS(M) \ + M(String, data_schema, "", "Avro schema identifier for the stream", 0) \ M(Bool, send_chunk_header, false, "Send number_of_rows\n before sending chunk to process.", 0) \ M(UInt64, pool_size, 16, "Processes pool size. If size == 0, then there is no size restrictions.", 0) \ M(UInt64, max_command_execution_time, 10, "Max command execution time in seconds.", 0) \ diff --git a/src/Storages/ExternalStream/ExternalStreamSettings.h b/src/Storages/ExternalStream/ExternalStreamSettings.h index 6975b80d36..27fad3e51d 100644 --- a/src/Storages/ExternalStream/ExternalStreamSettings.h +++ b/src/Storages/ExternalStream/ExternalStreamSettings.h @@ -58,6 +58,7 @@ class ASTStorage; #define ALL_EXTERNAL_STREAM_SETTINGS(M) \ M(String, type, "", "External stream type", 0) \ + M(String, data_schema, "", "Avro schema identifier for the stream", 0) \ KAFKA_EXTERNAL_STREAM_SETTINGS(M) \ LOG_FILE_EXTERNAL_STREAM_SETTINGS(M) \ PULSAR_EXTERNAL_STREAM_SETTINGS(M) \