Skip to content

[DO NOT MERGE] Redis experiments #7417

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 60 additions & 3 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ dependencies = [
"encoding_rs",
"flate2",
"form_urlencoded",
"fred",
"fred 9.4.0",
"futures",
"futures-test",
"graphql_client",
Expand Down Expand Up @@ -2441,6 +2441,15 @@ dependencies = [
"num-traits",
]

[[package]]
name = "float-cmp"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b09cf3155332e944990140d967ff5eceb70df778b34f77d8075db46e4704e6d8"
dependencies = [
"num-traits",
]

[[package]]
name = "fnv"
version = "1.0.7"
Expand Down Expand Up @@ -2542,13 +2551,13 @@ dependencies = [
"bytes",
"bytes-utils",
"crossbeam-queue",
"float-cmp",
"float-cmp 0.9.0",
"fred-macros",
"futures",
"log",
"parking_lot",
"rand 0.8.5",
"redis-protocol",
"redis-protocol 5.0.1",
"rustls",
"rustls-native-certs 0.7.3",
"semver",
Expand All @@ -2561,6 +2570,31 @@ dependencies = [
"urlencoding",
]

[[package]]
name = "fred"
version = "10.1.0"
source = "git+https://github.com/SimonSapin/fred.rs?rev=32391ec56ac18ee83d02e9f112910b0382bc082c#32391ec56ac18ee83d02e9f112910b0382bc082c"
dependencies = [
"arc-swap",
"async-trait",
"bytes",
"bytes-utils",
"float-cmp 0.10.0",
"fred-macros",
"futures",
"log",
"parking_lot",
"rand 0.8.5",
"redis-protocol 6.0.0",
"semver",
"socket2",
"tokio",
"tokio-stream",
"tokio-util",
"url",
"urlencoding",
]

[[package]]
name = "fred-macros"
version = "0.1.0"
Expand Down Expand Up @@ -5275,6 +5309,15 @@ dependencies = [
"crossbeam-utils",
]

[[package]]
name = "redis-experiments"
version = "0.1.0"
dependencies = [
"fred 10.1.0",
"tokio",
"uuid",
]

[[package]]
name = "redis-protocol"
version = "5.0.1"
Expand All @@ -5289,6 +5332,20 @@ dependencies = [
"nom",
]

[[package]]
name = "redis-protocol"
version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cdba59219406899220fc4cdfd17a95191ba9c9afb719b5fa5a083d63109a9f1"
dependencies = [
"bytes",
"bytes-utils",
"cookie-factory",
"crc16",
"log",
"nom",
]

[[package]]
name = "redox_syscall"
version = "0.5.7"
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ members = [
"examples/throw-error/rhai",
"fuzz",
"fuzz/subgraph",
"redis-experiments",
# Note that xtask is not in the workspace member because it relies on dependencies that are incompatible with the router. Notably hyperx but there are others.
]

Expand Down
17 changes: 17 additions & 0 deletions redis-experiments/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "redis-experiments"
version = "0.1.0"
edition = "2024"

[dependencies]
# env_logger = "0.11.5"
# fred = { version = "10.1.0", features = ["i-redisearch"] }
# fred = { path = "../../fred.rs", features = ["i-redisearch"] }
tokio = "1.44.2"
uuid = { version = "1.16.0", features = ["v4"] }

[dependencies.fred]
# https://github.com/aembke/fred.rs/issues/346 and https://github.com/aembke/fred.rs/pull/349
git = "https://github.com/SimonSapin/fred.rs"
rev = "32391ec56ac18ee83d02e9f112910b0382bc082c"
features = ["i-redisearch"]
231 changes: 231 additions & 0 deletions redis-experiments/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
use std::borrow::Cow;

use fred::bytes_utils::Str;
use fred::prelude::*;
use fred::types::MultipleKeys;
use fred::types::RespVersion;
use fred::types::redisearch::FtCreateOptions;
use fred::types::redisearch::FtSearchOptions;
use fred::types::redisearch::IndexKind;
use fred::types::redisearch::SearchField;
use fred::types::redisearch::SearchParameter;
use fred::types::redisearch::SearchSchema;
use fred::types::redisearch::SearchSchemaKind;

const INVALIDATION_KEY_SEPARATOR: char = '\t';
const INVALIDATION_BATCH_SIZE: i64 = 1_000;

pub struct Cache {
pub client: Client,
pub config: CacheConfig,
}

pub struct CacheConfig {
pub namespace: Option<String>,
pub temporary_seconds: Option<u64>,
pub index_name: String,
pub indexed_document_id_prefix: String,
pub invalidation_keys_field_name: String,
}

#[derive(Debug, Clone, Copy)]
pub enum Expire {
In { seconds: i64 },
At { timestamp: i64 },
}

impl CacheConfig {
pub fn random_namespace() -> String {
uuid::Uuid::new_v4().simple().to_string()
}
}

impl Cache {
/// Wraps the `FT.CREATE` command, ignoring "Index already exists" errors
pub async fn create_index_if_not_exists(&self) -> FredResult<()> {
let result = self.create_index().await;
if let Err(err) = &result {
if err.details() == "Index already exists" {
return Ok(());
}
}
result
}

/// Wraps the `FT.CREATE` command
pub async fn create_index(&self) -> FredResult<()> {
self.client
.ft_create(
self.index_name(),
FtCreateOptions {
on: Some(IndexKind::Hash),
prefixes: vec![
self.document_id(&self.config.indexed_document_id_prefix)
.into(),
],
temporary: self.config.temporary_seconds,
..Default::default()
},
vec![SearchSchema {
field_name: self.config.invalidation_keys_field_name.as_str().into(),
alias: None,
kind: SearchSchemaKind::Tag {
sortable: false,
unf: false,
separator: Some(INVALIDATION_KEY_SEPARATOR),
casesensitive: false,
withsuffixtrie: false,
noindex: false,
},
}],
)
.await
}

/// Wraps the `FT.DROPINDEX` command
pub async fn drop_index(&self, drop_indexed_documents: bool) -> FredResult<()> {
self.client
.ft_dropindex(self.index_name(), drop_indexed_documents)
.await
}

/// Wraps the `HSET` command, adding an indexed field for invalidation keys
///
/// If `document_id` already exists and is a hash document, new values are "merged" with it
/// (existing fields not specified here are not removed).
pub async fn insert_hash_document<InvalidationKey, MapKey, MapValue>(
&self,
document_id: &str,
expire: Expire,
invalidation_keys: impl IntoIterator<Item = InvalidationKey>,
values: impl IntoIterator<Item = (MapKey, MapValue)>,
) -> FredResult<()>
where
InvalidationKey: AsRef<str>,
MapKey: Into<Key>,
MapValue: Into<Value>,
{
let mut invalidation_keys = invalidation_keys.into_iter();
let invalidation_keys: Option<String> = invalidation_keys.next().map(|first| {
let mut separated = first.as_ref().to_owned();
for next in invalidation_keys {
separated.push(INVALIDATION_KEY_SEPARATOR);
separated.push_str(next.as_ref());
}
// Looks like "{key1}{INVALIDATION_KEY_SEPARATOR}{key2}"
separated
});
let maybe_invalidation_keys_field = invalidation_keys.map(|v| {
let k = Key::from(&self.config.invalidation_keys_field_name);
(k, v.into())
});
let map = fred::types::Map::from_iter(
values
.into_iter()
.map(|(k, v)| (k.into(), v.into()))
.chain(maybe_invalidation_keys_field),
);
let id = self.document_id(document_id);
let _: () = self.client.hset(id.as_ref(), map).await?;
match expire {
Expire::In { seconds } => self.client.expire(id.as_ref(), seconds, None).await,
Expire::At { timestamp } => self.client.expire_at(id.as_ref(), timestamp, None).await,
}
}

/// Wraps the `HMGET` command
pub async fn get_hash_document<R: FromValue>(
&self,
document_id: &str,
fields: impl Into<MultipleKeys> + Send,
) -> FredResult<R> {
self.client
.hmget(self.document_id(document_id).as_ref(), fields)
.await
}

/// Deletes all documents that have `invalidation_key`.
///
/// Returns the number of deleted documents.
pub async fn invalidate(&self, invalidation_key: impl Into<Str>) -> FredResult<u64> {
// We want `NOCONTENT` but it’s apparently not supported on AWS:
// TODO: test this, they also don’t document `DIALECT 2` but give an example with it.
// https://docs.aws.amazon.com/memorydb/latest/devguide/vector-search-commands-ft.search.html
//
// A work-around is `RETURN 0` (a empty list of zero fields) but it’s not supported by Fred:
// https://github.com/aembke/fred.rs/issues/345
//
// As a work-around for a the work-around, we request a single field that we know exists
let options = FtSearchOptions {
// nocontent: true,
// r#return: vec![],
r#return: vec![SearchField {
identifier: self.config.invalidation_keys_field_name.as_str().into(),
property: None,
}],

limit: Some((0, INVALIDATION_BATCH_SIZE)),
dialect: Some(2),
params: vec![SearchParameter {
name: Str::from_static("key"),
value: invalidation_key.into(),
}],

..Default::default()
};
let query = format!("@{}:{{$key}}", self.config.invalidation_keys_field_name);
let mut count = 0;

// https://redis.io/docs/latest/develop/reference/protocol-spec/#resp-versions
// > Future versions of Redis may change the default protocol version
//
// The result of FT.SEARCH is a map in RESP3 v.s. an array in RESP2.
assert_eq!(self.client.protocol_version(), RespVersion::RESP2);
loop {
let search_result = self
.client
.ft_search(self.index_name(), &query, options.clone())
.await?;
let Value::Array(array) = search_result else {
return Err(Error::new(
ErrorKind::Parse,
"Expected an array from FT.SEARCH",
));
};
let mut iter = array.into_iter();
let _count = iter.next();
if iter.len() == 0 {
return Ok(count);
}
let mut keys_to_delete = Vec::with_capacity(iter.len() / 2);
while let Some(id_value) = iter.next() {
let Value::String(id_string) = id_value else {
return Err(Error::new(
ErrorKind::Parse,
"Expected a string for document ID from FT.SEARCH",
));
};
keys_to_delete.push(id_string);
let _values = iter.next();
}
let deleted: u64 = self.client.del(keys_to_delete).await?;
count += deleted;
}
}

fn document_id<'a>(&self, id: &'a str) -> Cow<'a, str> {
if let Some(ns) = &self.config.namespace {
format!("{ns}:{id}").into()
} else {
id.into()
}
}

fn index_name(&self) -> Cow<'_, str> {
if let Some(ns) = &self.config.namespace {
format!("{ns}:{}", self.config.index_name).into()
} else {
self.config.index_name.as_str().into()
}
}
}
Loading