Skip to content

Commit 28f6a63

Browse files
qijun-niu-timepluscerebellumkingmrssss
authored
Feature/issue 315 make telemetry interval configurable (#828)
* Ignore internal query in statistics of select query * - make telemetry interval configurable - provide restful api to enable/disable telemetry collection - add delta of query counter * fix --------- Co-authored-by: cerebellumking <83447519+cerebellumking@users.noreply.github.com> Co-authored-by: Qijun Niu <niuqijun0702@qq.com>
1 parent 870d7d5 commit 28f6a63

File tree

12 files changed

+220
-49
lines changed

12 files changed

+220
-49
lines changed

programs/client/Client.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ std::vector<String> Client::loadWarningMessages()
338338
return {};
339339

340340
std::vector<String> messages;
341-
connection->sendQuery(connection_parameters.timeouts, "SELECT message FROM system.warnings SETTINGS _tp_internal_system_open_sesame=true",
341+
connection->sendQuery(connection_parameters.timeouts, "SELECT message FROM system.warnings SETTINGS _tp_internal_system_open_sesame=true,is_internal=true",
342342
{} /* query_parameters */,
343343
"" /* query_id */,
344344
QueryProcessingStage::Complete,

programs/server/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ include(${proton_SOURCE_DIR}/cmake/embed_binary.cmake)
22

33
set(CLICKHOUSE_SERVER_SOURCES
44
MetricsTransmitter.cpp
5-
TelemetryCollector.cpp
65
Server.cpp
76
)
87

programs/server/Server.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,11 @@
8787
#include "config_version.h"
8888

8989
/// proton: starts
90-
#include "TelemetryCollector.h"
9190
#include <Checkpoint/CheckpointCoordinator.h>
9291
#include <DataTypes/DataTypeFactory.h>
9392
#include <Functions/UserDefined/ExternalUserDefinedFunctionsLoader.h>
9493
#include <Interpreters/DiskUtilChecker.h>
94+
#include <Interpreters/TelemetryCollector.h>
9595
#include <KafkaLog/KafkaWALPool.h>
9696
#include <NativeLog/Server/NativeLog.h>
9797
#include <Server/RestRouterHandlers/RestRouterFactory.h>
@@ -307,6 +307,9 @@ void initGlobalServices(DB::ContextMutablePtr & global_context)
307307

308308
if (native_log.enabled() && pool.enabled())
309309
throw DB::Exception("Both external Kafka log and internal native log are enabled. This is not a supported configuration", DB::ErrorCodes::UNSUPPORTED);
310+
311+
auto & telemetry_collector = DB::TelemetryCollector::instance(global_context);
312+
telemetry_collector.startup();
310313
}
311314

312315
void initGlobalSingletons(DB::ContextMutablePtr & context)
@@ -317,7 +320,6 @@ void initGlobalSingletons(DB::ContextMutablePtr & context)
317320
DB::DiskUtilChecker::instance(context);
318321
DB::ExternalGrokPatterns::instance(context);
319322
DB::ExternalUserDefinedFunctionsLoader::instance(context);
320-
DB::TelemetryCollector::instance(context);
321323
}
322324

323325
void deinitGlobalSingletons(DB::ContextMutablePtr & context)
@@ -1759,6 +1761,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
17591761
LOG_INFO(log, "Closed connections.");
17601762

17611763
/// proton: start.
1764+
DB::TelemetryCollector::instance(global_context).shutdown();
1765+
17621766
deinitGlobalSingletons(global_context);
17631767

17641768
disposeV8();

programs/server/config.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,8 @@ telemetry_enabled:
150150
"@replace": true
151151
"#text": true
152152

153+
telemetry_interval_ms: 300000 # 5 minutes
154+
153155
# gRPC protocol (see src/Server/grpc_protos/proton_grpc.proto for the API)
154156
# grpc_port: 9100
155157
grpc:

src/Client/Suggest.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,9 @@ static String getLoadSuggestionQuery(Int32 suggestion_limit, bool basic_suggesti
9090
query << "SELECT DISTINCT name FROM system.dictionaries LIMIT " << limit_str
9191
<< " UNION ALL ";
9292
}
93-
query << "SELECT DISTINCT name FROM system.columns LIMIT " << limit_str << " SETTINGS _tp_internal_system_open_sesame=true";
93+
query << "SELECT DISTINCT name FROM system.columns LIMIT " << limit_str << " SETTINGS _tp_internal_system_open_sesame=true,is_internal=true";
9494
}
95-
query << ") WHERE not_empty(res) SETTINGS _tp_internal_system_open_sesame=true";
95+
query << ") WHERE not_empty(res) SETTINGS _tp_internal_system_open_sesame=true,is_internal=true";
9696

