Skip to content

Commit 0f2265b

Browse files
authored
feature: support more secure settings for HTTPS for schema registry (#584)
1 parent 7ac683c commit 0f2265b

9 files changed

+237
-37
lines changed

src/Core/Settings.h

+4
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,10 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
771771
M(String, rawstore_time_extraction_rule, "", "_tp_time extraction rule (string, json, regex)", 0) \
772772
M(URI, kafka_schema_registry_url, "", "For ProtobufSingle format: Kafka Schema Registry URL.", 0) \
773773
M(String, kafka_schema_registry_credentials, "", "Credetials to be used to fetch schema from the `kafka_schema_registry_url`, with format '<username>:<password>'.", 0) \
774+
M(String, kafka_schema_registry_private_key_file, "", "Path to the private key file used for encryption. Can be empty if no private key file is used.", 0) \
775+
M(String, kafka_schema_registry_cert_file, "", "Path to the certificate file (in PEM format). If the private key and the certificate are stored in the same file, this can be empty if kakfa_schema_registry_private_key_file is given.", 0) \
776+
M(String, kafka_schema_registry_ca_location, "", "Path to the file or directory containing the CA/root certificates.", 0) \
777+
M(Bool, kafka_schema_registry_skip_cert_check, false, "If set to true, ignore server certificate check result.", 0) \
774778
/** proton: ends. */
775779
// End of FORMAT_FACTORY_SETTINGS
776780
// Please add settings non-related to formats into the COMMON_SETTINGS above.

src/Formats/FormatFactory.cpp

+6-2
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,12 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
110110
format_settings.schema.format_schema_path = context->getFormatSchemaPath();
111111
format_settings.schema.is_server = context->hasGlobalContext() && (context->getGlobalContext()->getApplicationType() == Context::ApplicationType::SERVER);
112112
/// proton: starts
113-
format_settings.schema.kafka_schema_registry_url = settings.kafka_schema_registry_url.toString();
114-
format_settings.schema.kafka_schema_registry_credentials = settings.kafka_schema_registry_credentials;
113+
format_settings.kafka_schema_registry.url = settings.kafka_schema_registry_url.toString();
114+
format_settings.kafka_schema_registry.credentials = settings.kafka_schema_registry_credentials;
115+
format_settings.kafka_schema_registry.private_key_file = settings.kafka_schema_registry_private_key_file;
116+
format_settings.kafka_schema_registry.certificate_file = settings.kafka_schema_registry_cert_file;
117+
format_settings.kafka_schema_registry.ca_location = settings.kafka_schema_registry_ca_location;
118+
format_settings.kafka_schema_registry.skip_cert_check = settings.kafka_schema_registry_skip_cert_check;
115119
/// proton: ends
116120
format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields;
117121
format_settings.template_settings.resultset_format = settings.format_template_resultset;

src/Formats/FormatSettings.h

+12-4
Original file line numberDiff line numberDiff line change
@@ -207,12 +207,20 @@ struct FormatSettings
207207
std::string format_schema;
208208
std::string format_schema_path;
209209
bool is_server = false;
210-
/// proton: starts
211-
std::string kafka_schema_registry_url;
212-
std::string kafka_schema_registry_credentials;
213-
/// proton: ends
214210
} schema;
215211

212+
/// proton: starts
213+
struct
214+
{
215+
std::string url;
216+
std::string credentials;
217+
std::string private_key_file;
218+
std::string certificate_file;
219+
std::string ca_location;
220+
bool skip_cert_check = false;
221+
} kafka_schema_registry;
222+
/// proton: ends
223+
216224
struct
217225
{
218226
String resultset_format;

src/Formats/KafkaSchemaRegistry.cpp

+12-2
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,18 @@ namespace ErrorCodes
1515
extern const int INCORRECT_DATA;
1616
}
1717

