Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions mysql-test/suite/secondary_engine/r/shannon_partition_table.result
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ alter table LINEITEM change L_ORDERKEY L_ORDERKEY BIGINT NOT NULL COMMENT 'RAPID
set use_secondary_engine=forced;
alter table LINEITEM secondary_engine RAPID;
alter table LINEITEM secondary_load;
select * from LINEITEM;
L_ORDERKEY L_PARTKEY L_SUPPKEY L_LINENUMBER L_QUANTITY L_EXTENDEDPRICE L_DISCOUNT L_TAX L_RETURNFLAG L_LINESTATUS L_SHIPDATE L_COMMITDATE L_RECEIPTDATE L_SHIPINSTRUCT L_SHIPMODE L_COMMENT
23999974 770178 30217 6 48.00 59910.72 0.02 0.08 N O 1997-03-15 1997-05-26 1997-03-26 TAKE BACK RETURN FOB fy packages nag evenly
23999975 625094 15140 2 23.00 23438.38 0.05 0.06 N O 1996-10-23 1996-09-11 1996-11-12 DELIVER IN PERSON AIR ly regular realms after the
24000000 677354 17387 2 30.00 39939.60 0.00 0.01 N O 1996-04-21 1996-06-08 1996-05-08 TAKE BACK RETURN REG AIR le? blithely unusual deposits above
24000000 277642 37643 3 20.00 32392.60 0.01 0.02 N O 1996-05-14 1996-06-11 1996-06-04 TAKE BACK RETURN REG AIR s. blithely regular instructi
23999975 353843 3852 1 1.00 1896.83 0.05 0.01 N O 1996-08-03 1996-09-14 1996-08-15 TAKE BACK RETURN RAIL pending instructions affix qu
23999975 350648 30649 3 18.00 30575.34 0.08 0.01 N O 1996-10-07 1996-08-22 1996-10-19 DELIVER IN PERSON TRUCK ully pending dependencies hagg
23999974 311819 31820 4 22.00 40277.60 0.08 0.01 N O 1997-03-25 1997-04-24 1997-03-28 COLLECT COD REG AIR uickly ironic pains. bli
24000000 712607 2625 4 7.00 11336.99 0.09 0.00 N O 1996-07-16 1996-06-18 1996-08-04 COLLECT COD SHIP ckly express requests. even, pendin
23999974 737406 7461 5 2.00 2886.74 0.03 0.07 N O 1997-04-25 1997-05-31 1997-05-21 NONE REG AIR nding accounts. ironic, final
24000000 651246 21263 1 5.00 5986.05 0.06 0.05 N O 1996-06-11 1996-05-16 1996-06-22 NONE TRUCK nts use. ir
alter table LINEITEM secondary_unload;
alter table LINEITEM secondary_load PARTITION(p0, p1, p10);
alter table LINEITEM secondary_unload;
Expand All @@ -38,7 +50,7 @@ alter table LINEITEM secondary_unload PARTITION(p0, p10, p1);
alter table LINEITEM secondary_load PARTITION(p0, p1, p10);
alter table LINEITEM secondary_unload PARTITION(p0, p10);
alter table LINEITEM secondary_unload;
ERROR HY000: DDLs on a table with a secondary engine defined are not allowed.
ERROR HY000: Secondary engine operation failed. test_partition.LINEITEM table is not loaded into rapid yet.
drop table LINEITEM;
CREATE TABLE LINEITEM1 ( L_ORDERKEY BIGINT NOT NULL,
L_PARTKEY INTEGER NOT NULL,
Expand Down Expand Up @@ -70,7 +82,7 @@ alter table LINEITEM1 secondary_engine RAPID;
alter table LINEITEM1 secondary_load;
explain select * from LINEITEM1;
id select_type table partitions type possible_keys key key_len ref rows filtered Extra
1 SIMPLE LINEITEM1 p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16,p17 ALL NULL NULL NULL NULL 10 100.00 Using secondary engine Rapid
1 SIMPLE LINEITEM1 p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16,p17 ALL NULL NULL NULL NULL 0 0.00 Using secondary engine Rapid
Warnings:
Note 1003 /* select#1 */ select `test_partition`.`LINEITEM1`.`L_ORDERKEY` AS `L_ORDERKEY`,`test_partition`.`LINEITEM1`.`L_PARTKEY` AS `L_PARTKEY`,`test_partition`.`LINEITEM1`.`L_SUPPKEY` AS `L_SUPPKEY`,`test_partition`.`LINEITEM1`.`L_LINENUMBER` AS `L_LINENUMBER`,`test_partition`.`LINEITEM1`.`L_QUANTITY` AS `L_QUANTITY`,`test_partition`.`LINEITEM1`.`L_EXTENDEDPRICE` AS `L_EXTENDEDPRICE`,`test_partition`.`LINEITEM1`.`L_DISCOUNT` AS `L_DISCOUNT`,`test_partition`.`LINEITEM1`.`L_TAX` AS `L_TAX`,`test_partition`.`LINEITEM1`.`L_RETURNFLAG` AS `L_RETURNFLAG`,`test_partition`.`LINEITEM1`.`L_LINESTATUS` AS `L_LINESTATUS`,`test_partition`.`LINEITEM1`.`L_SHIPDATE` AS `L_SHIPDATE`,`test_partition`.`LINEITEM1`.`L_COMMITDATE` AS `L_COMMITDATE`,`test_partition`.`LINEITEM1`.`L_RECEIPTDATE` AS `L_RECEIPTDATE`,`test_partition`.`LINEITEM1`.`L_SHIPINSTRUCT` AS `L_SHIPINSTRUCT`,`test_partition`.`LINEITEM1`.`L_SHIPMODE` AS `L_SHIPMODE`,`test_partition`.`LINEITEM1`.`L_COMMENT` AS `L_COMMENT` from `test_partition`.`LINEITEM1`
alter table LINEITEM1 secondary_unload;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ set use_secondary_engine=forced;
alter table LINEITEM secondary_engine RAPID;

alter table LINEITEM secondary_load;

select * from LINEITEM;

alter table LINEITEM secondary_unload;

alter table LINEITEM secondary_load PARTITION(p0, p1, p10);
Expand All @@ -59,7 +62,7 @@ alter table LINEITEM secondary_unload PARTITION(p0, p10, p1);

alter table LINEITEM secondary_load PARTITION(p0, p1, p10);
alter table LINEITEM secondary_unload PARTITION(p0, p10);
--error 3890
--error 3889
alter table LINEITEM secondary_unload;
--enable_warnings

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class VectorizedTableScanIterator : public TableRowIterator {
auto str_id = *(uint32 *)data_ptr;

std::string fld_name(field->field_name);
auto rpd_field = m_data_table.get()->source()->get_field(fld_name);
auto rpd_field = m_data_table.get()->table()->get_field(fld_name);
if (!rpd_field) return;

auto str_ptr = rpd_field->header()->m_local_dict->get(str_id);
Expand Down
30 changes: 16 additions & 14 deletions storage/rapid_engine/handler/ha_shannon_rapid.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ int ha_rapid::info(unsigned int flags) {
std::string sch_tb;
sch_tb.append(table_share->db.str).append(":").append(table_share->table_name.str);

Rapid_load_context context;
Rapid_scan_context context;
context.m_trx = Transaction::get_or_create_trx(m_thd);
context.m_trx->begin();

Expand Down Expand Up @@ -275,12 +275,16 @@ unsigned long ha_rapid::index_flags(unsigned int idx, unsigned int part, bool al
}

int ha_rapid::records(ha_rows *num_rows) {
Rapid_load_context context;
Rapid_scan_context context;
context.m_trx = Transaction::get_or_create_trx(m_thd);
context.m_trx->begin();

std::string sch_tb;
sch_tb.append(table_share->db.str).append(":").append(table_share->table_name.str);
auto rpd_tb = Imcs::Imcs::instance()->get_table(sch_tb);
*num_rows = rpd_tb->rows(&context);

context.m_trx->commit();
return ShannonBase::SHANNON_SUCCESS;
}

Expand Down Expand Up @@ -461,7 +465,7 @@ int ha_rapid::rnd_next(uchar *buf) {
}
}

ha_statistic_increment(&System_status_var::ha_read_rnd_next_count);
if (error == ShannonBase::SHANNON_SUCCESS) ha_statistic_increment(&System_status_var::ha_read_rnd_next_count);
return error;
}

Expand Down Expand Up @@ -498,8 +502,7 @@ int ha_rapid::index_read(uchar *buf, const uchar *key, uint key_len, ha_rkey_fun

m_data_table->set_end_range(end_range);
err = m_data_table->index_read(buf, key, key_len, find_flag);
ha_statistic_increment(&System_status_var::ha_read_rnd_next_count);

if (err == ShannonBase::SHANNON_SUCCESS) ha_statistic_increment(&System_status_var::ha_read_rnd_next_count);
return err;
}

Expand All @@ -511,29 +514,30 @@ int ha_rapid::index_read_last(uchar *buf, const uchar *key, uint key_len) {
int ha_rapid::index_next(uchar *buf) {
ut_ad(m_start_of_scan && inited == handler::INDEX);

ha_statistic_increment(&System_status_var::ha_read_rnd_next_count);
return m_data_table->index_next(buf);
auto error = m_data_table->index_next(buf);
if (error == ShannonBase::SHANNON_SUCCESS) ha_statistic_increment(&System_status_var::ha_read_rnd_next_count);
return error;
}

int ha_rapid::index_next_same(uchar *buf, const uchar *, uint) {
ut_ad(m_start_of_scan && inited == handler::INDEX);

ha_statistic_increment(&System_status_var::ha_read_rnd_next_count);

return m_data_table->index_next(buf);
auto error = m_data_table->index_next(buf);
if (error == ShannonBase::SHANNON_SUCCESS) ha_statistic_increment(&System_status_var::ha_read_rnd_next_count);
return error;
}

int ha_rapid::index_first(uchar *buf) {
DBUG_TRACE;
ut_ad(m_start_of_scan && inited == handler::INDEX);
ha_statistic_increment(&System_status_var::ha_read_first_count);

int error;
if (end_range) {
m_data_table->set_end_range(end_range);
error = m_data_table->index_read(buf, end_range->key, end_range->length, end_range->flag);
} else
error = m_data_table->index_next(buf);
if (error == ShannonBase::SHANNON_SUCCESS) ha_statistic_increment(&System_status_var::ha_read_first_count);
return error;
}

Expand All @@ -545,8 +549,6 @@ int ha_rapid::index_prev(uchar *buf) {
int ha_rapid::index_last(uchar *buf) {
DBUG_TRACE;

ha_statistic_increment(&System_status_var::ha_read_last_count);

m_data_table->set_end_range(end_range);
int error = m_data_table->index_read(buf, nullptr, 0, HA_READ_BEFORE_KEY);

Expand All @@ -555,7 +557,7 @@ int ha_rapid::index_last(uchar *buf) {
if (error == HA_ERR_KEY_NOT_FOUND) {
error = HA_ERR_END_OF_FILE;
}

if (error == ShannonBase::SHANNON_SUCCESS) ha_statistic_increment(&System_status_var::ha_read_last_count);
return error;
}

Expand Down
6 changes: 3 additions & 3 deletions storage/rapid_engine/handler/ha_shannon_rapid.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ class ha_rapid : public handler {
public:
ha_rapid(handlerton *hton, TABLE_SHARE *table_share);

protected:
std::unique_ptr<ShannonBase::Imcs::DataTable> m_data_table;

private:
int create(const char *, TABLE *, HA_CREATE_INFO *, dd::Table *) override { return HA_ERR_WRONG_COMMAND; }

Expand Down Expand Up @@ -175,10 +178,7 @@ class ha_rapid : public handler {
not yet fetched any row, else false */
bool m_start_of_scan{false};

std::unique_ptr<ShannonBase::Imcs::DataTable> m_data_table;
std::string m_failed_reason;

ha_rapidpart *m_part_handler{nullptr};
};

} // namespace ShannonBase
Expand Down
72 changes: 57 additions & 15 deletions storage/rapid_engine/handler/ha_shannon_rapidpart.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,31 +42,73 @@ Created jun 6, 2025 */
#include "storage/rapid_engine/utils/utils.h"

namespace ShannonBase {

extern int rpd_async_column_threshold;
ha_rapidpart::ha_rapidpart(handlerton *hton, TABLE_SHARE *table)
: ha_rapid(hton, table), Partition_helper(this), m_thd(ha_thd()), m_share(nullptr) {}

int ha_rapidpart::records(ha_rows *num_rows) { return ShannonBase::SHANNON_SUCCESS; }

int ha_rapidpart::rnd_pos(uchar *record, uchar *pos) { return ShannonBase::SHANNON_SUCCESS; }

int ha_rapidpart::rnd_init(bool scan) {
m_current_part_empty = false;

if (m_data_table->init()) {
m_start_of_scan = false;
return HA_ERR_GENERIC;
}

inited = handler::RND;
m_start_of_scan = true;
return (Partition_helper::ph_rnd_init(scan));
}

int ha_rapidpart::rnd_init_in_part(uint part_id, bool scan) {
// int err = change_active_index(part_id, table_share->primary_key);
/* Don't use semi-consistent read in random row reads (by position).
This means we must disable semi_consistent_read if scan is false. */
std::string part_key;
auto part_name = m_data_table->source()->part_info->partitions[part_id]->partition_name;
part_key.append(part_name).append("#").append(std::to_string(part_id));

const auto &rpd_table = m_data_table->table_source();
auto partition_ptr = down_cast<ShannonBase::Imcs::PartTable *>(rpd_table)->get_partition(part_key);
auto n_rows = partition_ptr->rows(nullptr);
m_current_part_empty = (n_rows) ? false : true;

if (!m_current_part_empty) m_data_table->active_table(partition_ptr);

m_last_part = part_id;
m_start_of_scan = true;
return ShannonBase::SHANNON_SUCCESS;
}

int ha_rapidpart::rnd_next_in_part(uint part_id, uchar *buf) {
assert(false);
return ShannonBase::SHANNON_SUCCESS;
int error{HA_ERR_END_OF_FILE};
if (m_current_part_empty) return error;

if (inited == handler::RND && m_start_of_scan) {
if (table_share->fields <= static_cast<uint>(ShannonBase::rpd_async_column_threshold)) {
error = m_data_table->next(buf);
} else {
auto reader_pool = ShannonBase::Imcs::Imcs::pool();
std::future<int> fut =
boost::asio::co_spawn(*reader_pool, m_data_table->next_async(buf), boost::asio::use_future);
error = fut.get(); // co_await m_data_table->next_async(buf); // index_first(buf);
if (error == HA_ERR_KEY_NOT_FOUND) {
error = HA_ERR_END_OF_FILE;
}
}
}

// increase the row count.
if (error == ShannonBase::SHANNON_SUCCESS) ha_statistic_increment(&System_status_var::ha_read_rnd_next_count);
return error;
}

int ha_rapidpart::rnd_end_in_part(uint, bool) {
assert(false);
int ha_rapidpart::rnd_end_in_part(uint, bool) { return ShannonBase::SHANNON_SUCCESS; }

int ha_rapidpart::rnd_end() {
if (m_data_table->end()) return HA_ERR_GENERIC;

m_start_of_scan = false;
inited = handler::NONE;
return ShannonBase::SHANNON_SUCCESS;
}

Expand Down Expand Up @@ -132,18 +174,18 @@ int ha_rapidpart::load_table(const TABLE &table, bool *skip_metadata_update) {
if (shannon_loaded_tables->get(table.s->db.str, table.s->table_name.str) != nullptr) {
std::string err;
err.append(table.s->db.str).append(".").append(table.s->table_name.str).append(" already loaded");
my_error(ER_SECONDARY_ENGINE_DDL, MYF(0), err.c_str());
return HA_ERR_KEY_NOT_FOUND;
my_error(ER_SECONDARY_ENGINE, MYF(0), err.c_str());
return HA_ERR_GENERIC;
}

for (auto idx = 0u; idx < table.s->fields; idx++) {
auto fld = *(table.field + idx);
if (fld->is_flag_set(NOT_SECONDARY_FLAG)) continue;
if (!bitmap_is_set(table.read_set, idx) || fld->is_flag_set(NOT_SECONDARY_FLAG)) continue;

if (!ShannonBase::Utils::Util::is_support_type(fld->type())) {
std::string err;
err.append(table.s->table_name.str).append(fld->field_name).append(" type not allowed");
my_error(ER_SECONDARY_ENGINE_DDL, MYF(0), err.c_str());
my_error(ER_SECONDARY_ENGINE, MYF(0), err.c_str());
return HA_ERR_GENERIC;
}
}
Expand Down Expand Up @@ -177,7 +219,7 @@ int ha_rapidpart::load_table(const TABLE &table, bool *skip_metadata_update) {
}

if (Imcs::Imcs::instance()->load_parttable(&context, const_cast<TABLE *>(&table))) {
my_error(ER_SECONDARY_ENGINE_DDL, MYF(0), table.s->db.str, table.s->table_name.str);
my_error(ER_SECONDARY_ENGINE, MYF(0), table.s->db.str, table.s->table_name.str);
return HA_ERR_GENERIC;
}

Expand All @@ -197,7 +239,7 @@ int ha_rapidpart::unload_table(const char *db_name, const char *table_name, bool
if (error_if_not_loaded && !share) {
std::string err(db_name);
err.append(".").append(table_name).append(" table is not loaded into rapid yet");
my_error(ER_SECONDARY_ENGINE_DDL, MYF(0), err.c_str());
my_error(ER_SECONDARY_ENGINE, MYF(0), err.c_str());
return HA_ERR_GENERIC;
}

Expand Down
9 changes: 6 additions & 3 deletions storage/rapid_engine/handler/ha_shannon_rapidpart.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,13 @@ class ha_rapidpart : public ha_rapid, public Partition_helper, public Partition_
Partition_handler *get_partition_handler() override { return (static_cast<Partition_handler *>(this)); }

protected:
int rnd_init(bool scan) override { return (Partition_helper::ph_rnd_init(scan)); }
int rnd_init(bool scan) override;

int rnd_next(uchar *record) override { return (Partition_helper::ph_rnd_next(record)); }

int rnd_pos(uchar *record, uchar *pos) override;
int rnd_end() override;

int records(ha_rows *num_rows) override;
int rnd_pos(uchar *record, uchar *pos) override;

int index_next(uchar *record) override { return (Partition_helper::ph_index_next(record)); }

Expand All @@ -180,11 +180,14 @@ class ha_rapidpart : public ha_rapid, public Partition_helper, public Partition_

private:
THD *m_thd{nullptr};

RapidPartShare *m_share{nullptr};

/** this is set to 1 when we are starting a table scan but have
not yet fetched any row, else false */
bool m_start_of_scan{false};

bool m_current_part_empty = false;
};

} // namespace ShannonBase
Expand Down
4 changes: 2 additions & 2 deletions storage/rapid_engine/imcs/chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,15 +346,15 @@ void Chunk::check_data_type(size_t type_size) {
}
}

int Chunk::is_null(const Rapid_load_context *context, row_id_t pos) {
int Chunk::is_null(const Rapid_scan_context *context, row_id_t pos) {
std::shared_lock<std::shared_mutex> lk(m_header_mutex);
if (!m_header->m_null_mask.get())
return static_cast<int>(false);
else
return Utils::Util::bit_array_get(m_header->m_null_mask.get(), pos);
}

int Chunk::is_deleted(const Rapid_load_context *context, row_id_t pos) {
int Chunk::is_deleted(const Rapid_scan_context *context, row_id_t pos) {
std::shared_lock<std::shared_mutex> lk(m_header_mutex);
if (!m_header->m_del_mask.get())
return SHANNON_SUCCESS;
Expand Down
6 changes: 4 additions & 2 deletions storage/rapid_engine/imcs/chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
#include "storage/rapid_engine/trx/transaction.h" //Transaction

namespace ShannonBase {
class Rapid_context;
class Rapid_load_context;
class Rapid_scan_context;
namespace Imcs {
class Cu;
/**
Expand Down Expand Up @@ -188,10 +190,10 @@ class Chunk : public MemoryObject {
int GC();

// gets null bit flag.
int is_null(const Rapid_load_context *context, row_id_t pos);
int is_null(const Rapid_scan_context *context, row_id_t pos);

// gets the delete flag.
int is_deleted(const Rapid_load_context *context, row_id_t pos);
int is_deleted(const Rapid_scan_context *context, row_id_t pos);

// get the normalized pack length
inline size_t normalized_pack_length() { return m_header->m_normalized_pack_length; }
Expand Down
Loading