9797
return query.str();
9898
}

src/Core/Settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -844,6 +844,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
844844
M(Int64, async_ingest_block_timeout_ms, 120000, "Max duration for a block to commit before it is considered expired during async ingestion", 0) \
845845
M(UInt64, aysnc_ingest_max_outstanding_blocks, 10000, "Max outstanding blocks to be committed during async ingestion", 0) \
846846
M(Bool, _tp_internal_system_open_sesame, true, "Control the access to system.* streams", 0) \
847+
M(Bool, is_internal, false, "Control the statistics of select query", 0) \
847848
M(UInt64, javascript_max_memory_bytes, 100 * 1024 * 1024, "Maximum heap size of javascript UDA/UDF in bytes", 0) \
848849
M(Bool, enable_dependency_check, true, "Enable the dependency check of view/materialized view", 0) \
849850
M(RecoveryPolicy, recovery_policy, RecoveryPolicy::Strict, "Default recovery policy for materialized view when inner query failed. 'strict': always recover from checkpointed; 'best_effort': attempts to recover from checkpointed and allow skipping of some data with permanent errors;", 0) \

src/Interpreters/InterpreterFactory.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,13 @@ std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, ContextMut
134134
else if (query->as<ASTSelectWithUnionQuery>())
135135
{
136136
auto interpreter = std::make_unique<InterpreterSelectWithUnionQuery>(query, context, options);
137-
ProfileEvents::increment(ProfileEvents::SelectQuery);
138-
ProfileEvents::increment(interpreter->isStreamingQuery() ? ProfileEvents::StreamingSelectQuery : ProfileEvents::HistoricalSelectQuery);
137+
138+
if(!options.is_internal && !context->getSettingsRef().is_internal)
139+
{
140+
ProfileEvents::increment(ProfileEvents::SelectQuery);
141+
ProfileEvents::increment(interpreter->isStreamingQuery() ? ProfileEvents::StreamingSelectQuery : ProfileEvents::HistoricalSelectQuery);
142+
}
143+
139144
return std::move(interpreter);
140145
}
141146
else if (query->as<ASTSelectIntersectExceptQuery>())

programs/server/TelemetryCollector.cpp renamed to src/Interpreters/TelemetryCollector.cpp

Lines changed: 86 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,78 +1,97 @@
11
#include "TelemetryCollector.h"
22
#include "config_version.h"
33

4-
#include <Poco/Util/AbstractConfiguration.h>
4+
#include <filesystem>
5+
#include <Core/ServerUUID.h>
6+
#include <IO/WriteBufferFromString.h>
7+
#include <IO/WriteHelpers.h>
8+
#include <Interpreters/Context.h>
9+
#include <base/ClockUtils.h>
10+
#include <base/getMemoryAmount.h>
511
#include <Poco/Net/HTTPRequest.h>
612
#include <Poco/Net/HTTPResponse.h>
713
#include <Poco/Net/HTTPSClientSession.h>
8-
#include <base/ClockUtils.h>
9-
#include <base/getMemoryAmount.h>
14+
#include <Poco/Util/AbstractConfiguration.h>
1015
#include <Common/DateLUT.h>
1116
#include <Common/getNumberOfPhysicalCPUCores.h>
12-
#include <Core/ServerUUID.h>
13-
#include <IO/WriteBufferFromString.h>
14-
#include <IO/WriteHelpers.h>
15-
#include <Interpreters/Context.h>
16-
#include <filesystem>
1717

1818
namespace fs = std::filesystem;
1919

2020
namespace ProfileEvents
2121
{
22-
extern const Event SelectQuery;
23-
extern const Event StreamingSelectQuery;
24-
extern const Event HistoricalSelectQuery;
22+
extern const Event SelectQuery;
23+
extern const Event StreamingSelectQuery;
24+
extern const Event HistoricalSelectQuery;
2525
}
2626