18-
KafkaSchemaRegistry::KafkaSchemaRegistry(const String & base_url_, const String & credentials_)
18+
KafkaSchemaRegistry::KafkaSchemaRegistry(
19+
const String & base_url_,
20+
const String & credentials_,
21+
const String & private_key_file_,
22+
const String & certificate_file_,
23+
const String & ca_location_,
24+
bool skip_cert_check)
1925
: base_url(base_url_)
26+
, private_key_file(private_key_file_)
27+
, certificate_file(certificate_file_)
28+
, ca_location(ca_location_)
29+
, Verification_mode(skip_cert_check ? Poco::Net::Context::VERIFY_NONE : Poco::Net::Context::VERIFY_RELAXED)
2030
, logger(&Poco::Logger::get("KafkaSchemaRegistry"))
2131
{
2232
assert(!base_url.empty());
@@ -48,7 +58,7 @@ String KafkaSchemaRegistry::fetchSchema(UInt32 id)
4858
if (!credentials.empty())
4959
credentials.authenticate(request);
5060

51-
auto session = makePooledHTTPSession(url, timeouts, 1);
61+
auto session = makePooledHTTPSession(url, private_key_file, certificate_file, ca_location, Verification_mode, timeouts, 1);
5262
std::istream * response_body{};
5363
try
5464
{

src/Formats/KafkaSchemaRegistry.h

+46-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
#pragma once
22

3+
#include <Common/SipHash.h>
34
#include <IO/ReadBuffer.h>
45

6+
#include <Poco/Net/Context.h>
57
#include <Poco/Net/HTTPBasicCredentials.h>
68
#include <Poco/URI.h>
79

@@ -14,14 +16,57 @@ class KafkaSchemaRegistry final
1416
public:
1517
static UInt32 readSchemaId(ReadBuffer & in);
1618

19+
/// The key type for caching KafkaSchemaRegistry
20+
struct CacheKey
21+
{
22+
String base_url;
23+
String credentials;
24+
String private_key_file;
25+
String certificate_file;
26+
String ca_location;
27+
bool skip_cert_check;
28+
29+
bool operator==(const CacheKey & rhs) const
30+
{
31+
return std::tie(base_url, credentials, private_key_file, certificate_file, ca_location, skip_cert_check)
32+
== std::tie(rhs.base_url, rhs.credentials, rhs.private_key_file, rhs.certificate_file, rhs.ca_location, rhs.skip_cert_check);
33+
}
34+
};
35+
36+
/// The hash function for caching KafkaSchemaRegistry
37+
struct CacheHasher
38+
{
39+
size_t operator()(const CacheKey & k) const
40+
{
41+
SipHash s;
42+
s.update(k.base_url);
43+
s.update(k.credentials);
44+
s.update(k.private_key_file);
45+
s.update(k.certificate_file);
46+
s.update(k.ca_location);
47+
s.update(k.skip_cert_check);
48+
return s.get64();
49+
}
50+
};
51+
1752
/// \param credentials_ is expected to be formatted in "<username>:<password>".
18-
KafkaSchemaRegistry(const String & base_url_, const String & credentials_);
53+
KafkaSchemaRegistry(
54+
const String & base_url_,
55+
const String & credentials_,
56+
const String & private_key_file_,
57+
const String & certificate_file_,
58+
const String & ca_location_,
59+
bool skip_cert_check);
1960

2061
String fetchSchema(UInt32 id);
2162

2263
private:
2364
Poco::URI base_url;
2465
Poco::Net::HTTPBasicCredentials credentials;
66+
String private_key_file;
67+
String certificate_file;
68+
String ca_location;
69+
Poco::Net::Context::VerificationMode Verification_mode;
2570

2671
Poco::Logger* logger;
2772
};

src/IO/HTTPCommon.cpp

+87-10
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,15 @@ namespace
6767
throw Exception("Unsupported scheme in URI '" + uri.toString() + "'", ErrorCodes::UNSUPPORTED_URI_SCHEME);
6868
}
6969

