From 1aabaed98276f7a90c14291d79492d2e44153845 Mon Sep 17 00:00:00 2001 From: Stepan Bagritsevich Date: Wed, 19 Mar 2025 14:21:59 +0100 Subject: [PATCH] feat(search_family): Add basic support for the FT.CONFIG fixes dragonflydb#4352 Signed-off-by: Stepan Bagritsevich --- src/server/main_service.cc | 5 +- src/server/main_service.h | 2 + src/server/search/search_family.cc | 201 ++++++++++++++++++++++-- src/server/search/search_family.h | 35 +++-- src/server/search/search_family_test.cc | 72 +++++++++ 5 files changed, 287 insertions(+), 28 deletions(-) diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 09f7ac8276a9..8089309884ef 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -714,7 +714,8 @@ Service::Service(ProactorPool* pp) : pp_(*pp), acl_family_(&user_registry_, pp), server_family_(this), - cluster_family_(&server_family_) { + cluster_family_(&server_family_), + search_family_(&server_family_) { CHECK(pp); CHECK(shard_set == NULL); @@ -2694,10 +2695,10 @@ void Service::RegisterCommands() { JsonFamily::Register(®istry_); BitOpsFamily::Register(®istry_); HllFamily::Register(®istry_); - SearchFamily::Register(®istry_); BloomFamily::Register(®istry_); server_family_.Register(®istry_); cluster_family_.Register(®istry_); + search_family_.Register(®istry_); // AclFamily should always be registered last // If we add a new familly, register that first above and *not* below diff --git a/src/server/main_service.h b/src/server/main_service.h index 32a7386deca8..a95f4c3a95d9 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -16,6 +16,7 @@ #include "server/command_registry.h" #include "server/config_registry.h" #include "server/engine_shard_set.h" +#include "server/search/search_family.h" #include "server/server_family.h" namespace util { @@ -185,6 +186,7 @@ class Service : public facade::ServiceInterface { acl::AclFamily acl_family_; ServerFamily server_family_; cluster::ClusterFamily cluster_family_; + SearchFamily search_family_; CommandRegistry registry_; absl::flat_hash_map unknown_cmds_; diff --git a/src/server/search/search_family.cc b/src/server/search/search_family.cc index 08c5e5484cab..de71b3863c64 100644 --- a/src/server/search/search_family.cc +++ b/src/server/search/search_family.cc @@ -24,8 +24,10 @@ #include "server/conn_context.h" #include "server/container_utils.h" #include "server/engine_shard_set.h" +#include "server/main_service.h" #include "server/search/aggregator.h" #include "server/search/doc_index.h" +#include "server/server_family.h" #include "server/transaction.h" #include "src/core/overloaded.h" @@ -599,8 +601,116 @@ void SearchReply(const SearchParams& params, std::optional +using ConfigOptionsMap = std::array, kOptionsCount>; +using Config = ConfigOptionsMap; + +// Do not forget to update kConfigOptions after adding new option +constexpr ConfigOptionsMap kConfigOptionsHelp = {{ + {kSearchLimit, "Maximum number of results from ft.search command"sv}, + {kAggregateLimit, "Maximum number of results from ft.aggregate command"sv}, +}}; + +// Do not forget to update kConfigOptionsHelp after adding new option +thread_local Config kConfigOptions = {{ + {kSearchLimit, 10000}, + {kAggregateLimit, 10000}, +}}; + +static_assert( + kConfigOptions.size() == kConfigOptionsHelp.size() && kConfigOptions.size() == kOptionsCount, + "kConfigOptions and kConfigOptionsHelp must have the same size and equal to kOptionsCount."); + +template +std::optional FindOptionsMapValue(const ConfigOptionsMap& options, std::string_view name) { + auto it = std::find_if(options.begin(), options.end(), [name](const auto& opt) { + return absl::EqualsIgnoreCase(opt.first, name); + }); + return it != options.end() ? it->second : std::optional{}; +} + +void UpdateConfigOption(std::string_view option_name, uint64_t value) { + auto it = std::find_if( + kConfigOptions.begin(), kConfigOptions.end(), + [option_name](const auto& opt) { return absl::EqualsIgnoreCase(opt.first, option_name); }); + + DCHECK(it != kConfigOptions.end()); + it->second = value; +} + +void FtConfigHelp(CmdArgParser* parser, RedisReplyBuilder* rb) { + string_view option = parser->Next(); + + auto send_value = [&](string_view option_name) { + auto value = FindOptionsMapValue(kConfigOptions, option_name); + DCHECK(value.has_value()); + rb->SendLong(value.value()); + }; + + if (option == "*"sv) { + rb->StartArray(kOptionsCount); + for (const auto& option_help : kConfigOptionsHelp) { + rb->StartArray(5); + rb->SendBulkString(option_help.first); + rb->SendBulkString("Description"sv); + rb->SendBulkString(option_help.second); + rb->SendBulkString("Value"sv); + send_value(option_help.first); + } + return; + } + + auto option_description = FindOptionsMapValue(kConfigOptionsHelp, option); + if (option_description) { + rb->StartArray(1); + rb->StartArray(5); + rb->SendBulkString(absl::AsciiStrToUpper(option)); + rb->SendBulkString("Description"sv); + rb->SendBulkString(option_description.value()); + rb->SendBulkString("Value"sv); + send_value(option); + return; + } + + LOG(WARNING) << "Unknown configuration option: " << option; + rb->SendEmptyArray(); +} + +void FtConfigGet(CmdArgParser* parser, RedisReplyBuilder* rb) { + string_view option = parser->Next(); + + if (option == "*"sv) { + rb->StartArray(kOptionsCount); + for (const auto& option_help : kConfigOptions) { + rb->StartArray(2); + rb->SendBulkString(option_help.first); + rb->SendLong(option_help.second); + } + return; + } + + auto option_value = FindOptionsMapValue(kConfigOptions, option); + if (option_value) { + rb->StartArray(1); + rb->StartArray(2); + rb->SendBulkString(absl::AsciiStrToUpper(option)); + rb->SendLong(option_value.value()); + return; + } + + LOG(WARNING) << "Unknown configuration option: " << option; + rb->SendEmptyArray(); +} + } // namespace +SearchFamily::SearchFamily(ServerFamily* server_family) : server_family_(server_family) { +} + void SearchFamily::FtCreate(CmdArgList args, const CommandContext& cmd_cntx) { auto* builder = cmd_cntx.rb; if (cmd_cntx.conn_cntx->conn_state.db_index != 0) { @@ -815,6 +925,10 @@ void SearchFamily::FtSearch(CmdArgList args, const CommandContext& cmd_cntx) { if (!search_algo.Init(query_str, ¶ms->query_params, sort_opt)) return builder->SendError("Query syntax error"); + auto search_limit = FindOptionsMapValue(kConfigOptions, kSearchLimit); + DCHECK(search_limit.has_value()); + params->limit_total = std::min(params->limit_total, search_limit.value()); + // Because our coordinator thread may not have a shard, we can't check ahead if the index exists. atomic index_not_found{false}; vector docs(shard_set->size()); @@ -966,6 +1080,50 @@ void SearchFamily::FtProfile(CmdArgList args, const CommandContext& cmd_cntx) { } } +void SearchFamily::FtConfigSet(CmdArgList args, const CommandContext& cmd_cntx) { + CmdArgParser parser{args}; + auto* rb = static_cast(cmd_cntx.rb); + + string_view option = parser.Next(); + uint64_t value = parser.Next(); + + if (auto err = parser.Error(); err) + return rb->SendError(err->MakeReply()); + + auto option_exists = FindOptionsMapValue(kConfigOptions, option); + if (option_exists) { + DVLOG(2) << "Setting " << option << " to " << value; + + auto cb = [option, value](util::ProactorBase*) { UpdateConfigOption(option, value); }; + server_family_->service().proactor_pool().AwaitFiberOnAll(std::move(cb)); + + // Schedule empty callback inorder to journal command via transaction framework. + cmd_cntx.tx->ScheduleSingleHop([](auto* t, auto* shard) { return OpStatus::OK; }); + + rb->SendOk(); + } else { + rb->SendError("Invalid option"sv); + } +} + +void SearchFamily::FtConfig(CmdArgList args, const CommandContext& cmd_cntx) { + CmdArgParser parser{args}; + auto* rb = static_cast(cmd_cntx.rb); + + if (parser.Check("SET")) { + FtConfigSet(parser.Tail(), cmd_cntx); + return; + } + + auto func = parser.TryMapNext("GET", &FtConfigGet, "HELP", &FtConfigHelp); + + if (func) { + return func.value()(&parser, rb); + } else { + return rb->SendError("Unknown subcommand"); + } +} + void SearchFamily::FtTagVals(CmdArgList args, const CommandContext& cmd_cntx) { string_view index_name = ArgS(args, 0); string_view field_name = ArgS(args, 1); @@ -1052,11 +1210,19 @@ void SearchFamily::FtAggregate(CmdArgList args, const CommandContext& cmd_cntx) auto* rb = static_cast(cmd_cntx.rb); auto sortable_value_sender = SortableValueSender(rb); - const size_t result_size = agg_results.values.size(); + auto aggregate_limit = FindOptionsMapValue(kConfigOptions, kAggregateLimit); + DCHECK(aggregate_limit.has_value()); + size_t result_size = std::min(agg_results.values.size(), aggregate_limit.value()); + rb->StartArray(result_size + 1); rb->SendLong(result_size); for (const auto& value : agg_results.values) { + if (result_size == 0) { + break; + } + result_size--; + size_t fields_count = 0; for (const auto& field : agg_results.fields_to_print) { if (value.find(field) != value.end()) { @@ -1075,7 +1241,13 @@ void SearchFamily::FtAggregate(CmdArgList args, const CommandContext& cmd_cntx) } } -#define HFUNC(x) SetHandler(&SearchFamily::x) +using EngineFunc = void (SearchFamily::*)(CmdArgList args, const CommandContext& cmd_cntx); + +inline CommandId::Handler3 HandlerFunc(SearchFamily* se, EngineFunc f) { + return [=](CmdArgList args, const CommandContext& cmd_cntx) { return (se->*f)(args, cmd_cntx); }; +} + +#define HFUNC(x) SetHandler(HandlerFunc(this, &SearchFamily::x)) // Redis search is a module. Therefore we introduce dragonfly extension search // to set as the default for the search family of commands. More sensible defaults, @@ -1089,18 +1261,19 @@ void SearchFamily::Register(CommandRegistry* registry) { CO::NO_KEY_TRANSACTIONAL | CO::NO_KEY_TX_SPAN_ALL | CO::NO_AUTOJOURNAL; registry->StartFamily(); - *registry << CI{"FT.CREATE", CO::WRITE | CO::GLOBAL_TRANS, -2, 0, 0, acl::FT_SEARCH}.HFUNC( - FtCreate) - << CI{"FT.ALTER", CO::WRITE | CO::GLOBAL_TRANS, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtAlter) - << CI{"FT.DROPINDEX", CO::WRITE | CO::GLOBAL_TRANS, -2, 0, 0, acl::FT_SEARCH}.HFUNC( - FtDropIndex) - << CI{"FT.INFO", kReadOnlyMask, 2, 0, 0, acl::FT_SEARCH}.HFUNC(FtInfo) - // Underscore same as in RediSearch because it's "temporary" (long time already) - << CI{"FT._LIST", kReadOnlyMask, 1, 0, 0, acl::FT_SEARCH}.HFUNC(FtList) - << CI{"FT.SEARCH", kReadOnlyMask, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtSearch) - << CI{"FT.AGGREGATE", kReadOnlyMask, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtAggregate) - << CI{"FT.PROFILE", kReadOnlyMask, -4, 0, 0, acl::FT_SEARCH}.HFUNC(FtProfile) - << CI{"FT.TAGVALS", kReadOnlyMask, 3, 0, 0, acl::FT_SEARCH}.HFUNC(FtTagVals); + *registry + << CI{"FT.CREATE", CO::WRITE | CO::GLOBAL_TRANS, -2, 0, 0, acl::FT_SEARCH}.HFUNC(FtCreate) + << CI{"FT.ALTER", CO::WRITE | CO::GLOBAL_TRANS, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtAlter) + << CI{"FT.DROPINDEX", CO::WRITE | CO::GLOBAL_TRANS, -2, 0, 0, acl::FT_SEARCH}.HFUNC( + FtDropIndex) + << CI{"FT.CONFIG", CO::WRITE | CO::GLOBAL_TRANS, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtConfig) + << CI{"FT.INFO", kReadOnlyMask, 2, 0, 0, acl::FT_SEARCH}.HFUNC(FtInfo) + // Underscore same as in RediSearch because it's "temporary" (long time already) + << CI{"FT._LIST", kReadOnlyMask, 1, 0, 0, acl::FT_SEARCH}.HFUNC(FtList) + << CI{"FT.SEARCH", kReadOnlyMask, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtSearch) + << CI{"FT.AGGREGATE", kReadOnlyMask, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtAggregate) + << CI{"FT.PROFILE", kReadOnlyMask, -4, 0, 0, acl::FT_SEARCH}.HFUNC(FtProfile) + << CI{"FT.TAGVALS", kReadOnlyMask, 3, 0, 0, acl::FT_SEARCH}.HFUNC(FtTagVals); } } // namespace dfly diff --git a/src/server/search/search_family.h b/src/server/search/search_family.h index 104c9a92076c..bdc38bd60c43 100644 --- a/src/server/search/search_family.h +++ b/src/server/search/search_family.h @@ -14,24 +14,35 @@ class SinkReplyBuilder; } // namespace facade namespace dfly { +class ServerFamily; class CommandRegistry; struct CommandContext; class SearchFamily { - using SinkReplyBuilder = facade::SinkReplyBuilder; + public: + explicit SearchFamily(ServerFamily* server_family); - static void FtCreate(CmdArgList args, const CommandContext& cmd_cntx); - static void FtAlter(CmdArgList args, const CommandContext& cmd_cntx); - static void FtDropIndex(CmdArgList args, const CommandContext& cmd_cntx); - static void FtInfo(CmdArgList args, const CommandContext& cmd_cntx); - static void FtList(CmdArgList args, const CommandContext& cmd_cntx); - static void FtSearch(CmdArgList args, const CommandContext& cmd_cntx); - static void FtProfile(CmdArgList args, const CommandContext& cmd_cntx); - static void FtAggregate(CmdArgList args, const CommandContext& cmd_cntx); - static void FtTagVals(CmdArgList args, const CommandContext& cmd_cntx); + void Register(CommandRegistry* registry); - public: - static void Register(CommandRegistry* registry); + private: + using SinkReplyBuilder = facade::SinkReplyBuilder; + + void FtCreate(CmdArgList args, const CommandContext& cmd_cntx); + void FtAlter(CmdArgList args, const CommandContext& cmd_cntx); + void FtDropIndex(CmdArgList args, const CommandContext& cmd_cntx); + void FtInfo(CmdArgList args, const CommandContext& cmd_cntx); + void FtList(CmdArgList args, const CommandContext& cmd_cntx); + void FtSearch(CmdArgList args, const CommandContext& cmd_cntx); + void FtProfile(CmdArgList args, const CommandContext& cmd_cntx); + void FtConfig(CmdArgList args, const CommandContext& cmd_cntx); + void FtAggregate(CmdArgList args, const CommandContext& cmd_cntx); + void FtTagVals(CmdArgList args, const CommandContext& cmd_cntx); + + // Uses server_family_ to synchronize config changes + // Should not be registered + void FtConfigSet(CmdArgList args, const CommandContext& cmd_cntx); + + ServerFamily* server_family_; }; } // namespace dfly diff --git a/src/server/search/search_family_test.cc b/src/server/search/search_family_test.cc index 1744e63ecfc7..e20481a566f3 100644 --- a/src/server/search/search_family_test.cc +++ b/src/server/search/search_family_test.cc @@ -2014,4 +2014,76 @@ TEST_F(SearchFamilyTest, InvalidCreateOptions) { EXPECT_THAT(resp, ErrArg(kInvalidIntErr)); } +TEST_F(SearchFamilyTest, ConfigMaxSearchResults) { + Run({"HSET", "doc1", "title", "hello world1"}); + Run({"HSET", "doc2", "title", "hello world2"}); + Run({"FT.CREATE", "index", "ON", "HASH", "SCHEMA", "title", "TEXT"}); + + auto resp = Run({"FT.SEARCH", "index", "*", "NOCONTENT"}); + EXPECT_THAT(resp, IsUnordArray(IntArg(2), "doc1", "doc2")); + + resp = Run({"FT.CONFIG", "GET", "MAXSEARCHRESULTS"}); + EXPECT_THAT(resp, IsArray("MAXSEARCHRESULTS", IntArg(10000))); + + resp = Run({"FT.CONFIG", "SET", "MAXSEARCHRESULTS", "1"}); + EXPECT_EQ(resp, "OK"); + + resp = Run({"FT.SEARCH", "index", "*", "NOCONTENT"}); + EXPECT_THAT(resp, IsUnordArray(IntArg(2), AnyOf("doc1", "doc2"))); + + resp = Run({"FT.CONFIG", "GET", "MAXSEARCHRESULTS"}); + EXPECT_THAT(resp, IsArray("MAXSEARCHRESULTS", IntArg(1))); +} + +TEST_F(SearchFamilyTest, ConfigMaxAggregateResult) { + Run({"HSET", "doc1", "title", "hello world1"}); + Run({"HSET", "doc2", "title", "hello world2"}); + Run({"FT.CREATE", "index", "ON", "HASH", "SCHEMA", "title", "TEXT"}); + + auto resp = Run({"FT.AGGREGATE", "index", "*", "GROUPBY", "1", "@title", "REDUCE", "COUNT", "0", + "AS", "count"}); + EXPECT_THAT(resp, IsUnordArrayWithSize(IsMap("title", "hello world1", "count", "1"), + IsMap("title", "hello world2", "count", "1"))); + + resp = Run({"FT.CONFIG", "GET", "MAXAGGREGATERESULTS"}); + EXPECT_THAT(resp, IsArray("MAXAGGREGATERESULTS", IntArg(10000))); + + resp = Run({"FT.CONFIG", "SET", "MAXAGGREGATERESULTS", "1"}); + EXPECT_EQ(resp, "OK"); + + resp = Run({"FT.AGGREGATE", "index", "*", "GROUPBY", "1", "@title", "REDUCE", "COUNT", "0", "AS", + "count"}); + EXPECT_THAT(resp, IsUnordArrayWithSize(AnyOf(IsMap("title", "hello world1", "count", "1"), + IsMap("title", "hello world2", "count", "1")))); + + resp = Run({"FT.CONFIG", "GET", "MAXAGGREGATERESULTS"}); + EXPECT_THAT(resp, IsArray("MAXAGGREGATERESULTS", IntArg(1))); +} + +TEST_F(SearchFamilyTest, InvalidConfigOptions) { + // Test with an invalid argument + auto resp = Run({"FT.CONFIG", "INVALIDARG", "INVLIDARG"}); + EXPECT_THAT(resp, ErrArg("Unknown subcommand")); + + // Test with an invalid argument + resp = Run({"FT.CONFIG", "GET", "INVALIDARG"}); + EXPECT_THAT(resp, IsArray()); + + // Test with an invalid argument + resp = Run({"FT.CONFIG", "SET", "INVALIDARG"}); + EXPECT_THAT(resp, ErrArg(kSyntaxErr)); + + // Test with an invalid argument + resp = Run({"FT.CONFIG", "SET", "INVALIDARG", "5"}); + EXPECT_THAT(resp, ErrArg("Invalid option")); + + // Test with an invalid value + resp = Run({"FT.CONFIG", "SET", "MAXSEARCHRESULTS", "not_a_number"}); + EXPECT_THAT(resp, ErrArg(kInvalidIntErr)); + + // Test with an invalid argument + resp = Run({"FT.CONFIG", "HELP", "INVALIDARG"}); + EXPECT_THAT(resp, IsArray()); +} + } // namespace dfly