Skip to content

Commit 0860399

Browse files
authored
feat: timeplus external stream (#894)
1 parent 051b614 commit 0860399

14 files changed

+510
-8
lines changed

src/Databases/DDLDependencyVisitor.cpp

+41
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
#include <Parsers/ASTLiteral.h>
88
#include <Parsers/ASTSelectWithUnionQuery.h>
99
#include <Poco/String.h>
10+
/// proton: starts
11+
#include <Common/parseRemoteDescription.h>
12+
#include <Storages/ExternalStream/ExternalStreamSettings.h>
13+
/// proton: ends
1014

1115
namespace DB
1216
{
@@ -83,10 +87,47 @@ void DDLDependencyVisitor::visit(const ASTFunctionWithKeyValueArguments & dict_s
8387
data.dependencies.emplace(std::move(info->table_name));
8488
}
8589

90+
/// proton: starts
91+
namespace
92+
{
93+
bool hasLocalAddress(const String & hosts, bool secure)
94+
{
95+
auto global_context = Context::getGlobalContextInstance();
96+
UInt16 default_port = secure ? global_context->getTCPPortSecure().value_or(0) : global_context->getTCPPort();
97+
auto addresses = parseRemoteDescriptionForExternalDatabase(hosts, /*max_addresses=*/ 10, /*default_port=*/ default_port);
98+
for (const auto & addr : addresses)
99+
if (isLocalAddress({addr.first, addr.second}, default_port))
100+
return true;
101+
102+
return false;
103+
}
104+
}
105+
/// proton: ends
106+
86107
void DDLDependencyVisitor::visit(const ASTStorage & storage, Data & data)
87108
{
88109
if (!storage.engine)
89110
return;
111+
112+
/// proton: starts
113+
/// Because Timeplus external streams need to get the structure of the target stream,
114+
/// it depends on the target stream. Thus, if a Timeplus external stream is pointing to
115+
/// a local stream, then add the target stream to dependencies, to make sure that the
116+
/// target stream is loaded before the external stream.
117+
if (storage.engine->name == "ExternalStream")
118+
{
119+
ExternalStreamSettings settings;
120+
settings.loadFromQuery(const_cast<ASTStorage &>(storage));
121+
if (settings.type.value == "timeplus" && hasLocalAddress(settings.hosts, settings.secure))
122+
{
123+
QualifiedTableName name{settings.db, settings.stream};
124+
data.dependencies.emplace(std::move(name));
125+
}
126+
127+
return;
128+
}
129+
/// proton: ends
130+
90131
if (storage.engine->name != "Dictionary")
91132
return;
92133

src/Interpreters/ClusterProxy/executeQuery.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, ContextPtr c
9494
new_settings.limit = 0;
9595
new_settings.limit.changed = false;
9696
}
97+
/// proton: starts
98+
if (settings.query_mode.value == "table")
99+
new_settings.query_mode = settings.query_mode;
100+
/// proton: ends
97101

98102
auto new_context = Context::createCopy(context);
99103
new_context->setSettings(new_settings);

src/Processors/Sources/RemoteSource.cpp

+6-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
namespace DB
99
{
1010

11-
RemoteSource::RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation_info_, bool async_read_)
11+
RemoteSource::RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation_info_, bool async_read_, std::optional<bool> is_streaming_)
1212
: ISource(executor->getHeader(), false, ProcessorID::RemoteSourceID)
1313
, add_aggregation_info(add_aggregation_info_), query_executor(std::move(executor))
1414
, async_read(async_read_)
@@ -18,6 +18,11 @@ RemoteSource::RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation
1818
for (auto & type : sample.getDataTypes())
1919
if (typeid_cast<const DataTypeAggregateFunction *>(type.get()))
2020
add_aggregation_info = true;
21+
22+
/// proton: starts
23+
if (is_streaming_)
24+
setStreaming(is_streaming_.value());
25+
/// proton: ends
2126
}
2227

2328
RemoteSource::~RemoteSource() = default;

src/Processors/Sources/RemoteSource.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class RemoteSource final : public ISource
2020
/// Flag add_aggregation_info tells if AggregatedChunkInfo should be added to result chunk.
2121
/// AggregatedChunkInfo stores the bucket number used for two-level aggregation.
2222
/// This flag should be typically enabled for queries with GROUP BY which are executed till WithMergeableState.
23-
RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation_info_, bool async_read_);
23+
RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation_info_, bool async_read_, std::optional<bool> is_streaming_=std::nullopt); /// proton: added is_streaming_
2424
~RemoteSource() override;
2525

2626
Status prepare() override;

src/Storages/ExternalStream/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ endif()
1010

1111
add_headers_and_sources(external_stream Kafka)
1212
add_headers_and_sources(external_stream Pulsar)
13+
add_headers_and_sources(external_stream Timeplus)
1314

1415
add_library(external_stream ${external_stream_headers} ${external_stream_sources})
1516

