diff --git a/build.sh b/build.sh
index 11c5d4c4937..e17f8f15ddf 100755
--- a/build.sh
+++ b/build.sh
@@ -81,6 +81,7 @@ cmake .. \
-DENABLE_DEBUG_FUNCS=ON \
-DENABLE_URL_FUNCS=ON \
-DENABLE_AVRO=ON \
+ -DENABLE_AWS_S3=ON \
-DENABLE_AWS_MSK_IAM=ON \
-DENABLE_CURL=${is_not_darwin} \
-DENABLE_PULSAR=${is_not_darwin}
diff --git a/contrib/aws-cmake/CMakeLists.txt b/contrib/aws-cmake/CMakeLists.txt
index bd3043afc96..5d735f97182 100644
--- a/contrib/aws-cmake/CMakeLists.txt
+++ b/contrib/aws-cmake/CMakeLists.txt
@@ -2,7 +2,7 @@ set(ENABLE_AWS_S3_DEFAULT OFF)
set(ENABLE_AWS_MSK_IAM_DEFAULT OFF) # proton: added
if(ENABLE_LIBRARIES AND (OS_LINUX OR OS_DARWIN) AND TARGET OpenSSL::Crypto)
- set(ENABLE_AWS_S3_DEFAULT OFF) # proton: added https://github.com/timeplus-io/proton/issues/918
+ set(ENABLE_AWS_S3_DEFAULT ON)
set(ENABLE_AWS_MSK_IAM_DEFAULT ON) # proton: added
endif()
diff --git a/docker/packager/packager b/docker/packager/packager
index b1cfc507366..1659c1ca989 100755
--- a/docker/packager/packager
+++ b/docker/packager/packager
@@ -218,7 +218,7 @@ def parse_env_variables(build_type, compiler, sanitizer, package_type, image_typ
cmake_flags.append('-DENABLE_CYRUS_SASL=ON')
cmake_flags.append('-DENABLE_KRB5=ON')
cmake_flags.append('-DENABLE_BROTLI=ON')
- cmake_flags.append('-DENABLE_S3=ON')
+ cmake_flags.append('-DENABLE_AWS_S3=ON')
cmake_flags.append('-DENABLE_AVRO=ON')
cmake_flags.append('-DENABLE_AWS_MSK_IAM=ON')
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 039e58d5e17..37aae58486a 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -78,6 +78,9 @@ list (REMOVE_ITEM clickhouse_common_io_sources Common/malloc.cpp Common/new_dele
add_headers_and_sources(dbms Disks/IO)
add_headers_and_sources(dbms Disks/ObjectStorages)
+
+add_headers_and_sources(dbms Storages/NamedCollections)
+
if (USE_LIBPQXX)
add_headers_and_sources(dbms Core/PostgreSQL)
endif()
@@ -97,6 +100,8 @@ if (TARGET ch_contrib::azure_sdk)
add_headers_and_sources(dbms Disks/ObjectStorages/AzureBlobStorage)
endif()
+add_headers_and_sources(dbms Databases/ApacheIceberg)
+
add_headers_and_sources(dbms Disks/ObjectStorages/Cached)
add_headers_and_sources(dbms Disks/ObjectStorages/Web)
@@ -208,6 +213,7 @@ add_object_library(clickhouse_server_http Server/HTTP)
add_object_library(clickhouse_formats Formats)
add_object_library(clickhouse_processors Processors)
# proton : starts
+add_object_library(clickhouse_storages_iceberg Storages/Iceberg)
add_object_library(clickhouse_processors_streaming Processors/Streaming)
add_object_library(clickhouse_formats_avro Formats/Avro)
diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp
index 2b4d165dcf3..25c52ec4b1f 100644
--- a/src/Common/ErrorCodes.cpp
+++ b/src/Common/ErrorCodes.cpp
@@ -626,6 +626,7 @@
M(676, CANNOT_PARSE_IPV6) \
M(677, THREAD_WAS_CANCELED) \
M(678, IO_URING_INIT_FAILED) \
+ M(736, ICEBERG_CATALOG_ERROR) \
M(679, IO_URING_SUBMIT_ERROR) \
M(997, UNSUPPORTED) \
M(998, INVALID_INTEGER_STRING) \
diff --git a/src/Common/Priority.h b/src/Common/Priority.h
new file mode 100644
index 00000000000..8952fe4dd5a
--- /dev/null
+++ b/src/Common/Priority.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#include
+
+/// Common type for priority values.
+/// Separate type (rather than `Int64` is used just to avoid implicit conversion errors and to default-initialize
+struct Priority
+{
+ Int64 value = 0; /// Note that lower value means higher priority.
+ constexpr operator Int64() const { return value; } /// NOLINT
+};
diff --git a/src/Common/SettingsChanges.cpp b/src/Common/SettingsChanges.cpp
index 9fb4f361e09..7790c272606 100644
--- a/src/Common/SettingsChanges.cpp
+++ b/src/Common/SettingsChanges.cpp
@@ -46,4 +46,30 @@ Field * SettingsChanges::tryGet(std::string_view name)
return &change->value;
}
+bool SettingsChanges::insertSetting(std::string_view name, const Field & value)
+{
+ auto it = std::find_if(begin(), end(), [&name](const SettingChange & change) { return change.name == name; });
+ if (it != end())
+ return false;
+ emplace_back(name, value);
+ return true;
+}
+
+void SettingsChanges::setSetting(std::string_view name, const Field & value)
+{
+ if (auto * setting_value = tryGet(name))
+ *setting_value = value;
+ else
+ insertSetting(name, value);
+}
+
+bool SettingsChanges::removeSetting(std::string_view name)
+{
+ auto it = std::find_if(begin(), end(), [&name](const SettingChange & change) { return change.name == name; });
+ if (it == end())
+ return false;
+ erase(it);
+ return true;
+}
+
}
diff --git a/src/Common/SettingsChanges.h b/src/Common/SettingsChanges.h
index 851dd12751c..83db23016ad 100644
--- a/src/Common/SettingsChanges.h
+++ b/src/Common/SettingsChanges.h
@@ -30,6 +30,13 @@ class SettingsChanges : public std::vector
bool tryGet(std::string_view name, Field & out_value) const;
const Field * tryGet(std::string_view name) const;
Field * tryGet(std::string_view name);
+
+ /// Inserts element if doesn't exists and returns true, otherwise just returns false
+ bool insertSetting(std::string_view name, const Field & value);
+ /// Sets element to value, inserts if doesn't exist
+ void setSetting(std::string_view name, const Field & value);
+ /// If element exists - removes it and returns true, otherwise returns false
+ bool removeSetting(std::string_view name);
};
}
diff --git a/src/Core/BaseSettingsFwdMacros.h b/src/Core/BaseSettingsFwdMacros.h
new file mode 100644
index 00000000000..09a4397e483
--- /dev/null
+++ b/src/Core/BaseSettingsFwdMacros.h
@@ -0,0 +1,7 @@
+#pragma once
+
+#define DECLARE_SETTING_TRAIT(CLASS_NAME, TYPE) using CLASS_NAME##TYPE = SettingField##TYPE CLASS_NAME##Impl::*;
+
+#define DECLARE_SETTING_SUBSCRIPT_OPERATOR(CLASS_NAME, TYPE) \
+ const SettingField##TYPE & operator[](CLASS_NAME##TYPE t) const; \
+ SettingField##TYPE & operator[](CLASS_NAME##TYPE t);
diff --git a/src/Core/BaseSettingsFwdMacrosImpl.h b/src/Core/BaseSettingsFwdMacrosImpl.h
new file mode 100644
index 00000000000..31c6b5dca1f
--- /dev/null
+++ b/src/Core/BaseSettingsFwdMacrosImpl.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#define IMPLEMENT_SETTING_SUBSCRIPT_OPERATOR(CLASS_NAME, TYPE) \
+ const SettingField##TYPE & CLASS_NAME::operator[](CLASS_NAME##TYPE t) const \
+ { \
+ return impl.get()->*t; \
+ } \
+ SettingField##TYPE & CLASS_NAME::operator[](CLASS_NAME##TYPE t) \
+ { \
+ return impl.get()->*t; \
+ }
diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp
index a0ab9ed2d27..947d9454017 100644
--- a/src/Core/SettingsEnums.cpp
+++ b/src/Core/SettingsEnums.cpp
@@ -139,4 +139,7 @@ IMPLEMENT_SETTING_ENUM(EscapingRule, ErrorCodes::BAD_ARGUMENTS,
{"JSON", FormatSettings::EscapingRule::JSON},
{"XML", FormatSettings::EscapingRule::XML},
{"Raw", FormatSettings::EscapingRule::Raw}})
+
+IMPLEMENT_SETTING_ENUM(IcebergCatalogType, ErrorCodes::BAD_ARGUMENTS,
+ {{"rest", IcebergCatalogType::REST}})
}
diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h
index bbed96827ff..c24beab70d8 100644
--- a/src/Core/SettingsEnums.h
+++ b/src/Core/SettingsEnums.h
@@ -168,4 +168,11 @@ DECLARE_SETTING_ENUM_WITH_RENAME(EnumComparingMode, FormatSettings::EnumComparin
DECLARE_SETTING_ENUM_WITH_RENAME(EscapingRule, FormatSettings::EscapingRule)
+enum class IcebergCatalogType : uint8_t
+{
+ REST,
+};
+
+DECLARE_SETTING_ENUM(IcebergCatalogType)
+
}
diff --git a/src/Core/UUID.h b/src/Core/UUID.h
index a24dcebdc9e..2a64c57a37e 100644
--- a/src/Core/UUID.h
+++ b/src/Core/UUID.h
@@ -11,6 +11,29 @@ namespace UUIDHelpers
/// Generate random UUID.
UUID generateV4();
+ constexpr size_t HighBytes = (std::endian::native == std::endian::little) ? 0 : 1;
+ constexpr size_t LowBytes = (std::endian::native == std::endian::little) ? 1 : 0;
+
+ inline uint64_t getHighBytes(const UUID & uuid)
+ {
+ return uuid.toUnderType().items[HighBytes];
+ }
+
+ inline uint64_t & getHighBytes(UUID & uuid)
+ {
+ return uuid.toUnderType().items[HighBytes];
+ }
+
+ inline uint64_t getLowBytes(const UUID & uuid)
+ {
+ return uuid.toUnderType().items[LowBytes];
+ }
+
+ inline uint64_t & getLowBytes(UUID & uuid)
+ {
+ return uuid.toUnderType().items[LowBytes];
+ }
+
const UUID Nil{};
}
diff --git a/src/Databases/ApacheIceberg/ApacheIcebergStorageType.h b/src/Databases/ApacheIceberg/ApacheIcebergStorageType.h
new file mode 100644
index 00000000000..2e70e0ce749
--- /dev/null
+++ b/src/Databases/ApacheIceberg/ApacheIcebergStorageType.h
@@ -0,0 +1,16 @@
+#pragma once
+
+#include
+
+namespace DB
+{
+
+enum class ApacheIcebergStorageType : uint8_t
+{
+ S3,
+ Azure,
+ Local,
+ HDFS,
+};
+
+}
diff --git a/src/Databases/ApacheIceberg/DatabaseIceberg.cpp b/src/Databases/ApacheIceberg/DatabaseIceberg.cpp
new file mode 100644
index 00000000000..289bbce68fb
--- /dev/null
+++ b/src/Databases/ApacheIceberg/DatabaseIceberg.cpp
@@ -0,0 +1,353 @@
+#include
+
+#if USE_AVRO
+#include
+#include
+
+#include
+#include
+#include
+
+#include
+#include
+#include
+
+#include
+#include
+#include
+
+#include
+
+#include
+#include
+#include
+#include
+#include
+
+
+namespace DB
+{
+namespace DatabaseApacheIcebergSetting
+{
+extern const DatabaseApacheIcebergSettingsIcebergCatalogType catalog_type;
+extern const DatabaseApacheIcebergSettingsString warehouse;
+extern const DatabaseApacheIcebergSettingsString catalog_credential;
+extern const DatabaseApacheIcebergSettingsString auth_header;
+extern const DatabaseApacheIcebergSettingsString auth_scope;
+extern const DatabaseApacheIcebergSettingsString storage_endpoint;
+extern const DatabaseApacheIcebergSettingsString oauth_server_uri;
+extern const DatabaseApacheIcebergSettingsBool vended_credentials;
+extern const DatabaseApacheIcebergSettingsString rest_catalog_uri;
+extern const DatabaseApacheIcebergSettingsBool rest_catalog_sigv4_enabled;
+extern const DatabaseApacheIcebergSettingsString rest_catalog_signing_region;
+extern const DatabaseApacheIcebergSettingsString rest_catalog_signing_name;
+}
+
+namespace ErrorCodes
+{
+extern const int BAD_ARGUMENTS;
+extern const int INVALID_SETTING_VALUE;
+extern const int SUPPORT_IS_DISABLED;
+extern const int UNKNOWN_DATABASE;
+extern const int UNSUPPORTED;
+}
+
+namespace
+{
+/// Parse a string, containing at least one dot, into a two substrings:
+/// A.B.C.D.E -> A.B.C.D and E, where
+/// `A.B.C.D` is a table "namespace".
+/// `E` is a table name.
+std::pair parseTableName(const std::string & name)
+{
+ auto pos = name.rfind('.');
+ if (pos == std::string::npos)
+ return {"", name};
+
+ auto table_name = name.substr(pos + 1);
+ auto namespace_name = name.substr(0, name.size() - table_name.size() - 1);
+ return {std::move(namespace_name), std::move(table_name)};
+}
+
+auto & tableCache()
+{
+ static LRUCache table_cache{/*max_size_=*/1000};
+ return table_cache;
+}
+}
+
+DatabaseApacheIceberg::DatabaseApacheIceberg(
+ const std::string & database_name_,
+ const std::string & url_,
+ const DatabaseApacheIcebergSettings & settings_,
+ ASTPtr database_engine_definition_)
+ : IDatabase(database_name_)
+ , url(url_)
+ , settings(settings_)
+ , database_engine_definition(database_engine_definition_)
+ , log(&Poco::Logger::get(fmt::format("DatabaseIceberg({})", database_name)))
+{
+ validateSettings();
+ initCatalog();
+ if (!getCatalog()->existsNamespace(getDatabaseName()))
+ throw DB::Exception(DB::ErrorCodes::UNKNOWN_DATABASE, "Namespace {} does not exist in the catalog", getDatabaseName());
+}
+
+void DatabaseApacheIceberg::initCatalog()
+{
+ switch (settings[DatabaseApacheIcebergSetting::catalog_type].value)
+ {
+ case DB::IcebergCatalogType::REST:
+ {
+ catalog_impl = std::make_shared(
+ settings[DatabaseApacheIcebergSetting::warehouse].value,
+ url,
+ Apache::Iceberg::RestCatalog::Options{
+ .catalog_credential = settings[DatabaseApacheIcebergSetting::catalog_credential].value,
+ .auth_scope = settings[DatabaseApacheIcebergSetting::auth_scope].value,
+ .auth_header = settings[DatabaseApacheIcebergSetting::auth_header],
+ .oauth_server_uri = settings[DatabaseApacheIcebergSetting::oauth_server_uri].value,
+ .enable_sigv4 = settings[DatabaseApacheIcebergSetting::rest_catalog_sigv4_enabled].value,
+ .signing_region = settings[DatabaseApacheIcebergSetting::rest_catalog_signing_region].value,
+ .signing_name = settings[DatabaseApacheIcebergSetting::rest_catalog_signing_name].value,
+ },
+ Context::getGlobalContextInstance());
+
+ break;
+ }
+ }
+}
+
+void DatabaseApacheIceberg::validateSettings()
+{
+ if (settings[DatabaseApacheIcebergSetting::warehouse].value.empty())
+ {
+ throw Exception(
+ ErrorCodes::INVALID_SETTING_VALUE,
+ "`warehouse` setting cannot be empty. "
+ "Please specify 'SETTINGS warehouse=' in the CREATE DATABASE query");
+ }
+}
+
+Apache::Iceberg::CatalogPtr DatabaseApacheIceberg::getCatalog() const
+{
+ return catalog_impl;
+}
+
+std::string DatabaseApacheIceberg::getStorageEndpointForTable(const Apache::Iceberg::TableMetadata & table_metadata) const
+{
+ auto endpoint_from_settings = settings[DatabaseApacheIcebergSetting::storage_endpoint].value;
+ if (!endpoint_from_settings.empty())
+ {
+ return std::filesystem::path(endpoint_from_settings) / table_metadata.getLocation(/* path_only */ true) / "";
+ }
+ else
+ {
+ return std::filesystem::path(table_metadata.getLocation(/* path_only */ false)) / "";
+ }
+}
+
+bool DatabaseApacheIceberg::empty() const
+{
+ return getCatalog()->empty(getDatabaseName());
+}
+
+bool DatabaseApacheIceberg::isTableExist(const String & name, ContextPtr /* context_ */) const
+{
+ const auto [namespace_name, table_name] = parseTableName(name);
+ return getCatalog()->existsTable(namespace_name.empty() ? getDatabaseName() : fmt::format("{}.{}", getDatabaseName(), namespace_name), table_name);
+}
+
+StoragePtr DatabaseApacheIceberg::tryGetTable(const String & name [[maybe_unused]], ContextPtr context_ [[maybe_unused]]) const
+{
+ auto catalog = getCatalog();
+ auto table_metadata = Apache::Iceberg::TableMetadata().withLocation().withSchema();
+
+ const bool with_vended_credentials = settings[DatabaseApacheIcebergSetting::vended_credentials].value;
+ if (with_vended_credentials)
+ table_metadata = table_metadata.withStorageCredentials();
+
+ auto cache_key = fmt::format("{}.{}", getDatabaseName(), name);
+
+ if (!catalog->tryGetTableMetadata(getDatabaseName(), name, table_metadata))
+ {
+ tableCache().remove(cache_key);
+ return nullptr;
+ }
+
+ auto [table, _] = tableCache().getOrSet(cache_key, [&]() {
+ /// Replace Iceberg Catalog endpoint with storage path endpoint of requested table.
+ auto table_endpoint = getStorageEndpointForTable(table_metadata);
+ LOG_TEST(log, "Using table endpoint: {}", table_endpoint);
+
+ auto stream_settings = std::make_shared();
+ stream_settings->changes.setSetting("type", "iceberg");
+ stream_settings->changes.setSetting("iceberg_storage_endpoint", table_endpoint);
+
+ /// We either fetch storage credentials from catalog
+ /// or get storage credentials from database engine arguments
+ /// in CREATE query (e.g. in `args`).
+ /// Vended credentials can be disabled in catalog itself,
+ /// so we have a separate setting to know whether we should even try to fetch them.
+ if (with_vended_credentials)
+ {
+ auto storage_credentials = table_metadata.getStorageCredentials();
+ if (storage_credentials)
+ storage_credentials->addCredentialsToSettings(*stream_settings);
+ }
+
+ ASTStorage stream_storage;
+ stream_storage.set(stream_storage.settings, stream_settings);
+
+ return StorageExternalStream::create(
+ /*engine_args=*/ASTs{},
+ /*table_id_=*/StorageID{getDatabaseName(), name},
+ context_,
+ ColumnsDescription(table_metadata.getSchema()),
+ /*comment=*/"",
+ &stream_storage,
+ /*attach=*/false);
+ });
+
+ return table;
+}
+
+DatabaseTablesIteratorPtr
+DatabaseApacheIceberg::getTablesIterator(ContextPtr context_, const FilterByNameFunction & filter_by_table_name) const
+{
+ Tables tables;
+ auto catalog = getCatalog();
+ const auto iceberg_tables = catalog->getTables(getDatabaseName());
+
+ for (const auto & table_name : iceberg_tables)
+ {
+ if (filter_by_table_name && !filter_by_table_name(table_name))
+ continue;
+
+ auto storage = tryGetTable(table_name, context_);
+ [[maybe_unused]] bool inserted = tables.emplace(table_name, storage).second;
+ chassert(inserted);
+ }
+
+ return std::make_unique(std::move(tables), getDatabaseName());
+}
+
+ASTPtr DatabaseApacheIceberg::getCreateDatabaseQuery() const
+{
+ auto create_query = std::make_shared();
+ create_query->setDatabase(getDatabaseName());
+ create_query->set(create_query->storage, database_engine_definition);
+ return create_query;
+}
+
+namespace
+{
+
+void validateCreate(const ASTPtr & query)
+{
+ auto create = std::dynamic_pointer_cast(query);
+ if (create->isView())
+ throw Exception(ErrorCodes::UNSUPPORTED, "Views are not supported in Iceberg database");
+
+ if (create->is_dictionary)
+ throw Exception(ErrorCodes::UNSUPPORTED, "Dictionary is not supported in Iceberg database");
+
+ if (create->is_external)
+ {
+ if (create->storage == nullptr || create->storage->settings == nullptr
+ || create->storage->settings->changes.tryGet("type") == nullptr
+ || create->storage->settings->changes.tryGet("type")->get() != "iceberg")
+ throw Exception(ErrorCodes::UNSUPPORTED, "External streams are not supported in Iceberg database");
+ }
+
+ if (create->is_random)
+ throw Exception(ErrorCodes::UNSUPPORTED, "Random stream is not supported in Iceberg database");
+
+ if (create->is_virtual)
+ throw Exception(ErrorCodes::UNSUPPORTED, "Virtual stream is not supported in Iceberg database");
+}
+
+}
+
+void DatabaseApacheIceberg::createTable(ContextPtr /*context*/, const String & name, const StoragePtr & table, const ASTPtr & query)
+{
+ validateCreate(query);
+
+ auto endpoint = settings[DatabaseApacheIcebergSetting::storage_endpoint].value;
+ auto endpoint_uri = Poco::URI(endpoint);
+ /// The table location requires a S3 URI (starts with 's3://')
+ if (endpoint_uri.getScheme().starts_with("http"))
+ {
+ std::vector parts;
+ /// https://Bucket.s3.Region.amazonaws.com
+ parts.reserve(5);
+ splitInto<'.'>(parts, endpoint_uri.getHost());
+ endpoint = "s3://" + parts[0] + "/";
+ }
+
+ getCatalog()->createTable(
+ getDatabaseName(),
+ name,
+ endpoint + (endpoint.ends_with('/') ? "" : "/") + name,
+ table->getInMemoryMetadata().getColumns().getAllPhysical(),
+ std::nullopt,
+ std::nullopt,
+ /*stage_create=*/false,
+ {});
+}
+
+void DatabaseApacheIceberg::dropTable( ContextPtr /*context*/, const String & name, bool /*no_delay*/)
+{
+ getCatalog()->deleteTable(getDatabaseName(), name);
+ tableCache().remove(fmt::format("{}.{}", getDatabaseName(), name));
+}
+
+ASTPtr DatabaseApacheIceberg::getCreateTableQueryImpl(const String & name, ContextPtr /* context_ */, bool /* throw_on_error */) const
+{
+ auto table_metadata = Apache::Iceberg::TableMetadata().withLocation().withSchema();
+
+ const auto [namespace_name, table_name] = parseTableName(name);
+ getCatalog()->getTableMetadata(namespace_name.empty() ? getDatabaseName() : fmt::format("{}.{}", getDatabaseName(), namespace_name), table_name, table_metadata);
+
+ auto create_table_query = std::make_shared();
+
+ auto columns_declare_list = std::make_shared();
+ auto columns_expression_list = std::make_shared();
+
+ columns_declare_list->set(columns_declare_list->columns, columns_expression_list);
+ create_table_query->set(create_table_query->columns_list, columns_declare_list);
+
+ create_table_query->setTable(name);
+ create_table_query->setDatabase(getDatabaseName());
+
+ for (const auto & column_type_and_name : table_metadata.getSchema())
+ {
+ const auto column_declaration = std::make_shared();
+ column_declaration->name = column_type_and_name.name;
+ column_declaration->type = makeASTDataType(column_type_and_name.type->getName());
+ columns_expression_list->children.emplace_back(column_declaration);
+ }
+
+ return create_table_query;
+}
+
+bool DatabaseApacheIceberg::configureTableEngine(ASTCreateQuery & create) const
+{
+ if (create.storage == nullptr)
+ create.set(create.storage, std::make_shared());
+
+ create.is_external = true;
+ if (create.storage->settings == nullptr)
+ create.storage->set(create.storage->settings, std::make_shared());
+
+ create.storage->settings->changes.setSetting("type", "iceberg");
+
+ auto endpoint_from_settings = settings[DatabaseApacheIcebergSetting::storage_endpoint].value;
+ create.storage->settings->changes.setSetting("iceberg_storage_endpoint", endpoint_from_settings);
+
+ return true;
+}
+
+}
+
+#endif
diff --git a/src/Databases/ApacheIceberg/DatabaseIceberg.h b/src/Databases/ApacheIceberg/DatabaseIceberg.h
new file mode 100644
index 00000000000..b7a84a492fb
--- /dev/null
+++ b/src/Databases/ApacheIceberg/DatabaseIceberg.h
@@ -0,0 +1,72 @@
+#pragma once
+
+#include "config.h"
+
+#if USE_AVRO
+#include
+#include
+#include
+#include
+
+namespace DB
+{
+
+class DatabaseApacheIceberg final : public IDatabase, WithContext
+{
+public:
+ explicit DatabaseApacheIceberg(
+ const std::string & database_name_,
+ const std::string & url_,
+ const DatabaseApacheIcebergSettings & settings_,
+ ASTPtr database_engine_definition_);
+
+ String getEngineName() const override { return "Iceberg"; }
+ bool configureTableEngine(ASTCreateQuery &) const override;
+
+ bool canContainMergeTreeTables() const override { return false; }
+ bool canContainDistributedTables() const override { return false; }
+ bool shouldBeEmptyOnDetach() const override { return false; }
+
+ bool empty() const override;
+
+ bool isTableExist(const String & name, ContextPtr context) const override;
+ StoragePtr tryGetTable(const String & name, ContextPtr context) const override;
+
+ DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
+
+ void shutdown() override { }
+
+ ASTPtr getCreateDatabaseQuery() const override;
+
+ void createTable(ContextPtr /*context*/, const String & /*name*/, const StoragePtr & /*table*/, const ASTPtr & /*query*/) override;
+
+ void dropTable( ContextPtr /*context*/, const String & /*name*/, [[maybe_unused]] bool no_delay) override;
+
+ /// FIXME this should not be a public function, just a temporary trick.
+ Apache::Iceberg::CatalogPtr getCatalog() const;
+
+protected:
+ ASTPtr getCreateTableQueryImpl(const String & table_name, ContextPtr context, bool throw_on_error) const override;
+
+private:
+ void validateSettings();
+ void initCatalog();
+ /*std::shared_ptr getConfiguration(DatabaseIcebergStorageType type) const;*/
+ std::string getStorageEndpointForTable(const Apache::Iceberg::TableMetadata & table_metadata) const;
+
+ /// Iceberg Catalog url.
+ const std::string url;
+ /// SETTINGS from CREATE query.
+ const DatabaseApacheIcebergSettings settings;
+ /// Database engine definition taken from initial CREATE DATABASE query.
+ const ASTPtr database_engine_definition;
+ /// Crendetials to authenticate Iceberg Catalog.
+ Poco::Net::HTTPBasicCredentials credentials;
+
+ mutable Apache::Iceberg::CatalogPtr catalog_impl;
+
+ const Poco::Logger * log;
+};
+
+}
+#endif
diff --git a/src/Databases/ApacheIceberg/DatabaseIcebergSettings.cpp b/src/Databases/ApacheIceberg/DatabaseIcebergSettings.cpp
new file mode 100644
index 00000000000..1bf9e8667ed
--- /dev/null
+++ b/src/Databases/ApacheIceberg/DatabaseIcebergSettings.cpp
@@ -0,0 +1,93 @@
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+extern const int UNKNOWN_SETTING;
+}
+
+#define DATABASE_ICEBERG_RELATED_SETTINGS(DECLARE) \
+ DECLARE(IcebergCatalogType, catalog_type, IcebergCatalogType::REST, "Catalog type", 0) \
+ DECLARE(String, catalog_credential, "", "", 0) \
+ DECLARE(Bool, vended_credentials, false, "Use vended credentials (storage credentials) from catalog", 0) \
+ DECLARE(String, auth_scope, "PRINCIPAL_ROLE:ALL", "Authorization scope for client credentials or token exchange", 0) \
+ DECLARE(String, oauth_server_uri, "", "OAuth server uri", 0) \
+ DECLARE(String, warehouse, "", "Warehouse name inside the catalog", 0) \
+ DECLARE(String, auth_header, "", "Authorization header of format 'Authorization: '", 0) \
+ DECLARE(String, storage_endpoint, "", "Storage endpoint", 0) \
+ DECLARE(String, rest_catalog_uri, "", "URI identifying the REST catalog server", 0) \
+ DECLARE( \
+ Bool, rest_catalog_sigv4_enabled, false, "Set to true to sign requests to the REST catalog server using AWS SigV4 protocol", 0) \
+ DECLARE(String, rest_catalog_signing_region, "", "The region to use when SigV4 signing a request", 0) \
+ DECLARE(String, rest_catalog_signing_name, "", "The service signing name to use when SigV4 signing a request", 0)
+
+#define LIST_OF_DATABASE_ICEBERG_SETTINGS(M) DATABASE_ICEBERG_RELATED_SETTINGS(M)
+
+DECLARE_SETTINGS_TRAITS(DatabaseApacheIcebergSettingsTraits, LIST_OF_DATABASE_ICEBERG_SETTINGS)
+IMPLEMENT_SETTINGS_TRAITS(DatabaseApacheIcebergSettingsTraits, LIST_OF_DATABASE_ICEBERG_SETTINGS)
+
+struct DatabaseApacheIcebergSettingsImpl : public BaseSettings
+{
+};
+
+#define INITIALIZE_SETTING_EXTERN(TYPE, NAME, DEFAULT, DESCRIPTION, FLAGS) \
+ DatabaseApacheIcebergSettings##TYPE NAME = &DatabaseApacheIcebergSettingsImpl ::NAME;
+
+namespace DatabaseApacheIcebergSetting
+{
+LIST_OF_DATABASE_ICEBERG_SETTINGS(INITIALIZE_SETTING_EXTERN)
+}
+
+#undef INITIALIZE_SETTING_EXTERN
+
+DatabaseApacheIcebergSettings::DatabaseApacheIcebergSettings() : impl(std::make_unique())
+{
+}
+
+DatabaseApacheIcebergSettings::DatabaseApacheIcebergSettings(const DatabaseApacheIcebergSettings & settings)
+ : impl(std::make_unique(*settings.impl))
+{
+}
+
+DatabaseApacheIcebergSettings::DatabaseApacheIcebergSettings(DatabaseApacheIcebergSettings && settings) noexcept
+ : impl(std::make_unique(std::move(*settings.impl)))
+{
+}
+
+DatabaseApacheIcebergSettings::~DatabaseApacheIcebergSettings() = default;
+
+DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseApacheIcebergSettings, IMPLEMENT_SETTING_SUBSCRIPT_OPERATOR)
+
+
+void DatabaseApacheIcebergSettings::applyChanges(const SettingsChanges & changes)
+{
+ impl->applyChanges(changes);
+}
+
+void DatabaseApacheIcebergSettings::loadFromQuery(const ASTStorage & storage_def)
+{
+ if (storage_def.settings != nullptr)
+ {
+ try
+ {
+ impl->applyChanges(storage_def.settings->changes);
+ }
+ catch (Exception & e)
+ {
+ if (e.code() == ErrorCodes::UNKNOWN_SETTING)
+ e.addMessage("for database engine " + storage_def.engine->name);
+ throw;
+ }
+ }
+}
+
+}
diff --git a/src/Databases/ApacheIceberg/DatabaseIcebergSettings.h b/src/Databases/ApacheIceberg/DatabaseIcebergSettings.h
new file mode 100644
index 00000000000..4a3ede24b76
--- /dev/null
+++ b/src/Databases/ApacheIceberg/DatabaseIcebergSettings.h
@@ -0,0 +1,38 @@
+#pragma once
+
+#include
+#include
+#include
+
+namespace DB
+{
+class ASTStorage;
+struct DatabaseApacheIcebergSettingsImpl;
+class SettingsChanges;
+
+/// List of available types supported in DatabaseIcebergSettings object
+#define DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M) \
+ M(CLASS_NAME, String) \
+ M(CLASS_NAME, UInt64) \
+ M(CLASS_NAME, Bool) \
+ M(CLASS_NAME, IcebergCatalogType)
+
+DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseApacheIcebergSettings, DECLARE_SETTING_TRAIT)
+
+struct DatabaseApacheIcebergSettings
+{
+ DatabaseApacheIcebergSettings();
+ DatabaseApacheIcebergSettings(const DatabaseApacheIcebergSettings & settings);
+ DatabaseApacheIcebergSettings(DatabaseApacheIcebergSettings && settings) noexcept;
+ ~DatabaseApacheIcebergSettings();
+
+ DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseApacheIcebergSettings, DECLARE_SETTING_SUBSCRIPT_OPERATOR)
+
+ void loadFromQuery(const ASTStorage & storage_def);
+
+ void applyChanges(const SettingsChanges & changes);
+
+private:
+ std::unique_ptr impl;
+};
+}
diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp
index 12cc150c53a..9fe591f6596 100644
--- a/src/Databases/DatabaseFactory.cpp
+++ b/src/Databases/DatabaseFactory.cpp
@@ -1,6 +1,8 @@
#include
#include
+#include
+#include
#include
#include
#include
@@ -83,13 +85,13 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
static const std::unordered_set database_engines{"Ordinary", "Atomic", "Memory",
"Dictionary", "Lazy", "Replicated", "MySQL",
- "PostgreSQL", "MaterializedPostgreSQL", "SQLite"};
+ "PostgreSQL", "MaterializedPostgreSQL", "SQLite", "Iceberg"}; /// proton: updated
if (!database_engines.contains(engine_name))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine name `{}` does not exist", engine_name);
static const std::unordered_set engines_with_arguments{"MySQL",
- "Lazy", "Replicated", "PostgreSQL", "MaterializedPostgreSQL", "SQLite"};
+ "Lazy", "Replicated", "PostgreSQL", "MaterializedPostgreSQL", "SQLite", "Iceberg"}; /// proton: updated
static const std::unordered_set engines_with_table_overrides{"MaterializedPostgreSQL"};
bool engine_may_have_arguments = engines_with_arguments.contains(engine_name);
@@ -100,7 +102,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
bool has_unexpected_element = engine_define->engine->parameters || engine_define->partition_by ||
engine_define->primary_key || engine_define->order_by ||
engine_define->sample_by;
- bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated" || engine_name == "MaterializedPostgreSQL";
+ bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated" || engine_name == "MaterializedPostgreSQL" || engine_name == "Iceberg";
if (has_unexpected_element || (!may_have_settings && engine_define->settings))
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_AST,
@@ -131,6 +133,22 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
const auto cache_expiration_time_seconds = safeGetLiteralValue(arguments[0], "Lazy");
return std::make_shared(database_name, metadata_path, cache_expiration_time_seconds, context);
}
+ else if (engine_name == "Iceberg")
+ {
+ ASTs & engine_args = engine_define->engine->arguments->children;
+ if (engine_args.empty())
+ throw Exception(ErrorCodes::BAD_ARGUMENTS, "Iceberg database require url argument");
+
+ for (auto & engine_arg : engine_args)
+ engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context);
+
+ const auto url = safeGetLiteralValue(engine_args[0], "Iceberg");
+
+ DatabaseApacheIcebergSettings database_settings;
+ if (database_engine_define->settings != nullptr)
+ database_settings.loadFromQuery(*database_engine_define);
+ return std::make_shared(database_name, url, database_settings, database_engine_define->clone());
+ }
throw Exception("Unknown database engine: " + engine_name, ErrorCodes::UNKNOWN_DATABASE_ENGINE);
}
diff --git a/src/Databases/IDatabase.h b/src/Databases/IDatabase.h
index 7e9cd9c4ac6..c5b7209b4bd 100644
--- a/src/Databases/IDatabase.h
+++ b/src/Databases/IDatabase.h
@@ -158,6 +158,10 @@ class IDatabase : public std::enable_shared_from_this
virtual void startupTables(ThreadPool & /*thread_pool*/, bool /*force_restore*/, bool /*force_attach*/) {}
+ /// proton: starts
+ virtual bool configureTableEngine(ASTCreateQuery &) const { return false; }
+ /// proton: ends
+
/// Check the existence of the table in memory (attached).
virtual bool isTableExist(const String & name, ContextPtr context) const = 0;
diff --git a/src/Disks/IO/AsynchronousBoundedReadBuffer.cpp b/src/Disks/IO/AsynchronousBoundedReadBuffer.cpp
new file mode 100644
index 00000000000..1371974ec71
--- /dev/null
+++ b/src/Disks/IO/AsynchronousBoundedReadBuffer.cpp
@@ -0,0 +1,355 @@
+#include "AsynchronousBoundedReadBuffer.h"
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+
+namespace CurrentMetrics
+{
+ extern const Metric AsynchronousReadWait;
+}
+
+namespace ProfileEvents
+{
+ extern const Event AsynchronousRemoteReadWaitMicroseconds;
+ extern const Event SynchronousRemoteReadWaitMicroseconds;
+ extern const Event RemoteFSSeeks;
+ extern const Event RemoteFSPrefetches;
+ extern const Event RemoteFSCancelledPrefetches;
+ extern const Event RemoteFSUnusedPrefetches;
+ extern const Event RemoteFSPrefetchedReads;
+ extern const Event RemoteFSUnprefetchedReads;
+ extern const Event RemoteFSPrefetchedBytes;
+ extern const Event RemoteFSUnprefetchedBytes;
+ extern const Event RemoteFSLazySeeks;
+ extern const Event RemoteFSSeeksWithReset;
+ extern const Event RemoteFSBuffers;
+}
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int LOGICAL_ERROR;
+ extern const int ARGUMENT_OUT_OF_BOUND;
+}
+
+static size_t chooseBufferSize(size_t file_size)
+{
+ /// Buffers used for prefetch or pre-download better to have enough size, but not bigger than the whole file.
+ return std::min(DBMS_DEFAULT_BUFFER_SIZE, file_size);
+}
+
+AsynchronousBoundedReadBuffer::AsynchronousBoundedReadBuffer(
+ ImplPtr impl_,
+ IAsynchronousReader & reader_,
+ const ReadSettings & settings_,
+ AsyncReadCountersPtr async_read_counters_,
+ FilesystemReadPrefetchesLogPtr prefetches_log_)
+ : ReadBufferFromFileBase(chooseBufferSize(impl_->getFileSize()), nullptr, 0)
+ , impl(std::move(impl_))
+ , read_settings(settings_)
+ , reader(reader_)
+ , prefetch_buffer(chooseBufferSize(impl->getFileSize()))
+ , query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "")
+ , current_reader_id(getRandomASCIIString(8))
+ , log(&Poco::Logger::get("AsynchronousBoundedReadBuffer"))
+ , async_read_counters(async_read_counters_)
+ , prefetches_log(prefetches_log_)
+{
+ ProfileEvents::increment(ProfileEvents::RemoteFSBuffers);
+}
+
+bool AsynchronousBoundedReadBuffer::hasPendingDataToRead()
+{
+ if (read_until_position)
+ {
+ if (file_offset_of_buffer_end == *read_until_position) /// Everything is already read.
+ return false;
+
+ if (file_offset_of_buffer_end > *read_until_position)
+ {
+ throw Exception(
+ ErrorCodes::LOGICAL_ERROR,
+ "Read beyond last offset ({} > {}, info: {})",
+ file_offset_of_buffer_end, *read_until_position, impl->getInfoForLog());
+ }
+ }
+
+ return true;
+}
+
+std::future
+AsynchronousBoundedReadBuffer::asyncReadInto(char * data, size_t size, Priority priority)
+{
+ IAsynchronousReader::Request request;
+ request.descriptor = std::make_shared(*impl, async_read_counters);
+ request.buf = data;
+ request.size = size;
+ request.offset = file_offset_of_buffer_end;
+ request.priority = Priority{static_cast(read_settings.priority + priority.value)};
+ request.ignore = bytes_to_ignore;
+ return reader.submit(request);
+}
+
+void AsynchronousBoundedReadBuffer::prefetch()
+{
+ if (prefetch_future.valid())
+ return;
+
+ if (!hasPendingDataToRead())
+ return;
+
+ last_prefetch_info.submit_time = std::chrono::system_clock::now();
+
+ chassert(prefetch_buffer.size() == chooseBufferSize(impl->getFileSize()));
+ prefetch_future = asyncReadInto(prefetch_buffer.data(), prefetch_buffer.size(), {});
+ ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
+}
+
+void AsynchronousBoundedReadBuffer::setReadUntilPosition(size_t position)
+{
+ if (!read_until_position || position != *read_until_position)
+ {
+ read_until_position = position;
+
+ /// We must wait on future and reset the prefetch here, because otherwise there might be
+ /// a race between reading the data in the threadpool and impl->setReadUntilPosition()
+ /// which reinitializes internal remote read buffer (because if we have a new read range
+ /// then we need a new range request) and in case of reading from cache we need to request
+ /// and hold more file segment ranges from cache.
+ resetPrefetch(FilesystemPrefetchState::CANCELLED_WITH_RANGE_CHANGE);
+ impl->setReadUntilPosition(*read_until_position);
+ }
+}
+
+void AsynchronousBoundedReadBuffer::appendToPrefetchLog(
+ FilesystemPrefetchState state,
+ int64_t size,
+ const std::unique_ptr & execution_watch)
+{
+ FilesystemReadPrefetchesLogElement elem
+ {
+ .event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
+ .query_id = query_id,
+ .path = impl->getFileName(),
+ .offset = file_offset_of_buffer_end,
+ .size = size,
+ .prefetch_submit_time = last_prefetch_info.submit_time,
+ .execution_watch = execution_watch ? std::optional(*execution_watch) : std::nullopt,
+ .priority = last_prefetch_info.priority,
+ .state = state,
+ .thread_id = getThreadId(),
+ .reader_id = current_reader_id,
+ };
+
+ if (prefetches_log)
+ prefetches_log->add(elem);
+}
+
+
+bool AsynchronousBoundedReadBuffer::nextImpl()
+{
+ if (!hasPendingDataToRead())
+ return false;
+
+ chassert(file_offset_of_buffer_end <= impl->getFileSize());
+
+ size_t size, offset;
+ if (prefetch_future.valid())
+ {
+ ProfileEventTimeIncrement watch(ProfileEvents::AsynchronousRemoteReadWaitMicroseconds);
+ CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait};
+
+ auto result = prefetch_future.get();
+ size = result.size;
+ offset = result.offset;
+
+ prefetch_future = {};
+ prefetch_buffer.swap(memory);
+
+ last_prefetch_info = {};
+
+ ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedReads);
+ ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedBytes, size);
+ }
+ else
+ {
+ ProfileEventTimeIncrement watch(ProfileEvents::SynchronousRemoteReadWaitMicroseconds);
+
+ chassert(memory.size() == chooseBufferSize(impl->getFileSize()));
+ std::tie(size, offset) = impl->readInto(memory.data(), memory.size(), file_offset_of_buffer_end, bytes_to_ignore);
+
+ ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads);
+ ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedBytes, size);
+ }
+
+ bytes_to_ignore = 0;
+
+ chassert(size >= offset);
+
+ size_t bytes_read = size - offset;
+ if (bytes_read != 0u)
+ {
+ /// Adjust the working buffer so that it ignores `offset` bytes.
+ internal_buffer = Buffer(memory.data(), memory.data() + memory.size());
+ working_buffer = Buffer(memory.data() + offset, memory.data() + size);
+ pos = working_buffer.begin();
+ }
+
+ file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd();
+
+ /// In case of multiple files for the same file in clickhouse (i.e. log family)
+ /// file_offset_of_buffer_end will not match getImplementationBufferOffset()
+ /// so we use [impl->getImplementationBufferOffset(), impl->getFileSize()]
+ chassert(file_offset_of_buffer_end >= impl->getFileOffsetOfBufferEnd());
+ chassert(file_offset_of_buffer_end <= impl->getFileSize());
+
+ return bytes_read != 0u;
+}
+
+
+off_t AsynchronousBoundedReadBuffer::seek(off_t offset, int whence)
+{
+ ProfileEvents::increment(ProfileEvents::RemoteFSSeeks);
+
+ size_t new_pos;
+ if (whence == SEEK_SET)
+ {
+ assert(offset >= 0);
+ new_pos = offset;
+ }
+ else if (whence == SEEK_CUR)
+ {
+ new_pos = static_cast(getPosition()) + offset;
+ }
+ else
+ {
+ throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Expected SEEK_SET or SEEK_CUR as whence");
+ }
+
+ /// Position is unchanged.
+ if (new_pos == static_cast(getPosition()))
+ return new_pos;
+
+ bool read_from_prefetch = false;
+ while (true)
+ {
+ /// The first condition implies bytes_to_ignore = 0.
+ if (!working_buffer.empty() && file_offset_of_buffer_end - working_buffer.size() <= new_pos &&
+ new_pos <= file_offset_of_buffer_end)
+ {
+ /// Position is still inside the buffer.
+ /// Probably it is at the end of the buffer - then we will load data on the following 'next' call.
+
+ pos = working_buffer.end() - file_offset_of_buffer_end + new_pos;
+ assert(pos >= working_buffer.begin());
+ assert(pos <= working_buffer.end());
+
+ return new_pos;
+ }
+ else if (prefetch_future.valid())
+ {
+ read_from_prefetch = true;
+
+ /// Read from prefetch buffer and recheck if the new position is valid inside.
+ if (nextImpl())
+ continue;
+ }
+
+ /// Prefetch is cancelled because of seek.
+ if (read_from_prefetch)
+ {
+ ProfileEvents::increment(ProfileEvents::RemoteFSCancelledPrefetches);
+ }
+
+ break;
+ }
+
+ assert(!prefetch_future.valid());
+
+ /// First reset the buffer so the next read will fetch new data to the buffer.
+ resetWorkingBuffer();
+ bytes_to_ignore = 0;
+
+ if (read_until_position && new_pos > *read_until_position)
+ {
+ ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
+ file_offset_of_buffer_end = new_pos = *read_until_position; /// read_until_position is a non-included boundary.
+ impl->seek(file_offset_of_buffer_end, SEEK_SET);
+ return new_pos;
+ }
+
+ /**
+ * Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer.
+ * Note: we read in range [file_offset_of_buffer_end, read_until_position).
+ */
+ if (read_until_position && new_pos < *read_until_position
+ && new_pos > file_offset_of_buffer_end
+ && new_pos < file_offset_of_buffer_end + read_settings.remote_read_min_bytes_for_seek)
+ {
+ ProfileEvents::increment(ProfileEvents::RemoteFSLazySeeks);
+ bytes_to_ignore = new_pos - file_offset_of_buffer_end;
+ }
+ else
+ {
+ ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
+ file_offset_of_buffer_end = new_pos;
+ impl->seek(file_offset_of_buffer_end, SEEK_SET);
+ }
+
+ return new_pos;
+}
+
+
+void AsynchronousBoundedReadBuffer::finalize()
+{
+ resetPrefetch(FilesystemPrefetchState::UNNEEDED);
+}
+
+AsynchronousBoundedReadBuffer::~AsynchronousBoundedReadBuffer()
+{
+ try
+ {
+ finalize();
+ }
+ catch (...)
+ {
+ tryLogCurrentException(__PRETTY_FUNCTION__);
+ }
+}
+
+void AsynchronousBoundedReadBuffer::resetPrefetch(FilesystemPrefetchState state)
+{
+ if (!prefetch_future.valid())
+ return;
+
+ auto [size, offset] = prefetch_future.get();
+ prefetch_future = {};
+ last_prefetch_info = {};
+
+ ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedBytes, size);
+
+ switch (state)
+ {
+ case FilesystemPrefetchState::UNNEEDED:
+ ProfileEvents::increment(ProfileEvents::RemoteFSUnusedPrefetches);
+ break;
+ case FilesystemPrefetchState::CANCELLED_WITH_SEEK:
+ case FilesystemPrefetchState::CANCELLED_WITH_RANGE_CHANGE:
+ ProfileEvents::increment(ProfileEvents::RemoteFSCancelledPrefetches);
+ break;
+ default:
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected state of prefetch: {}", magic_enum::enum_name(state));
+ }
+}
+
+}
diff --git a/src/Disks/IO/AsynchronousBoundedReadBuffer.h b/src/Disks/IO/AsynchronousBoundedReadBuffer.h
new file mode 100644
index 00000000000..a70819d4efa
--- /dev/null
+++ b/src/Disks/IO/AsynchronousBoundedReadBuffer.h
@@ -0,0 +1,94 @@
+#pragma once
+
+#include
+#include
+#include
+#include
+
+#include
+
+namespace DB
+{
+
+struct AsyncReadCounters;
+using AsyncReadCountersPtr = std::shared_ptr;
+class ReadBufferFromRemoteFSGather;
+
+class AsynchronousBoundedReadBuffer : public ReadBufferFromFileBase
+{
+public:
+ using Impl = ReadBufferFromFileBase;
+ using ImplPtr = std::unique_ptr;
+
+ explicit AsynchronousBoundedReadBuffer(
+ ImplPtr impl_,
+ IAsynchronousReader & reader_,
+ const ReadSettings & settings_,
+ AsyncReadCountersPtr async_read_counters_ = nullptr,
+ FilesystemReadPrefetchesLogPtr prefetches_log_ = nullptr);
+
+ ~AsynchronousBoundedReadBuffer() override;
+
+ String getFileName() const override { return impl->getFileName(); }
+
+ size_t getFileSize() override { return impl->getFileSize(); }
+
+ String getInfoForLog() override { return impl->getInfoForLog(); }
+
+ off_t seek(off_t offset_, int whence) override;
+
+ void prefetch() override;
+
+ void setReadUntilPosition(size_t position) override; /// [..., position).
+
+ void setReadUntilEnd() override { return setReadUntilPosition(getFileSize()); }
+
+ off_t getPosition() override { return file_offset_of_buffer_end - available() + bytes_to_ignore; }
+
+private:
+ const ImplPtr impl;
+ const ReadSettings read_settings;
+ IAsynchronousReader & reader;
+
+ size_t file_offset_of_buffer_end = 0;
+ std::optional read_until_position;
+ /// If nonzero then working_buffer is empty.
+ /// If a prefetch is in flight, the prefetch task has been instructed to ignore this many bytes.
+ size_t bytes_to_ignore = 0;
+
+ Memory<> prefetch_buffer;
+ std::future prefetch_future;
+
+ const std::string query_id;
+ const std::string current_reader_id;
+
+ Poco::Logger * log;
+
+ AsyncReadCountersPtr async_read_counters;
+ FilesystemReadPrefetchesLogPtr prefetches_log;
+
+ struct LastPrefetchInfo
+ {
+ std::chrono::system_clock::time_point submit_time;
+ Priority priority;
+ };
+ LastPrefetchInfo last_prefetch_info;
+
+ bool nextImpl() override;
+
+ void finalize();
+
+ bool hasPendingDataToRead();
+
+ void appendToPrefetchLog(
+ FilesystemPrefetchState state,
+ int64_t size,
+ const std::unique_ptr & execution_watch);
+
+ std::future asyncReadInto(char * data, size_t size, Priority priority);
+
+ void resetPrefetch(FilesystemPrefetchState state);
+
+};
+
+}
diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp
index 6e3ebb43e50..b590ba93d84 100644
--- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp
+++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp
@@ -491,7 +491,7 @@ void S3ObjectStorage::copyObjectMultipartImpl(
std::vector part_tags;
- size_t upload_part_size = settings_ptr->request_settings.min_upload_part_size;
+ size_t upload_part_size = settings_ptr->request_settings.getUploadSettings().min_upload_part_size;
for (size_t position = 0, part_number = 1; position < size; ++part_number, position += upload_part_size)
{
ProfileEvents::increment(ProfileEvents::S3UploadPartCopy);
diff --git a/src/Disks/ObjectStorages/S3/diskSettings.cpp b/src/Disks/ObjectStorages/S3/diskSettings.cpp
index 8a3a00929db..c154e64307c 100644
--- a/src/Disks/ObjectStorages/S3/diskSettings.cpp
+++ b/src/Disks/ObjectStorages/S3/diskSettings.cpp
@@ -34,22 +34,7 @@ extern const int BAD_ARGUMENTS;
std::unique_ptr getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context)
{
const Settings & settings = context->getSettingsRef();
- S3Settings::RequestSettings request_settings;
- request_settings.max_single_read_retries = config.getUInt64(config_prefix + ".s3_max_single_read_retries", settings.s3_max_single_read_retries);
- request_settings.min_upload_part_size = config.getUInt64(config_prefix + ".s3_min_upload_part_size", settings.s3_min_upload_part_size);
- request_settings.upload_part_size_multiply_factor = config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_factor", settings.s3_upload_part_size_multiply_factor);
- request_settings.upload_part_size_multiply_parts_count_threshold = config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_parts_count_threshold", settings.s3_upload_part_size_multiply_parts_count_threshold);
- request_settings.max_single_part_upload_size = config.getUInt64(config_prefix + ".s3_max_single_part_upload_size", settings.s3_max_single_part_upload_size);
- request_settings.check_objects_after_upload = config.getUInt64(config_prefix + ".s3_check_objects_after_upload", settings.s3_check_objects_after_upload);
- request_settings.max_unexpected_write_error_retries = config.getUInt64(config_prefix + ".s3_max_unexpected_write_error_retries", settings.s3_max_unexpected_write_error_retries);
-
- // NOTE: it would be better to reuse old throttlers to avoid losing token bucket state on every config reload, which could lead to exceeding limit for short time. But it is good enough unless very high `burst` values are used.
- if (UInt64 max_get_rps = config.getUInt64(config_prefix + ".s3_max_get_rps", settings.s3_max_get_rps))
- request_settings.get_request_throttler = std::make_shared(
- max_get_rps, config.getUInt64(config_prefix + ".s3_max_get_burst", settings.s3_max_get_burst ? settings.s3_max_get_burst : Throttler::default_burst_seconds * max_get_rps));
- if (UInt64 max_put_rps = config.getUInt64(config_prefix + ".s3_max_put_rps", settings.s3_max_put_rps))
- request_settings.put_request_throttler = std::make_shared(
- max_put_rps, config.getUInt64(config_prefix + ".s3_max_put_burst", settings.s3_max_put_burst ? settings.s3_max_put_burst : Throttler::default_burst_seconds * max_put_rps));
+ S3Settings::RequestSettings request_settings(config, config_prefix, settings);
return std::make_unique(
request_settings,
@@ -122,7 +107,7 @@ std::shared_ptr getProxyConfiguration(const String & pre
}
-std::unique_ptr getClient(
+std::unique_ptr getClient(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
@@ -158,6 +143,18 @@ std::unique_ptr getClient(
client_configuration.retryStrategy
= std::make_shared(config.getUInt(config_prefix + ".retry_attempts", 10));
+ /*
+ * const PocoHTTPClientConfiguration & cfg_,
+ bool is_virtual_hosted_style,
+ const String & access_key_id,
+ const String & secret_access_key,
+ const String & server_side_encryption_customer_key_base64,
+ ServerSideEncryptionKMSConfig sse_kms_config,
+ HTTPHeaderEntries headers,
+ CredentialsConfiguration credentials_configuration,
+ const String & session_token)
+
+ */
return S3::ClientFactory::instance().create(
client_configuration,
uri.is_virtual_hosted_style,
@@ -165,8 +162,11 @@ std::unique_ptr getClient(
config.getString(config_prefix + ".secret_access_key", ""),
config.getString(config_prefix + ".server_side_encryption_customer_key_base64", ""),
{},
- config.getBool(config_prefix + ".use_environment_credentials", config.getBool("s3.use_environment_credentials", false)),
- config.getBool(config_prefix + ".use_insecure_imds_request", config.getBool("s3.use_insecure_imds_request", false)));
+ {},
+ {
+ .use_environment_credentials = config.getBool(config_prefix + ".use_environment_credentials", config.getBool("s3.use_environment_credentials", false)),
+ .use_insecure_imds_request = config.getBool(config_prefix + ".use_insecure_imds_request", config.getBool("s3.use_insecure_imds_request", false))
+ });
}
}
diff --git a/src/Disks/ObjectStorages/S3/diskSettings.h b/src/Disks/ObjectStorages/S3/diskSettings.h
index 04eb7aced8e..9baa317643b 100644
--- a/src/Disks/ObjectStorages/S3/diskSettings.h
+++ b/src/Disks/ObjectStorages/S3/diskSettings.h
@@ -4,6 +4,7 @@
#if USE_AWS_S3
+#include
#include
#include
@@ -22,7 +23,7 @@ struct S3ObjectStorageSettings;
std::unique_ptr getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context);
-std::unique_ptr getClient(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context, const S3ObjectStorageSettings & settings);
+std::unique_ptr getClient(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context, const S3ObjectStorageSettings & settings);
}
diff --git a/src/IO/S3/getObjectInfo.cpp b/src/IO/S3/getObjectInfo.cpp
index e848229517a..dfbf42f7d0c 100644
--- a/src/IO/S3/getObjectInfo.cpp
+++ b/src/IO/S3/getObjectInfo.cpp
@@ -24,22 +24,6 @@ namespace DB::S3
namespace
{
-Aws::S3::Model::HeadObjectOutcome headObject(
- const S3::Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
-{
- ProfileEvents::increment(ProfileEvents::S3HeadObject);
- if (for_disk_s3)
- ProfileEvents::increment(ProfileEvents::DiskS3HeadObject);
-
- S3::HeadObjectRequest req;
- req.SetBucket(bucket);
- req.SetKey(key);
-
- if (!version_id.empty())
- req.SetVersionId(version_id);
-
- return client.HeadObject(req);
-}
/// Performs a request to get the size and last modification time of an object.
std::pair, Aws::S3::S3Error> tryGetObjectInfo(
@@ -60,6 +44,24 @@ std::pair, Aws::S3::S3Error> tryGetObjectInfo(
return {object_info, {}};
}
+
+}
+
+Aws::S3::Model::HeadObjectOutcome headObject(
+ const S3::Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
+{
+ ProfileEvents::increment(ProfileEvents::S3HeadObject);
+ if (for_disk_s3)
+ ProfileEvents::increment(ProfileEvents::DiskS3HeadObject);
+
+ S3::HeadObjectRequest req;
+ req.SetBucket(bucket);
+ req.SetKey(key);
+
+ if (!version_id.empty())
+ req.SetVersionId(version_id);
+
+ return client.HeadObject(req);
}
diff --git a/src/IO/S3/getObjectInfo.h b/src/IO/S3/getObjectInfo.h
index a57d807644b..eae94d8e094 100644
--- a/src/IO/S3/getObjectInfo.h
+++ b/src/IO/S3/getObjectInfo.h
@@ -3,9 +3,9 @@
#include "config.h"
#if USE_AWS_S3
-#include
-#include
-#include
+# include
+# include
+# include
namespace DB::S3
@@ -19,6 +19,9 @@ struct ObjectInfo
std::map metadata = {}; /// Set only if getObjectInfo() is called with `with_metadata = true`.
};
+Aws::S3::Model::HeadObjectOutcome
+headObject(const S3::Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3);
+
ObjectInfo getObjectInfo(
const S3::Client & client,
const String & bucket,
diff --git a/src/IO/SeekableReadBuffer.cpp b/src/IO/SeekableReadBuffer.cpp
new file mode 100644
index 00000000000..ea8d4b87b0b
--- /dev/null
+++ b/src/IO/SeekableReadBuffer.cpp
@@ -0,0 +1,63 @@
+#include
+
+
+namespace DB
+{
+
+namespace
+{
+template
+class SeekableReadBufferWrapper : public SeekableReadBuffer
+{
+public:
+ SeekableReadBufferWrapper(SeekableReadBuffer & in_, CustomData && custom_data_)
+ : SeekableReadBuffer(in_.buffer().begin(), in_.buffer().size(), in_.offset())
+ , in(in_)
+ , custom_data(std::move(custom_data_))
+ {
+ }
+
+private:
+ SeekableReadBuffer & in;
+ CustomData custom_data;
+
+ bool nextImpl() override
+ {
+ in.position() = position();
+ if (!in.next())
+ {
+ set(in.position(), 0);
+ return false;
+ }
+ BufferBase::set(in.buffer().begin(), in.buffer().size(), in.offset());
+ return true;
+ }
+
+ off_t seek(off_t off, int whence) override
+ {
+ in.position() = position();
+ off_t new_pos = in.seek(off, whence);
+ BufferBase::set(in.buffer().begin(), in.buffer().size(), in.offset());
+ return new_pos;
+ }
+
+ off_t getPosition() override
+ {
+ in.position() = position();
+ return in.getPosition();
+ }
+};
+}
+
+
+std::unique_ptr wrapSeekableReadBufferReference(SeekableReadBuffer & ref)
+{
+ return std::make_unique>(ref, nullptr);
+}
+
+std::unique_ptr wrapSeekableReadBufferPointer(SeekableReadBufferPtr ptr)
+{
+ return std::make_unique>(*ptr, SeekableReadBufferPtr{ptr});
+}
+
+}
diff --git a/src/IO/SeekableReadBuffer.h b/src/IO/SeekableReadBuffer.h
index b4d5779d423..d038003f923 100644
--- a/src/IO/SeekableReadBuffer.h
+++ b/src/IO/SeekableReadBuffer.h
@@ -67,6 +67,37 @@ class SeekableReadBuffer : public ReadBuffer
virtual bool isIntegratedWithFilesystemCache() const { return false; }
};
+/// Useful for reading in parallel.
+/// The created read buffers may outlive the factory.
+///
+/// There are 2 ways to use this:
+/// (1) Never call seek() or getFileSize(), read the file sequentially.
+/// For HTTP, this usually translates to just one HTTP request.
+/// (2) Call checkIfActuallySeekable(), then:
+/// a. If it returned false, go to (1). seek() and getFileSize() are not available (throw if called).
+/// b. If it returned true, seek() and getFileSize() are available, knock yourself out.
+/// For HTTP, checkIfActuallySeekable() sends a HEAD request and returns false if the web server
+/// doesn't support ranges (or doesn't support HEAD requests).
+class SeekableReadBufferFactory : public WithFileSize
+{
+public:
+ ~SeekableReadBufferFactory() override = default;
+
+ // We usually call setReadUntilPosition() and seek() on the returned buffer before reading.
+ // So it's recommended that the returned implementation be lazy, i.e. don't start reading
+ // before the first call to nextImpl().
+ virtual std::unique_ptr getReader() = 0;
+
+ virtual bool checkIfActuallySeekable() { return true; }
+};
+
using SeekableReadBufferPtr = std::shared_ptr;
+using SeekableReadBufferFactoryPtr = std::unique_ptr;
+
+/// Wraps a reference to a SeekableReadBuffer into an unique pointer to SeekableReadBuffer.
+/// This function is like wrapReadBufferReference() but for SeekableReadBuffer.
+std::unique_ptr wrapSeekableReadBufferReference(SeekableReadBuffer & ref);
+std::unique_ptr wrapSeekableReadBufferPointer(SeekableReadBufferPtr ptr);
+
}
diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp
index a0f381ed2bf..e443dfff33b 100644
--- a/src/IO/WriteBufferFromS3.cpp
+++ b/src/IO/WriteBufferFromS3.cpp
@@ -7,6 +7,7 @@
#include
#include
+#include
#include
#include
#include
@@ -71,7 +72,7 @@ WriteBufferFromS3::WriteBufferFromS3(
std::shared_ptr client_ptr_,
const String & bucket_,
const String & key_,
- const S3Settings::RequestSettings & request_settings_,
+ const S3Settings::RequestSettings & request_settings,
std::optional> object_metadata_,
size_t buffer_size_,
ThreadPoolCallbackRunner schedule_,
@@ -79,10 +80,12 @@ WriteBufferFromS3::WriteBufferFromS3(
: BufferWithOwnMemory(buffer_size_, nullptr, 0)
, bucket(bucket_)
, key(key_)
- , request_settings(request_settings_)
+ , settings(request_settings.getUploadSettings())
+ , check_objects_after_upload(request_settings.check_objects_after_upload)
+ , max_unexpected_write_error_retries(request_settings.max_unexpected_write_error_retries)
, client_ptr(std::move(client_ptr_))
, object_metadata(std::move(object_metadata_))
- , upload_part_size(request_settings_.min_upload_part_size)
+ , upload_part_size(settings.min_upload_part_size)
, schedule(std::move(schedule_))
, write_settings(write_settings_)
{
@@ -91,7 +94,7 @@ WriteBufferFromS3::WriteBufferFromS3(
void WriteBufferFromS3::nextImpl()
{
- if (!offset())
+ if (offset() == 0u)
return;
/// Buffer in a bad state after exception
@@ -111,7 +114,7 @@ void WriteBufferFromS3::nextImpl()
write_settings.remote_throttler->add(offset());
/// Data size exceeds singlepart upload threshold, need to use multipart upload.
- if (multipart_upload_id.empty() && last_part_size > request_settings.max_single_part_upload_size)
+ if (multipart_upload_id.empty() && last_part_size > settings.max_single_part_upload_size)
createMultipartUpload();
if (!multipart_upload_id.empty() && last_part_size > upload_part_size)
@@ -176,7 +179,7 @@ void WriteBufferFromS3::finalizeImpl()
if (!multipart_upload_id.empty())
completeMultipartUpload();
- if (request_settings.check_objects_after_upload)
+ if (check_objects_after_upload)
{
LOG_TRACE(log, "Checking object {} exists after upload", key);
@@ -302,6 +305,20 @@ void WriteBufferFromS3::writePart()
void WriteBufferFromS3::fillUploadRequest(Aws::S3::Model::UploadPartRequest & req, int part_number)
{
+ /// Increase part number.
+ ++part_number;
+ if (!multipart_upload_id.empty() && (part_number > settings.max_part_number))
+ {
+ throw Exception(
+ ErrorCodes::INVALID_CONFIG_PARAMETER,
+ "Part number exceeded {} while writing {} bytes to S3. Check min_upload_part_size = {}, max_upload_part_size = {}, "
+ "upload_part_size_multiply_factor = {}, upload_part_size_multiply_parts_count_threshold = {}, max_single_part_upload_size = {}",
+ settings.max_part_number, count(), settings.min_upload_part_size, settings.max_upload_part_size,
+ settings.upload_part_size_multiply_factor, settings.upload_part_size_multiply_parts_count_threshold,
+ settings.max_single_part_upload_size);
+ }
+
+ /// Setup request.
req.SetBucket(bucket);
req.SetKey(key);
req.SetPartNumber(part_number);
@@ -311,6 +328,13 @@ void WriteBufferFromS3::fillUploadRequest(Aws::S3::Model::UploadPartRequest & re
/// If we don't do it, AWS SDK can mistakenly set it to application/xml, see https://github.com/aws/aws-sdk-cpp/issues/1840
req.SetContentType("binary/octet-stream");
+
+ /// Maybe increase `upload_part_size` (we need to increase it sometimes to keep `part_number` less or equal than `max_part_number`).
+ if (!multipart_upload_id.empty() && (part_number % settings.upload_part_size_multiply_parts_count_threshold == 0))
+ {
+ upload_part_size *= settings.upload_part_size_multiply_factor;
+ upload_part_size = std::min(upload_part_size, settings.max_upload_part_size);
+ }
}
void WriteBufferFromS3::processUploadRequest(UploadPartTask & task)
@@ -356,7 +380,7 @@ void WriteBufferFromS3::completeMultipartUpload()
req.SetMultipartUpload(multipart_upload);
- size_t max_retry = std::max(request_settings.max_unexpected_write_error_retries, 1UL);
+ size_t max_retry = std::max(max_unexpected_write_error_retries, 1UL);
for (size_t i = 0; i < max_retry; ++i)
{
ProfileEvents::increment(ProfileEvents::S3CompleteMultipartUpload);
@@ -462,7 +486,7 @@ void WriteBufferFromS3::fillPutRequest(Aws::S3::Model::PutObjectRequest & req)
void WriteBufferFromS3::processPutRequest(const PutObjectTask & task)
{
- size_t max_retry = std::max(request_settings.max_unexpected_write_error_retries, 1UL);
+ size_t max_retry = std::max(max_unexpected_write_error_retries, 1UL);
for (size_t i = 0; i < max_retry; ++i)
{
ProfileEvents::increment(ProfileEvents::S3PutObject);
diff --git a/src/IO/WriteBufferFromS3.h b/src/IO/WriteBufferFromS3.h
index ebc75dedd09..4b5272b3a07 100644
--- a/src/IO/WriteBufferFromS3.h
+++ b/src/IO/WriteBufferFromS3.h
@@ -50,7 +50,7 @@ class WriteBufferFromS3 final : public BufferWithOwnMemory
std::shared_ptr client_ptr_,
const String & bucket_,
const String & key_,
- const S3Settings::RequestSettings & request_settings_,
+ const S3Settings::RequestSettings & request_settings,
std::optional> object_metadata_ = std::nullopt,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
ThreadPoolCallbackRunner schedule_ = {},
@@ -88,7 +88,9 @@ class WriteBufferFromS3 final : public BufferWithOwnMemory
const String bucket;
const String key;
- const S3Settings::RequestSettings request_settings;
+ const S3Settings::RequestSettings::PartUploadSettings settings;
+ const bool check_objects_after_upload = false;
+ const size_t max_unexpected_write_error_retries = 4;
const std::shared_ptr client_ptr;
const std::optional> object_metadata;
diff --git a/src/Interpreters/FilesystemReadPrefetchesLog.cpp b/src/Interpreters/FilesystemReadPrefetchesLog.cpp
new file mode 100644
index 00000000000..d1cc61b94ba
--- /dev/null
+++ b/src/Interpreters/FilesystemReadPrefetchesLog.cpp
@@ -0,0 +1,61 @@
+#include
+#include
+#include
+#include
+#include
+#include
+
+
+namespace DB
+{
+
+NamesAndTypesList FilesystemReadPrefetchesLogElement::getNamesAndTypes()
+{
+ return {
+ {"event_date", std::make_shared()},
+ {"event_time", std::make_shared()},
+ {"query_id", std::make_shared()},
+ {"path", std::make_shared()},
+ {"offset", std::make_shared()},
+ {"size", std::make_shared()},
+ {"prefetch_submit_time", std::make_shared(6)},
+ {"priority", std::make_shared()},
+ {"prefetch_execution_start_time", std::make_shared(6)},
+ {"prefetch_execution_end_time", std::make_shared(6)},
+ {"prefetch_execution_time_us", std::make_shared()},
+ {"state", std::make_shared()}, /// Was this prefetch used or we downloaded it in vain?
+ {"thread_id", std::make_shared()},
+ {"reader_id", std::make_shared()},
+ };
+}
+
+void FilesystemReadPrefetchesLogElement::appendToBlock(MutableColumns & columns) const
+{
+ size_t i = 0;
+
+ columns[i++]->insert(DateLUT::instance().toDayNum(event_time).toUnderType());
+ columns[i++]->insert(event_time);
+ columns[i++]->insert(query_id);
+ columns[i++]->insert(path);
+ columns[i++]->insert(offset);
+ columns[i++]->insert(size);
+ columns[i++]->insert(std::chrono::duration_cast(prefetch_submit_time.time_since_epoch()).count());
+ columns[i++]->insert(priority.value);
+ if (execution_watch)
+ {
+ columns[i++]->insert(execution_watch->getStart() / 1000);
+ columns[i++]->insert(execution_watch->getEnd() / 1000);
+ columns[i++]->insert(execution_watch->elapsedMicroseconds());
+ }
+ else
+ {
+ columns[i++]->insertDefault();
+ columns[i++]->insertDefault();
+ columns[i++]->insertDefault();
+ }
+ columns[i++]->insert(magic_enum::enum_name(state));
+ columns[i++]->insert(thread_id);
+ columns[i++]->insert(reader_id);
+}
+
+}
diff --git a/src/Interpreters/FilesystemReadPrefetchesLog.h b/src/Interpreters/FilesystemReadPrefetchesLog.h
new file mode 100644
index 00000000000..313c8ab5872
--- /dev/null
+++ b/src/Interpreters/FilesystemReadPrefetchesLog.h
@@ -0,0 +1,51 @@
+#pragma once
+
+#include
+#include
+#include
+#include
+#include
+
+namespace DB
+{
+
+enum class FilesystemPrefetchState
+{
+ USED,
+ CANCELLED_WITH_SEEK,
+ CANCELLED_WITH_RANGE_CHANGE,
+ UNNEEDED,
+};
+
+struct FilesystemReadPrefetchesLogElement
+{
+ time_t event_time{};
+ String query_id;
+ String path;
+ UInt64 offset;
+ Int64 size; /// -1 means unknown
+ std::chrono::system_clock::time_point prefetch_submit_time;
+ std::optional execution_watch;
+ Priority priority;
+ FilesystemPrefetchState state;
+ UInt64 thread_id;
+ String reader_id;
+
+ static std::string name() { return "FilesystemReadPrefetchesLog"; }
+
+ static NamesAndTypesList getNamesAndTypes();
+ static NamesAndAliases getNamesAndAliases() { return {}; }
+
+ void appendToBlock(MutableColumns & columns) const;
+ static const char * getCustomColumnList() { return nullptr; }
+};
+
+class FilesystemReadPrefetchesLog : public SystemLog
+{
+public:
+ using SystemLog::SystemLog;
+};
+
+using FilesystemReadPrefetchesLogPtr = std::shared_ptr;
+
+}
diff --git a/src/Parsers/ASTDataType.cpp b/src/Parsers/ASTDataType.cpp
new file mode 100644
index 00000000000..a0e4cd723ac
--- /dev/null
+++ b/src/Parsers/ASTDataType.cpp
@@ -0,0 +1,67 @@
+#include
+#include
+#include
+
+
+namespace DB
+{
+
+String ASTDataType::getID(char delim) const
+{
+ return "DataType" + (delim + name);
+}
+
+ASTPtr ASTDataType::clone() const
+{
+ auto res = std::make_shared(*this);
+ res->children.clear();
+
+ if (arguments)
+ {
+ res->arguments = arguments->clone();
+ res->children.push_back(res->arguments);
+ }
+
+ return res;
+}
+
+void ASTDataType::updateTreeHashImpl(SipHash & hash_state) const
+{
+ hash_state.update(name.size());
+ hash_state.update(name);
+ /// Children are hashed automatically.
+}
+
+void ASTDataType::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
+{
+ settings.ostr << (settings.hilite ? hilite_function : "") << name;
+
+ if (arguments && !arguments->children.empty())
+ {
+ settings.ostr << '(' << (settings.hilite ? hilite_none : "");
+
+ if (!settings.one_line && settings.print_pretty_type_names && name == "Tuple")
+ {
+ ++frame.indent;
+ std::string indent_str = settings.one_line ? "" : "\n" + std::string(4 * frame.indent, ' ');
+ for (size_t i = 0, size = arguments->children.size(); i < size; ++i)
+ {
+ if (i != 0)
+ settings.ostr << ',';
+ settings.ostr << indent_str;
+ arguments->children[i]->formatImpl(settings, state, frame);
+ }
+ }
+ else
+ {
+ frame.expression_list_prepend_whitespace = false;
+ arguments->formatImpl(settings, state, frame);
+ }
+
+ settings.ostr << (settings.hilite ? hilite_function : "") << ')';
+ }
+
+ settings.ostr << (settings.hilite ? hilite_none : "");
+}
+
+}
diff --git a/src/Parsers/ASTDataType.h b/src/Parsers/ASTDataType.h
new file mode 100644
index 00000000000..f8fc4e16a0d
--- /dev/null
+++ b/src/Parsers/ASTDataType.h
@@ -0,0 +1,40 @@
+#pragma once
+
+#include
+
+
+namespace DB
+{
+
+/// AST for data types, e.g. UInt8 or Tuple(x UInt8, y Enum(a = 1))
+class ASTDataType : public IAST
+{
+public:
+ String name;
+ ASTPtr arguments;
+
+ String getID(char delim) const override;
+ ASTPtr clone() const override;
+ void updateTreeHashImpl(SipHash & hash_state) const override;
+
+protected:
+ void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
+};
+
+template
+std::shared_ptr makeASTDataType(const String & name, Args &&... args)
+{
+ auto data_type = std::make_shared();
+ data_type->name = name;
+
+ if constexpr (sizeof...(args))
+ {
+ data_type->arguments = std::make_shared();
+ data_type->children.push_back(data_type->arguments);
+ data_type->arguments->children = { std::forward(args)... };
+ }
+
+ return data_type;
+}
+
+}
diff --git a/src/Processors/Formats/IOutputFormat.h b/src/Processors/Formats/IOutputFormat.h
index 3a92cec025a..454fb0f05d0 100644
--- a/src/Processors/Formats/IOutputFormat.h
+++ b/src/Processors/Formats/IOutputFormat.h
@@ -170,4 +170,7 @@ class IOutputFormat : public IProcessor
size_t result_rows = 0;
size_t result_bytes = 0;
};
+
+using OutputFormatPtr = std::shared_ptr;
+
}
diff --git a/src/Processors/Formats/Impl/CHColumnToArrowColumn.cpp b/src/Processors/Formats/Impl/CHColumnToArrowColumn.cpp
index 2af060fe4e1..b846d8329e5 100644
--- a/src/Processors/Formats/Impl/CHColumnToArrowColumn.cpp
+++ b/src/Processors/Formats/Impl/CHColumnToArrowColumn.cpp
@@ -650,7 +650,7 @@ namespace DB
{
arrow_fields.reserve(header.columns());
header_columns.reserve(header.columns());
- for (auto column : header.getColumnsWithTypeAndName())
+ for (size_t field_id = 1; auto column : header.getColumnsWithTypeAndName())
{
if (!low_cardinality_as_dictionary)
{
@@ -659,7 +659,12 @@ namespace DB
}
bool is_column_nullable = false;
auto arrow_type = getArrowType(column.type, column.column, column.name, format_name, &is_column_nullable);
- arrow_fields.emplace_back(std::make_shared(column.name, arrow_type, is_column_nullable));
+ /// proton: starts
+ /// FIXME field_id is needed for Iceberg, we need to ensure that the field_ids match the ones defined in the iceberg table schema.
+ auto kv_metadata = std::make_shared();
+ kv_metadata->Append("PARQUET:field_id", fmt::format("{}", field_id)); /// field_id starts from 1
+ arrow_fields.emplace_back(std::make_shared(column.name, arrow_type, is_column_nullable, std::move(kv_metadata)));
+ /// proton: ends
header_columns.emplace_back(std::move(column));
}
}
diff --git a/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp
index 57819eb7fd0..fa39144d919 100644
--- a/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp
+++ b/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp
@@ -39,9 +39,12 @@ void ParquetBlockOutputFormat::consume(Chunk chunk)
auto sink = std::make_shared(out);
parquet::WriterProperties::Builder builder;
-#if USE_SNAPPY
+ /// proton: FIXME
+ /// Make compression configurable.
+#if 0 /// USE_SNAPPY
builder.compression(parquet::Compression::SNAPPY);
#endif
+ builder.compression(parquet::Compression::ZSTD);
auto props = builder.build();
auto result = parquet::arrow::FileWriter::Open(
*arrow_table->schema(),
diff --git a/src/Processors/Formats/Impl/ParquetBlockOutputFormat.h b/src/Processors/Formats/Impl/ParquetBlockOutputFormat.h
index c0421a4d99f..c8017de49e6 100644
--- a/src/Processors/Formats/Impl/ParquetBlockOutputFormat.h
+++ b/src/Processors/Formats/Impl/ParquetBlockOutputFormat.h
@@ -33,6 +33,8 @@ class ParquetBlockOutputFormat : public IOutputFormat
String getContentType() const override { return "application/octet-stream"; }
+ const parquet::arrow::FileWriter & fileWriter() const { return *file_writer; } /// proton: added
+
private:
void consume(Chunk) override;
void finalizeImpl() override;
diff --git a/src/Processors/ProcessorID.h b/src/Processors/ProcessorID.h
index f1510305c70..77480324bf3 100644
--- a/src/Processors/ProcessorID.h
+++ b/src/Processors/ProcessorID.h
@@ -249,15 +249,16 @@ enum class ProcessorID : UInt32
GenerateRandomSourceID = 10'045,
SourceFromQueryPipelineID = 10'046,
ConvertingAggregatedToChunksSourceShuffledID = 10'047,
- /// proton: starts
+
ClickHouseSourceID = 11'000,
PulsarSourceID = 11'054,
- /// proton: ends
+ IcebergSourceID = 11'100,
/// Sink Processors
EmptySinkID = 20'000,
NullSinkID = 20'001,
ExternalTableDataSinkID = 20'002,
+ IcebergSinkID = 20'100,
};
inline ProcessorID toProcessID(UInt32 v)
diff --git a/src/Storages/ExternalStream/CMakeLists.txt b/src/Storages/ExternalStream/CMakeLists.txt
index a74389d9973..11b8db28888 100644
--- a/src/Storages/ExternalStream/CMakeLists.txt
+++ b/src/Storages/ExternalStream/CMakeLists.txt
@@ -6,6 +6,7 @@ add_headers_and_sources(external_stream Log)
add_headers_and_sources(external_stream Kafka)
add_headers_and_sources(external_stream Pulsar)
add_headers_and_sources(external_stream Timeplus)
+add_headers_and_sources(external_stream Iceberg)
add_library(external_stream ${external_stream_headers} ${external_stream_sources})
@@ -19,6 +20,14 @@ if (TARGET ch_contrib::pulsar)
target_link_libraries(external_stream PRIVATE ch_contrib::pulsar)
endif()
+if (TARGET ch_contrib::avrocpp)
+ target_link_libraries(external_stream PRIVATE ch_contrib::avrocpp)
+endif()
+
+if (TARGET ch_contrib::parquet)
+ target_link_libraries(external_stream PRIVATE ch_contrib::parquet)
+endif()
+
if (ENABLE_TESTS)
add_subdirectory(tests)
endif ()
diff --git a/src/Storages/ExternalStream/ExternalStreamSettings.h b/src/Storages/ExternalStream/ExternalStreamSettings.h
index 135cac4ce94..a709c995863 100644
--- a/src/Storages/ExternalStream/ExternalStreamSettings.h
+++ b/src/Storages/ExternalStream/ExternalStreamSettings.h
@@ -27,6 +27,8 @@ class ASTStorage;
M(String, message_key, "", "(Deprecated) An expression which will be evaluated on each row of data returned by the query to compute a string which will be used as the message key. This setting is deprecated, please define a `_tp_message_key` column in the external stream instead.", 0) \
M(Bool, one_message_per_row, false, "If set to true, when send data to the Kafka external stream with row-based data format like `JSONEachRow`, it will produce one message per row.", 0) \
M(String, region, "", "The AWS region to target.", 0) \
+ M(String, access_key_id, "", "The access key ID.", 0) \
+ M(String, secret_access_key, "", "The secret access key.", 0) \
M(Bool, log_stats, false, "If set to true, print statistics to the logs. Note that, the statistics could contain quite a lot of data. The frequency of the statistics logs is control by the statistics.interval.ms property.", 0) \
M(Milliseconds, consumer_stall_timeout_ms, 60 * 1000, "Define the amount of time when a consumer is not making any progress, then consider the consumer stalled, and then a new consumer will be created. Adjust the value based on how busy a topic is. Use small values for a busy topic to avoid big latency. Use big values for less busy topics to avoid disruption. Set to 0 to disable the behavior.", 0)
@@ -59,13 +61,17 @@ class ASTStorage;
M(UInt64, memory_limit, 0, "Configure a limit on the amount of memory that will be allocated by this external stream. Setting this to 0 will disable the limit. By default this is disabled.", 0) \
M(UInt64, io_threads, 1, "Set the number of IO threads to be used by the Pulsar client. Default is 1 thread.", 0)
+#define ICEBERG_EXTERNAL_STREAM_SETTINGS(M) \
+ M(String, iceberg_storage_endpoint, "", "Endpoint for data storage.", 0)
+
#define ALL_EXTERNAL_STREAM_SETTINGS(M) \
M(String, type, "", "External stream type", 0) \
M(String, config_file, "", "External stream configuration file path", 0) \
KAFKA_EXTERNAL_STREAM_SETTINGS(M) \
LOG_FILE_EXTERNAL_STREAM_SETTINGS(M) \
TIMEPLUS_EXTERNAL_STREAM_SETTINGS(M) \
- PULSAR_EXTERNAL_STREAM_SETTINGS(M)
+ PULSAR_EXTERNAL_STREAM_SETTINGS(M) \
+ ICEBERG_EXTERNAL_STREAM_SETTINGS(M)
#define LIST_OF_EXTERNAL_STREAM_SETTINGS(M) \
ALL_EXTERNAL_STREAM_SETTINGS(M) \
diff --git a/src/Storages/ExternalStream/ExternalStreamTypes.h b/src/Storages/ExternalStream/ExternalStreamTypes.h
index aec5f56fd48..0bda1a3472f 100644
--- a/src/Storages/ExternalStream/ExternalStreamTypes.h
+++ b/src/Storages/ExternalStream/ExternalStreamTypes.h
@@ -9,5 +9,6 @@ namespace StreamTypes
const String PULSAR = "pulsar";
const String TIMEPLUS = "timeplus";
const String LOG = "log";
+ const String ICEBERG = "iceberg";
}
}
diff --git a/src/Storages/ExternalStream/Iceberg/Iceberg.cpp b/src/Storages/ExternalStream/Iceberg/Iceberg.cpp
new file mode 100644
index 00000000000..b5553cc6223
--- /dev/null
+++ b/src/Storages/ExternalStream/Iceberg/Iceberg.cpp
@@ -0,0 +1,352 @@
+#include
+
+#if USE_AVRO
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+extern const int INVALID_SETTING_VALUE;
+}
+
+namespace
+{
+
+void updateFormatFactorySettings(FormatFactorySettings & settings, const ContextPtr & context)
+{
+ const auto & changes = context->getSettingsRef().changes();
+ for (const auto & change : changes)
+ {
+ if (settings.has(change.name))
+ settings.set(change.name, change.value);
+ }
+}
+
+}
+
+namespace ExternalStream
+{
+
+Iceberg::Iceberg(IStorage * storage, ExternalStreamSettingsPtr settings_, ExternalStreamCounterPtr, ContextPtr context)
+ : StorageExternalStreamImpl(storage, std::move(settings_), context)
+{
+ auto default_virtuals = NamesAndTypesList{
+ {"_path", std::make_shared(std::make_shared())},
+ {"_file", std::make_shared(std::make_shared())}};
+
+ auto columns = storage->getInMemoryMetadata().getSampleBlock().getNamesAndTypesList();
+ virtual_columns = getVirtualsForStorage(columns, default_virtuals);
+ for (const auto & column : virtual_columns)
+ virtual_block.insert({column.type->createColumn(), column.type, column.name});
+
+ IcebergS3Configuration configuration;
+
+ auto storage_endpoint = settings->iceberg_storage_endpoint.value;
+ if (storage_endpoint.empty())
+ throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "iceberg_storage_endpoint cannot be empty");
+
+ configuration.url = S3::URI(storage_endpoint);
+ configuration.format = "Parquet";
+
+ configuration.min_upload_file_size = context->getSettingsRef().s3_min_upload_file_size;
+ configuration.max_upload_idle_seconds = context->getSettingsRef().s3_max_upload_idle_seconds;
+
+ configuration.auth_settings.access_key_id = settings->access_key_id;
+ configuration.auth_settings.secret_access_key = settings->secret_access_key;
+ configuration.auth_settings.region = configuration.url.region;
+ configuration.auth_settings.use_environment_credentials = true; ///settings->use_environment_credentials;
+
+ configuration.request_settings.updateFromSettings(context->getSettingsRef());
+
+ s3_configuration = std::move(configuration);
+ s3_configuration.createClient(context);
+}
+
+Apache::Iceberg::CatalogPtr Iceberg::getCatalog() const
+{
+ auto database = DatabaseCatalog::instance().getDatabase(getStorageID().database_name);
+ auto iceberg_db = std::dynamic_pointer_cast(database);
+ assert(iceberg_db);
+
+ return iceberg_db->getCatalog();
+}
+
+Apache::Iceberg::TableMetadata Iceberg::getTableMetadata() const
+{
+ auto storage_id = getStorageID();
+ /// TODO withCredentials()
+ Apache::Iceberg::TableMetadata metadata;
+ metadata.withSchema().withLocation();
+ getCatalog()->getTableMetadata(storage_id.database_name, storage_id.table_name, metadata);
+
+ return metadata;
+}
+
+std::list Iceberg::fetchManifestList(const Apache::Iceberg::TableMetadata & table_metadata) const
+{
+ std::list manifest_lists;
+
+ const auto & manifest_list_uri = table_metadata.getManifestList();
+ if (manifest_list_uri.empty())
+ {
+ LOG_INFO(logger, "Table metadata does not have a manifest list.");
+ return manifest_lists;
+ }
+
+ ReadBufferFromS3 buf{
+ s3_configuration.client,
+ s3_configuration.url.bucket,
+ Poco::URI(manifest_list_uri).getPath(),
+ /*version_id_=*/"",
+ s3_configuration.request_settings,
+ /*settings_=*/{}};
+
+ auto is = std::make_unique(buf);
+
+ auto schema = avro::compileJsonSchemaFromString(Apache::Iceberg::AvroSchemas::MANIFEST_LIST);
+ avro::DataFileReader reader{std::move(is), schema};
+
+ Apache::Iceberg::ManifestList manifest_list;
+ auto enc = avro::jsonEncoder(schema);
+ while (reader.read(manifest_list))
+ {
+ LOG_INFO(logger, "Got manifest_list sn = {} sid = {}", manifest_list.sequence_number, manifest_list.added_snapshot_id);
+ manifest_lists.push_back(std::move(manifest_list));
+ }
+
+ return manifest_lists;
+}
+
+void Iceberg::fetchDataFiles(const Apache::Iceberg::ManifestList & manifest_list, std::list & data_files) const
+{
+ LOG_INFO(logger, "Fetching manifest from {}", manifest_list.manifest_path);
+ ReadBufferFromS3 buf{
+ s3_configuration.client,
+ s3_configuration.url.bucket,
+ Poco::URI(manifest_list.manifest_path).getPath(),
+ /*version_id_=*/"",
+ s3_configuration.request_settings,
+ /*settings_=*/{}};
+
+ auto is = std::make_unique(buf);
+
+ auto schema = avro::compileJsonSchemaFromString(Apache::Iceberg::AvroSchemas::MANIFEST);
+ avro::DataFileReader reader{std::move(is), schema};
+
+ Apache::Iceberg::Manifest manifest;
+ auto enc = avro::jsonEncoder(schema);
+ while (reader.read(manifest))
+ {
+ LOG_INFO(logger, "Got manifest data_file = {}", manifest.data_file.file_path);
+ if (!boost::iequals(manifest.data_file.file_format, "parquet"))
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Data file format {} is not supported", manifest.data_file.file_format);
+ if (!manifest.data_file.file_path.empty())
+ data_files.push_back(std::move(manifest.data_file.file_path));
+ }
+}
+
+namespace
+{
+std::shared_ptr createFileIterator(
+ Strings keys,
+ IcebergS3Configuration s3_configuration,
+ bool distributed_processing,
+ ContextPtr local_context,
+ ASTPtr query,
+ const Block & virtual_block,
+ IcebergSource::ObjectInfos * object_infos)
+{
+ if (distributed_processing)
+ {
+ return std::make_shared(local_context->getReadTaskCallback());
+ }
+ else
+ {
+ return std::make_shared(
+ s3_configuration.client,
+ s3_configuration.url.version_id,
+ keys,
+ s3_configuration.url.bucket,
+ s3_configuration.request_settings,
+ query,
+ virtual_block,
+ local_context,
+ object_infos);
+ }
+}
+}
+
+Pipe Iceberg::read(
+ const Names & column_names,
+ const StorageSnapshotPtr & storage_snapshot,
+ SelectQueryInfo & query_info,
+ ContextPtr local_context,
+ QueryProcessingStage::Enum /*processed_stage*/,
+ size_t max_block_size,
+ size_t num_streams)
+{
+ auto table_metadata = getTableMetadata();
+ auto manifest_lists = fetchManifestList(table_metadata);
+
+ auto header = storage_snapshot->getSampleBlockForColumns(column_names);
+
+ Pipes pipes;
+
+ if (manifest_lists.empty())
+ { /// No data in the table
+ LOG_INFO(logger, "No manifest list, no data to read");
+ pipes.reserve(1);
+ pipes.emplace_back(std::make_shared(header));
+ }
+ else
+ {
+ std::unordered_set column_names_set(column_names.begin(), column_names.end());
+ std::vector requested_virtual_columns;
+
+ for (const auto & virtual_column : getVirtuals())
+ {
+ if (column_names_set.contains(virtual_column.name))
+ requested_virtual_columns.push_back(virtual_column);
+ }
+
+ std::list data_files;
+ for (const auto & manifest_list : manifest_lists)
+ fetchDataFiles(manifest_list, data_files);
+
+ LOG_INFO(logger, "Reading {} data files", data_files.size());
+
+ Strings keys;
+ keys.reserve(data_files.size());
+ for (const auto & data_file : data_files)
+ keys.push_back(Poco::URI(data_file).getPath());
+
+ std::shared_ptr iterator_wrapper = createFileIterator(
+ keys,
+ s3_configuration,
+ /*distributed_processing=*/false,
+ local_context,
+ query_info.query,
+ virtual_block,
+ &object_infos);
+
+ ColumnsDescription columns_description;
+ Block block_for_format;
+ if (supportsSubsetOfColumns())
+ {
+ auto fetch_columns = column_names;
+ const auto & virtuals = getVirtuals();
+ std::erase_if(fetch_columns, [&](const String & col) {
+ return std::any_of(
+ virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col) { return col == virtual_col.name; });
+ });
+
+ if (fetch_columns.empty())
+ fetch_columns.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()));
+
+ columns_description = storage_snapshot->getDescriptionForColumns(fetch_columns);
+ block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
+ }
+ else
+ {
+ columns_description = storage_snapshot->metadata->getColumns();
+ block_for_format = storage_snapshot->metadata->getSampleBlock();
+ }
+
+ const size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
+ for (size_t i = 0; i < num_streams; ++i)
+ {
+ pipes.emplace_back(std::make_shared(
+ requested_virtual_columns,
+ "Parquet", /// for now, only parquet data files are supported
+ getName(),
+ block_for_format,
+ local_context,
+ getFormatSettings(local_context),
+ columns_description,
+ max_block_size,
+ s3_configuration.request_settings,
+ /*compression_method=*/"none",
+ s3_configuration.client,
+ s3_configuration.url.bucket,
+ s3_configuration.url.version_id,
+ iterator_wrapper,
+ max_download_threads));
+ }
+ }
+
+ auto pipe = Pipe::unitePipes(std::move(pipes));
+
+ return pipe;
+}
+
+SinkToStoragePtr Iceberg::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
+{
+ auto table_metadata = getTableMetadata();
+ auto manifest_lists = fetchManifestList(table_metadata);
+
+ auto min_upload_file_size_ = local_context->getSettingsRef().s3_min_upload_file_size.changed
+ ? local_context->getSettingsRef().s3_min_upload_file_size.value
+ : s3_configuration.min_upload_file_size;
+
+ auto max_upload_idle_seconds_ = local_context->getSettingsRef().s3_max_upload_idle_seconds.changed
+ ? local_context->getSettingsRef().s3_max_upload_idle_seconds.value
+ : s3_configuration.max_upload_idle_seconds;
+
+ auto sample_block = metadata_snapshot->getSampleBlock();
+
+ auto format_settings = getFormatSettings(local_context);
+
+ return std::make_shared(
+ getStorageID(),
+ "Parquet",
+ sample_block,
+ format_settings,
+ s3_configuration,
+ s3_configuration.url.bucket,
+ getStorageID().getTableName(),
+ min_upload_file_size_,
+ max_upload_idle_seconds_,
+ std::move(table_metadata),
+ std::move(manifest_lists),
+ getCatalog(),
+ local_context);
+}
+
+FormatSettings Iceberg::getFormatSettings(const ContextPtr & local_context) const
+{
+ FormatFactorySettings settings = format_factory_settings.has_value() ? *format_factory_settings : FormatFactorySettings();
+ updateFormatFactorySettings(settings, local_context);
+ auto format_settings = DB::getFormatSettings(local_context, settings);
+
+ /// This is needed otherwise using an external stream with ProtobufSingle format as the target stream
+ /// of a MV (or in `INSERT ... SELECT ...`), i.e. more than one rows sent to the stream, exception will be thrown.
+ format_settings.protobuf.allow_multiple_rows_without_delimiter = true;
+
+ /// This is required otherwise tools like Spark can't read data of string columns.
+ format_settings.parquet.output_string_as_string = true;
+ return format_settings;
+}
+
+}
+
+}
+#endif
diff --git a/src/Storages/ExternalStream/Iceberg/Iceberg.h b/src/Storages/ExternalStream/Iceberg/Iceberg.h
new file mode 100644
index 00000000000..84db14b47a7
--- /dev/null
+++ b/src/Storages/ExternalStream/Iceberg/Iceberg.h
@@ -0,0 +1,71 @@
+#pragma once
+
+#include "config.h"
+
+#if USE_AWS_S3 && USE_AVRO
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+namespace DB
+{
+
+namespace ExternalStream
+{
+
+class Iceberg final : public StorageExternalStreamImpl
+{
+ using ObjectInfos = IcebergSource::ObjectInfos;
+
+public:
+ Iceberg(IStorage *, ExternalStreamSettingsPtr, ExternalStreamCounterPtr, ContextPtr);
+ ~Iceberg() override = default;
+
+ String getName() const override { return "IcebergExternalStream"; }
+
+ /// For now, we only support Parquet format and it supports subset of columns.
+ bool supportsSubsetOfColumns() const override { return true; }
+
+ Pipe read(
+ const Names & column_names,
+ const StorageSnapshotPtr & storage_snapshot,
+ SelectQueryInfo & query_info,
+ ContextPtr context,
+ QueryProcessingStage::Enum processed_stage,
+ size_t max_block_size,
+ size_t num_streams) override;
+
+ SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override;
+
+ /// FIXME this is a temporary trick.
+ void setIcebergSchema(const std::string & schema) { iceberg_schema_json = schema; }
+
+private:
+ Apache::Iceberg::CatalogPtr getCatalog() const;
+ Apache::Iceberg::TableMetadata getTableMetadata() const;
+ std::list fetchManifestList(const Apache::Iceberg::TableMetadata &) const;
+ void fetchDataFiles(const Apache::Iceberg::ManifestList &, std::list & data_files) const;
+
+ FormatSettings getFormatSettings(const ContextPtr & local_context) const;
+
+ NamesAndTypesList virtual_columns;
+ Block virtual_block;
+ /// Temporary. FIXME
+ ObjectInfos object_infos;
+
+ IcebergS3Configuration s3_configuration;
+ std::optional format_factory_settings;
+
+ std::string iceberg_schema_json;
+};
+
+}
+
+}
+
+#endif
diff --git a/src/Storages/ExternalStream/Iceberg/IcebergS3Configuration.cpp b/src/Storages/ExternalStream/Iceberg/IcebergS3Configuration.cpp
new file mode 100644
index 00000000000..d50041ef20c
--- /dev/null
+++ b/src/Storages/ExternalStream/Iceberg/IcebergS3Configuration.cpp
@@ -0,0 +1,58 @@
+#include
+
+#if USE_AWS_S3 && USE_AVRO
+
+#include
+
+namespace DB
+{
+
+namespace ExternalStream
+{
+
+void IcebergS3Configuration::createClient(const ContextPtr & ctx)
+{
+ if (client)
+ return;
+
+ request_settings.updateFromSettings(ctx->getSettings());
+
+ DB::S3::PocoHTTPClientConfiguration client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration(
+ auth_settings.region,
+ ctx->getRemoteHostFilter(),
+ static_cast(ctx->getGlobalContext()->getSettingsRef().s3_max_redirects),
+ ctx->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
+ /* for_disk_s3 = */ false,
+ request_settings.get_request_throttler,
+ request_settings.put_request_throttler);
+
+ client_configuration.endpointOverride = url.endpoint;
+ client_configuration.maxConnections = static_cast(request_settings.max_connections);
+
+ auto credentials = Aws::Auth::AWSCredentials(auth_settings.access_key_id, auth_settings.secret_access_key);
+ auto headers = auth_settings.headers;
+ if (!headers_from_ast.empty())
+ headers.insert(headers.end(), headers_from_ast.begin(), headers_from_ast.end());
+
+ client = DB::S3::ClientFactory::instance().create(
+ client_configuration,
+ url.is_virtual_hosted_style,
+ credentials.GetAWSAccessKeyId(),
+ credentials.GetAWSSecretKey(),
+ auth_settings.server_side_encryption_customer_key_base64,
+ auth_settings.server_side_encryption_kms_config,
+ std::move(headers),
+ S3::CredentialsConfiguration{
+ /*use_environment_credentials=*/true,
+ auth_settings.use_insecure_imds_request.value_or(ctx->getConfigRef().getBool("s3.use_insecure_imds_request", false)),
+ auth_settings.expiration_window_seconds.value_or(
+ ctx->getConfigRef().getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)),
+ auth_settings.no_sign_request.value_or(ctx->getConfigRef().getBool("s3.no_sign_request", false)),
+ });
+}
+
+}
+
+}
+
+#endif
diff --git a/src/Storages/ExternalStream/Iceberg/IcebergS3Configuration.h b/src/Storages/ExternalStream/Iceberg/IcebergS3Configuration.h
new file mode 100644
index 00000000000..901cd274153
--- /dev/null
+++ b/src/Storages/ExternalStream/Iceberg/IcebergS3Configuration.h
@@ -0,0 +1,44 @@
+#pragma once
+
+#include "config.h"
+
+#if USE_AWS_S3 && USE_AVRO
+
+#include
+#include
+#include