70-
HTTPSessionPtr makeHTTPSessionImpl(const std::string & host, UInt16 port, bool https, bool keep_alive, bool resolve_host = true)
70+
HTTPSessionPtr makeHTTPSessionImpl(const std::string & host, UInt16 port, bool https, bool keep_alive, Poco::Net::Context::Ptr context, bool resolve_host = true) /* proton: updated */
7171
{
7272
HTTPSessionPtr session;
7373

7474
if (https)
7575
{
7676
#if USE_SSL
7777
/// Cannot resolve host in advance, otherwise SNI won't work in Poco.
78-
session = std::make_shared<Poco::Net::HTTPSClientSession>(host, port);
78+
session = std::make_shared<Poco::Net::HTTPSClientSession>(host, port, context);
7979
#else
8080
throw Exception("proton was built without HTTPS support", ErrorCodes::FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME);
8181
#endif
@@ -108,10 +108,11 @@ namespace
108108
const UInt16 proxy_port;
109109
bool proxy_https;
110110
bool resolve_host;
111+
Poco::Net::Context::Ptr context; /* proton: updated */
111112
using Base = PoolBase<Poco::Net::HTTPClientSession>;
112113
ObjectPtr allocObject() override
113114
{
114-
auto session = makeHTTPSessionImpl(host, port, https, true, resolve_host);
115+
auto session = makeHTTPSessionImpl(host, port, https, true, context, resolve_host);
115116
if (!proxy_host.empty())
116117
{
117118
const String proxy_scheme = proxy_https ? "https" : "http";
@@ -136,6 +137,12 @@ namespace
136137
const std::string & proxy_host_,
137138
UInt16 proxy_port_,
138139
bool proxy_https_,
140+
/// proton: starts
141+
const String & private_key_file,
142+
const String & certificate_file,
143+
const String & ca_location,
144+
Poco::Net::Context::VerificationMode verification_mode,
145+
/// proton: ends
139146
size_t max_pool_size_,
140147
bool resolve_host_ = true)
141148
: Base(static_cast<unsigned>(max_pool_size_), &Poco::Logger::get("HTTPSessionPool"))
@@ -146,6 +153,14 @@ namespace
146153
, proxy_port(proxy_port_)
147154
, proxy_https(proxy_https_)
148155
, resolve_host(resolve_host_)
156+
, context(new Poco::Net::Context(
157+
Poco::Net::SSLManager::instance().defaultClientContext()->usage(),
158+
private_key_file,
159+
certificate_file,
160+
ca_location,
161+
/*verificationMode=*/verification_mode,
162+
/*verificationDepth=*/9,
163+
/*loadDefaultCAs=*/true))
149164
{
150165
}
151166
};
@@ -162,10 +177,21 @@ namespace
162177
UInt16 proxy_port;
163178
bool is_proxy_https;
164179

180+
/// proton: starts
181+
String private_key_file;
182+
String certificate_file;
183+
String ca_location;
184+
Poco::Net::Context::VerificationMode Verification_mode;
185+
/// proton: ends
186+
165187
bool operator ==(const Key & rhs) const
166188
{
167-
return std::tie(target_host, target_port, is_target_https, proxy_host, proxy_port, is_proxy_https)
168-
== std::tie(rhs.target_host, rhs.target_port, rhs.is_target_https, rhs.proxy_host, rhs.proxy_port, rhs.is_proxy_https);
189+
/// proton: starts
190+
return std::tie(target_host, target_port, is_target_https, proxy_host, proxy_port, is_proxy_https,
191+
private_key_file, certificate_file, ca_location, Verification_mode)
192+
== std::tie(rhs.target_host, rhs.target_port, rhs.is_target_https, rhs.proxy_host, rhs.proxy_port, rhs.is_proxy_https,
193+
rhs.private_key_file, rhs.certificate_file, rhs.ca_location, rhs.Verification_mode);
194+
/// proton: ends
169195
}
170196
};
171197

@@ -204,6 +230,12 @@ namespace
204230
Entry getSession(
205231
const Poco::URI & uri,
206232
const Poco::URI & proxy_uri,
233+
/// proton: starts
234+
const String & private_key_file,
235+
const String & certificate_file,
236+
const String & ca_location,
237+
Poco::Net::Context::VerificationMode verification_mode,
238+
/// proton: ends
207239
const ConnectionTimeouts & timeouts,
208240
size_t max_connections_per_endpoint,
209241
bool resolve_host = true)
@@ -224,11 +256,16 @@ namespace
224256
proxy_https = isHTTPS(proxy_uri);
225257
}
226258

227-
HTTPSessionPool::Key key{host, port, https, proxy_host, proxy_port, proxy_https};
259+
HTTPSessionPool::Key key{
260+
host, port, https, proxy_host, proxy_port, proxy_https,
261+
private_key_file, certificate_file, ca_location, verification_mode};
228262
auto pool_ptr = endpoints_pool.find(key);
229263
if (pool_ptr == endpoints_pool.end())
230264
std::tie(pool_ptr, std::ignore) = endpoints_pool.emplace(
231-
key, std::make_shared<SingleEndpointHTTPSessionPool>(host, port, https, proxy_host, proxy_port, proxy_https, max_connections_per_endpoint, resolve_host));
265+
key, std::make_shared<SingleEndpointHTTPSessionPool>(
266+
host, port, https, proxy_host, proxy_port, proxy_https,
267+
private_key_file, certificate_file, ca_location, verification_mode, /* proton: updated */
268+
max_connections_per_endpoint, resolve_host));
232269

233270
auto retry_timeout = timeouts.connection_timeout.totalMicroseconds();
234271
auto session = pool_ptr->second->get(retry_timeout);
@@ -280,20 +317,60 @@ HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts &
280317
UInt16 port = uri.getPort();
281318
bool https = isHTTPS(uri);
282319

