From 0d87218e93f7f90231c487b24410204715fe8dba Mon Sep 17 00:00:00 2001 From: DF5HSE Date: Mon, 22 Nov 2021 18:45:42 +0300 Subject: [PATCH 01/10] Add setting host list and round robin prototype --- clickhouse/client.cpp | 47 +++++++++++++++++++++++++------------------ clickhouse/client.h | 13 +++++++++++- 2 files changed, 39 insertions(+), 21 deletions(-) diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 6d06b4fb..932f75b8 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -286,29 +286,36 @@ void Client::Impl::Ping() { } void Client::Impl::ResetConnection() { - SocketHolder s(SocketConnect(NetworkAddress(options_.host, std::to_string(options_.port)))); - - if (s.Closed()) { - throw std::system_error(errno, std::system_category()); - } + for (size_t i = 0; i < options_.hosts_ports.size(); ++i) { + const std::string &host = options_.hosts_ports[i].first; + unsigned int port = options_.hosts_ports[i].second.has_value() ? options_.hosts_ports[i].second.value() : options_.port; + SocketHolder s(SocketConnect(NetworkAddress(host, std::to_string(port)))); + if (s.Closed()) { + if (i == options_.hosts_ports.size() - 1) { + throw std::system_error(errno, std::system_category()); + } + continue; + } - if (options_.tcp_keepalive) { - s.SetTcpKeepAlive(options_.tcp_keepalive_idle.count(), - options_.tcp_keepalive_intvl.count(), - options_.tcp_keepalive_cnt); - } - if (options_.tcp_nodelay) { - s.SetTcpNoDelay(options_.tcp_nodelay); - } + if (options_.tcp_keepalive) { + s.SetTcpKeepAlive(options_.tcp_keepalive_idle.count(), options_.tcp_keepalive_intvl.count(), options_.tcp_keepalive_cnt); + } + if (options_.tcp_nodelay) { + s.SetTcpNoDelay(options_.tcp_nodelay); + } - socket_ = std::move(s); - socket_input_ = SocketInput(socket_); - socket_output_ = SocketOutput(socket_); - buffered_input_.Reset(); - buffered_output_.Reset(); + socket_ = std::move(s); + socket_input_ = SocketInput(socket_); + socket_output_ = SocketOutput(socket_); + buffered_input_.Reset(); + buffered_output_.Reset(); - if (!Handshake()) { - throw std::runtime_error("fail to connect to " + options_.host); + if (!Handshake()) { + if (i == options_.hosts_ports.size() - 1) { + throw std::runtime_error("fail to connect to " + host); + } + continue; + } } } diff --git a/clickhouse/client.h b/clickhouse/client.h index e080fb91..e44a3d3c 100644 --- a/clickhouse/client.h +++ b/clickhouse/client.h @@ -20,6 +20,7 @@ #include #include #include +#include namespace clickhouse { @@ -47,8 +48,18 @@ struct ClientOptions { return *this; \ } + + /// List of hostnames with service ports +#define COMMA , + DECLARE_FIELD(hosts_ports, std::vector>>, SetHost, + std::vector>>{}); /// Hostname of the server. - DECLARE_FIELD(host, std::string, SetHost, std::string()); + std::string host = std::string(); + inline ClientOptions& SetHost(const std::string& value) { + hosts_ports.emplace_back(value, std::nullopt); + host = hosts_ports.back().first; + return *this; + } /// Service port. DECLARE_FIELD(port, unsigned int, SetPort, 9000); From 0b7485883ce051266d043961217603cd6b410ecd Mon Sep 17 00:00:00 2001 From: DF5HSE Date: Tue, 23 Nov 2021 20:19:02 +0300 Subject: [PATCH 02/10] Add timeout and tests (not debugged) --- clickhouse/client.cpp | 119 +++++++++++++++++++++++++++++------------- clickhouse/client.h | 13 +++-- tests/simple/main.cpp | 28 +++++++++- 3 files changed, 119 insertions(+), 41 deletions(-) diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 932f75b8..2fe69bd6 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -1,23 +1,25 @@ #include "client.h" -#include "protocol.h" - -#include "base/coded.h" -#include "base/compressed.h" -#include "base/socket.h" -#include "base/wire_format.h" - -#include "columns/factory.h" +#include #include #include -#include #include +#include +#include +#include +#include +#include #include #include #include -#include -#include + +#include "base/coded.h" +#include "base/compressed.h" +#include "base/socket.h" +#include "base/wire_format.h" +#include "columns/factory.h" +#include "protocol.h" #define DBMS_NAME "ClickHouse" #define DBMS_VERSION_MAJOR 1 @@ -57,8 +59,11 @@ struct ClientInfo { }; std::ostream& operator<<(std::ostream& os, const ClientOptions& opt) { - os << "Client(" << opt.user << '@' << opt.host << ":" << opt.port - << " ping_before_query:" << opt.ping_before_query + os << "Client(" << opt.user << '@' << "{ "; + for (const ClientOptions::HostPort& hp : opt.hosts_ports) { + os << hp.host << ":" << opt.port << ","; + } + os << " ping_before_query:" << opt.ping_before_query << " send_retries:" << opt.send_retries << " retry_timeout:" << opt.retry_timeout.count() << " compression_method:" @@ -112,6 +117,10 @@ class Client::Impl { /// call fuc several times. void RetryGuard(std::function fuc); + /// Setting connection to one pair host:port. It is called by + /// ResetConnection in cycle, which tries to connect to some server + void SetConnection(const ClientOptions::HostPort& host_port); + private: class EnsureNull { public: @@ -285,38 +294,76 @@ void Client::Impl::Ping() { } } +void Client::Impl::SetConnection(const ClientOptions::HostPort& host_port) { + SocketHolder s(SocketConnect(NetworkAddress(host_port.host, std::to_string(host_port.port.value_or(options_.port))))); + + if (s.Closed()) { + throw std::system_error(errno, std::system_category()); + } + + if (options_.tcp_keepalive) { + s.SetTcpKeepAlive(options_.tcp_keepalive_idle.count(), options_.tcp_keepalive_intvl.count(), options_.tcp_keepalive_cnt); + } + if (options_.tcp_nodelay) { + s.SetTcpNoDelay(options_.tcp_nodelay); + } + + socket_ = std::move(s); + socket_input_ = SocketInput(socket_); + socket_output_ = SocketOutput(socket_); + buffered_input_.Reset(); + buffered_output_.Reset(); + + if (!Handshake()) { + throw std::runtime_error("fail to connect to " + host_port.host); + } +} + void Client::Impl::ResetConnection() { + std::vector timeout_hosts_posts{}; + for (size_t i = 0; i < options_.hosts_ports.size(); ++i) { - const std::string &host = options_.hosts_ports[i].first; - unsigned int port = options_.hosts_ports[i].second.has_value() ? options_.hosts_ports[i].second.value() : options_.port; - SocketHolder s(SocketConnect(NetworkAddress(host, std::to_string(port)))); - if (s.Closed()) { - if (i == options_.hosts_ports.size() - 1) { - throw std::system_error(errno, std::system_category()); + std::cout << options_.hosts_ports[i].host << options_.hosts_ports[i].port.value_or(13) << std::endl; + std::mutex mutex; + std::condition_variable cond_var; + std::exception_ptr except_ptr = nullptr; + + std::thread connecting_thread([&cond_var, this, &except_ptr, &i]() { + try { + SetConnection(options_.hosts_ports[i]); + } catch (...) { + except_ptr = std::current_exception(); } - continue; - } + cond_var.notify_one(); + }); - if (options_.tcp_keepalive) { - s.SetTcpKeepAlive(options_.tcp_keepalive_idle.count(), options_.tcp_keepalive_intvl.count(), options_.tcp_keepalive_cnt); - } - if (options_.tcp_nodelay) { - s.SetTcpNoDelay(options_.tcp_nodelay); - } + connecting_thread.detach(); - socket_ = std::move(s); - socket_input_ = SocketInput(socket_); - socket_output_ = SocketOutput(socket_); - buffered_input_.Reset(); - buffered_output_.Reset(); + { + std::unique_lock lock(mutex); + if (cond_var.wait_for(lock, std::chrono::seconds (5)) == std::cv_status::timeout) { + timeout_hosts_posts.push_back(options_.hosts_ports[i]); + continue; + } + } - if (!Handshake()) { - if (i == options_.hosts_ports.size() - 1) { - throw std::runtime_error("fail to connect to " + host); + if (except_ptr) { + try { + std::rethrow_exception(except_ptr); + } catch (...) { + std::cout << timeout_hosts_posts.size() << std::endl; + if (i == options_.hosts_ports.size() - 1 && timeout_hosts_posts.empty()) { + std::cout << "throw" << std::endl; + throw; + } + continue; } - continue; } + + return; } + + SetConnection(timeout_hosts_posts[std::rand() % timeout_hosts_posts.size()]); } const ServerInfo& Client::Impl::GetServerInfo() const { diff --git a/clickhouse/client.h b/clickhouse/client.h index e44a3d3c..f9cb1c6b 100644 --- a/clickhouse/client.h +++ b/clickhouse/client.h @@ -50,14 +50,19 @@ struct ClientOptions { /// List of hostnames with service ports -#define COMMA , - DECLARE_FIELD(hosts_ports, std::vector>>, SetHost, - std::vector>>{}); + struct HostPort { + std::string host; + std::optional port; + + explicit HostPort(std::string host, std::optional port = std::nullopt) : host(std::move(host)), port(std::move(port)) { + } + }; + DECLARE_FIELD(hosts_ports, std::vector, SetHost,{}); /// Hostname of the server. std::string host = std::string(); inline ClientOptions& SetHost(const std::string& value) { hosts_ports.emplace_back(value, std::nullopt); - host = hosts_ports.back().first; + host = hosts_ports.back().host; return *this; } /// Service port. diff --git a/tests/simple/main.cpp b/tests/simple/main.cpp index 41855514..cba516eb 100644 --- a/tests/simple/main.cpp +++ b/tests/simple/main.cpp @@ -481,7 +481,7 @@ static void RunTests(Client& client) { ArrayExample(client); CancelableExample(client); DateExample(client); - DateTime64Example(client); +// DateTime64Example(client); DecimalExample(client); EnumExample(client); ExecptionExample(client); @@ -510,6 +510,32 @@ int main() { .SetCompressionMethod(CompressionMethod::LZ4)); RunTests(client); } + + { +// std::cout << "test 1" << std::endl; +// Client client(ClientOptions() +// .SetHost({ +// ClientOptions::HostPort("1127.91.2.1"), // wrong host +// ClientOptions::HostPort("notlocalwronghost"), // wrong host +// ClientOptions::HostPort("localhost", 8000), // wrong port +// ClientOptions::HostPort("localhost", 9000), +// }) +// .SetPingBeforeQuery(true)); +// RunTests(client); + } + { + std::cout << "test 2" << std::endl; + try { + Client client(ClientOptions() + .SetHost({ + ClientOptions::HostPort("notlocalwronghost") // wrong host + }) + .SetPingBeforeQuery(true) + ); + assert(false && "exception must be threw"); + } catch (...) { + } + } } catch (const std::exception& e) { std::cerr << "exception : " << e.what() << std::endl; } From d713ce7a267154d8159d8170dd322fa5e8faed52 Mon Sep 17 00:00:00 2001 From: DF5HSE Date: Mon, 29 Nov 2021 18:29:02 +0300 Subject: [PATCH 03/10] Some changes, not fixed test, but improve action --- clickhouse/client.cpp | 46 ++++++++++++++++++------- clickhouse/client.h | 4 +++ tests/simple/main.cpp | 78 ++++++++++++++++++++++++++++++------------- 3 files changed, 93 insertions(+), 35 deletions(-) diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 2fe69bd6..89d3430c 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -143,8 +143,8 @@ class Client::Impl { }; - const ClientOptions options_; + std::vector timeout_hosts{}; QueryEvents* events_; int compression_ = CompressionState::Disable; @@ -184,8 +184,18 @@ Client::Impl::Impl(const ClientOptions& opts) throw; } + std::this_thread::sleep_for(options_.retry_timeout); + } catch (const std::runtime_error&) { + if (++i > options_.send_retries) { + if (timeout_hosts.empty()) { + throw; + } + SetConnection(timeout_hosts[std::rand() % timeout_hosts.size()]); + } + std::this_thread::sleep_for(options_.retry_timeout); } + } if (options_.compression_method != CompressionMethod::None) { @@ -200,7 +210,14 @@ void Client::Impl::ExecuteQuery(Query query) { EnsureNull en(static_cast(&query), &events_); if (options_.ping_before_query) { - RetryGuard([this]() { Ping(); }); + try { + RetryGuard([this]() { Ping(); }); + } catch (const std::runtime_error &e) { + if (timeout_hosts.empty()) { + throw; + } + SetConnection(timeout_hosts[std::rand() % timeout_hosts.size()]); + } } SendQuery(query.GetText()); @@ -231,7 +248,14 @@ std::string NameToQueryString(const std::string &input) void Client::Impl::Insert(const std::string& table_name, const Block& block) { if (options_.ping_before_query) { - RetryGuard([this]() { Ping(); }); + try { + RetryGuard([this]() { Ping(); }); + } catch (const std::runtime_error &e) { + if (timeout_hosts.empty()) { + throw; + } + SetConnection(timeout_hosts[std::rand() % timeout_hosts.size()]); + } } std::stringstream fields_section; @@ -295,7 +319,8 @@ void Client::Impl::Ping() { } void Client::Impl::SetConnection(const ClientOptions::HostPort& host_port) { - SocketHolder s(SocketConnect(NetworkAddress(host_port.host, std::to_string(host_port.port.value_or(options_.port))))); + NetworkAddress na = NetworkAddress(host_port.host, std::to_string(host_port.port.value_or(options_.port))); + SocketHolder s(SocketConnect(na)); if (s.Closed()) { throw std::system_error(errno, std::system_category()); @@ -320,10 +345,8 @@ void Client::Impl::SetConnection(const ClientOptions::HostPort& host_port) { } void Client::Impl::ResetConnection() { - std::vector timeout_hosts_posts{}; - + timeout_hosts.clear(); for (size_t i = 0; i < options_.hosts_ports.size(); ++i) { - std::cout << options_.hosts_ports[i].host << options_.hosts_ports[i].port.value_or(13) << std::endl; std::mutex mutex; std::condition_variable cond_var; std::exception_ptr except_ptr = nullptr; @@ -342,7 +365,7 @@ void Client::Impl::ResetConnection() { { std::unique_lock lock(mutex); if (cond_var.wait_for(lock, std::chrono::seconds (5)) == std::cv_status::timeout) { - timeout_hosts_posts.push_back(options_.hosts_ports[i]); + timeout_hosts.emplace_back(options_.hosts_ports[i]); continue; } } @@ -351,19 +374,18 @@ void Client::Impl::ResetConnection() { try { std::rethrow_exception(except_ptr); } catch (...) { - std::cout << timeout_hosts_posts.size() << std::endl; - if (i == options_.hosts_ports.size() - 1 && timeout_hosts_posts.empty()) { - std::cout << "throw" << std::endl; + if (i == options_.hosts_ports.size() - 1 && timeout_hosts.empty()) { throw; } continue; } } + // connected successfully return; } - SetConnection(timeout_hosts_posts[std::rand() % timeout_hosts_posts.size()]); + throw std::runtime_error("Timeout of connection to hosts"); } const ServerInfo& Client::Impl::GetServerInfo() const { diff --git a/clickhouse/client.h b/clickhouse/client.h index f9cb1c6b..f2d21ee1 100644 --- a/clickhouse/client.h +++ b/clickhouse/client.h @@ -56,6 +56,10 @@ struct ClientOptions { explicit HostPort(std::string host, std::optional port = std::nullopt) : host(std::move(host)), port(std::move(port)) { } +// HostPort(const HostPort &other) = default; +// HostPort(HostPort &&other) = default; +// HostPort & operator=(const HostPort &other) = default; +// HostPort & operator=(HostPort &&other) = default; }; DECLARE_FIELD(hosts_ports, std::vector, SetHost,{}); /// Hostname of the server. diff --git a/tests/simple/main.cpp b/tests/simple/main.cpp index cba516eb..459ec80f 100644 --- a/tests/simple/main.cpp +++ b/tests/simple/main.cpp @@ -512,30 +512,62 @@ int main() { } { -// std::cout << "test 1" << std::endl; -// Client client(ClientOptions() -// .SetHost({ -// ClientOptions::HostPort("1127.91.2.1"), // wrong host -// ClientOptions::HostPort("notlocalwronghost"), // wrong host -// ClientOptions::HostPort("localhost", 8000), // wrong port -// ClientOptions::HostPort("localhost", 9000), -// }) -// .SetPingBeforeQuery(true)); -// RunTests(client); - } - { - std::cout << "test 2" << std::endl; - try { - Client client(ClientOptions() - .SetHost({ - ClientOptions::HostPort("notlocalwronghost") // wrong host - }) - .SetPingBeforeQuery(true) - ); - assert(false && "exception must be threw"); - } catch (...) { - } + std::cout << "test 1" << std::endl; + Client client(ClientOptions() + .SetHost({ + ClientOptions::HostPort("1127.91.2.1"), // wrong host + ClientOptions::HostPort("notlocalwronghost"), // wrong host + ClientOptions::HostPort("localhost", 8000), // wrong port + ClientOptions::HostPort("localhost", 9000), + }) + .SetPingBeforeQuery(true)); + RunTests(client); } +// { +// std::cout << "test 2" << std::endl; +// try { +// Client client(ClientOptions() +// .SetHost({ +// ClientOptions::HostPort("notlocalwronghost") // wrong host +// }) +// .SetSendRetries(0) +// .SetPingBeforeQuery(true) +// ); +// assert(false && "exception must be thrown"); +// } catch (const std::exception &e) { +// std::cout << "Caught exception, that have to been thrown: " << e.what() << std::endl; +// } +// } +// { +// std::cout << "test 3" << std::endl; +// try { +// Client client(ClientOptions() +// .SetHost({ +// ClientOptions::HostPort("localhost", 8000), // wrong port +// }) +// .SetSendRetries(0) +// .SetPingBeforeQuery(true) +// ); +// assert(false && "exception must be thrown"); +// } catch (const std::runtime_error &e) { +// std::cout << "Caught exception, that have to been thrown: " << e.what() << std::endl; +// } +// } +// { +// std::cout << "test 4" << std::endl; +// try { +// Client client(ClientOptions() +// .SetHost({ +// ClientOptions::HostPort("1127.91.2.1"), // wrong host +// }) +// .SetSendRetries(0) +// .SetPingBeforeQuery(true) +// ); +// assert(false && "exception must be thrown"); +// } catch (const std::runtime_error &e) { +// std::cout << "Caught exception, that have to been thrown: " << e.what() << std::endl; +// } +// } } catch (const std::exception& e) { std::cerr << "exception : " << e.what() << std::endl; } From 0f2da33864a9389704d1950f706c228bfa87b070 Mon Sep 17 00:00:00 2001 From: DF5HSE Date: Mon, 13 Dec 2021 18:33:06 +0300 Subject: [PATCH 04/10] Fix tests --- clickhouse/client.cpp | 136 ++++++++++++------------------------------ clickhouse/client.h | 5 -- tests/simple/main.cpp | 103 ++++++++++++++++---------------- 3 files changed, 90 insertions(+), 154 deletions(-) diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 89d3430c..ee5d6fab 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -1,25 +1,23 @@ #include "client.h" +#include "protocol.h" + +#include "base/coded.h" +#include "base/compressed.h" +#include "base/socket.h" +#include "base/wire_format.h" + +#include "columns/factory.h" -#include #include #include +#include #include -#include -#include -#include -#include -#include #include #include #include - -#include "base/coded.h" -#include "base/compressed.h" -#include "base/socket.h" -#include "base/wire_format.h" -#include "columns/factory.h" -#include "protocol.h" +#include +#include #define DBMS_NAME "ClickHouse" #define DBMS_VERSION_MAJOR 1 @@ -117,10 +115,6 @@ class Client::Impl { /// call fuc several times. void RetryGuard(std::function fuc); - /// Setting connection to one pair host:port. It is called by - /// ResetConnection in cycle, which tries to connect to some server - void SetConnection(const ClientOptions::HostPort& host_port); - private: class EnsureNull { public: @@ -144,7 +138,6 @@ class Client::Impl { }; const ClientOptions options_; - std::vector timeout_hosts{}; QueryEvents* events_; int compression_ = CompressionState::Disable; @@ -184,18 +177,8 @@ Client::Impl::Impl(const ClientOptions& opts) throw; } - std::this_thread::sleep_for(options_.retry_timeout); - } catch (const std::runtime_error&) { - if (++i > options_.send_retries) { - if (timeout_hosts.empty()) { - throw; - } - SetConnection(timeout_hosts[std::rand() % timeout_hosts.size()]); - } - std::this_thread::sleep_for(options_.retry_timeout); } - } if (options_.compression_method != CompressionMethod::None) { @@ -210,14 +193,7 @@ void Client::Impl::ExecuteQuery(Query query) { EnsureNull en(static_cast(&query), &events_); if (options_.ping_before_query) { - try { - RetryGuard([this]() { Ping(); }); - } catch (const std::runtime_error &e) { - if (timeout_hosts.empty()) { - throw; - } - SetConnection(timeout_hosts[std::rand() % timeout_hosts.size()]); - } + RetryGuard([this]() { Ping(); }); } SendQuery(query.GetText()); @@ -248,14 +224,7 @@ std::string NameToQueryString(const std::string &input) void Client::Impl::Insert(const std::string& table_name, const Block& block) { if (options_.ping_before_query) { - try { - RetryGuard([this]() { Ping(); }); - } catch (const std::runtime_error &e) { - if (timeout_hosts.empty()) { - throw; - } - SetConnection(timeout_hosts[std::rand() % timeout_hosts.size()]); - } + RetryGuard([this]() { Ping(); }); } std::stringstream fields_section; @@ -318,74 +287,43 @@ void Client::Impl::Ping() { } } -void Client::Impl::SetConnection(const ClientOptions::HostPort& host_port) { - NetworkAddress na = NetworkAddress(host_port.host, std::to_string(host_port.port.value_or(options_.port))); - SocketHolder s(SocketConnect(na)); - - if (s.Closed()) { - throw std::system_error(errno, std::system_category()); - } - - if (options_.tcp_keepalive) { - s.SetTcpKeepAlive(options_.tcp_keepalive_idle.count(), options_.tcp_keepalive_intvl.count(), options_.tcp_keepalive_cnt); - } - if (options_.tcp_nodelay) { - s.SetTcpNoDelay(options_.tcp_nodelay); - } - - socket_ = std::move(s); - socket_input_ = SocketInput(socket_); - socket_output_ = SocketOutput(socket_); - buffered_input_.Reset(); - buffered_output_.Reset(); - - if (!Handshake()) { - throw std::runtime_error("fail to connect to " + host_port.host); - } -} - void Client::Impl::ResetConnection() { - timeout_hosts.clear(); for (size_t i = 0; i < options_.hosts_ports.size(); ++i) { - std::mutex mutex; - std::condition_variable cond_var; - std::exception_ptr except_ptr = nullptr; + try { + const ClientOptions::HostPort& host_port = options_.hosts_ports[i]; + NetworkAddress na = NetworkAddress(host_port.host, std::to_string(host_port.port.value_or(options_.port))); + SocketHolder s(SocketConnect(na)); - std::thread connecting_thread([&cond_var, this, &except_ptr, &i]() { - try { - SetConnection(options_.hosts_ports[i]); - } catch (...) { - except_ptr = std::current_exception(); + if (s.Closed()) { + throw std::system_error(errno, std::system_category()); } - cond_var.notify_one(); - }); - - connecting_thread.detach(); - { - std::unique_lock lock(mutex); - if (cond_var.wait_for(lock, std::chrono::seconds (5)) == std::cv_status::timeout) { - timeout_hosts.emplace_back(options_.hosts_ports[i]); - continue; + if (options_.tcp_keepalive) { + s.SetTcpKeepAlive(options_.tcp_keepalive_idle.count(), options_.tcp_keepalive_intvl.count(), options_.tcp_keepalive_cnt); + } + if (options_.tcp_nodelay) { + s.SetTcpNoDelay(options_.tcp_nodelay); } - } - if (except_ptr) { - try { - std::rethrow_exception(except_ptr); - } catch (...) { - if (i == options_.hosts_ports.size() - 1 && timeout_hosts.empty()) { - throw; - } - continue; + socket_ = std::move(s); + socket_input_ = SocketInput(socket_); + socket_output_ = SocketOutput(socket_); + buffered_input_.Reset(); + buffered_output_.Reset(); + + if (!Handshake()) { + throw std::runtime_error("fail to connect to " + host_port.host); } + } catch (...) { + if (i == options_.hosts_ports.size() - 1) { + throw; + } + continue; } // connected successfully return; } - - throw std::runtime_error("Timeout of connection to hosts"); } const ServerInfo& Client::Impl::GetServerInfo() const { diff --git a/clickhouse/client.h b/clickhouse/client.h index f2d21ee1..e5a0f0cc 100644 --- a/clickhouse/client.h +++ b/clickhouse/client.h @@ -20,7 +20,6 @@ #include #include #include -#include namespace clickhouse { @@ -56,10 +55,6 @@ struct ClientOptions { explicit HostPort(std::string host, std::optional port = std::nullopt) : host(std::move(host)), port(std::move(port)) { } -// HostPort(const HostPort &other) = default; -// HostPort(HostPort &&other) = default; -// HostPort & operator=(const HostPort &other) = default; -// HostPort & operator=(HostPort &&other) = default; }; DECLARE_FIELD(hosts_ports, std::vector, SetHost,{}); /// Hostname of the server. diff --git a/tests/simple/main.cpp b/tests/simple/main.cpp index 459ec80f..2e2b20c5 100644 --- a/tests/simple/main.cpp +++ b/tests/simple/main.cpp @@ -515,59 +515,62 @@ int main() { std::cout << "test 1" << std::endl; Client client(ClientOptions() .SetHost({ - ClientOptions::HostPort("1127.91.2.1"), // wrong host - ClientOptions::HostPort("notlocalwronghost"), // wrong host - ClientOptions::HostPort("localhost", 8000), // wrong port - ClientOptions::HostPort("localhost", 9000), - }) + ClientOptions::HostPort("localhost", 8000), // wrong port + ClientOptions::HostPort("localhost", 7000), // wrong port + ClientOptions::HostPort("1127.91.2.1"), // wrong host + ClientOptions::HostPort("1127.91.2.2"), // wrong host + ClientOptions::HostPort("notlocalwronghost"), // wrong host + ClientOptions::HostPort("another_notlocalwronghost"), // wrong host + ClientOptions::HostPort("localhost", 9000), + }) .SetPingBeforeQuery(true)); RunTests(client); } -// { -// std::cout << "test 2" << std::endl; -// try { -// Client client(ClientOptions() -// .SetHost({ -// ClientOptions::HostPort("notlocalwronghost") // wrong host -// }) -// .SetSendRetries(0) -// .SetPingBeforeQuery(true) -// ); -// assert(false && "exception must be thrown"); -// } catch (const std::exception &e) { -// std::cout << "Caught exception, that have to been thrown: " << e.what() << std::endl; -// } -// } -// { -// std::cout << "test 3" << std::endl; -// try { -// Client client(ClientOptions() -// .SetHost({ -// ClientOptions::HostPort("localhost", 8000), // wrong port -// }) -// .SetSendRetries(0) -// .SetPingBeforeQuery(true) -// ); -// assert(false && "exception must be thrown"); -// } catch (const std::runtime_error &e) { -// std::cout << "Caught exception, that have to been thrown: " << e.what() << std::endl; -// } -// } -// { -// std::cout << "test 4" << std::endl; -// try { -// Client client(ClientOptions() -// .SetHost({ -// ClientOptions::HostPort("1127.91.2.1"), // wrong host -// }) -// .SetSendRetries(0) -// .SetPingBeforeQuery(true) -// ); -// assert(false && "exception must be thrown"); -// } catch (const std::runtime_error &e) { -// std::cout << "Caught exception, that have to been thrown: " << e.what() << std::endl; -// } -// } + { + std::cout << "test 2" << std::endl; + try { + Client client(ClientOptions() + .SetHost({ + ClientOptions::HostPort("notlocalwronghost") // wrong host + }) + .SetSendRetries(0) + .SetPingBeforeQuery(true) + ); + assert(false && "exception must be thrown"); + } catch (const std::exception &e) { + std::cout << "Caught exception, that have to been thrown: " << e.what() << std::endl; + } + } + { + std::cout << "test 3" << std::endl; + try { + Client client(ClientOptions() + .SetHost({ + ClientOptions::HostPort("localhost", 8000), // wrong port + }) + .SetSendRetries(0) + .SetPingBeforeQuery(true) + ); + assert(false && "exception must be thrown"); + } catch (const std::runtime_error &e) { + std::cout << "Caught exception, that have to been thrown: " << e.what() << std::endl; + } + } + { + std::cout << "test 4" << std::endl; + try { + Client client(ClientOptions() + .SetHost({ + ClientOptions::HostPort("1127.91.2.1"), // wrong host + }) + .SetSendRetries(0) + .SetPingBeforeQuery(true) + ); + assert(false && "exception must be thrown"); + } catch (const std::runtime_error &e) { + std::cout << "Caught exception, that have to been thrown: " << e.what() << std::endl; + } + } } catch (const std::exception& e) { std::cerr << "exception : " << e.what() << std::endl; } From 25b7fdce699001ad353069b171036b08b3e4b8f5 Mon Sep 17 00:00:00 2001 From: DF5HSE Date: Mon, 13 Dec 2021 18:47:31 +0300 Subject: [PATCH 05/10] Delete extra changes --- clickhouse/client.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index ee5d6fab..28eacb4c 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -137,6 +137,7 @@ class Client::Impl { }; + const ClientOptions options_; QueryEvents* events_; int compression_ = CompressionState::Disable; @@ -291,8 +292,7 @@ void Client::Impl::ResetConnection() { for (size_t i = 0; i < options_.hosts_ports.size(); ++i) { try { const ClientOptions::HostPort& host_port = options_.hosts_ports[i]; - NetworkAddress na = NetworkAddress(host_port.host, std::to_string(host_port.port.value_or(options_.port))); - SocketHolder s(SocketConnect(na)); + SocketHolder s(SocketConnect(NetworkAddress(host_port.host, std::to_string(host_port.port.value_or(options_.port))))); if (s.Closed()) { throw std::system_error(errno, std::system_category()); From 662a27d19aa6a8adc124f4a8fb7986c4a3c6809c Mon Sep 17 00:00:00 2001 From: DF5HSE Date: Tue, 21 Dec 2021 18:40:09 +0300 Subject: [PATCH 06/10] Fix review comments --- clickhouse/client.cpp | 44 ++++++++++++++++++++++++++++++++++++------- clickhouse/client.h | 13 +++++++------ tests/simple/main.cpp | 10 +++++----- 3 files changed, 49 insertions(+), 18 deletions(-) diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 28eacb4c..2c02b17b 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -57,10 +57,28 @@ struct ClientInfo { }; std::ostream& operator<<(std::ostream& os, const ClientOptions& opt) { - os << "Client(" << opt.user << '@' << "{ "; - for (const ClientOptions::HostPort& hp : opt.hosts_ports) { - os << hp.host << ":" << opt.port << ","; + os << "Client(" << opt.user << '@'; + + bool many_hosts = int(opt.hosts_ports.size()) - int(!opt.host.empty()) > 1; + if (many_hosts) { + os << "{ "; + if (!opt.host.empty()) { + os << opt.host << ":" << opt.port << ","; + } + for (size_t i = 0; i < opt.hosts_ports.size(); ++i) { + os << opt.hosts_ports[i].host << ":" << opt.hosts_ports[i].port.value_or(opt.port) + << (i != opt.hosts_ports.size() - 1 ? "," : "}"); + } + } + else { + if (opt.host.empty()) { + os << opt.hosts_ports[0].host << ":" << opt.hosts_ports[0].port.value_or(opt.port); + } + else { + os << opt.host << ":" << opt.port; + } } + os << " ping_before_query:" << opt.ping_before_query << " send_retries:" << opt.send_retries << " retry_timeout:" << opt.retry_timeout.count() @@ -87,6 +105,8 @@ class Client::Impl { const ServerInfo& GetServerInfo() const; + const std::optional& GetConnectedHostPort() const; + private: bool Handshake(); @@ -153,6 +173,7 @@ class Client::Impl { CodedOutputStream output_; ServerInfo server_info_; + std::optional connected_host_port_; }; @@ -289,9 +310,10 @@ void Client::Impl::Ping() { } void Client::Impl::ResetConnection() { - for (size_t i = 0; i < options_.hosts_ports.size(); ++i) { + connected_host_port_.reset(); + for (int i = -1; i < int(options_.hosts_ports.size()); ++i) { + const ClientOptions::HostPort& host_port = i == -1 ? ClientOptions::HostPort(options_.host) : options_.hosts_ports[i]; try { - const ClientOptions::HostPort& host_port = options_.hosts_ports[i]; SocketHolder s(SocketConnect(NetworkAddress(host_port.host, std::to_string(host_port.port.value_or(options_.port))))); if (s.Closed()) { @@ -315,13 +337,13 @@ void Client::Impl::ResetConnection() { throw std::runtime_error("fail to connect to " + host_port.host); } } catch (...) { - if (i == options_.hosts_ports.size() - 1) { + if (i == int(options_.hosts_ports.size()) - 1) { throw; } continue; } - // connected successfully + connected_host_port_ = host_port; return; } } @@ -330,6 +352,10 @@ const ServerInfo& Client::Impl::GetServerInfo() const { return server_info_; } +const std::optional& Client::Impl::GetConnectedHostPort() const { + return connected_host_port_; +} + bool Client::Impl::Handshake() { if (!SendHello()) { return false; @@ -832,4 +858,8 @@ const ServerInfo& Client::GetServerInfo() const { return impl_->GetServerInfo(); } +const std::optional& Client::GetConnectedHostPort() const { + return impl_->GetConnectedHostPort(); +} + } diff --git a/clickhouse/client.h b/clickhouse/client.h index e5a0f0cc..0587b458 100644 --- a/clickhouse/client.h +++ b/clickhouse/client.h @@ -55,15 +55,14 @@ struct ClientOptions { explicit HostPort(std::string host, std::optional port = std::nullopt) : host(std::move(host)), port(std::move(port)) { } + + bool operator==(const HostPort& other) const { + return host == other.host && port == other.port; + } }; DECLARE_FIELD(hosts_ports, std::vector, SetHost,{}); /// Hostname of the server. - std::string host = std::string(); - inline ClientOptions& SetHost(const std::string& value) { - hosts_ports.emplace_back(value, std::nullopt); - host = hosts_ports.back().host; - return *this; - } + DECLARE_FIELD(host, std::string, SetHost, std::string()); /// Service port. DECLARE_FIELD(port, unsigned int, SetPort, 9000); @@ -144,6 +143,8 @@ class Client { const ServerInfo& GetServerInfo() const; + const std::optional& GetConnectedHostPort() const; + private: ClientOptions options_; diff --git a/tests/simple/main.cpp b/tests/simple/main.cpp index 2e2b20c5..8619371c 100644 --- a/tests/simple/main.cpp +++ b/tests/simple/main.cpp @@ -512,7 +512,7 @@ int main() { } { - std::cout << "test 1" << std::endl; + ClientOptions::HostPort correct_host_port = ClientOptions::HostPort("localhost", 9000); Client client(ClientOptions() .SetHost({ ClientOptions::HostPort("localhost", 8000), // wrong port @@ -521,13 +521,15 @@ int main() { ClientOptions::HostPort("1127.91.2.2"), // wrong host ClientOptions::HostPort("notlocalwronghost"), // wrong host ClientOptions::HostPort("another_notlocalwronghost"), // wrong host - ClientOptions::HostPort("localhost", 9000), + correct_host_port, + ClientOptions::HostPort("localhost", 9001), // wrong port + ClientOptions::HostPort("1127.911.2.2"), // wrong host }) .SetPingBeforeQuery(true)); + assert(client.GetConnectedHostPort() == correct_host_port); RunTests(client); } { - std::cout << "test 2" << std::endl; try { Client client(ClientOptions() .SetHost({ @@ -542,7 +544,6 @@ int main() { } } { - std::cout << "test 3" << std::endl; try { Client client(ClientOptions() .SetHost({ @@ -557,7 +558,6 @@ int main() { } } { - std::cout << "test 4" << std::endl; try { Client client(ClientOptions() .SetHost({ From 53affd25cf28f7b20684d00583ca741b9b208b5b Mon Sep 17 00:00:00 2001 From: DF5HSE Date: Fri, 31 Dec 2021 13:59:50 +0300 Subject: [PATCH 07/10] Divided catch (...) block --- clickhouse/client.cpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 2c02b17b..c82039a3 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -336,6 +336,16 @@ void Client::Impl::ResetConnection() { if (!Handshake()) { throw std::runtime_error("fail to connect to " + host_port.host); } + } catch (const std::system_error &e) { + if (i == int(options_.hosts_ports.size()) - 1) { + throw; + } + continue; + } catch (const std::runtime_error &e) { + if (i == int(options_.hosts_ports.size()) - 1) { + throw; + } + continue; } catch (...) { if (i == int(options_.hosts_ports.size()) - 1) { throw; From fabac4d8b9204f16f4179a17896ea5c8ea9f1eec Mon Sep 17 00:00:00 2001 From: DF5HSE Date: Tue, 11 Jan 2022 01:26:06 +0300 Subject: [PATCH 08/10] Add descriptioin of feature to README --- README.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/README.md b/README.md index 1f75797c..fbcfc4a6 100644 --- a/README.md +++ b/README.md @@ -75,3 +75,16 @@ client.Select("SELECT id, name FROM test.numbers", [] (const Block& block) /// Delete table. client.Execute("DROP TABLE test.numbers"); ``` + +### Features +## Multiple host +It is possible to specify multiple hosts to connect to. The connection +will be set to the first available host. +```cpp +Client client(ClientOptions() + .SetHost({ + ClientOptions::HostPort("host1.com", 8000), + ClientOptions::HostPort("host2.com"), /// port is ClientOptions.port + })); + +``` \ No newline at end of file From edf5421a15e0ffcba89601c13688fe60f65dedd8 Mon Sep 17 00:00:00 2001 From: DF5HSE Date: Tue, 11 Jan 2022 01:27:42 +0300 Subject: [PATCH 09/10] Fix font size --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index fbcfc4a6..80cc596f 100644 --- a/README.md +++ b/README.md @@ -76,8 +76,8 @@ client.Select("SELECT id, name FROM test.numbers", [] (const Block& block) client.Execute("DROP TABLE test.numbers"); ``` -### Features -## Multiple host +## Features +### Multiple host It is possible to specify multiple hosts to connect to. The connection will be set to the first available host. ```cpp From 13294d3b4c242e851ca809ed1fefe6e9088b52cc Mon Sep 17 00:00:00 2001 From: DF5HSE Date: Thu, 13 Jan 2022 19:34:21 +0300 Subject: [PATCH 10/10] Delete unused operator== overload --- clickhouse/client.h | 4 ---- 1 file changed, 4 deletions(-) diff --git a/clickhouse/client.h b/clickhouse/client.h index 4e0a0025..1cacaf79 100644 --- a/clickhouse/client.h +++ b/clickhouse/client.h @@ -59,10 +59,6 @@ struct ClientOptions { explicit HostPort(std::string host, std::optional port = std::nullopt) : host(std::move(host)), port(std::move(port)) { } - - bool operator==(const HostPort& other) const { - return host == other.host && port == other.port; - } }; DECLARE_FIELD(hosts_ports, std::vector, SetHost,{}); /// Hostname of the server.