Skip to content

Commit fe65d3d

Browse files
committed
Fix issue timeplus-io#912: Kafka external stream fix
1 parent 36ef0d9 commit fe65d3d

10 files changed

+169
-2
lines changed

docker-compose.yml

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
version: '3.8'
2+
3+
services:
4+
zookeeper:
5+
image: confluentinc/cp-zookeeper:latest
6+
container_name: zookeeper
7+
environment:
8+
ZOOKEEPER_CLIENT_PORT: 2181
9+
ports:
10+
- "2181:2181"
11+
12+
kafka:
13+
image: confluentinc/cp-kafka:latest
14+
container_name: kafka
15+
depends_on:
16+
- zookeeper
17+
ports:
18+
- "9092:9092"
19+
environment:
20+
KAFKA_BROKER_ID: 1
21+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
22+
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
23+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
24+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
25+
ALLOW_PLAINTEXT_LISTENER: "yes"
26+

my_schema.json

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"type": "record",
3+
"name": "Data",
4+
"fields": [
5+
{"name": "id", "type": "int"},
6+
{"name": "name", "type": "string"},
7+
{"name": "score", "type": "float"}
8+
]
9+
}

src/Formats/FormatSchemaInfo.cpp

+37
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,33 @@
22
#include <Interpreters/Context.h>
33
#include <Common/Exception.h>
44
#include <filesystem>
5+
#include <unordered_map>
56

67