283-
auto session = makeHTTPSessionImpl(host, port, https, false, resolve_host);
320+
auto session = makeHTTPSessionImpl(host, port, https, false, Poco::Net::SSLManager::instance().defaultClientContext(), resolve_host);
284321
setTimeouts(*session, timeouts);
285322
return session;
286323
}
287324

325+
/// proton: starts
326+
PooledHTTPSessionPtr makePooledHTTPSession(
327+
const Poco::URI & uri,
328+
const String & private_key_file,
329+
const String & certificate_file,
330+
const String & ca_location,
331+
Poco::Net::Context::VerificationMode verification_mode,
332+
const ConnectionTimeouts & timeouts,
333+
size_t per_endpoint_pool_size,
334+
bool resolve_host)
335+
{
336+
return makePooledHTTPSession(uri, {},
337+
private_key_file, certificate_file, ca_location, verification_mode,
338+
timeouts, per_endpoint_pool_size, resolve_host);
339+
}
340+
341+
PooledHTTPSessionPtr makePooledHTTPSession(
342+
const Poco::URI & uri,
343+
const Poco::URI & proxy_uri,
344+
const String & private_key_file,
345+
const String & certificate_file,
346+
const String & ca_location,
347+
Poco::Net::Context::VerificationMode verification_mode,
348+
const ConnectionTimeouts & timeouts,
349+
size_t per_endpoint_pool_size,
350+
bool resolve_host)
351+
{
352+
return HTTPSessionPool::instance().getSession(uri, proxy_uri,
353+
private_key_file, certificate_file, ca_location, verification_mode,
354+
timeouts, per_endpoint_pool_size, resolve_host);
355+
}
356+
/// proton: ends
357+
288358

289359
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host)
290360
{
291361
return makePooledHTTPSession(uri, {}, timeouts, per_endpoint_pool_size, resolve_host);
292362
}
293363

294-
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const Poco::URI & proxy_uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host)
364+
PooledHTTPSessionPtr makePooledHTTPSession(
365+
const Poco::URI & uri,
366+
const Poco::URI & proxy_uri,
367+
const ConnectionTimeouts & timeouts,
368+
size_t per_endpoint_pool_size,
369+
bool resolve_host)
295370
{
296-
return HTTPSessionPool::instance().getSession(uri, proxy_uri, timeouts, per_endpoint_pool_size, resolve_host);
371+
return HTTPSessionPool::instance().getSession(uri, proxy_uri,
372+
/*private_key_file=*/"", /*certificate_file=*/"", /*ca_location=*/"", /*verification_mode=*/Poco::Net::Context::VERIFY_RELAXED,
373+
timeouts, per_endpoint_pool_size, resolve_host);
297374
}
298375

299376
bool isRedirect(const Poco::Net::HTTPResponse::HTTPStatus status) { return status == Poco::Net::HTTPResponse::HTTP_MOVED_PERMANENTLY || status == Poco::Net::HTTPResponse::HTTP_FOUND || status == Poco::Net::HTTPResponse::HTTP_SEE_OTHER || status == Poco::Net::HTTPResponse::HTTP_TEMPORARY_REDIRECT; }

src/IO/HTTPCommon.h

+24
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <memory>
55
#include <mutex>
66

7+
#include <Poco/Net/Context.h>
78
#include <Poco/Net/HTTPClientSession.h>
89
#include <Poco/Net/HTTPRequest.h>
910
#include <Poco/Net/HTTPResponse.h>
@@ -77,6 +78,29 @@ HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts &
7778
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host = true);
7879
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const Poco::URI & proxy_uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host = true);
7980

81+
/// proton: starts
82+
PooledHTTPSessionPtr makePooledHTTPSession(
83+
const Poco::URI & uri,
84+
const String & private_key_file,
85+
const String & certificate_file,
86+
const String & ca_location,
87+
Poco::Net::Context::VerificationMode verification_mode,
88+
const ConnectionTimeouts & timeouts,
89+
size_t per_endpoint_pool_size,
90+
bool resolve_host = true);
91+
92+
PooledHTTPSessionPtr makePooledHTTPSession(
93+
const Poco::URI & uri,
94+
const Poco::URI & proxy_uri,
95+
const String & private_key_file,
96+
const String & certificate_file,
97+
const String & ca_location,
98+
Poco::Net::Context::VerificationMode verification_mode,
99+
const ConnectionTimeouts & timeouts,
100+
size_t per_endpoint_pool_size,
101+
bool resolve_host = true);
102+
/// proton: ends
103+
80104
bool isRedirect(Poco::Net::HTTPResponse::HTTPStatus status);
81105

82106
/** Used to receive response (response headers and possibly body)

0 commit comments

Comments
 (0)