2727
namespace DB
2828
{
29+
namespace
30+
{
31+
constexpr auto DEFAULT_INTERVAL_MS = 5 * 60 * 1000;
32+
}
2933

3034
TelemetryCollector::TelemetryCollector(ContextPtr context_)
31-
: log(&Poco::Logger::get("TelemetryCollector")),
32-
pool(context_->getSchedulePool()),
33-
started_on_in_minutes(UTCMinutes::now())
35+
: log(&Poco::Logger::get("TelemetryCollector")), pool(context_->getSchedulePool()), started_on_in_minutes(UTCMinutes::now())
3436
{
3537
const auto & config = context_->getConfigRef();
3638

37-
if (!config.getBool("telemetry_enabled", true))
38-
{
39-
LOG_WARNING(log, "Please note that telemetry is disabled.");
40-
is_shutdown.test_and_set();
41-
return;
42-
}
39+
is_enable = config.getBool("telemetry_enabled", true);
40+
41+
collect_interval_ms = config.getUInt("telemetry_interval_ms", DEFAULT_INTERVAL_MS);
4342

4443
WriteBufferFromOwnString wb;
4544
writeDateTimeTextISO(UTCMilliseconds::now(), 3, wb, DateLUT::instance("UTC"));
4645
started_on = wb.str();
47-
48-
LOG_WARNING(log, "Please note that telemetry is enabled. "
49-
"This is used to collect the version and runtime environment information to Timeplus, Inc. "
50-
"You can disable it by setting telemetry_enabled to false in config.yaml");
51-
52-
collector_task = pool.createTask("TelemetryCollector", [this]() { this->collect(); });
53-
collector_task->activate();
54-
collector_task->schedule();
5546
}
5647

5748
TelemetryCollector::~TelemetryCollector()
5849
{
5950
shutdown();
51+
LOG_INFO(log, "stopped");
52+
}
53+
54+
void TelemetryCollector::startup()
55+
{
56+
collector_task = pool.createTask("TelemetryCollector", [this]() { this->collect(); });
57+
collector_task->activate();
58+
collector_task->schedule();
6059
}
6160

6261
void TelemetryCollector::shutdown()
6362
{
64-
if (!is_shutdown.test_and_set())
63+
if (is_shutdown.test_and_set())
64+
return;
65+
66+
if (collector_task)
6567
{
6668
LOG_INFO(log, "Stopped");
6769
collector_task->deactivate();
6870
}
6971
}
7072

73+
void TelemetryCollector::enable()
74+
{
75+
LOG_WARNING(
76+
log,
77+
"Please note that telemetry is enabled. "
78+
"This is used to collect the version and runtime environment information to Timeplus, Inc. "
79+
"You can disable it by setting telemetry_enabled to false in config.yaml");
80+
is_enable = true;
81+
}
82+
83+
void TelemetryCollector::disable()
84+
{
85+
LOG_WARNING(log, "Please note that telemetry is disabled.");
86+
is_enable = false;
87+
}
88+
7189
void TelemetryCollector::collect()
7290
{
73-
SCOPE_EXIT({
74-
collector_task->scheduleAfter(INTERVAL_MS);
75-
});
91+
SCOPE_EXIT({ collector_task->scheduleAfter(getCollectIntervalMilliseconds()); });
92+
93+
if (!isEnabled())
94+
return;
7695

7796
constexpr auto jitsu_url = "https://data.timeplus.com/api/s/s2s/track";
7897
constexpr auto jitsu_token = "U7qmIGzuZvvkp16iPaYLeBR4IHfKBY6P:Cc6EUDRmEHG9TCO7DX8x23xWrdFg8pBU";
@@ -94,16 +113,26 @@ void TelemetryCollector::collect()
94113
/// https://stackoverflow.com/questions/20010199/how-to-determine-if-a-process-runs-inside-lxc-docker
95114
bool in_docker = fs::exists("/.dockerenv");
96115

97-
auto load_counter = [](const auto & event){
98-
assert (event < ProfileEvents::end());
99-
return ProfileEvents::global_counters[event].load(std::memory_order_relaxed);
116+
auto load_counter = [](const auto & event) {
117+
assert(event < ProfileEvents::end());
118+
return static_cast<Int64>(ProfileEvents::global_counters[event].load(std::memory_order_relaxed));
100119
};
101120

102121
const auto total_select_query = load_counter(ProfileEvents::SelectQuery);
103122
const auto streaming_select_query = load_counter(ProfileEvents::StreamingSelectQuery);
104123
const auto historical_select_query = load_counter(ProfileEvents::HistoricalSelectQuery);
105124

106-
std::string data = fmt::format("{{"
125+
const auto delta_total_select_query = total_select_query - prev_total_select_query;
126+
prev_total_select_query = total_select_query;
127+
128+
const auto delta_streaming_select_query = streaming_select_query - prev_streaming_select_query;
129+
prev_streaming_select_query = streaming_select_query;
130+
131+
const auto delta_historical_select_query = historical_select_query - prev_historical_select_query;
132+
prev_historical_select_query = historical_select_query;
133+
134+
std::string data = fmt::format(
135+
"{{"
107136
"\"type\": \"track\","
108137
"\"event\": \"proton_ping\","
109138
"\"properties\": {{"
@@ -118,14 +147,30 @@ void TelemetryCollector::collect()
118147
" \"docker\": \"{}\","
119148
" \"total_select_query\": \"{}\","
120149
" \"historical_select_query\": \"{}\","
121-
" \"streaming_select_query\": \"{}\""
150+
" \"streaming_select_query\": \"{}\","
151+
" \"delta_total_select_query\": \"{}\","
152+
" \"delta_historical_select_query\": \"{}\","
153+
" \"delta_streaming_select_query\": \"{}\""
122154
"}}"
123-
"}}", cpu, memory_in_gb, EDITION, VERSION_STRING, new_session, started_on, duration_in_minute, server_uuid_str, in_docker, total_select_query, historical_select_query, streaming_select_query);
155+
"}}",
156+
cpu,
157+
memory_in_gb,
158+
EDITION,
159+
VERSION_STRING,
160+
new_session,
161+
started_on,
162+
duration_in_minute,
163+
server_uuid_str,
164+
in_docker,
165+
total_select_query,
166+
historical_select_query,
167+
streaming_select_query,
168+
delta_total_select_query,
169+
delta_historical_select_query,
170+
delta_streaming_select_query);
124171

125172
LOG_TRACE(log, "Sending telemetry: {}.", data);
126173

127-
new_session = false;
128-
129174
request.setContentLength(data.length());
130175
request.setContentType("application/json");
131176
request.add("X-Write-Key", jitsu_token);
@@ -145,6 +190,7 @@ void TelemetryCollector::collect()
145190
return;
146191
}
147192

193+
new_session = false;
148194
LOG_INFO(log, "Telemetry sent successfully.");
149195
}
150196
catch (Poco::Exception & ex)

programs/server/TelemetryCollector.h renamed to src/Interpreters/TelemetryCollector.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,18 @@ class TelemetryCollector
1414
BackgroundSchedulePool & pool;
1515
BackgroundSchedulePoolTaskHolder collector_task;
1616
std::atomic_flag is_shutdown;
17+
std::atomic_bool is_enable;
1718

1819
std::string started_on;
1920
bool new_session = true;
2021
Int64 started_on_in_minutes;
22+
std::atomic<UInt64> collect_interval_ms;
2123

22-
static constexpr auto INTERVAL_MS = 5 * 60 * 1000; /// sending anonymous telemetry data every 5 minutes
24+
Int64 prev_total_select_query = 0;
25+
Int64 prev_streaming_select_query = 0;
26+
Int64 prev_historical_select_query = 0;
27+
28+
// static constexpr auto INTERVAL_MS = 5 * 60 * 1000; /// sending anonymous telemetry data every 5 minutes
2329

2430
public:
2531
static TelemetryCollector & instance(ContextPtr context_)
@@ -29,8 +35,19 @@ class TelemetryCollector
2935
}
3036

