Skip to content

feat(search_family): Add basic support for the FT.CONFIG command #4779

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,8 @@ Service::Service(ProactorPool* pp)
: pp_(*pp),
acl_family_(&user_registry_, pp),
server_family_(this),
cluster_family_(&server_family_) {
cluster_family_(&server_family_),
search_family_(&server_family_) {
CHECK(pp);
CHECK(shard_set == NULL);

Expand Down Expand Up @@ -2694,10 +2695,10 @@ void Service::RegisterCommands() {
JsonFamily::Register(&registry_);
BitOpsFamily::Register(&registry_);
HllFamily::Register(&registry_);
SearchFamily::Register(&registry_);
BloomFamily::Register(&registry_);
server_family_.Register(&registry_);
cluster_family_.Register(&registry_);
search_family_.Register(&registry_);

// AclFamily should always be registered last
// If we add a new familly, register that first above and *not* below
Expand Down
2 changes: 2 additions & 0 deletions src/server/main_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "server/command_registry.h"
#include "server/config_registry.h"
#include "server/engine_shard_set.h"
#include "server/search/search_family.h"
#include "server/server_family.h"

namespace util {
Expand Down Expand Up @@ -185,6 +186,7 @@ class Service : public facade::ServiceInterface {
acl::AclFamily acl_family_;
ServerFamily server_family_;
cluster::ClusterFamily cluster_family_;
SearchFamily search_family_;
CommandRegistry registry_;
absl::flat_hash_map<std::string, unsigned> unknown_cmds_;

Expand Down
201 changes: 187 additions & 14 deletions src/server/search/search_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
#include "server/conn_context.h"
#include "server/container_utils.h"
#include "server/engine_shard_set.h"
#include "server/main_service.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need it?

#include "server/search/aggregator.h"
#include "server/search/doc_index.h"
#include "server/server_family.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove

#include "server/transaction.h"
#include "src/core/overloaded.h"

Expand Down Expand Up @@ -599,8 +601,116 @@ void SearchReply(const SearchParams& params, std::optional<search::AggregationIn
}
}

constexpr size_t kOptionsCount = 2;
constexpr std::string_view kSearchLimit = "MAXSEARCHRESULTS"sv;
constexpr std::string_view kAggregateLimit = "MAXAGGREGATERESULTS"sv;

template <typename V>
using ConfigOptionsMap = std::array<std::pair<const std::string_view, V>, kOptionsCount>;
using Config = ConfigOptionsMap<uint64_t>;

// Do not forget to update kConfigOptions after adding new option
constexpr ConfigOptionsMap<const std::string_view> kConfigOptionsHelp = {{
{kSearchLimit, "Maximum number of results from ft.search command"sv},
{kAggregateLimit, "Maximum number of results from ft.aggregate command"sv},
}};

// Do not forget to update kConfigOptionsHelp after adding new option
thread_local Config kConfigOptions = {{
{kSearchLimit, 10000},
{kAggregateLimit, 10000},
}};

static_assert(
kConfigOptions.size() == kConfigOptionsHelp.size() && kConfigOptions.size() == kOptionsCount,
"kConfigOptions and kConfigOptionsHelp must have the same size and equal to kOptionsCount.");

template <typename V>
std::optional<V> FindOptionsMapValue(const ConfigOptionsMap<V>& options, std::string_view name) {
auto it = std::find_if(options.begin(), options.end(), [name](const auto& opt) {
return absl::EqualsIgnoreCase(opt.first, name);
});
return it != options.end() ? it->second : std::optional<V>{};
}

void UpdateConfigOption(std::string_view option_name, uint64_t value) {
auto it = std::find_if(
kConfigOptions.begin(), kConfigOptions.end(),
[option_name](const auto& opt) { return absl::EqualsIgnoreCase(opt.first, option_name); });

DCHECK(it != kConfigOptions.end());
it->second = value;
}

void FtConfigHelp(CmdArgParser* parser, RedisReplyBuilder* rb) {
string_view option = parser->Next();

auto send_value = [&](string_view option_name) {
auto value = FindOptionsMapValue(kConfigOptions, option_name);
DCHECK(value.has_value());
rb->SendLong(value.value());
};

if (option == "*"sv) {
rb->StartArray(kOptionsCount);
for (const auto& option_help : kConfigOptionsHelp) {
rb->StartArray(5);
rb->SendBulkString(option_help.first);
rb->SendBulkString("Description"sv);
rb->SendBulkString(option_help.second);
rb->SendBulkString("Value"sv);
send_value(option_help.first);
}
return;
}

auto option_description = FindOptionsMapValue(kConfigOptionsHelp, option);
if (option_description) {
rb->StartArray(1);
rb->StartArray(5);
rb->SendBulkString(absl::AsciiStrToUpper(option));
rb->SendBulkString("Description"sv);
rb->SendBulkString(option_description.value());
rb->SendBulkString("Value"sv);
send_value(option);
return;
}

LOG(WARNING) << "Unknown configuration option: " << option;
rb->SendEmptyArray();
}

void FtConfigGet(CmdArgParser* parser, RedisReplyBuilder* rb) {
string_view option = parser->Next();

if (option == "*"sv) {
rb->StartArray(kOptionsCount);
for (const auto& option_help : kConfigOptions) {
rb->StartArray(2);
rb->SendBulkString(option_help.first);
rb->SendLong(option_help.second);
}
return;
}

auto option_value = FindOptionsMapValue(kConfigOptions, option);
if (option_value) {
rb->StartArray(1);
rb->StartArray(2);
rb->SendBulkString(absl::AsciiStrToUpper(option));
rb->SendLong(option_value.value());
return;
}

LOG(WARNING) << "Unknown configuration option: " << option;
rb->SendEmptyArray();
}

} // namespace

SearchFamily::SearchFamily(ServerFamily* server_family) : server_family_(server_family) {
}

void SearchFamily::FtCreate(CmdArgList args, const CommandContext& cmd_cntx) {
auto* builder = cmd_cntx.rb;
if (cmd_cntx.conn_cntx->conn_state.db_index != 0) {
Expand Down Expand Up @@ -815,6 +925,10 @@ void SearchFamily::FtSearch(CmdArgList args, const CommandContext& cmd_cntx) {
if (!search_algo.Init(query_str, &params->query_params, sort_opt))
return builder->SendError("Query syntax error");

auto search_limit = FindOptionsMapValue(kConfigOptions, kSearchLimit);
DCHECK(search_limit.has_value());
params->limit_total = std::min(params->limit_total, search_limit.value());

// Because our coordinator thread may not have a shard, we can't check ahead if the index exists.
atomic<bool> index_not_found{false};
vector<SearchResult> docs(shard_set->size());
Expand Down Expand Up @@ -966,6 +1080,50 @@ void SearchFamily::FtProfile(CmdArgList args, const CommandContext& cmd_cntx) {
}
}

void SearchFamily::FtConfigSet(CmdArgList args, const CommandContext& cmd_cntx) {
CmdArgParser parser{args};
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);

string_view option = parser.Next();
uint64_t value = parser.Next<uint64_t>();

if (auto err = parser.Error(); err)
return rb->SendError(err->MakeReply());

auto option_exists = FindOptionsMapValue(kConfigOptions, option);
if (option_exists) {
DVLOG(2) << "Setting " << option << " to " << value;

auto cb = [option, value](util::ProactorBase*) { UpdateConfigOption(option, value); };
server_family_->service().proactor_pool().AwaitFiberOnAll(std::move(cb));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't meed server_family, you can use shard_set->pool()->AwaitFiberOnAll

Comment on lines +1083 to +1098
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what will happen if we have 2 simultaneous commands?


// Schedule empty callback inorder to journal command via transaction framework.
cmd_cntx.tx->ScheduleSingleHop([](auto* t, auto* shard) { return OpStatus::OK; });

rb->SendOk();
} else {
rb->SendError("Invalid option"sv);
}
}

void SearchFamily::FtConfig(CmdArgList args, const CommandContext& cmd_cntx) {
CmdArgParser parser{args};
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);

if (parser.Check("SET")) {
FtConfigSet(parser.Tail(), cmd_cntx);
return;
}

auto func = parser.TryMapNext("GET", &FtConfigGet, "HELP", &FtConfigHelp);

if (func) {
return func.value()(&parser, rb);
} else {
return rb->SendError("Unknown subcommand");
}
}

void SearchFamily::FtTagVals(CmdArgList args, const CommandContext& cmd_cntx) {
string_view index_name = ArgS(args, 0);
string_view field_name = ArgS(args, 1);
Expand Down Expand Up @@ -1052,11 +1210,19 @@ void SearchFamily::FtAggregate(CmdArgList args, const CommandContext& cmd_cntx)
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
auto sortable_value_sender = SortableValueSender(rb);

const size_t result_size = agg_results.values.size();
auto aggregate_limit = FindOptionsMapValue(kConfigOptions, kAggregateLimit);
DCHECK(aggregate_limit.has_value());
size_t result_size = std::min(agg_results.values.size(), aggregate_limit.value());

rb->StartArray(result_size + 1);
rb->SendLong(result_size);

for (const auto& value : agg_results.values) {
if (result_size == 0) {
break;
}
result_size--;

size_t fields_count = 0;
for (const auto& field : agg_results.fields_to_print) {
if (value.find(field) != value.end()) {
Expand All @@ -1075,7 +1241,13 @@ void SearchFamily::FtAggregate(CmdArgList args, const CommandContext& cmd_cntx)
}
}

#define HFUNC(x) SetHandler(&SearchFamily::x)
using EngineFunc = void (SearchFamily::*)(CmdArgList args, const CommandContext& cmd_cntx);

inline CommandId::Handler3 HandlerFunc(SearchFamily* se, EngineFunc f) {
return [=](CmdArgList args, const CommandContext& cmd_cntx) { return (se->*f)(args, cmd_cntx); };
}

#define HFUNC(x) SetHandler(HandlerFunc(this, &SearchFamily::x))

// Redis search is a module. Therefore we introduce dragonfly extension search
// to set as the default for the search family of commands. More sensible defaults,
Expand All @@ -1089,18 +1261,19 @@ void SearchFamily::Register(CommandRegistry* registry) {
CO::NO_KEY_TRANSACTIONAL | CO::NO_KEY_TX_SPAN_ALL | CO::NO_AUTOJOURNAL;

registry->StartFamily();
*registry << CI{"FT.CREATE", CO::WRITE | CO::GLOBAL_TRANS, -2, 0, 0, acl::FT_SEARCH}.HFUNC(
FtCreate)
<< CI{"FT.ALTER", CO::WRITE | CO::GLOBAL_TRANS, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtAlter)
<< CI{"FT.DROPINDEX", CO::WRITE | CO::GLOBAL_TRANS, -2, 0, 0, acl::FT_SEARCH}.HFUNC(
FtDropIndex)
<< CI{"FT.INFO", kReadOnlyMask, 2, 0, 0, acl::FT_SEARCH}.HFUNC(FtInfo)
// Underscore same as in RediSearch because it's "temporary" (long time already)
<< CI{"FT._LIST", kReadOnlyMask, 1, 0, 0, acl::FT_SEARCH}.HFUNC(FtList)
<< CI{"FT.SEARCH", kReadOnlyMask, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtSearch)
<< CI{"FT.AGGREGATE", kReadOnlyMask, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtAggregate)
<< CI{"FT.PROFILE", kReadOnlyMask, -4, 0, 0, acl::FT_SEARCH}.HFUNC(FtProfile)
<< CI{"FT.TAGVALS", kReadOnlyMask, 3, 0, 0, acl::FT_SEARCH}.HFUNC(FtTagVals);
*registry
<< CI{"FT.CREATE", CO::WRITE | CO::GLOBAL_TRANS, -2, 0, 0, acl::FT_SEARCH}.HFUNC(FtCreate)
<< CI{"FT.ALTER", CO::WRITE | CO::GLOBAL_TRANS, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtAlter)
<< CI{"FT.DROPINDEX", CO::WRITE | CO::GLOBAL_TRANS, -2, 0, 0, acl::FT_SEARCH}.HFUNC(
FtDropIndex)
<< CI{"FT.CONFIG", CO::WRITE | CO::GLOBAL_TRANS, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtConfig)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also not sure what options should I set here, But I think since we are going to remove mutexes it sould be GLOBAL_TRANS

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be GLOBAL anycase

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you also need to add NO_KEY_TRANSACTIONAL

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just global is enough

<< CI{"FT.INFO", kReadOnlyMask, 2, 0, 0, acl::FT_SEARCH}.HFUNC(FtInfo)
// Underscore same as in RediSearch because it's "temporary" (long time already)
<< CI{"FT._LIST", kReadOnlyMask, 1, 0, 0, acl::FT_SEARCH}.HFUNC(FtList)
<< CI{"FT.SEARCH", kReadOnlyMask, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtSearch)
<< CI{"FT.AGGREGATE", kReadOnlyMask, -3, 0, 0, acl::FT_SEARCH}.HFUNC(FtAggregate)
<< CI{"FT.PROFILE", kReadOnlyMask, -4, 0, 0, acl::FT_SEARCH}.HFUNC(FtProfile)
<< CI{"FT.TAGVALS", kReadOnlyMask, 3, 0, 0, acl::FT_SEARCH}.HFUNC(FtTagVals);
}

} // namespace dfly
35 changes: 23 additions & 12 deletions src/server/search/search_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,35 @@ class SinkReplyBuilder;
} // namespace facade

namespace dfly {
class ServerFamily;
class CommandRegistry;
struct CommandContext;

class SearchFamily {
using SinkReplyBuilder = facade::SinkReplyBuilder;
public:
explicit SearchFamily(ServerFamily* server_family);

static void FtCreate(CmdArgList args, const CommandContext& cmd_cntx);
static void FtAlter(CmdArgList args, const CommandContext& cmd_cntx);
static void FtDropIndex(CmdArgList args, const CommandContext& cmd_cntx);
static void FtInfo(CmdArgList args, const CommandContext& cmd_cntx);
static void FtList(CmdArgList args, const CommandContext& cmd_cntx);
static void FtSearch(CmdArgList args, const CommandContext& cmd_cntx);
static void FtProfile(CmdArgList args, const CommandContext& cmd_cntx);
static void FtAggregate(CmdArgList args, const CommandContext& cmd_cntx);
static void FtTagVals(CmdArgList args, const CommandContext& cmd_cntx);
void Register(CommandRegistry* registry);

public:
static void Register(CommandRegistry* registry);
private:
using SinkReplyBuilder = facade::SinkReplyBuilder;

void FtCreate(CmdArgList args, const CommandContext& cmd_cntx);
void FtAlter(CmdArgList args, const CommandContext& cmd_cntx);
void FtDropIndex(CmdArgList args, const CommandContext& cmd_cntx);
void FtInfo(CmdArgList args, const CommandContext& cmd_cntx);
void FtList(CmdArgList args, const CommandContext& cmd_cntx);
void FtSearch(CmdArgList args, const CommandContext& cmd_cntx);
void FtProfile(CmdArgList args, const CommandContext& cmd_cntx);
void FtConfig(CmdArgList args, const CommandContext& cmd_cntx);
void FtAggregate(CmdArgList args, const CommandContext& cmd_cntx);
void FtTagVals(CmdArgList args, const CommandContext& cmd_cntx);

// Uses server_family_ to synchronize config changes
// Should not be registered
void FtConfigSet(CmdArgList args, const CommandContext& cmd_cntx);

ServerFamily* server_family_;
};

} // namespace dfly
Loading
Loading