diff --git a/mysql-test/suite/secondary_engine/r/shannon_partition_table.result b/mysql-test/suite/secondary_engine/r/shannon_partition_table.result index 9629b8d57..e11db3d3f 100644 --- a/mysql-test/suite/secondary_engine/r/shannon_partition_table.result +++ b/mysql-test/suite/secondary_engine/r/shannon_partition_table.result @@ -30,6 +30,18 @@ alter table LINEITEM change L_ORDERKEY L_ORDERKEY BIGINT NOT NULL COMMENT 'RAPID set use_secondary_engine=forced; alter table LINEITEM secondary_engine RAPID; alter table LINEITEM secondary_load; +select * from LINEITEM; +L_ORDERKEY L_PARTKEY L_SUPPKEY L_LINENUMBER L_QUANTITY L_EXTENDEDPRICE L_DISCOUNT L_TAX L_RETURNFLAG L_LINESTATUS L_SHIPDATE L_COMMITDATE L_RECEIPTDATE L_SHIPINSTRUCT L_SHIPMODE L_COMMENT +23999974 770178 30217 6 48.00 59910.72 0.02 0.08 N O 1997-03-15 1997-05-26 1997-03-26 TAKE BACK RETURN FOB fy packages nag evenly +23999975 625094 15140 2 23.00 23438.38 0.05 0.06 N O 1996-10-23 1996-09-11 1996-11-12 DELIVER IN PERSON AIR ly regular realms after the +24000000 677354 17387 2 30.00 39939.60 0.00 0.01 N O 1996-04-21 1996-06-08 1996-05-08 TAKE BACK RETURN REG AIR le? blithely unusual deposits above +24000000 277642 37643 3 20.00 32392.60 0.01 0.02 N O 1996-05-14 1996-06-11 1996-06-04 TAKE BACK RETURN REG AIR s. blithely regular instructi +23999975 353843 3852 1 1.00 1896.83 0.05 0.01 N O 1996-08-03 1996-09-14 1996-08-15 TAKE BACK RETURN RAIL pending instructions affix qu +23999975 350648 30649 3 18.00 30575.34 0.08 0.01 N O 1996-10-07 1996-08-22 1996-10-19 DELIVER IN PERSON TRUCK ully pending dependencies hagg +23999974 311819 31820 4 22.00 40277.60 0.08 0.01 N O 1997-03-25 1997-04-24 1997-03-28 COLLECT COD REG AIR uickly ironic pains. bli +24000000 712607 2625 4 7.00 11336.99 0.09 0.00 N O 1996-07-16 1996-06-18 1996-08-04 COLLECT COD SHIP ckly express requests. even, pendin +23999974 737406 7461 5 2.00 2886.74 0.03 0.07 N O 1997-04-25 1997-05-31 1997-05-21 NONE REG AIR nding accounts. ironic, final +24000000 651246 21263 1 5.00 5986.05 0.06 0.05 N O 1996-06-11 1996-05-16 1996-06-22 NONE TRUCK nts use. ir alter table LINEITEM secondary_unload; alter table LINEITEM secondary_load PARTITION(p0, p1, p10); alter table LINEITEM secondary_unload; @@ -38,7 +50,7 @@ alter table LINEITEM secondary_unload PARTITION(p0, p10, p1); alter table LINEITEM secondary_load PARTITION(p0, p1, p10); alter table LINEITEM secondary_unload PARTITION(p0, p10); alter table LINEITEM secondary_unload; -ERROR HY000: DDLs on a table with a secondary engine defined are not allowed. +ERROR HY000: Secondary engine operation failed. test_partition.LINEITEM table is not loaded into rapid yet. drop table LINEITEM; CREATE TABLE LINEITEM1 ( L_ORDERKEY BIGINT NOT NULL, L_PARTKEY INTEGER NOT NULL, @@ -70,7 +82,7 @@ alter table LINEITEM1 secondary_engine RAPID; alter table LINEITEM1 secondary_load; explain select * from LINEITEM1; id select_type table partitions type possible_keys key key_len ref rows filtered Extra -1 SIMPLE LINEITEM1 p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16,p17 ALL NULL NULL NULL NULL 10 100.00 Using secondary engine Rapid +1 SIMPLE LINEITEM1 p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16,p17 ALL NULL NULL NULL NULL 0 0.00 Using secondary engine Rapid Warnings: Note 1003 /* select#1 */ select `test_partition`.`LINEITEM1`.`L_ORDERKEY` AS `L_ORDERKEY`,`test_partition`.`LINEITEM1`.`L_PARTKEY` AS `L_PARTKEY`,`test_partition`.`LINEITEM1`.`L_SUPPKEY` AS `L_SUPPKEY`,`test_partition`.`LINEITEM1`.`L_LINENUMBER` AS `L_LINENUMBER`,`test_partition`.`LINEITEM1`.`L_QUANTITY` AS `L_QUANTITY`,`test_partition`.`LINEITEM1`.`L_EXTENDEDPRICE` AS `L_EXTENDEDPRICE`,`test_partition`.`LINEITEM1`.`L_DISCOUNT` AS `L_DISCOUNT`,`test_partition`.`LINEITEM1`.`L_TAX` AS `L_TAX`,`test_partition`.`LINEITEM1`.`L_RETURNFLAG` AS `L_RETURNFLAG`,`test_partition`.`LINEITEM1`.`L_LINESTATUS` AS `L_LINESTATUS`,`test_partition`.`LINEITEM1`.`L_SHIPDATE` AS `L_SHIPDATE`,`test_partition`.`LINEITEM1`.`L_COMMITDATE` AS `L_COMMITDATE`,`test_partition`.`LINEITEM1`.`L_RECEIPTDATE` AS `L_RECEIPTDATE`,`test_partition`.`LINEITEM1`.`L_SHIPINSTRUCT` AS `L_SHIPINSTRUCT`,`test_partition`.`LINEITEM1`.`L_SHIPMODE` AS `L_SHIPMODE`,`test_partition`.`LINEITEM1`.`L_COMMENT` AS `L_COMMENT` from `test_partition`.`LINEITEM1` alter table LINEITEM1 secondary_unload; diff --git a/mysql-test/suite/secondary_engine/t/shannon_partition_table.test b/mysql-test/suite/secondary_engine/t/shannon_partition_table.test index c63aa0f4a..444b69e8e 100644 --- a/mysql-test/suite/secondary_engine/t/shannon_partition_table.test +++ b/mysql-test/suite/secondary_engine/t/shannon_partition_table.test @@ -49,6 +49,9 @@ set use_secondary_engine=forced; alter table LINEITEM secondary_engine RAPID; alter table LINEITEM secondary_load; + +select * from LINEITEM; + alter table LINEITEM secondary_unload; alter table LINEITEM secondary_load PARTITION(p0, p1, p10); @@ -59,7 +62,7 @@ alter table LINEITEM secondary_unload PARTITION(p0, p10, p1); alter table LINEITEM secondary_load PARTITION(p0, p1, p10); alter table LINEITEM secondary_unload PARTITION(p0, p10); ---error 3890 +--error 3889 alter table LINEITEM secondary_unload; --enable_warnings diff --git a/storage/rapid_engine/executor/iterators/table_scan_iterator.h b/storage/rapid_engine/executor/iterators/table_scan_iterator.h index f194ef3a9..4f73b9f08 100644 --- a/storage/rapid_engine/executor/iterators/table_scan_iterator.h +++ b/storage/rapid_engine/executor/iterators/table_scan_iterator.h @@ -193,7 +193,7 @@ class VectorizedTableScanIterator : public TableRowIterator { auto str_id = *(uint32 *)data_ptr; std::string fld_name(field->field_name); - auto rpd_field = m_data_table.get()->source()->get_field(fld_name); + auto rpd_field = m_data_table.get()->table()->get_field(fld_name); if (!rpd_field) return; auto str_ptr = rpd_field->header()->m_local_dict->get(str_id); diff --git a/storage/rapid_engine/handler/ha_shannon_rapid.cc b/storage/rapid_engine/handler/ha_shannon_rapid.cc index a191475d5..5c9ef93ab 100644 --- a/storage/rapid_engine/handler/ha_shannon_rapid.cc +++ b/storage/rapid_engine/handler/ha_shannon_rapid.cc @@ -211,7 +211,7 @@ int ha_rapid::info(unsigned int flags) { std::string sch_tb; sch_tb.append(table_share->db.str).append(":").append(table_share->table_name.str); - Rapid_load_context context; + Rapid_scan_context context; context.m_trx = Transaction::get_or_create_trx(m_thd); context.m_trx->begin(); @@ -275,12 +275,16 @@ unsigned long ha_rapid::index_flags(unsigned int idx, unsigned int part, bool al } int ha_rapid::records(ha_rows *num_rows) { - Rapid_load_context context; + Rapid_scan_context context; context.m_trx = Transaction::get_or_create_trx(m_thd); + context.m_trx->begin(); + std::string sch_tb; sch_tb.append(table_share->db.str).append(":").append(table_share->table_name.str); auto rpd_tb = Imcs::Imcs::instance()->get_table(sch_tb); *num_rows = rpd_tb->rows(&context); + + context.m_trx->commit(); return ShannonBase::SHANNON_SUCCESS; } @@ -461,7 +465,7 @@ int ha_rapid::rnd_next(uchar *buf) { } } - ha_statistic_increment(&System_status_var::ha_read_rnd_next_count); + if (error == ShannonBase::SHANNON_SUCCESS) ha_statistic_increment(&System_status_var::ha_read_rnd_next_count); return error; } @@ -498,8 +502,7 @@ int ha_rapid::index_read(uchar *buf, const uchar *key, uint key_len, ha_rkey_fun m_data_table->set_end_range(end_range); err = m_data_table->index_read(buf, key, key_len, find_flag); - ha_statistic_increment(&System_status_var::ha_read_rnd_next_count); - + if (err == ShannonBase::SHANNON_SUCCESS) ha_statistic_increment(&System_status_var::ha_read_rnd_next_count); return err; } @@ -511,22 +514,22 @@ int ha_rapid::index_read_last(uchar *buf, const uchar *key, uint key_len) { int ha_rapid::index_next(uchar *buf) { ut_ad(m_start_of_scan && inited == handler::INDEX); - ha_statistic_increment(&System_status_var::ha_read_rnd_next_count); - return m_data_table->index_next(buf); + auto error = m_data_table->index_next(buf); + if (error == ShannonBase::SHANNON_SUCCESS) ha_statistic_increment(&System_status_var::ha_read_rnd_next_count); + return error; } int ha_rapid::index_next_same(uchar *buf, const uchar *, uint) { ut_ad(m_start_of_scan && inited == handler::INDEX); - ha_statistic_increment(&System_status_var::ha_read_rnd_next_count); - - return m_data_table->index_next(buf); + auto error = m_data_table->index_next(buf); + if (error == ShannonBase::SHANNON_SUCCESS) ha_statistic_increment(&System_status_var::ha_read_rnd_next_count); + return error; } int ha_rapid::index_first(uchar *buf) { DBUG_TRACE; ut_ad(m_start_of_scan && inited == handler::INDEX); - ha_statistic_increment(&System_status_var::ha_read_first_count); int error; if (end_range) { @@ -534,6 +537,7 @@ int ha_rapid::index_first(uchar *buf) { error = m_data_table->index_read(buf, end_range->key, end_range->length, end_range->flag); } else error = m_data_table->index_next(buf); + if (error == ShannonBase::SHANNON_SUCCESS) ha_statistic_increment(&System_status_var::ha_read_first_count); return error; } @@ -545,8 +549,6 @@ int ha_rapid::index_prev(uchar *buf) { int ha_rapid::index_last(uchar *buf) { DBUG_TRACE; - ha_statistic_increment(&System_status_var::ha_read_last_count); - m_data_table->set_end_range(end_range); int error = m_data_table->index_read(buf, nullptr, 0, HA_READ_BEFORE_KEY); @@ -555,7 +557,7 @@ int ha_rapid::index_last(uchar *buf) { if (error == HA_ERR_KEY_NOT_FOUND) { error = HA_ERR_END_OF_FILE; } - + if (error == ShannonBase::SHANNON_SUCCESS) ha_statistic_increment(&System_status_var::ha_read_last_count); return error; } diff --git a/storage/rapid_engine/handler/ha_shannon_rapid.h b/storage/rapid_engine/handler/ha_shannon_rapid.h index db4755c7d..88a460c7c 100644 --- a/storage/rapid_engine/handler/ha_shannon_rapid.h +++ b/storage/rapid_engine/handler/ha_shannon_rapid.h @@ -86,6 +86,9 @@ class ha_rapid : public handler { public: ha_rapid(handlerton *hton, TABLE_SHARE *table_share); + protected: + std::unique_ptr m_data_table; + private: int create(const char *, TABLE *, HA_CREATE_INFO *, dd::Table *) override { return HA_ERR_WRONG_COMMAND; } @@ -175,10 +178,7 @@ class ha_rapid : public handler { not yet fetched any row, else false */ bool m_start_of_scan{false}; - std::unique_ptr m_data_table; std::string m_failed_reason; - - ha_rapidpart *m_part_handler{nullptr}; }; } // namespace ShannonBase diff --git a/storage/rapid_engine/handler/ha_shannon_rapidpart.cc b/storage/rapid_engine/handler/ha_shannon_rapidpart.cc index a9f20b3e6..ab88f8872 100644 --- a/storage/rapid_engine/handler/ha_shannon_rapidpart.cc +++ b/storage/rapid_engine/handler/ha_shannon_rapidpart.cc @@ -42,31 +42,73 @@ Created jun 6, 2025 */ #include "storage/rapid_engine/utils/utils.h" namespace ShannonBase { - +extern int rpd_async_column_threshold; ha_rapidpart::ha_rapidpart(handlerton *hton, TABLE_SHARE *table) : ha_rapid(hton, table), Partition_helper(this), m_thd(ha_thd()), m_share(nullptr) {} -int ha_rapidpart::records(ha_rows *num_rows) { return ShannonBase::SHANNON_SUCCESS; } - int ha_rapidpart::rnd_pos(uchar *record, uchar *pos) { return ShannonBase::SHANNON_SUCCESS; } +int ha_rapidpart::rnd_init(bool scan) { + m_current_part_empty = false; + + if (m_data_table->init()) { + m_start_of_scan = false; + return HA_ERR_GENERIC; + } + + inited = handler::RND; + m_start_of_scan = true; + return (Partition_helper::ph_rnd_init(scan)); +} + int ha_rapidpart::rnd_init_in_part(uint part_id, bool scan) { // int err = change_active_index(part_id, table_share->primary_key); /* Don't use semi-consistent read in random row reads (by position). This means we must disable semi_consistent_read if scan is false. */ + std::string part_key; + auto part_name = m_data_table->source()->part_info->partitions[part_id]->partition_name; + part_key.append(part_name).append("#").append(std::to_string(part_id)); + + const auto &rpd_table = m_data_table->table_source(); + auto partition_ptr = down_cast(rpd_table)->get_partition(part_key); + auto n_rows = partition_ptr->rows(nullptr); + m_current_part_empty = (n_rows) ? false : true; + + if (!m_current_part_empty) m_data_table->active_table(partition_ptr); - m_last_part = part_id; - m_start_of_scan = true; return ShannonBase::SHANNON_SUCCESS; } int ha_rapidpart::rnd_next_in_part(uint part_id, uchar *buf) { - assert(false); - return ShannonBase::SHANNON_SUCCESS; + int error{HA_ERR_END_OF_FILE}; + if (m_current_part_empty) return error; + + if (inited == handler::RND && m_start_of_scan) { + if (table_share->fields <= static_cast(ShannonBase::rpd_async_column_threshold)) { + error = m_data_table->next(buf); + } else { + auto reader_pool = ShannonBase::Imcs::Imcs::pool(); + std::future fut = + boost::asio::co_spawn(*reader_pool, m_data_table->next_async(buf), boost::asio::use_future); + error = fut.get(); // co_await m_data_table->next_async(buf); // index_first(buf); + if (error == HA_ERR_KEY_NOT_FOUND) { + error = HA_ERR_END_OF_FILE; + } + } + } + + // increase the row count. + if (error == ShannonBase::SHANNON_SUCCESS) ha_statistic_increment(&System_status_var::ha_read_rnd_next_count); + return error; } -int ha_rapidpart::rnd_end_in_part(uint, bool) { - assert(false); +int ha_rapidpart::rnd_end_in_part(uint, bool) { return ShannonBase::SHANNON_SUCCESS; } + +int ha_rapidpart::rnd_end() { + if (m_data_table->end()) return HA_ERR_GENERIC; + + m_start_of_scan = false; + inited = handler::NONE; return ShannonBase::SHANNON_SUCCESS; } @@ -132,18 +174,18 @@ int ha_rapidpart::load_table(const TABLE &table, bool *skip_metadata_update) { if (shannon_loaded_tables->get(table.s->db.str, table.s->table_name.str) != nullptr) { std::string err; err.append(table.s->db.str).append(".").append(table.s->table_name.str).append(" already loaded"); - my_error(ER_SECONDARY_ENGINE_DDL, MYF(0), err.c_str()); - return HA_ERR_KEY_NOT_FOUND; + my_error(ER_SECONDARY_ENGINE, MYF(0), err.c_str()); + return HA_ERR_GENERIC; } for (auto idx = 0u; idx < table.s->fields; idx++) { auto fld = *(table.field + idx); - if (fld->is_flag_set(NOT_SECONDARY_FLAG)) continue; + if (!bitmap_is_set(table.read_set, idx) || fld->is_flag_set(NOT_SECONDARY_FLAG)) continue; if (!ShannonBase::Utils::Util::is_support_type(fld->type())) { std::string err; err.append(table.s->table_name.str).append(fld->field_name).append(" type not allowed"); - my_error(ER_SECONDARY_ENGINE_DDL, MYF(0), err.c_str()); + my_error(ER_SECONDARY_ENGINE, MYF(0), err.c_str()); return HA_ERR_GENERIC; } } @@ -177,7 +219,7 @@ int ha_rapidpart::load_table(const TABLE &table, bool *skip_metadata_update) { } if (Imcs::Imcs::instance()->load_parttable(&context, const_cast(&table))) { - my_error(ER_SECONDARY_ENGINE_DDL, MYF(0), table.s->db.str, table.s->table_name.str); + my_error(ER_SECONDARY_ENGINE, MYF(0), table.s->db.str, table.s->table_name.str); return HA_ERR_GENERIC; } @@ -197,7 +239,7 @@ int ha_rapidpart::unload_table(const char *db_name, const char *table_name, bool if (error_if_not_loaded && !share) { std::string err(db_name); err.append(".").append(table_name).append(" table is not loaded into rapid yet"); - my_error(ER_SECONDARY_ENGINE_DDL, MYF(0), err.c_str()); + my_error(ER_SECONDARY_ENGINE, MYF(0), err.c_str()); return HA_ERR_GENERIC; } diff --git a/storage/rapid_engine/handler/ha_shannon_rapidpart.h b/storage/rapid_engine/handler/ha_shannon_rapidpart.h index 148ba4bae..aec69f28b 100644 --- a/storage/rapid_engine/handler/ha_shannon_rapidpart.h +++ b/storage/rapid_engine/handler/ha_shannon_rapidpart.h @@ -158,13 +158,13 @@ class ha_rapidpart : public ha_rapid, public Partition_helper, public Partition_ Partition_handler *get_partition_handler() override { return (static_cast(this)); } protected: - int rnd_init(bool scan) override { return (Partition_helper::ph_rnd_init(scan)); } + int rnd_init(bool scan) override; int rnd_next(uchar *record) override { return (Partition_helper::ph_rnd_next(record)); } - int rnd_pos(uchar *record, uchar *pos) override; + int rnd_end() override; - int records(ha_rows *num_rows) override; + int rnd_pos(uchar *record, uchar *pos) override; int index_next(uchar *record) override { return (Partition_helper::ph_index_next(record)); } @@ -180,11 +180,14 @@ class ha_rapidpart : public ha_rapid, public Partition_helper, public Partition_ private: THD *m_thd{nullptr}; + RapidPartShare *m_share{nullptr}; /** this is set to 1 when we are starting a table scan but have not yet fetched any row, else false */ bool m_start_of_scan{false}; + + bool m_current_part_empty = false; }; } // namespace ShannonBase diff --git a/storage/rapid_engine/imcs/chunk.cpp b/storage/rapid_engine/imcs/chunk.cpp index e42cb7dc4..6226ee6c4 100644 --- a/storage/rapid_engine/imcs/chunk.cpp +++ b/storage/rapid_engine/imcs/chunk.cpp @@ -346,7 +346,7 @@ void Chunk::check_data_type(size_t type_size) { } } -int Chunk::is_null(const Rapid_load_context *context, row_id_t pos) { +int Chunk::is_null(const Rapid_scan_context *context, row_id_t pos) { std::shared_lock lk(m_header_mutex); if (!m_header->m_null_mask.get()) return static_cast(false); @@ -354,7 +354,7 @@ int Chunk::is_null(const Rapid_load_context *context, row_id_t pos) { return Utils::Util::bit_array_get(m_header->m_null_mask.get(), pos); } -int Chunk::is_deleted(const Rapid_load_context *context, row_id_t pos) { +int Chunk::is_deleted(const Rapid_scan_context *context, row_id_t pos) { std::shared_lock lk(m_header_mutex); if (!m_header->m_del_mask.get()) return SHANNON_SUCCESS; diff --git a/storage/rapid_engine/imcs/chunk.h b/storage/rapid_engine/imcs/chunk.h index 66d05ec75..e5f193d75 100644 --- a/storage/rapid_engine/imcs/chunk.h +++ b/storage/rapid_engine/imcs/chunk.h @@ -41,7 +41,9 @@ #include "storage/rapid_engine/trx/transaction.h" //Transaction namespace ShannonBase { +class Rapid_context; class Rapid_load_context; +class Rapid_scan_context; namespace Imcs { class Cu; /** @@ -188,10 +190,10 @@ class Chunk : public MemoryObject { int GC(); // gets null bit flag. - int is_null(const Rapid_load_context *context, row_id_t pos); + int is_null(const Rapid_scan_context *context, row_id_t pos); // gets the delete flag. - int is_deleted(const Rapid_load_context *context, row_id_t pos); + int is_deleted(const Rapid_scan_context *context, row_id_t pos); // get the normalized pack length inline size_t normalized_pack_length() { return m_header->m_normalized_pack_length; } diff --git a/storage/rapid_engine/imcs/cu.cpp b/storage/rapid_engine/imcs/cu.cpp index 8e2e1438e..1d2343e28 100644 --- a/storage/rapid_engine/imcs/cu.cpp +++ b/storage/rapid_engine/imcs/cu.cpp @@ -38,9 +38,9 @@ namespace ShannonBase { namespace Imcs { -SHANNON_THREAD_LOCAL Cu::LocalDataBuffer Cu::m_buff; +// SHANNON_THREAD_LOCAL Cu::LocalDataBuffer Cu::m_buff; Cu::Cu(RapidTable *owner, const Field *field) { - static_assert(alignof(m_buff) >= CACHE_LINE_SIZE, "Alignment failed"); + // static_assert(alignof(m_buff) >= CACHE_LINE_SIZE, "Alignment failed"); ut_a(field && !field->is_flag_set(NOT_SECONDARY_FLAG)); m_header = std::make_unique(); @@ -263,11 +263,12 @@ uchar *Cu::write_row(const Rapid_load_context *context, row_id_t rowid, uchar *d ut_a((data && len != UNIV_SQL_NULL) || (!data && len == UNIV_SQL_NULL)); auto dlen = (len == UNIV_SQL_NULL) ? sizeof(uint32) : ((len < sizeof(uint32)) ? sizeof(uint32) : len); + uchar local_buff[MAX_FIELD_WIDTH]; std::unique_ptr datum(dlen < MAX_FIELD_WIDTH ? nullptr : new uchar[dlen]); uchar *pdatum{nullptr}; if (data) { // not null. ut_a(len != UNIV_SQL_NULL); - pdatum = (dlen < MAX_FIELD_WIDTH) ? m_buff.data : datum.get(); + pdatum = (dlen < MAX_FIELD_WIDTH) ? local_buff : datum.get(); std::memset(pdatum, 0x0, (dlen < MAX_FIELD_WIDTH) ? MAX_FIELD_WIDTH : dlen); std::memcpy(pdatum, data, len); } @@ -369,10 +370,11 @@ uchar *Cu::update_row_from_log(const Rapid_load_context *context, row_id_t rowid ut_a((data && len != UNIV_SQL_NULL) || (!data && len == UNIV_SQL_NULL)); auto dlen = (len < sizeof(uint32)) ? sizeof(uint32) : len; - std::unique_ptr datum((dlen < MAX_FIELD_WIDTH) ? nullptr : new uchar[dlen]); + uchar local_buff[MAX_FIELD_WIDTH]; + std::unique_ptr datum(dlen < MAX_FIELD_WIDTH ? nullptr : new uchar[dlen]); uchar *pdatum{nullptr}; if (data) { - pdatum = (dlen < MAX_FIELD_WIDTH) ? m_buff.data : datum.get(); + pdatum = (dlen < MAX_FIELD_WIDTH) ? local_buff : datum.get(); std::memset(pdatum, 0x0, (dlen < MAX_FIELD_WIDTH) ? MAX_FIELD_WIDTH : dlen); std::memcpy(pdatum, data, len); } diff --git a/storage/rapid_engine/imcs/cu.h b/storage/rapid_engine/imcs/cu.h index b3895de99..744cc1fe4 100644 --- a/storage/rapid_engine/imcs/cu.h +++ b/storage/rapid_engine/imcs/cu.h @@ -230,7 +230,8 @@ class Cu : public MemoryObject { typedef struct alignas(CACHE_LINE_SIZE) LocalDataBuffer_t { uchar data[MAX_FIELD_WIDTH]; } LocalDataBuffer; - static SHANNON_THREAD_LOCAL LocalDataBuffer m_buff; + // TODO: it corrupt under multithread, will fixed in future. + // static SHANNON_THREAD_LOCAL LocalDataBuffer m_buff; // the meta stat mutex. std::mutex m_stats_mutex; diff --git a/storage/rapid_engine/imcs/data_table.cpp b/storage/rapid_engine/imcs/data_table.cpp index 7501a4bd4..d6cc0cd5f 100644 --- a/storage/rapid_engine/imcs/data_table.cpp +++ b/storage/rapid_engine/imcs/data_table.cpp @@ -54,8 +54,10 @@ namespace ShannonBase { namespace Imcs { ShannonBase::Utils::SimpleRatioAdjuster DataTable::m_adaptive_ratio(0.3); -DataTable::DataTable(TABLE *source, RapidTable *rpd) : m_initialized{false}, m_data_source(source), m_rapid_table(rpd) { - ut_a(m_data_source && m_rapid_table); +DataTable::DataTable(TABLE *source, RapidTable *rpd) + : m_initialized{false}, m_data_source(source), m_rapid_table(rpd), m_source_rpd_table(rpd) { + // if m_rapid_table is null, means we will get its real imp by part_id when it used. + ut_a(m_data_source); m_rowid.store(0); } @@ -82,7 +84,7 @@ int DataTable::init() { m_initialized.store(true); m_rowid.store(0); - m_context = std::make_unique(); + m_context = std::make_unique(); m_context->m_thd = current_thd; m_context->m_extra_info.m_keynr = m_active_index; @@ -99,6 +101,15 @@ int DataTable::init() { return ShannonBase::SHANNON_SUCCESS; } +int DataTable::end() { + m_context->m_trx->release_snapshot(); + m_context->m_trx->commit(); + + m_rowid.store(0); + m_initialized.store(false); + return ShannonBase::SHANNON_SUCCESS; +} + void DataTable::encode_key_parts(uchar *encoded_key, const uchar *original_key, uint key_len, KEY *key_info) { if (!encoded_key || !original_key || !key_info) return; @@ -395,15 +406,6 @@ int DataTable::next_batch(size_t batch_size, std::vectorm_trx->release_snapshot(); - m_context->m_trx->commit(); - - m_rowid.store(0); - m_initialized.store(false); - return ShannonBase::SHANNON_SUCCESS; -} - int DataTable::index_init(uint keynr, bool sorted) { init(); m_active_index = keynr; diff --git a/storage/rapid_engine/imcs/data_table.h b/storage/rapid_engine/imcs/data_table.h index aa6782ed0..6a5761096 100644 --- a/storage/rapid_engine/imcs/data_table.h +++ b/storage/rapid_engine/imcs/data_table.h @@ -41,7 +41,9 @@ class TABLE; class key_range; namespace ShannonBase { +class Rapid_context; class Rapid_load_context; +class Rapid_scan_context; namespace Imcs { class Imcs; class Cu; @@ -74,7 +76,19 @@ class DataTable : public MemoryObject { DataTable(TABLE *source_table, RapidTable *rpd); virtual ~DataTable(); - RapidTable *source() { return m_rapid_table; } + // gets its active rapid table. + inline RapidTable *table() const { return m_rapid_table; } + + // the parent table of partitions. + inline RapidTable *table_source() const { return m_source_rpd_table; } + + // to reset to a new rpd table source. + inline void active_table(RapidTable *rpd_table) { + m_rowid.store(0); + m_rapid_table = rpd_table; + } + + inline TABLE *source() const { return m_data_source; } // open a cursor on db_table to read/write. int open(); @@ -144,13 +158,13 @@ class DataTable : public MemoryObject { TABLE *m_data_source{nullptr}; // rapid table. - RapidTable *m_rapid_table; + RapidTable *m_rapid_table{nullptr}, *m_source_rpd_table{nullptr}; // start from where. std::atomic m_rowid{0}; // context - std::unique_ptr m_context{nullptr}; + std::unique_ptr m_context{nullptr}; // index iterator. std::unique_ptr m_index_iter; diff --git a/storage/rapid_engine/imcs/imcs.cpp b/storage/rapid_engine/imcs/imcs.cpp index d41d3bb78..01c8f5c59 100644 --- a/storage/rapid_engine/imcs/imcs.cpp +++ b/storage/rapid_engine/imcs/imcs.cpp @@ -30,14 +30,20 @@ #include #include -#include +#include #include #include +#include +#include #include "include/decimal.h" -#include "include/my_dbug.h" //DBUG_EXECUTE_IF -#include "include/row0pread-adapter.h" //Parallel Reader +#include "include/my_dbug.h" //DBUG_EXECUTE_IF +#include "include/row0pread-adapter.h" //Parallel Reader +#include "sql/dd_table_share.h" +#include "sql/histograms/table_histograms.h" // decrement_reference_counter #include "sql/partitioning/partition_handler.h" //partition handler +#include "sql/sql_base.h" +#include "sql/transaction.h" // trans_rollback_stmt, trans_commit_stmt #include "storage/innobase/handler/ha_innopart.h" #include "storage/innobase/include/data0type.h" @@ -52,6 +58,7 @@ namespace ShannonBase { extern ulonglong rpd_para_load_threshold; +SHANNON_THREAD_LOCAL std::string Rapid_load_context::extra_info_t::m_active_part_key; namespace Imcs { Imcs *Imcs::m_instance{nullptr}; std::unique_ptr Imcs::m_imcs_pool{nullptr}; @@ -59,6 +66,100 @@ std::once_flag Imcs::one; SHANNON_THREAD_LOCAL Imcs *current_imcs_instance = Imcs::instance(); +bool PartitionLoadThreadContext::initialize(const Rapid_load_context *context) { + // Create THD + m_thd = new THD; + if (!m_thd) return true; + + m_thd->set_new_thread_id(); + m_thd->thread_stack = (char *)this; + m_thd->set_command(COM_DAEMON); + m_thd->security_context()->skip_grants(); + m_thd->system_thread = NON_SYSTEM_THREAD; + m_thd->store_globals(); + m_thd->lex->sql_command = SQLCOM_SELECT; + + // Open table from source table share. + TABLE_SHARE *share = context->m_table->s; + m_table = (TABLE *)m_thd->mem_root->Alloc(sizeof(TABLE)); + if (!m_table) return true; + // get a copy of source TABLE object with its table share. TABLE will be used for feteching data from part tables. + // we will clone a new handler for using multi-cursor. The invoker[mysql_secodary_load_unload] hold the refcnt + // of shhare, here, we dont need to warry about its be released. + if (open_table_from_share(m_thd, share, share->path.str, 0, SKIP_NEW_HANDLER, 0, m_table, false, nullptr)) { + return true; + } + + m_table->in_use = m_thd; + m_table->alias_name_used = context->m_table->alias_name_used; + m_table->read_set = context->m_table->read_set; + m_table->write_set = context->m_table->write_set; + + return false; +} + +int PartitionLoadThreadContext::end_transactions() { + auto ret{ShannonBase::SHANNON_SUCCESS}; + if (m_transactions_ended || !m_thd) return ret; + ret = (m_error.load()) ? (trans_rollback_stmt(m_thd) || trans_rollback(m_thd)) + : (trans_commit_stmt(m_thd) || trans_commit(m_thd)); + m_transactions_ended = true; + return ret; +} + +void PartitionLoadThreadContext::cleanup() { + // Ensure transactions are ended first (idempotent) + end_transactions(); + + if (m_handler) { + m_handler->ha_close(); + m_handler = nullptr; + } + + if (m_table) { + closefrm(m_table, false); // should be freed by mysql_secondary_load_or_unload. in `closefrm`, it dont decrease + // refcnt of m_histograms. + + /** + * in `open_table_from_share` `m_histograms` ref_cnt is increased, therefore, here, we should decrease its refcnt by + * onw. + */ + if (m_table->histograms) { + mysql_mutex_lock(&LOCK_open); + m_table->s->m_histograms->release(m_table->histograms); + mysql_mutex_unlock(&LOCK_open); + } + + m_table = nullptr; + } + if (m_thd) { + m_thd->restore_globals(); + delete m_thd; + m_thd = nullptr; + } +} + +bool PartitionLoadThreadContext::clone_handler(ha_innopart *file, const Rapid_load_context *context, + std::mutex &clone_mutex) { + std::lock_guard lock(clone_mutex); + THD *original_thd = context->m_table->in_use; + context->m_table->in_use = m_thd; + m_handler = static_cast(file->clone(context->m_table->s->normalized_path.str, m_thd->mem_root)); + context->m_table->in_use = original_thd; + + if (!m_handler) return true; + + m_handler->change_table_ptr(m_table, m_table->s); + m_table->file = m_handler; + + // Note: ha_open() is not needed because: + // 1. ha_innopart::clone() inherits the open state from the source handler + // 2. change_table_ptr() updates internal pointers while preserving the open state + // 3. Partition-level operations (rnd_init_in_part/rnd_next_in_part) work directly + + return false; +} + int Imcs::initialize() { if (!m_inited.load()) { m_inited.store(1); @@ -85,10 +186,7 @@ int Imcs::create_table_memo(const Rapid_load_context *context, const TABLE *sour ut_a(source); auto ret{ShannonBase::SHANNON_SUCCESS}; std::unique_ptr table{nullptr}; - if (context->m_extra_info.m_partition_infos.size()) - table = std::make_unique(source->s->db.str, source->s->table_name.str); - else - table = std::make_unique
(source->s->db.str, source->s->table_name.str); + table = std::make_unique
(source->s->db.str, source->s->table_name.str); // step 1: build the Cus meta info for every column. if ((ret = table.get()->create_fields_memo(context))) return ret; @@ -101,10 +199,7 @@ int Imcs::create_table_memo(const Rapid_load_context *context, const TABLE *sour // Adding the Table meta obj into m_tables/loaded tables meta information. std::string keypart; keypart.append(source->s->db.str).append(":").append(source->s->table_name.str); - if (context->m_extra_info.m_partition_infos.size()) - m_parttables.emplace(keypart, std::move(table)); - else - m_tables.emplace(keypart, std::move(table)); + m_tables.emplace(keypart, std::move(table)); return ShannonBase::SHANNON_SUCCESS; /* in secondary load phase, the table not loaded into imcs. therefore, it can be seen @@ -119,6 +214,29 @@ int Imcs::create_table_memo(const Rapid_load_context *context, const TABLE *sour */ } +int Imcs::create_parttable_memo(const Rapid_load_context *context, const TABLE *source) { + ut_a(source); + + std::string parttb_key; + parttb_key.append(source->s->db.str).append(":").append(source->s->table_name.str); + std::unique_ptr table = + std::make_unique(source->s->db.str, source->s->table_name.str, parttb_key); + + if (table->build_partitions(context)) { + std::string errmsg; + errmsg.append("try to build ") + .append(context->m_schema_name.c_str()) + .append(".") + .append(context->m_table_name.c_str()) + .append(" partitions failed"); + my_error(ER_SECONDARY_ENGINE, MYF(0), errmsg.c_str()); + return HA_ERR_GENERIC; + } + + m_parttables.emplace(parttb_key, std::move(table)); + return ShannonBase::SHANNON_SUCCESS; +} + int Imcs::build_indexes_from_keys(const Rapid_load_context *context, std::map &keys, row_id_t rowid) { std::string sch_tb(context->m_schema_name); @@ -376,34 +494,38 @@ int Imcs::load_innodb_parallel(const Rapid_load_context *context, ha_innobase *f int Imcs::load_innodbpart(const Rapid_load_context *context, ha_innopart *file) { std::string sch_name(context->m_schema_name.c_str()), table_name(context->m_table_name.c_str()), key; key.append(sch_name).append(":").append(table_name); - ut_a(m_parttables.find(key) != m_parttables.end()); + if (m_parttables.find(key) == m_parttables.end()) return ShannonBase::SHANNON_SUCCESS; + auto part_tb_ptr = down_cast(m_parttables[key].get()); + assert(part_tb_ptr); context->m_thd->set_sent_row_count(0); - for (auto &part : context->m_extra_info.m_partition_infos) { - auto partkey(part.first); - partkey.append("#").append(std::to_string(part.second)); - const_cast(context)->m_extra_info.m_active_part_key = partkey; + for (auto &[part_name, part_id] : context->m_extra_info.m_partition_infos) { + auto partkey{part_name}; + partkey.append("#").append(std::to_string(part_id)); + auto partition_ptr = part_tb_ptr->get_partition(partkey); + + Rapid_load_context::extra_info_t::m_active_part_key = partkey; // should be RC isolation level. set_tx_isolation(m_thd, ISO_READ_COMMITTED, true); - if (file->inited == handler::NONE && file->rnd_init_in_part(part.second, true)) { + if (file->inited == handler::NONE && file->rnd_init_in_part(part_id, true)) { my_error(ER_NO_SUCH_TABLE, MYF(0), sch_name, table_name); return HA_ERR_GENERIC; } int tmp{HA_ERR_GENERIC}; - while ((tmp = file->rnd_next_in_part(part.second, context->m_table->record[0])) != HA_ERR_END_OF_FILE) { + while ((tmp = file->rnd_next_in_part(part_id, context->m_table->record[0])) != HA_ERR_END_OF_FILE) { /*** ha_rnd_next can return RECORD_DELETED for MyISAM when one thread is reading and another deleting without locks. Now, do full scan, but multi-thread scan will impl in future. */ if (tmp == HA_ERR_KEY_NOT_FOUND) break; DBUG_EXECUTE_IF("secondary_engine_rapid_load_table_error", { my_error(ER_SECONDARY_ENGINE, MYF(0), sch_name, table_name); - file->rnd_end_in_part(part.second, true); + file->rnd_end_in_part(part_id, true); return HA_ERR_GENERIC; }); // ref to `row_sel_store_row_id_to_prebuilt` in row0sel.cc - if (m_parttables[key].get()->write(context, context->m_table->record[0])) { - file->rnd_end_in_part(part.second, true); + if (partition_ptr->write(context, context->m_table->record[0])) { + file->rnd_end_in_part(part_id, true); std::string errmsg; errmsg.append("load data from ").append(sch_name).append(".").append(table_name).append(" to imcs failed"); my_error(ER_SECONDARY_ENGINE, MYF(0), errmsg.c_str()); @@ -412,10 +534,175 @@ int Imcs::load_innodbpart(const Rapid_load_context *context, ha_innopart *file) context->m_thd->inc_sent_row_count(1); if (tmp == HA_ERR_RECORD_DELETED && !context->m_thd->killed) continue; } + // end of load the data from innodb to imcs. - file->rnd_end_in_part(part.second, true); + file->rnd_end_in_part(part_id, true); + } + + return ShannonBase::SHANNON_SUCCESS; +} + +int Imcs::load_innodbpart_parallel(const Rapid_load_context *context, ha_innopart *file) { + std::string sch_name(context->m_schema_name.c_str()), table_name(context->m_table_name.c_str()), key; + key.append(sch_name).append(":").append(table_name); + if (m_parttables.find(key) == m_parttables.end()) return ShannonBase::SHANNON_SUCCESS; + auto part_tb_ptr = down_cast(m_parttables[key].get()); + assert(part_tb_ptr); + + context->m_thd->set_sent_row_count(0); + + unsigned int num_threads = std::thread::hardware_concurrency() * 0.8; + if (num_threads == 0) num_threads = SHANNON_PARTS_PARALLEL; + + std::vector tasks; + tasks.reserve(context->m_extra_info.m_partition_infos.size()); + + for (auto &[part_name, part_id] : context->m_extra_info.m_partition_infos) { + partition_load_task_t task; + task.part_id = part_id; + task.part_key = part_name + "#" + std::to_string(part_id); + task.result = ShannonBase::SHANNON_SUCCESS; + task.rows_loaded = 0; + tasks.push_back(std::move(task)); + } + + num_threads = std::min(num_threads, static_cast(tasks.size())); + + std::mutex error_mutex, clone_mutex; + std::atomic total_rows{0}; + std::atomic has_error{false}; + + std::vector col_offsets(context->m_table->s->fields); + std::vector null_byte_offsets(context->m_table->s->fields); + std::vector null_bitmasks(context->m_table->s->fields); + + for (uint idx = 0; idx < context->m_table->s->fields; idx++) { + auto fld = *(context->m_table->field + idx); + col_offsets[idx] = fld->offset(context->m_table->record[0]); + null_byte_offsets[idx] = fld->null_offset(); + null_bitmasks[idx] = fld->null_bit; } + auto load_one_partition = [&](partition_load_task_t &task, + ha_innopart *task_handler) -> int { // Lambda: load a partition. + int result{ShannonBase::SHANNON_SUCCESS}; + task.rows_loaded = 0; + + if (task_handler == nullptr) { + std::lock_guard lock(error_mutex); + task.error_msg = "Handler clone is null for partition " + std::to_string(task.part_id); + task.result = HA_ERR_GENERIC; + return HA_ERR_GENERIC; + } + + Rapid_load_context::extra_info_t::m_active_part_key = task.part_key; + + bool part_initialized = false; + struct PartitionGuard { // Scope guard using local struct - will be called at scope exit + bool &initialized; + ha_innopart *handler; + uint part_id; + + ~PartitionGuard() { + if (initialized && handler) { + handler->rnd_end_in_part(part_id, true); + } + } + } part_guard{part_initialized, task_handler, task.part_id}; + + if (task_handler->inited == handler::NONE && task_handler->rnd_init_in_part(task.part_id, true)) { + std::lock_guard lock(error_mutex); + task.error_msg = "Failed to initialize partition " + std::to_string(task.part_id); + task.result = HA_ERR_GENERIC; + return HA_ERR_GENERIC; + } + part_initialized = true; + + int tmp{HA_ERR_GENERIC}; + std::unique_ptr rec_buff = std::make_unique(context->m_table->s->rec_buff_length); + memset(rec_buff.get(), 0, context->m_table->s->rec_buff_length); + while ((tmp = task_handler->rnd_next_in_part(task.part_id, rec_buff.get())) != HA_ERR_END_OF_FILE) { + if (tmp == HA_ERR_KEY_NOT_FOUND) break; + + DBUG_EXECUTE_IF("secondary_engine_rapid_part_table_load_error", { + std::lock_guard lock(error_mutex); + task.error_msg = "Secondary engine part table loaded error"; + task.result = HA_ERR_GENERIC; + return HA_ERR_GENERIC; + }); + + auto partition_ptr = part_tb_ptr->get_partition(task.part_key); + if (!partition_ptr) { + std::lock_guard lock(error_mutex); + task.error_msg = "partition not found: " + task.part_key; + task.result = HA_ERR_GENERIC; + return HA_ERR_GENERIC; + } + // parttable is shared_ptr/unique_ptr to PartTable + if (partition_ptr->write(context, rec_buff.get(), context->m_table->s->reclength, col_offsets.data(), + context->m_table->s->fields, null_byte_offsets.data(), null_bitmasks.data())) { + std::lock_guard lock(error_mutex); + task.error_msg = "load data from " + sch_name + "." + table_name + " to imcs failed"; + task.result = HA_ERR_GENERIC; + return HA_ERR_GENERIC; + } + + memset(rec_buff.get(), 0, context->m_table->s->rec_buff_length); + task.rows_loaded++; + if (tmp == HA_ERR_RECORD_DELETED && !context->m_thd->killed) continue; + } + + task.result = ShannonBase::SHANNON_SUCCESS; + return result; + }; + + std::atomic_size_t task_idx{0}; + std::vector workers_pool; // thread pool. + auto worker_func = [&]() { + PartitionLoadThreadContext ctx; + std::unique_ptr handler_lock{nullptr}; + if (ctx.initialize(context) || ctx.clone_handler(file, context, clone_mutex)) { + has_error.store(true); + ctx.set_error(); + return; + } + handler_lock = std::make_unique(ctx.handler(), ctx.thd(), F_RDLCK); + + while (true) { + size_t current_task = task_idx.fetch_add(1); + if (current_task >= tasks.size() || has_error.load()) break; + + auto result = load_one_partition(tasks[current_task], ctx.handler()); + if (result != ShannonBase::SHANNON_SUCCESS) { + has_error.store(true); + ctx.set_error(); + break; + } + total_rows.fetch_add(tasks[current_task].rows_loaded); + } + + handler_lock.reset(); // Release handler lock first + ctx.end_transactions(); // Then end transactions + }; + + for (unsigned int i = 0; i < num_threads; ++i) { // to start the worker threads. + workers_pool.emplace_back(worker_func); + } + + for (auto &worker : workers_pool) { + if (worker.joinable()) worker.join(); + } + + if (has_error.load()) { + for (const auto &task : tasks) { + if (task.result == ShannonBase::SHANNON_SUCCESS) continue; + task.error_msg.size() ? my_error(ER_SECONDARY_ENGINE, MYF(0), task.error_msg.c_str()) + : my_error(ER_NO_SUCH_TABLE, MYF(0), sch_name.c_str(), table_name.c_str()); + return HA_ERR_GENERIC; + } + } + + context->m_thd->set_sent_row_count(total_rows.load()); return ShannonBase::SHANNON_SUCCESS; } @@ -443,7 +730,7 @@ int Imcs::load_table(const Rapid_load_context *context, const TABLE *source) { } int Imcs::load_parttable(const Rapid_load_context *context, const TABLE *source) { - if (create_table_memo(context, source)) { + if (create_parttable_memo(context, source)) { std::string sch(source->s->db.str), table(source->s->table_name.str), errmsg; cleanup(sch, table); errmsg.append("create table memo for ") @@ -456,10 +743,20 @@ int Imcs::load_parttable(const Rapid_load_context *context, const TABLE *source) } auto ret{ShannonBase::SHANNON_SUCCESS}; - if ((ret = load_innodbpart(context, dynamic_cast(source->file)))) { - // if load partition table failed, then do normal load mode, therefore clear partition info. - const_cast(context)->m_extra_info.m_partition_infos.clear(); - ret = load_innodb(context, dynamic_cast(source->file)); + auto parall_scan = (context->m_extra_info.m_partition_infos.size() > SHANNON_PARTS_PARALLEL) ? true : false; + ret = parall_scan ? load_innodbpart_parallel(context, dynamic_cast(source->file)) + : load_innodbpart(context, dynamic_cast(source->file)); + + if (ret) { + std::string sch(source->s->db.str), table(source->s->table_name.str), errmsg; + cleanup(sch, table); + errmsg.append("load data from") + .append(context->m_schema_name) + .append(".") + .append(context->m_table_name) + .append(" failed."); + my_error(ER_SECONDARY_ENGINE, MYF(0), errmsg.c_str()); + return HA_ERR_GENERIC; } return ret; } diff --git a/storage/rapid_engine/imcs/imcs.h b/storage/rapid_engine/imcs/imcs.h index 89ad1de1d..8c3953f3c 100644 --- a/storage/rapid_engine/imcs/imcs.h +++ b/storage/rapid_engine/imcs/imcs.h @@ -79,6 +79,9 @@ class Imcs : public MemoryObject { /**create all cus needed by source table, and ready to write the data into.*/ int create_table_memo(const Rapid_load_context *context, const TABLE *source); + /**create all cus needed by source table, and ready to write the data into.*/ + int create_parttable_memo(const Rapid_load_context *context, const TABLE *source); + /** load the current table rows data into imcs. the caller's responsible for moving to next row */ int load_table(const Rapid_load_context *context, const TABLE *source); @@ -159,6 +162,7 @@ class Imcs : public MemoryObject { int load_innodb(const Rapid_load_context *context, ha_innobase *file); int load_innodb_parallel(const Rapid_load_context *context, ha_innobase *file); int load_innodbpart(const Rapid_load_context *context, ha_innopart *file); + int load_innodbpart_parallel(const Rapid_load_context *context, ha_innopart *file); int unload_innodb(const Rapid_load_context *context, const char *db_name, const char *table_name, bool error_if_not_loaded); @@ -167,6 +171,7 @@ class Imcs : public MemoryObject { bool error_if_not_loaded); private: + // Thread context for parallel scanning operations typedef struct { // if you dont use this, remove the boost_thread and boost_system libs in cmake file. // thread id @@ -185,6 +190,16 @@ class Imcs : public MemoryObject { std::vector null_bitmasks; } parall_scan_cookie_t; + // a partition loading task with results + typedef struct PartitionLoadTask { + uint part_id; + std::string part_key; + int result; + uint64_t rows_loaded; + std::string error_msg; + PartitionLoadTask() : part_id(0), result(0), rows_loaded(0) {} + } partition_load_task_t; + // imcs instance static Imcs *m_instance; @@ -208,6 +223,197 @@ class Imcs : public MemoryObject { const char *m_magic = "SHANNON_MAGIC_IMCS"; }; + +/** + * Multi-threaded Partition Loading Design + * ======================================== + * + * Overview: + * This implementation enables parallel loading of partitioned InnoDB tables by creating + * independent thread contexts that share metadata while maintaining separate cursors. + * + * Architecture: + * + * Main Thread (Secondary Load Context) + * ┌─────────────────────────────────────────────────────────────┐ + * │ context->m_table │ + * │ ├─ s: TABLE_SHARE (metadata) ← Shared by all threads │ + * │ ├─ file: ha_innopart (opened) ← Source for cloning │ + * │ └─ read_set/write_set ← Copied to worker threads │ + * └─────────────────────────────────────────────────────────────┘ + * │ + * │ Spawn worker threads + * ↓ + * ┌─────────────────────────────────────────────────────────────┐ + * │ Worker Thread 1 Worker Thread 2 ... │ + * ├─────────────────────────────────────────────────────────────┤ + * │ PartitionLoadThreadContext PartitionLoadThreadContext │ + * │ ├─ m_thd (independent) ├─ m_thd (independent) │ + * │ ├─ m_table (independent) ├─ m_table (independent) │ + * │ │ ├─ s → TABLE_SHARE │ ├─ s → TABLE_SHARE │ + * │ │ └─ file: m_handler │ └─ file: m_handler │ + * │ └─ m_handler (cloned) └─ m_handler (cloned) │ + * │ └─ Partitions 0,3,6... └─ Partitions 1,4,7... │ + * └─────────────────────────────────────────────────────────────┘ + * │ + * │ Parallel partition scanning + * ↓ + * ┌─────────────────────────────────────────────────────────────┐ + * │ InnoDB Storage Engine │ + * │ Partition 0 Partition 1 Partition 2 Partition 3 ... │ + * └─────────────────────────────────────────────────────────────┘ + * + * Key Design Principles: + * + * 1. Shared Metadata (TABLE_SHARE): + * - All worker threads share the same TABLE_SHARE from the main thread + * - This saves memory and ensures consistent schema information + * - Shared via open_table_from_share() with SKIP_NEW_HANDLER flag + * + * 2. Independent Thread Context (THD): + * - Each worker creates its own THD for thread-local storage + * - Enables independent transaction contexts and memory management + * - Each THD has its own mem_root for automatic memory cleanup + * + * 3. Independent TABLE Objects: + * - Each worker has its own TABLE struct for cursor state + * - TABLE objects share the TABLE_SHARE but have independent: + * * read_set/write_set bitmaps + * * handler pointers + * * buffer spaces + * + * 4. Cloned Handlers (ha_innopart): + * - Each worker clones the main handler using handler::clone() + * - Cloned handlers inherit the opened state from the source handler + * - No explicit ha_open() needed because: + * a) ha_innopart::clone() inherits partition handlers' open state + * b) change_table_ptr() updates internal pointers to new TABLE + * c) Partition-level operations (rnd_*_in_part) work directly + * - Each cloned handler maintains independent cursors for concurrent reads + * + * 5. Resource Management: + * - Handler and TABLE memory allocated on THD's mem_root + * - closefrm() automatically closes handlers and frees TABLE resources + * - Deleting THD releases all mem_root allocations (handlers, TABLE, etc.) + * - No manual delete/free needed for mem_root-allocated objects + * + * Thread Safety: + * - clone_mutex: Protects handler cloning (some internal states may not be thread-safe) + * - Each thread reads from different partitions to avoid contention + * - Shared TABLE_SHARE is read-only after initialization + * + * Workflow: + * 1. Main thread initializes context with opened table + * 2. Each worker thread: + * a) Creates independent THD + * b) Creates TABLE sharing the main TABLE_SHARE (SKIP_NEW_HANDLER) + * c) Clones handler from main thread's handler + * d) Associates cloned handler with new TABLE via change_table_ptr() + * 3. Workers process assigned partitions in parallel + * 4. Cleanup: closefrm() + delete THD handles all resource release + */ + +// Manages THD lifecycle, table opening, and handler cloning. +// Ensures THD globals are restored properly on destruction, preventing +// thread-local state corruption in multi-threaded parallel loading +class PartitionLoadThreadContext { + public: + PartitionLoadThreadContext() : m_thd(nullptr), m_handler(nullptr), m_table(nullptr) { my_thread_init(); } + + ~PartitionLoadThreadContext() { + cleanup(); + my_thread_end(); + } + + /** + * Initializes the thread context for parallel partition loading. + * + * This method creates an isolated execution environment for a worker thread by: + * 1. Creating a new THD (Thread Descriptor) with daemon privileges + * 2. Setting up thread-specific global variables + * 3. Creating a private TABLE instance from the shared table definition + * 4. Configuring the table for read operations with proper column sets + * + * Each worker thread needs its own THD and TABLE instances to avoid conflicts + * when accessing MySQL internals and to maintain proper isolation during + * parallel data loading operations. + * + * @param context The rapid load context containing table and schema information + * @return true if initialization failed, false if successful + */ + bool initialize(const Rapid_load_context *context); + + /** + * Clones the partition handler for exclusive use by this worker thread. + * + * This method creates a private copy of the partition handler (ha_innopart) + * to enable concurrent data scanning from multiple partitions without + * resource contention. Handler cloning is thread-sensitive and must be + * protected by a mutex since the original handler's state may be modified + * during the cloning process. + * + * Key benefits: + * - Each thread gets independent handler instance avoiding lock contention + * - Isolated cursor positions and internal states for parallel scanning + * - Thread-safe access to underlying storage engine structures + * + * @param file The source partition handler to clone from + * @param context The rapid load context with table information + * @param clone_mutex Mutex to protect the cloning operation (thread-sensitive) + * @return true if cloning failed, false if successful + */ + bool clone_handler(ha_innopart *file, const Rapid_load_context *context, std::mutex &clone_mutex); + + inline THD *thd() { return m_thd; } + inline ha_innopart *handler() { return m_handler; } + inline TABLE *table() { return m_table; } + + // Prevent copying + PartitionLoadThreadContext(const PartitionLoadThreadContext &) = delete; + PartitionLoadThreadContext &operator=(const PartitionLoadThreadContext &) = delete; + + inline void set_error() { m_error.store(true); } + + int end_transactions(); + void cleanup(); + + private: + std::atomic m_error{false}; + bool m_transactions_ended{false}; + THD *m_thd; + ha_innopart *m_handler; + TABLE *m_table; +}; + +// Manages InnoDB transaction state by acquiring/releasing external locks. +// 1. ha_external_lock(F_RDLCK) starts the transaction before reading data +// 2. ha_external_lock(F_UNLCK) releases the lock on destruction +class PartitionLoadHandlerLock { + public: + PartitionLoadHandlerLock(handler *h, THD *thd, int lock_type) : m_handler(h), m_thd(thd), m_locked(false) { + if (m_handler && m_handler->ha_external_lock(m_thd, lock_type) == 0) { + m_locked = true; + } + } + + virtual ~PartitionLoadHandlerLock() { + if (m_locked && m_handler) { + m_handler->ha_external_lock(m_thd, F_UNLCK); + } + } + + inline bool is_locked() const { return m_locked; } + + // Prevent copying + PartitionLoadHandlerLock(const PartitionLoadHandlerLock &) = delete; + PartitionLoadHandlerLock &operator=(const PartitionLoadHandlerLock &) = delete; + + private: + handler *m_handler; + THD *m_thd; + bool m_locked; +}; + } // namespace Imcs } // namespace ShannonBase #endif //__SHANNONBASE_IMCS_H__ \ No newline at end of file diff --git a/storage/rapid_engine/imcs/table.cpp b/storage/rapid_engine/imcs/table.cpp index 53514306d..f4c775636 100644 --- a/storage/rapid_engine/imcs/table.cpp +++ b/storage/rapid_engine/imcs/table.cpp @@ -173,6 +173,8 @@ int Table::create_fields_memo(const Rapid_load_context *context) { size_t chunk_size = SHANNON_ROWS_IN_CHUNK * Utils::Util::normalized_length(field); if (likely(ShannonBase::rapid_allocated_mem_size + chunk_size > ShannonBase::rpd_mem_sz_max)) { my_error(ER_SECONDARY_ENGINE_PLUGIN, MYF(0), "Rapid allocated memory exceeds over the maximum"); + + m_fields.clear(); return HA_ERR_GENERIC; } @@ -341,9 +343,31 @@ int Table::write(const Rapid_load_context *context, uchar *data) { case MYSQL_TYPE_TINY_BLOB: case MYSQL_TYPE_MEDIUM_BLOB: case MYSQL_TYPE_LONG_BLOB: { + data_ptr = fld->field_ptr(); // TODO: BLOB data maybe not in the page. stores off the page. - data_ptr = const_cast(fld->data_ptr()); - data_len = down_cast(fld)->get_length(); + auto bfld = down_cast(fld); + uint pack_len = bfld->pack_length_no_ptr(); + switch (pack_len) { + case 1: + data_len = *data_ptr; + break; + case 2: + data_len = uint2korr(data_ptr); + break; + case 3: + data_len = uint3korr(data_ptr); + break; + case 4: + data_len = uint4korr(data_ptr); + break; + } + // Advance past length prefix + data_ptr += pack_len; + + // For BLOBs, the data_ptr now points to a pointer to the actual blob data + uchar *blob_ptr = nullptr; + memcpy(&blob_ptr, data_ptr, sizeof(uchar *)); + data_ptr = blob_ptr; } break; case MYSQL_TYPE_VARCHAR: case MYSQL_TYPE_VAR_STRING: { @@ -406,7 +430,8 @@ int Table::write(const Rapid_load_context *context, uchar *rowdata, size_t len, case MYSQL_TYPE_LONG_BLOB: { // TODO: BLOB data maybe not in the page. stores off the page. auto bfld = down_cast(fld); - switch (bfld->pack_length_no_ptr()) { + uint pack_len = bfld->pack_length_no_ptr(); + switch (pack_len) { case 1: data_len = *data_ptr; break; @@ -420,6 +445,13 @@ int Table::write(const Rapid_load_context *context, uchar *rowdata, size_t len, data_len = uint4korr(data_ptr); break; } + // Advance past length prefix + data_ptr += pack_len; + + // For BLOBs, the data_ptr now points to a pointer to the actual blob data + uchar *blob_ptr = nullptr; + memcpy(&blob_ptr, data_ptr, sizeof(uchar *)); + data_ptr = blob_ptr; } break; case MYSQL_TYPE_VARCHAR: case MYSQL_TYPE_VAR_STRING: { @@ -566,414 +598,35 @@ int Table::delete_rows(const Rapid_load_context *context, const std::vectorm_table); +int PartTable::build_partitions(const Rapid_load_context *context) { + auto ret{ShannonBase::SHANNON_SUCCESS}; auto source = context->m_table; - for (auto index = 0u; index < source->s->fields; index++) { - auto field = *(source->field + index); - for (auto &part : context->m_extra_info.m_partition_infos) { - size_t chunk_size = SHANNON_ROWS_IN_CHUNK * Utils::Util::normalized_length(field); - if (likely(ShannonBase::rapid_allocated_mem_size + chunk_size > ShannonBase::rpd_mem_sz_max)) { - my_error(ER_SECONDARY_ENGINE_PLUGIN, MYF(0), "Rapid allocated memory exceeds over the maximum"); - return HA_ERR_GENERIC; - } - auto part_name = part.first; - part_name.append("#").append(std::to_string(part.second)).append(":").append(field->field_name); - m_fields.emplace(part_name, std::make_unique(this, field, part_name)); - } - } - - return ShannonBase::SHANNON_SUCCESS; -} - -Cu *PartTable::get_field(std::string field_name) { - std::shared_lock lk(m_fields_mutex); - if (m_fields.find(field_name) == m_fields.end()) return nullptr; - return m_fields[field_name].get(); -} - -int PartTable::create_index_memo(const Rapid_load_context *context) { - auto source = context->m_table; - ut_a(source); - // no.1: primary key. using row_id as the primary key when missing user-defined pk. - if (source->s->is_missing_primary_key()) build_hidden_index_memo(context); - - // no.2: user-defined indexes. - build_user_defined_index_memo(context); - return ShannonBase::SHANNON_SUCCESS; -} - -int PartTable::build_hidden_index_memo(const Rapid_load_context *context) { - for (auto &part : context->m_extra_info.m_partition_infos) { - auto partkey = part.first; - partkey.append("#").append(std::to_string(part.second)).append(":"); - partkey.append(ShannonBase::SHANNON_PRIMARY_KEY_NAME); + // start to add partitions. + for (auto &[part_name, part_id] : context->m_extra_info.m_partition_infos) { + auto part_key = part_name; + part_key.append("#").append(std::to_string(part_id)); + auto table = std::make_unique
(source->s->db.str, source->s->table_name.str, part_key); - m_source_keys.emplace(partkey, - std::make_pair(SHANNON_DATA_DB_ROW_ID_LEN, std::vector{SHANNON_DB_ROW_ID})); - m_indexes.emplace(partkey, std::make_unique>(SHANNON_DB_ROW_ID)); - m_index_mutexes.emplace(ShannonBase::SHANNON_PRIMARY_KEY_NAME, std::make_unique()); - } - - return ShannonBase::SHANNON_SUCCESS; -} - -int PartTable::build_user_defined_index_memo(const Rapid_load_context *context) { - auto source = context->m_table; - for (auto &part : context->m_extra_info.m_partition_infos) { - auto keypart = part.first; - keypart.append("#").append(std::to_string(part.second)).append(":"); - - for (auto ind = 0u; ind < source->s->keys; ind++) { - auto key_info = source->key_info + ind; - auto keyname(keypart); - keyname.append(key_info->name); - - std::vector key_parts_names; - for (uint i = 0u; i < key_info->user_defined_key_parts /**actual_key_parts*/; i++) { - key_parts_names.push_back(key_info->key_part[i].field->field_name); - } - m_source_keys.emplace(keyname, std::make_pair(key_info->key_length, key_parts_names)); - m_indexes.emplace(keyname, std::make_unique>(keyname)); - m_index_mutexes.emplace(keyname, std::make_unique()); + // step 1: build the Cus meta info for every column. + if ((ret = table.get()->create_fields_memo(context))) { + my_error(ER_SECONDARY_ENGINE_PLUGIN, MYF(0), "Build fields memo for partition failed"); + return ret; } - } - - return ShannonBase::SHANNON_SUCCESS; -} - -int PartTable::build_index_impl(const Rapid_load_context *context, const KEY *key, row_id_t rowid) { - // this is come from ha_innodb.cc postion(), when postion() changed, the part should be changed respondingly. - // why we dont not change the impl of postion() directly? because the postion() is impled in innodb engine. - // we want to decouple with innodb engine. - auto source = context->m_table; - auto active_partkey = context->m_extra_info.m_active_part_key; - - if (key == nullptr) { - /* No primary key was defined for the table and we generated the clustered index - from row id: the row reference will be the row id, not any key value that MySQL - knows of */ - ut_a(source->file->ref_length == ShannonBase::SHANNON_DATA_DB_ROW_ID_LEN); - - ut_a(const_cast(context)->m_extra_info.m_key_len == source->file->ref_length); - const_cast(context)->m_extra_info.m_key_len = source->file->ref_length; - const_cast(context)->m_extra_info.m_key_buff = - std::make_unique(source->file->ref_length); - memset(context->m_extra_info.m_key_buff.get(), 0x0, source->file->ref_length); - memcpy(context->m_extra_info.m_key_buff.get(), source->file->ref, source->file->ref_length); - } else { - /* Copy primary key as the row reference */ - auto from_record = source->record[0]; - const_cast(context)->m_extra_info.m_key_len = key->key_length; - const_cast(context)->m_extra_info.m_key_buff = std::make_unique(key->key_length); - memset(context->m_extra_info.m_key_buff.get(), 0x0, key->key_length); - auto to_key = context->m_extra_info.m_key_buff.get(); - encode_row_key(to_key, from_record, key, key->key_length); - } - - auto keyname = key ? key->name : ShannonBase::SHANNON_PRIMARY_KEY_NAME; - auto active_key = active_partkey.append(":").append(keyname); - { - std::lock_guard lock(*m_index_mutexes[active_key].get()); - m_indexes[active_key].get()->insert(context->m_extra_info.m_key_buff.get(), context->m_extra_info.m_key_len, &rowid, - sizeof(rowid)); - } - const_cast(context)->m_extra_info.m_key_len = 0; - const_cast(context)->m_extra_info.m_key_buff.reset(nullptr); - return SHANNON_SUCCESS; -} - -// using for parallel load. -int PartTable::build_index_impl(const Rapid_load_context *context, const KEY *key, row_id_t rowid, uchar *rowdata, - ulong *col_offsets, ulong *null_byte_offsets, ulong *null_bitmasks) { - // this is come from ha_innodb.cc postion(), when postion() changed, the part should be changed respondingly. - // why we dont not change the impl of postion() directly? because the postion() is impled in innodb engine. - // we want to decouple with innodb engine. - auto source = context->m_table; - auto active_partkey = context->m_extra_info.m_active_part_key; - - std::unique_ptr key_buff{nullptr}; - auto key_len{0u}; - if (key == nullptr) { - /* No primary key was defined for the table and we generated the clustered index - from row id: the row reference will be the row id, not any key value that MySQL - knows of */ - ut_a(source->file->ref_length == ShannonBase::SHANNON_DATA_DB_ROW_ID_LEN); - ut_a(const_cast(context)->m_extra_info.m_key_len == source->file->ref_length); - key_len = source->file->ref_length; - key_buff = std::make_unique(key_len); - memset(key_buff.get(), 0x0, key_len); - memcpy(key_buff.get(), source->file->ref, key_len); - } else { - /* Copy primary key as the row reference */ - key_len = key->key_length; - key_buff = std::make_unique(key_len); - memset(key_buff.get(), 0x0, key_len); - auto to_key = key_buff.get(); - encode_key_from_row(rowdata, col_offsets, null_byte_offsets, null_bitmasks, key, to_key, m_key_buff_mutex); - } - - auto keyname = key ? key->name : ShannonBase::SHANNON_PRIMARY_KEY_NAME; - auto active_key = active_partkey.append(":").append(keyname); - { - std::lock_guard lock(*m_index_mutexes[active_key].get()); - m_indexes[active_key].get()->insert(key_buff.get(), key_len, &rowid, sizeof(rowid)); - } - - return SHANNON_SUCCESS; -} - -int PartTable::build_index(const Rapid_load_context *context, const KEY *key, row_id_t rowid) { - return build_index_impl(context, key, rowid); -} - -// using for parallel load. -int PartTable::build_index(const Rapid_load_context *context, const KEY *key, row_id_t rowid, uchar *rowdata, - ulong *col_offsets, ulong *null_byte_offsets, ulong *null_bitmasks) { - return build_index_impl(context, key, rowid, rowdata, col_offsets, null_byte_offsets, null_bitmasks); -} - -// IMPORTANT NOTIC: IF YOU CHANGE THE CODE HERE, YOU SHOULD CHANGE THE PARTITIAL TABLE `Table::write` CORRESPONDINGLY. -int PartTable::write(const Rapid_load_context *context, uchar *data) { - ut_a(context && data); - - auto rowid = reserver_rowid(); - for (auto index = 0u; index < context->m_table->s->fields; index++) { - auto fld = *(context->m_table->field + index); - if (fld->is_flag_set(NOT_SECONDARY_FLAG)) continue; - - auto data_len{0u}, extra_offset{0u}; - uchar *data_ptr{nullptr}; - if (fld->is_null()) { - data_len = UNIV_SQL_NULL; - data_ptr = nullptr; - } else { - switch (fld->type()) { - case MYSQL_TYPE_BLOB: - case MYSQL_TYPE_TINY_BLOB: - case MYSQL_TYPE_MEDIUM_BLOB: - case MYSQL_TYPE_LONG_BLOB: { - data_ptr = const_cast(fld->data_ptr()); - data_len = down_cast(fld)->get_length(); - } break; - case MYSQL_TYPE_VARCHAR: - case MYSQL_TYPE_VAR_STRING: { - extra_offset = (fld->field_length > 256 ? 2 : 1); - data_ptr = fld->field_ptr() + extra_offset; - if (extra_offset == 1) - data_len = mach_read_from_1(fld->field_ptr()); - else if (extra_offset == 2) - data_len = mach_read_from_2_little_endian(fld->field_ptr()); - } break; - default: { - data_ptr = fld->field_ptr(); - data_len = fld->pack_length(); - } break; - } + // step 2: build indexes. + if ((ret = table.get()->create_index_memo(context))) { + my_error(ER_SECONDARY_ENGINE_PLUGIN, MYF(0), "Build indexes memo for partition failed"); + return ret; } - auto active_part = context->m_extra_info.m_active_part_key; - auto key = active_part.append(":").append(fld->field_name); - if (!(m_fields[key]->write_row(context, rowid, data_ptr, data_len))) { - // TODO: mark this row to be junk. - return HA_ERR_GENERIC; - } - } + // step 3: set load type. + table.get()->set_load_type(RapidTable::LoadType::USER_LOADED); - if (context->m_table->s->is_missing_primary_key()) { - context->m_table->file->position((const uchar *)context->m_table->record[0]); // to set DB_ROW_ID. - if (build_index(context, nullptr, rowid)) return HA_ERR_GENERIC; + // step 4: Adding the Table meta obj into partitions table meta information. + m_partitions.emplace(part_key, std::move(table)); } - for (auto index = 0u; index < context->m_table->s->keys; index++) { - auto key_info = context->m_table->key_info + index; - if (build_index(context, key_info, rowid)) return HA_ERR_GENERIC; - } - - return ShannonBase::SHANNON_SUCCESS; -} - -int PartTable::write(const Rapid_load_context *context, uchar *rowdata, size_t len, ulong *col_offsets, size_t n_cols, - ulong *null_byte_offsets, ulong *null_bitmasks) { - ut_a(context->m_table->s->fields == n_cols); - uchar *data_ptr{nullptr}; - uint data_len{0}; - - auto rowid = reserver_rowid(); - for (auto col_ind = 0u; col_ind < context->m_table->s->fields; col_ind++) { - auto fld = *(context->m_table->field + col_ind); - if (fld->is_flag_set(NOT_SECONDARY_FLAG)) continue; - - data_ptr = rowdata + col_offsets[col_ind]; - auto is_null = (fld->is_nullable()) ? is_field_null(col_ind, rowdata, null_byte_offsets, null_bitmasks) : false; - if (is_null) { - data_len = UNIV_SQL_NULL; - data_ptr = nullptr; - } else { - switch (fld->type()) { - case MYSQL_TYPE_BLOB: - case MYSQL_TYPE_TINY_BLOB: - case MYSQL_TYPE_MEDIUM_BLOB: - case MYSQL_TYPE_LONG_BLOB: { - auto bfld = down_cast(fld); - switch (bfld->pack_length_no_ptr()) { - case 1: - data_len = *data_ptr; - break; - case 2: - data_len = uint2korr(data_ptr); - break; - case 3: - data_len = uint3korr(data_ptr); - break; - case 4: - data_len = uint4korr(data_ptr); - break; - } - } break; - case MYSQL_TYPE_VARCHAR: - case MYSQL_TYPE_VAR_STRING: { - auto extra_offset = (fld->field_length > 256 ? 2 : 1); - if (extra_offset == 1) - data_len = mach_read_from_1(data_ptr); - else if (extra_offset == 2) - data_len = mach_read_from_2_little_endian(data_ptr); - data_ptr = data_ptr + ptrdiff_t(extra_offset); - } break; - default: { - data_len = fld->pack_length(); - } break; - } - } - - if (!(m_fields[fld->field_name]->write_row(context, rowid, data_ptr, data_len))) { - // TODO: mark this row to be junk. - return HA_ERR_GENERIC; - } - } - - if (context->m_table->s->is_missing_primary_key()) { - context->m_table->file->position((const uchar *)rowdata); // to set DB_ROW_ID. - if (build_index(context, nullptr, rowid, rowdata, col_offsets, null_byte_offsets, null_bitmasks)) - return HA_ERR_GENERIC; - } - - for (auto index = 0u; index < context->m_table->s->keys; index++) { - auto key_info = context->m_table->key_info + index; - if (build_index(context, key_info, rowid, rowdata, col_offsets, null_byte_offsets, null_bitmasks)) - return HA_ERR_GENERIC; - } - return ShannonBase::SHANNON_SUCCESS; -} - -int PartTable::rollback_changes_by_trxid(Transaction::ID trxid) { - for (auto &cu : m_fields) { - auto chunk_sz = cu.second.get()->chunks(); - for (auto index = 0u; index < chunk_sz; index++) { - auto &version_infos = cu.second.get()->chunk(index)->header()->m_smu->version_info(); - if (!version_infos.size()) continue; - - for (auto &ver : version_infos) { - std::lock_guard lock(ver.second.vec_mutex); - auto rowid = ver.first; - std::for_each(ver.second.items.begin(), ver.second.items.end(), [&](ReadView::SMU_item &item) { - if (item.trxid == trxid) { - // To update rows status. - if (item.oper_type == OPER_TYPE::OPER_INSERT) { // - if (!cu.second.get()->chunk(index)->header()->m_del_mask.get()) { // the del mask not exists now. - cu.second.get()->chunk(index)->header()->m_del_mask = - std::make_unique(SHANNON_ROWS_IN_CHUNK); - } - Utils::Util::bit_array_set(cu.second.get()->chunk(index)->header()->m_del_mask.get(), rowid); - } - if (item.oper_type == OPER_TYPE::OPER_DELETE) { - Utils::Util::bit_array_reset(cu.second.get()->chunk(index)->header()->m_del_mask.get(), rowid); - } - item.tm_committed = ShannonBase::SHANNON_MAX_STMP; // reset commit timestamp to max, mean it rollbacked. - // has been rollbacked, invisible to all readview. - } - }); - } - } - } - return ShannonBase::SHANNON_SUCCESS; -} - -int PartTable::write_row_from_log(const Rapid_load_context *context, row_id_t rowid, - std::unordered_map &fields) { - for (auto &field_val : fields) { - auto key_name = field_val.first; - // escape the db_trx_id field and the filed is set to NOT_SECONDARY[not loaded int imcs] - if (key_name == SHANNON_DB_TRX_ID || m_fields.find(key_name) == m_fields.end()) continue; - // if data is nullptr, means it's 'NULL'. - auto len = field_val.second.mlength; - if (!m_fields[key_name]->write_row(context, rowid, field_val.second.data.get(), len)) { - // TODO: mark this row to be junk. - return HA_ERR_WRONG_IN_RECORD; - } - } - - m_stats.prows.fetch_add(1); - return ShannonBase::SHANNON_SUCCESS; -} - -int PartTable::update_row(const Rapid_load_context *context, row_id_t rowid, std::string &field_key, - const uchar *new_field_data, size_t nlen) { - if (m_fields.find(field_key) == m_fields.end()) return HA_ERR_GENERIC; - - auto ret = m_fields[field_key]->update_row(context, rowid, const_cast(new_field_data), nlen); - if (!ret) return HA_ERR_GENERIC; - return ShannonBase::SHANNON_SUCCESS; -} - -int PartTable::update_row_from_log(const Rapid_load_context *context, row_id_t rowid, - std::unordered_map &upd_recs) { - for (auto &field_val : upd_recs) { - auto key_name = field_val.first; - // escape the db_trx_id field and the filed is set to NOT_SECONDARY[not loaded int imcs] - if (key_name == SHANNON_DB_TRX_ID || m_fields.find(key_name) == m_fields.end()) continue; - // if data is nullptr, means it's 'NULL'. - auto len = field_val.second.mlength; - if (!m_fields[key_name]->update_row_from_log(context, rowid, field_val.second.data.get(), len)) - return HA_ERR_WRONG_IN_RECORD; - } - - return ShannonBase::SHANNON_SUCCESS; -} - -int PartTable::delete_row(const Rapid_load_context *context, row_id_t rowid) { - for (auto it = m_fields.begin(); it != m_fields.end();) { - if (!it->second->delete_row(context, rowid)) { - return HA_ERR_GENERIC; - } - ++it; - } - - return SHANNON_SUCCESS; -} - -int PartTable::delete_rows(const Rapid_load_context *context, const std::vector &rowids) { - if (!m_fields.size()) return SHANNON_SUCCESS; - - auto deleted_cnt{0}; - if (rowids.empty()) { // delete all rows. - for (auto &cu : m_fields) { - assert(cu.second); - if (!cu.second->delete_row_all(context)) return HA_ERR_GENERIC; - } - - return ShannonBase::SHANNON_SUCCESS; - } - - for (auto &rowid : rowids) { // delete some rows. - for (auto &cu : m_fields) { - assert(cu.second); - if (!cu.second->delete_row(context, rowid)) return HA_ERR_GENERIC; - } - deleted_cnt++; - } return ShannonBase::SHANNON_SUCCESS; } diff --git a/storage/rapid_engine/imcs/table.h b/storage/rapid_engine/imcs/table.h index d433f00e6..5f4803bf3 100644 --- a/storage/rapid_engine/imcs/table.h +++ b/storage/rapid_engine/imcs/table.h @@ -40,7 +40,9 @@ class TABLE; class Field; namespace ShannonBase { +class Rapid_context; class Rapid_load_context; +class Rapid_scan_context; namespace Imcs { class Cu; class RapidTable : public MemoryObject { @@ -108,8 +110,10 @@ class RapidTable : public MemoryObject { virtual int write(const Rapid_load_context *, uchar *) = 0; virtual int write(const Rapid_load_context *, uchar *, size_t, ulong *, size_t, ulong *, ulong *) = 0; + virtual int build_partitions(const Rapid_load_context *) = 0; + // gets the # of physical rows. - virtual row_id_t rows(const Rapid_load_context *) = 0; + virtual row_id_t rows(const Rapid_context *) = 0; // to reserer a row place for this operation. virtual row_id_t reserve_id(const Rapid_load_context *) = 0; @@ -167,6 +171,7 @@ class Table : public RapidTable { public: Table() = default; Table(std::string schema, std::string table) : RapidTable(schema, table) {} + Table(std::string schema, std::string table, std::string partkey) : RapidTable(schema, table), m_part_key(partkey) {} virtual ~Table() { m_fields.clear(); m_source_keys.clear(); @@ -197,11 +202,13 @@ class Table : public RapidTable { return ShannonBase::SHANNON_SUCCESS; } + virtual int build_partitions(const Rapid_load_context *) final { return ShannonBase::SHANNON_SUCCESS; } + // rollback a modified record. virtual int rollback_changes_by_trxid(Transaction::ID trxid) final; // gets the # of physical rows. - virtual row_id_t rows(const Rapid_load_context *) final { return m_stats.prows.load(); } + virtual row_id_t rows(const Rapid_context *) final { return m_stats.prows.load(); } // to reserer a row place for this operation. virtual row_id_t reserve_id(const Rapid_load_context *) final { return m_stats.prows.fetch_add(1); } @@ -244,12 +251,16 @@ class Table : public RapidTable { int build_index_impl(const Rapid_load_context *context, const KEY *key, row_id_t rowid); int build_index_impl(const Rapid_load_context *context, const KEY *key, row_id_t rowid, uchar *rowdata, ulong *col_offsets, ulong *null_byte_offsets, ulong *null_bitmasks); + + private: + std::string m_part_key; }; class PartTable : public RapidTable { public: PartTable() = default; - PartTable(std::string schema, std::string table) : RapidTable(schema, table) {} + PartTable(std::string schema, std::string table, std::string part_key) + : RapidTable(schema, table), m_part_key(part_key) {} virtual ~PartTable() { m_fields.clear(); m_source_keys.clear(); @@ -257,49 +268,90 @@ class PartTable : public RapidTable { } virtual TYPE type() final { return RapidTable::TYPE::PARTTABLE; } - virtual int create_fields_memo(const Rapid_load_context *) final; - virtual int create_index_memo(const Rapid_load_context *context) final; - virtual int write(const Rapid_load_context *context, uchar *data) final; - virtual int write(const Rapid_load_context *context, uchar *rowdata, size_t len, ulong *col_offsets, size_t n_cols, - ulong *null_byte_offsets, ulong *null_bitmasks) final; + virtual int create_fields_memo(const Rapid_load_context *) final { + assert(false); + return ShannonBase::SHANNON_SUCCESS; + } - virtual int delete_row(const Rapid_load_context *context, row_id_t rowid) final; - virtual int delete_rows(const Rapid_load_context *context, const std::vector &rowids) final; + virtual int create_index_memo(const Rapid_load_context *) final { + assert(false); + return ShannonBase::SHANNON_SUCCESS; + } - virtual int build_index(const Rapid_load_context *context, const KEY *key, row_id_t rowid) final; - virtual int build_index(const Rapid_load_context *context, const KEY *key, row_id_t rowid, uchar *rowdata, - ulong *col_offsets, ulong *null_byte_offsets, ulong *null_bitmasks) final; + virtual int write(const Rapid_load_context *, uchar *) final { + assert(false); + return ShannonBase::SHANNON_SUCCESS; + } + + virtual int write(const Rapid_load_context *, uchar *, size_t, ulong *, size_t, ulong *, ulong *) final { + assert(false); + return ShannonBase::SHANNON_SUCCESS; + } + + virtual int delete_row(const Rapid_load_context *, row_id_t) final { + assert(false); + return ShannonBase::SHANNON_SUCCESS; + } + + virtual int delete_rows(const Rapid_load_context *, const std::vector &) final { + assert(false); + return ShannonBase::SHANNON_SUCCESS; + } + + virtual int build_index(const Rapid_load_context *, const KEY *, row_id_t) final { + assert(false); + return ShannonBase::SHANNON_SUCCESS; + } + + virtual int build_index(const Rapid_load_context *, const KEY *, row_id_t, uchar *, ulong *, ulong *, ulong *) final { + assert(false); + return ShannonBase::SHANNON_SUCCESS; + } + + virtual int write_row_from_log(const Rapid_load_context *, row_id_t, + std::unordered_map &) final { + assert(false); + return ShannonBase::SHANNON_SUCCESS; + } + + virtual int update_row(const Rapid_load_context *, row_id_t, std::string &, const uchar *, size_t) final { + assert(false); + return ShannonBase::SHANNON_SUCCESS; + } + + virtual int update_row_from_log(const Rapid_load_context *, row_id_t, + std::unordered_map &) final { + assert(false); + return ShannonBase::SHANNON_SUCCESS; + } - virtual int write_row_from_log(const Rapid_load_context *context, row_id_t rowid, - std::unordered_map &fields) final; - virtual int update_row(const Rapid_load_context *context, row_id_t rowid, std::string &field_key, - const uchar *new_field_data, size_t nlen) final; - virtual int update_row_from_log(const Rapid_load_context *context, row_id_t rowid, - std::unordered_map &upd_recs) final; virtual int truncate() final { assert(false); return ShannonBase::SHANNON_SUCCESS; } + virtual int build_partitions(const Rapid_load_context *context) final; + // rollback a modified record. - virtual int rollback_changes_by_trxid(Transaction::ID trxid) final; + virtual int rollback_changes_by_trxid(Transaction::ID) final { assert(false); } // gets the # of physical rows. - virtual row_id_t rows(const Rapid_load_context *) final { return m_stats.prows.load(); } + virtual row_id_t rows(const Rapid_context *) final { return m_stats.prows.load(); } // to reserer a row place for this operation. virtual row_id_t reserve_id(const Rapid_load_context *) final { return m_stats.prows.fetch_add(1); } virtual Cu *first_field() final { return m_fields.begin()->second.get(); } - virtual Cu *get_field(std::string field_name) final; + virtual Cu *get_field(std::string) final { + assert(false); + return nullptr; + } - virtual Index::Index *get_index(std::string key_name) final { - if (m_indexes.find(key_name) == m_indexes.end()) - return nullptr; - else - return m_indexes[key_name].get(); + virtual Index::Index *get_index(std::string) final { + assert(false); + return nullptr; } virtual std::unordered_map> &get_fields() final { return m_fields; } @@ -310,6 +362,11 @@ class PartTable : public RapidTable { virtual std::string &name() final { return m_table_name; } virtual row_id_t reserver_rowid() final { return m_stats.prows.fetch_add(1); } + inline RapidTable *get_partition(std::string part_key) { + if (m_partitions.find(part_key) == m_partitions.end()) return nullptr; + return m_partitions[part_key].get(); + } + private: bool is_field_null(int field_index, const uchar *rowdata, const ulong *null_byte_offsets, const ulong *null_bitmasks) { @@ -329,6 +386,13 @@ class PartTable : public RapidTable { int build_index_impl(const Rapid_load_context *context, const KEY *key, row_id_t rowid); int build_index_impl(const Rapid_load_context *context, const KEY *key, row_id_t rowid, uchar *rowdata, ulong *col_offsets, ulong *null_byte_offsets, ulong *null_bitmasks); + + private: + // all the partition sub-tables. + std::unordered_map> m_partitions; + + // part_name+"#"+ part_id + std::string m_part_key; }; } // namespace Imcs diff --git a/storage/rapid_engine/include/rapid_const.h b/storage/rapid_engine/include/rapid_const.h index 350bf456d..9242886bc 100644 --- a/storage/rapid_engine/include/rapid_const.h +++ b/storage/rapid_engine/include/rapid_const.h @@ -84,6 +84,8 @@ #define SHANNON_VECTOR_WIDTH 128 // ARM NEON #endif +constexpr uint64 SHANNON_PARTS_PARALLEL = 32; + extern char *mysql_llm_home_ptr; namespace ShannonBase { using row_id_t = size_t; diff --git a/storage/rapid_engine/include/rapid_context.h b/storage/rapid_engine/include/rapid_context.h index f8fc62561..20cbb6882 100644 --- a/storage/rapid_engine/include/rapid_context.h +++ b/storage/rapid_engine/include/rapid_context.h @@ -81,9 +81,38 @@ class Rapid_execution_context : public Secondary_engine_execution_context { double m_best_cost; }; +class Rapid_pop_context : public Secondary_engine_execution_context { + public: + uint64_t m_start_lsn; + // current schema name and table name. + std::string m_schema_name, m_table_name; + // trx id. + Transaction::ID m_trxid{0}; + + // key length, DATA_ROW_ID_LEN OR KEY LEN; + uint8 m_key_len{0}; + + // key info. which is rowid or primary key/unique key. + std::unique_ptr m_key_buff{nullptr}; +}; + class ha_rapid; // used in imcs. -class Rapid_load_context : public Secondary_engine_execution_context { +class Rapid_ha_data { + public: + Rapid_ha_data() : m_trx(nullptr) {} + + ~Rapid_ha_data() {} + + ShannonBase::Transaction *get_trx() const { return m_trx; } + + void set_trx(ShannonBase::Transaction *t) { m_trx = t; } + + private: + ShannonBase::Transaction *m_trx; +}; + +class Rapid_context : public Secondary_engine_execution_context { public: class extra_info_t { public: @@ -113,15 +142,20 @@ class Rapid_load_context : public Secondary_engine_execution_context { std::unordered_map m_partition_infos; // active partitio info. - std::string m_active_part_key; + static SHANNON_THREAD_LOCAL std::string m_active_part_key; }; - Rapid_load_context() : m_trx(nullptr), m_table(nullptr), m_local_dict(nullptr), m_thd{nullptr} {} - virtual ~Rapid_load_context() = default; - // current schema name and table name. std::string m_schema_name, m_table_name; + extra_info_t m_extra_info; +}; + +class Rapid_load_context : public Rapid_context { + public: + Rapid_load_context() : m_trx(nullptr), m_table(nullptr), m_local_dict(nullptr), m_thd{nullptr} {} + virtual ~Rapid_load_context() = default; + // current transaction. Transaction *m_trx{nullptr}; @@ -133,37 +167,18 @@ class Rapid_load_context : public Secondary_engine_execution_context { // current thd here. THD *m_thd{nullptr}; - - extra_info_t m_extra_info; }; -class Rapid_pop_context : public Secondary_engine_execution_context { +class Rapid_scan_context : public Rapid_context { public: - uint64_t m_start_lsn; - // current schema name and table name. - std::string m_schema_name, m_table_name; - // trx id. - Transaction::ID m_trxid{0}; - - // key length, DATA_ROW_ID_LEN OR KEY LEN; - uint8 m_key_len{0}; - - // key info. which is rowid or primary key/unique key. - std::unique_ptr m_key_buff{nullptr}; -}; - -class Rapid_ha_data { - public: - Rapid_ha_data() : m_trx(nullptr) {} - - ~Rapid_ha_data() {} - - ShannonBase::Transaction *get_trx() const { return m_trx; } + // current transaction. + Transaction *m_trx{nullptr}; - void set_trx(ShannonBase::Transaction *t) { m_trx = t; } + // the primary key of this table. + TABLE *m_table{nullptr}; - private: - ShannonBase::Transaction *m_trx; + // current thd here. + THD *m_thd{nullptr}; }; } // namespace ShannonBase diff --git a/storage/rapid_engine/trx/readview.cpp b/storage/rapid_engine/trx/readview.cpp index 0064279df..70cf993ff 100644 --- a/storage/rapid_engine/trx/readview.cpp +++ b/storage/rapid_engine/trx/readview.cpp @@ -134,7 +134,7 @@ smu_item_t::smu_item_t(size_t size) { tm_stamp = tm_committed = std::chrono::high_resolution_clock::now(); } -uchar *smu_item_vec_t::reconstruct_data(Rapid_load_context *context, uchar *in_place, size_t &in_place_len, +uchar *smu_item_vec_t::reconstruct_data(Rapid_scan_context *context, uchar *in_place, size_t &in_place_len, uint8 &status) { std::lock_guard lock(vec_mutex); auto ret = in_place; @@ -191,7 +191,7 @@ uchar *smu_item_vec_t::reconstruct_data(Rapid_load_context *context, uchar *in_p void Snapshot_meta_unit::set_owner(ShannonBase::Imcs::Chunk *owner) { m_owner = owner; } -uchar *Snapshot_meta_unit::build_prev_vers(Rapid_load_context *context, ShannonBase::row_id_t rowid, uchar *in_place, +uchar *Snapshot_meta_unit::build_prev_vers(Rapid_scan_context *context, ShannonBase::row_id_t rowid, uchar *in_place, size_t &in_place_len, uint8 &status) { if (m_owner->is_deleted(context, rowid)) status |= static_cast(RECONSTRUCTED_STATUS::STAT_DELETED); if (m_owner->is_null(context, rowid)) { @@ -204,7 +204,7 @@ uchar *Snapshot_meta_unit::build_prev_vers(Rapid_load_context *context, ShannonB : m_version_info[rowid].reconstruct_data(context, in_place, in_place_len, status); } -BitmapResult Snapshot_meta_unit::build_prev_vers_batch(Rapid_load_context *context, ShannonBase::row_id_t row_start, +BitmapResult Snapshot_meta_unit::build_prev_vers_batch(Rapid_scan_context *context, ShannonBase::row_id_t row_start, size_t row_count, const uchar *chunk_base_ptr, size_t normalized_len, uchar *reconstruct_buf) { BitmapResult result; diff --git a/storage/rapid_engine/trx/readview.h b/storage/rapid_engine/trx/readview.h index 4c78c2e66..7728b63e1 100644 --- a/storage/rapid_engine/trx/readview.h +++ b/storage/rapid_engine/trx/readview.h @@ -35,6 +35,8 @@ #include "storage/rapid_engine/trx/transaction.h" namespace ShannonBase { +class Rapid_context; +class Rapid_scan_context; class Rapid_load_context; namespace Imcs { class Chunk; @@ -147,7 +149,7 @@ using SMU_items = struct smu_item_vec_t { * if in_place_len == UNIV_SQL_NULL, means in_place value is null. flag is to indicate flag bit * of the data, such as deleted, null, etc. **/ - uchar *reconstruct_data(Rapid_load_context *context, uchar *in_place, size_t &in_place_len, uint8 &status); + uchar *reconstruct_data(Rapid_scan_context *context, uchar *in_place, size_t &in_place_len, uint8 &status); std::mutex vec_mutex; std::deque items; @@ -174,7 +176,7 @@ class Snapshot_meta_unit { */ // in_place, means the current version. flag indicates the flag of reconstructed data. // such as is null or not, is deleted marked or not. - uchar *build_prev_vers(Rapid_load_context *context, ShannonBase::row_id_t rowid, uchar *in_place, + uchar *build_prev_vers(Rapid_scan_context *context, ShannonBase::row_id_t rowid, uchar *in_place, size_t &in_place_len, uint8 &status); // - row_start: starting global row id @@ -186,7 +188,7 @@ class Snapshot_meta_unit { // - reconstruct_buf: caller-provided buffer of size >= row_count * normalized_len // // Returns BitmapResult where bit i indicates visibility of row (row_start + i). - BitmapResult build_prev_vers_batch(Rapid_load_context *context, ShannonBase::row_id_t row_start, size_t row_count, + BitmapResult build_prev_vers_batch(Rapid_scan_context *context, ShannonBase::row_id_t row_start, size_t row_count, const uchar *chunk_base_ptr, size_t normalized_len, uchar *reconstruct_buf); // gets the rowid's versions.