78
namespace DB
89
{
10+
String avroTypeToClickHouseType(const String & avro_type)
11+
{
12+
static const std::unordered_map<String, String> avro_to_clickhouse = {
13+
{"null", "Nullable(String)"},
14+
{"boolean", "Bool"},
15+
{"int", "int32"},
16+
{"long", "int64"},
17+
{"float", "float32"},
18+
{"double", "float64"},
19+
{"bytes", "string"},
20+
{"string", "string"},
21+
{"array", "Array(string)"},
22+
{"map", "Map(string, string)"},
23+
{"fixed", "FixedString(16)"}
24+
};
25+
26+
auto it = avro_to_clickhouse.find(avro_type);
27+
if (it != avro_to_clickhouse.end())
28+
return it->second;
29+
30+
return "string";
31+
}
932
namespace ErrorCodes
1033
{
1134
extern const int BAD_ARGUMENTS;
@@ -32,6 +55,7 @@ namespace
3255
}
3356

3457

58+
3559
FormatSchemaInfo::FormatSchemaInfo(const String & format_schema, const String & format, bool require_message, bool is_server, const std::string & format_schema_path)
3660
{
3761
if (format_schema.empty())
@@ -100,6 +124,19 @@ FormatSchemaInfo::FormatSchemaInfo(const String & format_schema, const String &
100124
schema_path = path.filename();
101125
schema_directory = path.parent_path() / "";
102126
}
127+
else if (path.has_parent_path() && !fs::weakly_canonical(default_schema_directory_path / path).string().starts_with(fs::weakly_canonical(default_schema_directory_path).string()))
128+
{
129+
if (is_server)
130+
throw Exception(
131+
ErrorCodes::BAD_ARGUMENTS,
132+
"Path in the 'format_schema' setting shouldn't go outside the 'format_schema_path' directory: {} ({} not in {})",
133+
default_schema_directory(),
134+
path.string(),
135+
default_schema_directory());
136+
path = default_schema_directory_path / path;
137+
schema_path = path.filename();
138+
schema_directory = path.parent_path() / "";
139+
}
103140
else
104141
{
105142
schema_path = path;

src/Formats/FormatSchemaInfo.h

+2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
namespace DB
77
{
8+
String avroTypeToClickHouseType(const String & avro_type);
9+
810
class Context;
911

1012
/// Extracts information about where the format schema file is from passed context and keep it.

src/Interpreters/InterpreterCreateQuery.cpp

+73-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313

1414
#include <Columns/ColumnString.h>
1515

16+
#include <Formats/FormatSchemaInfo.h>
17+
18+
#include <IO/ReadBufferFromFile.h>
19+
#include <IO/ReadHelpers.h>
1620
#include <IO/WriteBufferFromFile.h>
1721
#include <IO/WriteHelpers.h>
1822

@@ -26,6 +30,11 @@
2630
#include <Parsers/parseQuery.h>
2731
#include <Parsers/queryToString.h>
2832

33+
#include <Poco/JSON/Parser.h>
34+
#include <Poco/Dynamic/Var.h>
35+
#include <Poco/JSON/Object.h>
36+
#include <Poco/JSON/Array.h>
37+
2938
#include <Storages/StorageFactory.h>
3039
#include <Storages/StorageInMemoryMetadata.h>
3140
#include <Storages/Streaming/StorageStream.h>
@@ -1454,8 +1463,70 @@ BlockIO InterpreterCreateQuery::execute()
14541463
/// CREATE|ATTACH DATABASE
14551464
if (create.database && !create.table)
14561465
return createDatabase(create);
1457-
else
1458-
return createTable(create);
1466+
// else
1467+
// return createTable(create);
1468+
1469+
/// CREATE EXTERNAL STREAM
1470+
if (create.is_external && create.storage && create.storage->settings)
1471+
{
1472+
// Extract data_schema from SETTINGS
1473+
auto * set_query = create.storage->settings;
1474+
for (const auto & change : set_query->changes)
1475+
{
1476+
if (change.name == "data_schema")
1477+
create.data_schema = change.value.safeGet<String>();
1478+
}
1479+
// Automatically derive schema
1480+
if (!create.columns_list && create.data_schema)
1481+
{
1482+
// FORMAT SCHEMA JSON
1483+
FormatSchemaInfo schema_info(*create.data_schema, "Avro", false, getContext()->getApplicationType() == Context::ApplicationType::SERVER, getContext()->getFormatSchemaPath());
1484+
String schema_path = schema_info.absoluteSchemaPath();
1485+
1486+
std::string avro_schema_json;
1487+
{
1488+
ReadBufferFromFile in(schema_path);
1489+
readStringUntilEOF(avro_schema_json, in);
1490+
}
1491+
1492+
Poco::JSON::Parser parser;
1493+
Poco::Dynamic::Var parsed_result = parser.parse(avro_schema_json);
1494+
Poco::JSON::Object::Ptr schema_obj = parsed_result.extract<Poco::JSON::Object::Ptr>();
1495+
1496+
if (!schema_obj->has("fields"))
1497+
{
1498+
throw Exception("Invalid Avro schema: 'fields' not found in schema: " + *create.data_schema,
1499+
ErrorCodes::BAD_ARGUMENTS);
1500+
}
1501+
1502+
Poco::JSON::Array::Ptr fields = schema_obj->getArray("fields");
1503+
auto columns_list = std::make_shared<ASTColumns>();
1504+
auto expr_list = std::make_shared<ASTExpressionList>();
1505+
columns_list->columns = expr_list.get();
1506+
columns_list->children.push_back(expr_list);
1507+
1508+
for (size_t i = 0; i < fields->size(); ++i)
1509+
{
1510+
Poco::JSON::Object::Ptr field = fields->getObject(i);
1511+
String column_name = field->getValue<String>("name");
1512+
String column_type = field->getValue<String>("type");
1513+
1514+
// Create ASTColumnDeclaration
1515+
auto column_decl = std::make_shared<ASTColumnDeclaration>();
1516+
column_decl->name = column_name;
1517+
1518+
// Change Avro type into ClickHouse Type
1519+
String clickhouse_type = DB::avroTypeToClickHouseType(column_type);
1520+
auto type_ast = std::make_shared<ASTIdentifier>(clickhouse_type);
1521+
column_decl->type = type_ast;
1522+
1523+
expr_list->children.push_back(column_decl);
1524+
}
1525+
1526+
create.set(create.columns_list, columns_list);
1527+
}
1528+
}
1529+
return createTable(create);
14591530
}
14601531

14611532

src/Parsers/ASTCreateQuery.cpp

+9
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,8 @@ ASTPtr ASTCreateQuery::clone() const
213213
if (comment)
214214
res->set(res->comment, comment->clone());
215215

216+
res->data_schema = data_schema;
217+
216218
cloneOutputOptions(*res);
217219
cloneTableOptions(*res);
218220

@@ -254,6 +256,13 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
254256
comment->formatImpl(settings, state, frame);
255257
}
256258

259+
if (data_schema)
260+
{
261+
settings.ostr << (settings.hilite ? hilite_keyword : "") << settings.nl_or_ws
262+
<< "data_schema = " << (settings.hilite ? hilite_none : "")
263+
<< quoteString(*data_schema);
264+
}
265+
257266
return;
258267
}
259268

src/Parsers/ASTCreateQuery.h

+3
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ class ASTCreateQuery : public ASTQueryWithTableAndOutput, public ASTQueryWithOnC
7676
/// CREATE EXTERNAL STREAM
7777
bool is_external = false;
7878

79+
/// For store data_schema
80+
std::optional<String> data_schema;
81+
7982
ASTColumns * columns_list = nullptr;
8083
ASTExpressionList * tables = nullptr;
8184

src/Storages/ExecutableSettings.cpp

+8
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@ void ExecutableSettings::loadFromQuery(ASTStorage & storage_def)
2222
{
2323
try
2424
{
25+
for (const auto & change : storage_def.settings->changes)
26+
{
27+
if (change.name == "data_schema")
28+
{
29+
this->data_schema = change.value.safeGet<String>();
30+
}
31+
}
32+
2533
applyChanges(storage_def.settings->changes);
2634
}
2735
catch (Exception & e)

src/Storages/ExecutableSettings.h

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ namespace DB
99
class ASTStorage;
1010

1111
#define LIST_OF_EXECUTABLE_SETTINGS(M) \
12+
M(String, data_schema, "", "Avro schema identifier for the stream", 0) \
1213
M(Bool, send_chunk_header, false, "Send number_of_rows\n before sending chunk to process.", 0) \
1314
M(UInt64, pool_size, 16, "Processes pool size. If size == 0, then there is no size restrictions.", 0) \
1415
M(UInt64, max_command_execution_time, 10, "Max command execution time in seconds.", 0) \

src/Storages/ExternalStream/ExternalStreamSettings.h

+1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class ASTStorage;
5858

5959
#define ALL_EXTERNAL_STREAM_SETTINGS(M) \
6060
M(String, type, "", "External stream type", 0) \
61+
M(String, data_schema, "", "Avro schema identifier for the stream", 0) \
6162
KAFKA_EXTERNAL_STREAM_SETTINGS(M) \
6263
LOG_FILE_EXTERNAL_STREAM_SETTINGS(M) \
6364
PULSAR_EXTERNAL_STREAM_SETTINGS(M) \

0 commit comments

Comments
 (0)