src/Storages/ExternalStream/ExternalStreamSettings.h

+9-1
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,19 @@ class ASTStorage;
4949
M(UInt64, memory_limit, 0, "Configure a limit on the amount of memory that will be allocated by this external stream. Setting this to 0 will disable the limit. By default this is disabled.", 0) \
5050
M(UInt64, io_threads, 1, "Set the number of IO threads to be used by the Pulsar client. Default is 1 thread.", 0)
5151

52+
#define TIMEPLUS_EXTERNAL_STREAM_SETTINGS(M, ALIAS) \
53+
M(String, hosts, "", "A remote server address or an expression that generates multiple addresses of remote servers. Format: host or host:port.", 0) \
54+
M(String, db, "default", "Database name.", 0) \
55+
M(String, stream, "", "Stream name.", 0) \
56+
M(String, user, "", "User name. If not specified, `default` is be used.", 0) \
57+
M(Bool, secure, false, "Use secure connection.", 0)
58+
5259
#define ALL_EXTERNAL_STREAM_SETTINGS(M) \
5360
M(String, type, "", "External stream type", 0) \
5461
KAFKA_EXTERNAL_STREAM_SETTINGS(M) \
5562
LOG_FILE_EXTERNAL_STREAM_SETTINGS(M) \
56-
PULSAR_EXTERNAL_STREAM_SETTINGS(M)
63+
PULSAR_EXTERNAL_STREAM_SETTINGS(M) \
64+
TIMEPLUS_EXTERNAL_STREAM_SETTINGS(M, ALIAS)
5765

5866
#define LIST_OF_EXTERNAL_STREAM_SETTINGS(M) \
5967
ALL_EXTERNAL_STREAM_SETTINGS(M) \

src/Storages/ExternalStream/ExternalStreamTypes.h

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ namespace StreamTypes
77
const String KAFKA = "kafka";
88
const String REDPANDA = "redpanda";
99
const String PULSAR = "pulsar";
10+
const String TIMEPLUS = "timeplus";
1011
const String LOG = "log";
1112
}
1213
}

src/Storages/ExternalStream/StorageExternalStream.cpp

+9-2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <Storages/ExternalStream/ExternalStreamTypes.h>
1010
#include <Storages/ExternalStream/Kafka/Kafka.h>
1111
#include <Storages/ExternalStream/Pulsar/Pulsar.h>
12+
#include <Storages/ExternalStream/Timeplus/Timeplus.h>
1213
#ifdef OS_LINUX
1314
# include <Storages/ExternalStream/Log/FileLog.h>
1415
#endif
@@ -66,6 +67,7 @@ StoragePtr createExternalStream(
6667
IStorage * storage,
6768
ExternalStreamSettingsPtr settings,
6869
const ASTs & engine_args,
70+
StorageInMemoryMetadata & storage_metadata,
6971
bool attach,
7072
ExternalStreamCounterPtr external_stream_counter,
7173
ContextPtr context_)
@@ -79,6 +81,9 @@ StoragePtr createExternalStream(
7981
if (type == StreamTypes::KAFKA || type == StreamTypes::REDPANDA)
8082
return std::make_unique<Kafka>(storage, std::move(settings), engine_args, attach, external_stream_counter, std::move(context_));
8183

84+
if (type == StreamTypes::TIMEPLUS)
85+
return std::make_unique<ExternalStream::Timeplus>(storage, storage_metadata, std::move(settings), attach, std::move(context_));
86+
8287
#ifdef OS_LINUX
8388
if (type == StreamTypes::LOG && context_->getSettingsRef()._tp_enable_log_stream_expr.value)
8489
return std::make_unique<FileLog>(storage, std::move(settings), std::move(context_));
@@ -121,7 +126,7 @@ StorageExternalStream::StorageExternalStream(
121126
}
122127
}
123128

124-
if (columns_.empty())
129+
if (columns_.empty() && external_stream_settings->type.value != StreamTypes::TIMEPLUS)
125130
/// This is the same error reported by InterpreterCreateQuery
126131
throw Exception(
127132
ErrorCodes::INCORRECT_QUERY, "Incorrect CREATE query: required list of column descriptions or AS section or SELECT.");
@@ -145,8 +150,10 @@ StorageExternalStream::StorageExternalStream(
145150

146151
auto metadata = getInMemoryMetadata();
147152
auto stream = createExternalStream(
148-
this, std::move(external_stream_settings), engine_args, attach, external_stream_counter, std::move(context_));
153+
this, std::move(external_stream_settings), engine_args, metadata, attach, external_stream_counter, std::move(context_));
149154
external_stream.swap(stream);
155+
/// Some external streams fetch the structure in other ways, thus need to set the metadata again here in case it's updated.
156+
setInMemoryMetadata(metadata);
150157
}
151158

152159
void StorageExternalStream::read(

0 commit comments

Comments
 (0)