3137
~TelemetryCollector();
38+
39+
void startup();
3240
void shutdown();
3341

42+
void enable();
43+
void disable();
44+
45+
bool isEnabled() const { return is_enable; }
46+
47+
UInt64 getCollectIntervalMilliseconds() const { return collect_interval_ms.load(); }
48+
49+
void setCollectIntervalMilliseconds(UInt64 interval_ms) { collect_interval_ms.store(interval_ms); }
50+
3451
private:
3552
void collect();
3653
TelemetryCollector(ContextPtr context_);

src/Server/RestRouterHandlers/RestRouterFactory.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "StorageInfoHandler.h"
2020
#include "SystemCommandHandler.h"
2121
#include "TabularTableRestRouterHandler.h"
22+
#include "TelemetryHandler.h"
2223
#include "UDFHandler.h"
2324

2425
#include <re2/re2.h>
@@ -216,6 +217,14 @@ class RestRouterFactory final
216217
return std::make_shared<DB::SystemCommandHandler>(query_context);
217218
});
218219

220+
/// GET/POST: /proton/v1/telemetry
221+
factory.registerRouterHandler(
222+
fmt::format("/{}/v1/telemetry", prefix),
223+
"GET/POST",
224+
[](ContextMutablePtr query_context) { /// STYLE_CHECK_ALLOW_BRACE_SAME_LINE_LAMBDA
225+
return std::make_shared<DB::TelemetryHandler>(query_context);
226+
});
227+
219228
factory.registerRouterHandler(
220229
fmt::format("/{}/apis", prefix),
221230
"GET",

0 commit comments

Comments
 (0)