-
Notifications
You must be signed in to change notification settings - Fork 1k
feat(search_family): Add basic support for the FT.CONFIG command #4779
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,8 +24,10 @@ | |
#include "server/conn_context.h" | ||
#include "server/container_utils.h" | ||
#include "server/engine_shard_set.h" | ||
#include "server/main_service.h" | ||
#include "server/search/aggregator.h" | ||
#include "server/search/doc_index.h" | ||
#include "server/server_family.h" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please remove |
||
#include "server/transaction.h" | ||
#include "src/core/overloaded.h" | ||
|
||
|
@@ -599,8 +601,116 @@ void SearchReply(const SearchParams& params, std::optional<search::AggregationIn | |
} | ||
} | ||
|
||
constexpr size_t kOptionsCount = 2; | ||
constexpr std::string_view kSearchLimit = "MAXSEARCHRESULTS"sv; | ||
constexpr std::string_view kAggregateLimit = "MAXAGGREGATERESULTS"sv; | ||
|
||
template <typename V> | ||
using ConfigOptionsMap = std::array<std::pair<const std::string_view, V>, kOptionsCount>; | ||
using Config = ConfigOptionsMap<uint64_t>; | ||
|
||
// Do not forget to update kConfigOptions after adding new option | ||
constexpr ConfigOptionsMap<const std::string_view> kConfigOptionsHelp = {{ | ||
{kSearchLimit, "Maximum number of results from ft.search command"sv}, | ||
{kAggregateLimit, "Maximum number of results from ft.aggregate command"sv}, | ||
}}; | ||
|
||
// Do not forget to update kConfigOptionsHelp after adding new option | ||
thread_local Config kConfigOptions = {{ | ||
{kSearchLimit, 10000}, | ||
{kAggregateLimit, 10000}, | ||
}}; | ||
|
||
static_assert( | ||
kConfigOptions.size() == kConfigOptionsHelp.size() && kConfigOptions.size() == kOptionsCount, | ||
"kConfigOptions and kConfigOptionsHelp must have the same size and equal to kOptionsCount."); | ||
|
||
template <typename V> | ||
std::optional<V> FindOptionsMapValue(const ConfigOptionsMap<V>& options, std::string_view name) { | ||
auto it = std::find_if(options.begin(), options.end(), [name](const auto& opt) { | ||
return absl::EqualsIgnoreCase(opt.first, name); | ||
}); | ||
return it != options.end() ? it->second : std::optional<V>{}; | ||
} | ||
|
||
void UpdateConfigOption(std::string_view option_name, uint64_t value) { | ||
auto it = std::find_if( | ||
kConfigOptions.begin(), kConfigOptions.end(), | ||
[option_name](const auto& opt) { return absl::EqualsIgnoreCase(opt.first, option_name); }); | ||
|
||
DCHECK(it != kConfigOptions.end()); | ||
it->second = value; | ||
} | ||
|
||
void FtConfigHelp(CmdArgParser* parser, RedisReplyBuilder* rb) { | ||
string_view option = parser->Next(); | ||
|
||
auto send_value = [&](string_view option_name) { | ||
auto value = FindOptionsMapValue(kConfigOptions, option_name); | ||
DCHECK(value.has_value()); | ||
rb->SendLong(value.value()); | ||
}; | ||
|
||
if (option == "*"sv) { | ||
rb->StartArray(kOptionsCount); | ||
for (const auto& option_help : kConfigOptionsHelp) { | ||
rb->StartArray(5); | ||
rb->SendBulkString(option_help.first); | ||
rb->SendBulkString("Description"sv); | ||
rb->SendBulkString(option_help.second); | ||
rb->SendBulkString("Value"sv); | ||
send_value(option_help.first); | ||
} | ||
return; | ||
} | ||
|
||
auto option_description = FindOptionsMapValue(kConfigOptionsHelp, option); | ||
if (option_description) { | ||
rb->StartArray(1); | ||
rb->StartArray(5); | ||
rb->SendBulkString(absl::AsciiStrToUpper(option)); | ||
rb->SendBulkString("Description"sv); | ||
rb->SendBulkString(option_description.value()); | ||
rb->SendBulkString("Value"sv); | ||
send_value(option); | ||
return; | ||
} | ||
|
||
LOG(WARNING) << "Unknown configuration option: " << option; | ||
rb->SendEmptyArray(); | ||
} | ||
|
||
void FtConfigGet(CmdArgParser* parser, RedisReplyBuilder* rb) { | ||
string_view option = parser->Next(); | ||
|
||
if (option == "*"sv) { | ||
rb->StartArray(kOptionsCount); | ||
for (const auto& option_help : kConfigOptions) { | ||
rb->StartArray(2); | ||
rb->SendBulkString(option_help.first); | ||
rb->SendLong(option_help.second); | ||
} | ||
return; | ||
} | ||
|
||
auto option_value = FindOptionsMapValue(kConfigOptions, option); | ||
if (option_value) { | ||
rb->StartArray(1); | ||
rb->StartArray(2); | ||
rb->SendBulkString(absl::AsciiStrToUpper(option)); | ||
rb->SendLong(option_value.value()); | ||
return; | ||
} | ||
|
||
LOG(WARNING) << "Unknown configuration option: " << option; | ||
rb->SendEmptyArray(); | ||
} | ||
|
||
} // namespace | ||
|
||
SearchFamily::SearchFamily(ServerFamily* server_family) : server_family_(server_family) { | ||
} | ||
|
||
void SearchFamily::FtCreate(CmdArgList args, const CommandContext& cmd_cntx) { | ||
auto* builder = cmd_cntx.rb; | ||
if (cmd_cntx.conn_cntx->conn_state.db_index != 0) { | ||
|
@@ -815,6 +925,10 @@ void SearchFamily::FtSearch(CmdArgList args, const CommandContext& cmd_cntx) { | |
if (!search_algo.Init(query_str, ¶ms->query_params, sort_opt)) | ||
return builder->SendError("Query syntax error"); | ||
|
||
auto search_limit = FindOptionsMapValue(kConfigOptions, kSearchLimit); | ||
DCHECK(search_limit.has_value()); | ||
params->limit_total = std::min(params->limit_total, search_limit.value()); | ||
|
||
// Because our coordinator thread may not have a shard, we can't check ahead if the index exists. | ||
atomic<bool> index_not_found{false}; | ||
vector<SearchResult> docs(shard_set->size()); | ||
|
@@ -966,6 +1080,50 @@ void SearchFamily::FtProfile(CmdArgList args, const CommandContext& cmd_cntx) { | |
} | ||
} | ||
|
||
void SearchFamily::FtConfigSet(CmdArgList args, const CommandContext& cmd_cntx) { | ||
CmdArgParser parser{args}; | ||
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb); | ||
|
||
string_view option = parser.Next(); | ||
uint64_t value = parser.Next<uint64_t>(); | ||
|
||
if (auto err = parser.Error(); err) | ||
return rb->SendError(err->MakeReply()); | ||
|
||
auto option_exists = FindOptionsMapValue(kConfigOptions, option); | ||
if (option_exists) { | ||
DVLOG(2) << "Setting " << option << " to " << value; | ||
|
||
auto cb = [option, value](util::ProactorBase*) { UpdateConfigOption(option, value); }; | ||
server_family_->service().proactor_pool().AwaitFiberOnAll(std::move(cb)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You don't meed server_family, you can use shard_set->pool()->AwaitFiberOnAll
Comment on lines
+1083
to
+1098
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what will happen if we have 2 simultaneous commands? |
||
|
||
// Schedule empty callback inorder to journal command via transaction framework. | ||
cmd_cntx.tx->ScheduleSingleHop([](auto* t, auto* shard) { return OpStatus::OK; }); | ||
|
||
rb->SendOk(); | ||
} else { | ||
rb->SendError("Invalid option"sv); | ||
} | ||
} | ||
|
||
void SearchFamily::FtConfig(CmdArgList args, const CommandContext& cmd_cntx) { | ||
CmdArgParser parser{args}; | ||
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb); | ||
|
||
if (parser.Check("SET")) { | ||
FtConfigSet(parser.Tail(), cmd_cntx); | ||
return; | ||
} | ||
|
||
auto func = parser.TryMapNext("GET", &FtConfigGet, "HELP", &FtConfigHelp); | ||
|
||
if (func) { | ||
return func.value()(&parser, rb); | ||
} else { | ||
return rb->SendError("Unknown subcommand"); | ||
} | ||
} | ||
|
||
void SearchFamily::FtTagVals(CmdArgList args, const CommandContext& cmd_cntx) { | ||
string_view index_name = ArgS(args, 0); | ||
string_view field_name = ArgS(args, 1); | ||
|
@@ -1052,11 +1210,19 @@ void SearchFamily::FtAggregate(CmdArgList args, const CommandContext& cmd_cntx) | |
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb); | ||
auto sortable_value_sender = SortableValueSender(rb); | ||
|
||
const size_t result_size = agg_results.values.size(); | ||
auto aggregate_limit = FindOptionsMapValue(kConfigOptions, kAggregateLimit); | ||
DCHECK(aggregate_limit.has_value()); | ||
size_t result_size = std::min(agg_results.values.size(), aggregate_limit.value()); | ||
|
||
rb->StartArray(result_size + 1); | ||
rb->SendLong(result_size); | ||
|
||
for (const auto& value : agg_results.values) { | ||
if (result_size == 0) { | ||
break; | ||
} | ||
result_size--; | ||
|
||
size_t fields_count = 0; | ||
for (const auto& field : agg_results.fields_to_print) { | ||
if (value.find(field) != value.end()) { | ||
|
@@ -1075,7 +1241,13 @@ void SearchFamily::FtAggregate(CmdArgList args, const CommandContext& cmd_cntx) | |
} | ||
} | ||
|
||
#define HFUNC(x) SetHandler(&SearchFamily::x) | ||
using EngineFunc = void (SearchFamily::*)(CmdArgList args, const CommandContext& cmd_cntx); | ||
|
||
inline CommandId::Handler3 HandlerFunc(SearchFamily* se, EngineFunc f) { | ||
return [=](CmdArgList args, const CommandContext& cmd_cntx) { return (se->*f)(args, cmd_cntx); }; | ||
} | ||
|
||
#define HFUNC(x) SetHandler(HandlerFunc(this, &SearchFamily::x)) | ||
|
||
// Redis search is a module. Therefore we introduce dragonfly extension search | ||
// to set as the default for the search family of commands. More sensible defaults, | ||
|
@@ -1089,18 +1261,19 @@ void SearchFamily::Register(CommandRegistry* registry) { | |
CO::NO_KEY_TRANSACTIONAL | CO::NO_KEY_TX_SPAN_ALL | CO::NO_AUTOJOURNAL; | ||
|
||
registry->StartFamily(); | ||
*registry << CI{"FT.CREATE", CO::WRITE | CO::GLOBAL_TRANS, -2, 0, 0, acl::FT_SEARCH}.HFUNC( | ||
FtCreate) | ||
<< CI{"FT.ALTER", CO::WRITE | CO::GLOBAL_TRANS, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtAlter) | ||
<< CI{"FT.DROPINDEX", CO::WRITE | CO::GLOBAL_TRANS, -2, 0, 0, acl::FT_SEARCH}.HFUNC( | ||
FtDropIndex) | ||
<< CI{"FT.INFO", kReadOnlyMask, 2, 0, 0, acl::FT_SEARCH}.HFUNC(FtInfo) | ||
// Underscore same as in RediSearch because it's "temporary" (long time already) | ||
<< CI{"FT._LIST", kReadOnlyMask, 1, 0, 0, acl::FT_SEARCH}.HFUNC(FtList) | ||
<< CI{"FT.SEARCH", kReadOnlyMask, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtSearch) | ||
<< CI{"FT.AGGREGATE", kReadOnlyMask, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtAggregate) | ||
<< CI{"FT.PROFILE", kReadOnlyMask, -4, 0, 0, acl::FT_SEARCH}.HFUNC(FtProfile) | ||
<< CI{"FT.TAGVALS", kReadOnlyMask, 3, 0, 0, acl::FT_SEARCH}.HFUNC(FtTagVals); | ||
*registry | ||
<< CI{"FT.CREATE", CO::WRITE | CO::GLOBAL_TRANS, -2, 0, 0, acl::FT_SEARCH}.HFUNC(FtCreate) | ||
<< CI{"FT.ALTER", CO::WRITE | CO::GLOBAL_TRANS, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtAlter) | ||
<< CI{"FT.DROPINDEX", CO::WRITE | CO::GLOBAL_TRANS, -2, 0, 0, acl::FT_SEARCH}.HFUNC( | ||
FtDropIndex) | ||
<< CI{"FT.CONFIG", CO::WRITE | CO::GLOBAL_TRANS, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtConfig) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm also not sure what options should I set here, But I think since we are going to remove mutexes it sould be GLOBAL_TRANS There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it should be GLOBAL anycase There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe you also need to add NO_KEY_TRANSACTIONAL There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just global is enough |
||
<< CI{"FT.INFO", kReadOnlyMask, 2, 0, 0, acl::FT_SEARCH}.HFUNC(FtInfo) | ||
// Underscore same as in RediSearch because it's "temporary" (long time already) | ||
<< CI{"FT._LIST", kReadOnlyMask, 1, 0, 0, acl::FT_SEARCH}.HFUNC(FtList) | ||
<< CI{"FT.SEARCH", kReadOnlyMask, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtSearch) | ||
<< CI{"FT.AGGREGATE", kReadOnlyMask, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtAggregate) | ||
<< CI{"FT.PROFILE", kReadOnlyMask, -4, 0, 0, acl::FT_SEARCH}.HFUNC(FtProfile) | ||
<< CI{"FT.TAGVALS", kReadOnlyMask, 3, 0, 0, acl::FT_SEARCH}.HFUNC(FtTagVals); | ||
} | ||
|
||
} // namespace dfly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need it?