diff --git a/Cargo.lock b/Cargo.lock index 91638f90808..b94bb1ab688 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -297,7 +297,7 @@ dependencies = [ "encoding_rs", "flate2", "form_urlencoded", - "fred", + "fred 9.4.0", "futures", "futures-test", "graphql_client", @@ -2441,6 +2441,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "float-cmp" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b09cf3155332e944990140d967ff5eceb70df778b34f77d8075db46e4704e6d8" +dependencies = [ + "num-traits", +] + [[package]] name = "fnv" version = "1.0.7" @@ -2542,13 +2551,13 @@ dependencies = [ "bytes", "bytes-utils", "crossbeam-queue", - "float-cmp", + "float-cmp 0.9.0", "fred-macros", "futures", "log", "parking_lot", "rand 0.8.5", - "redis-protocol", + "redis-protocol 5.0.1", "rustls", "rustls-native-certs 0.7.3", "semver", @@ -2561,6 +2570,31 @@ dependencies = [ "urlencoding", ] +[[package]] +name = "fred" +version = "10.1.0" +source = "git+https://github.com/SimonSapin/fred.rs?rev=32391ec56ac18ee83d02e9f112910b0382bc082c#32391ec56ac18ee83d02e9f112910b0382bc082c" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "bytes-utils", + "float-cmp 0.10.0", + "fred-macros", + "futures", + "log", + "parking_lot", + "rand 0.8.5", + "redis-protocol 6.0.0", + "semver", + "socket2", + "tokio", + "tokio-stream", + "tokio-util", + "url", + "urlencoding", +] + [[package]] name = "fred-macros" version = "0.1.0" @@ -5275,6 +5309,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "redis-experiments" +version = "0.1.0" +dependencies = [ + "fred 10.1.0", + "tokio", + "uuid", +] + [[package]] name = "redis-protocol" version = "5.0.1" @@ -5289,6 +5332,20 @@ dependencies = [ "nom", ] +[[package]] +name = "redis-protocol" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cdba59219406899220fc4cdfd17a95191ba9c9afb719b5fa5a083d63109a9f1" +dependencies = [ + "bytes", + "bytes-utils", + "cookie-factory", + "crc16", + "log", + "nom", +] + [[package]] name = "redox_syscall" version = "0.5.7" diff --git a/Cargo.toml b/Cargo.toml index eba725b7b5a..a9de408d788 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ members = [ "examples/throw-error/rhai", "fuzz", "fuzz/subgraph", + "redis-experiments", # Note that xtask is not in the workspace member because it relies on dependencies that are incompatible with the router. Notably hyperx but there are others. ] diff --git a/redis-experiments/Cargo.toml b/redis-experiments/Cargo.toml new file mode 100644 index 00000000000..2dfd839cf0d --- /dev/null +++ b/redis-experiments/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "redis-experiments" +version = "0.1.0" +edition = "2024" + +[dependencies] +# env_logger = "0.11.5" +# fred = { version = "10.1.0", features = ["i-redisearch"] } +# fred = { path = "../../fred.rs", features = ["i-redisearch"] } +tokio = "1.44.2" +uuid = { version = "1.16.0", features = ["v4"] } + +[dependencies.fred] +# https://github.com/aembke/fred.rs/issues/346 and https://github.com/aembke/fred.rs/pull/349 +git = "https://github.com/SimonSapin/fred.rs" +rev = "32391ec56ac18ee83d02e9f112910b0382bc082c" +features = ["i-redisearch"] \ No newline at end of file diff --git a/redis-experiments/src/lib.rs b/redis-experiments/src/lib.rs new file mode 100644 index 00000000000..a32f5493553 --- /dev/null +++ b/redis-experiments/src/lib.rs @@ -0,0 +1,231 @@ +use std::borrow::Cow; + +use fred::bytes_utils::Str; +use fred::prelude::*; +use fred::types::MultipleKeys; +use fred::types::RespVersion; +use fred::types::redisearch::FtCreateOptions; +use fred::types::redisearch::FtSearchOptions; +use fred::types::redisearch::IndexKind; +use fred::types::redisearch::SearchField; +use fred::types::redisearch::SearchParameter; +use fred::types::redisearch::SearchSchema; +use fred::types::redisearch::SearchSchemaKind; + +const INVALIDATION_KEY_SEPARATOR: char = '\t'; +const INVALIDATION_BATCH_SIZE: i64 = 1_000; + +pub struct Cache { + pub client: Client, + pub config: CacheConfig, +} + +pub struct CacheConfig { + pub namespace: Option, + pub temporary_seconds: Option, + pub index_name: String, + pub indexed_document_id_prefix: String, + pub invalidation_keys_field_name: String, +} + +#[derive(Debug, Clone, Copy)] +pub enum Expire { + In { seconds: i64 }, + At { timestamp: i64 }, +} + +impl CacheConfig { + pub fn random_namespace() -> String { + uuid::Uuid::new_v4().simple().to_string() + } +} + +impl Cache { + /// Wraps the `FT.CREATE` command, ignoring "Index already exists" errors + pub async fn create_index_if_not_exists(&self) -> FredResult<()> { + let result = self.create_index().await; + if let Err(err) = &result { + if err.details() == "Index already exists" { + return Ok(()); + } + } + result + } + + /// Wraps the `FT.CREATE` command + pub async fn create_index(&self) -> FredResult<()> { + self.client + .ft_create( + self.index_name(), + FtCreateOptions { + on: Some(IndexKind::Hash), + prefixes: vec![ + self.document_id(&self.config.indexed_document_id_prefix) + .into(), + ], + temporary: self.config.temporary_seconds, + ..Default::default() + }, + vec![SearchSchema { + field_name: self.config.invalidation_keys_field_name.as_str().into(), + alias: None, + kind: SearchSchemaKind::Tag { + sortable: false, + unf: false, + separator: Some(INVALIDATION_KEY_SEPARATOR), + casesensitive: false, + withsuffixtrie: false, + noindex: false, + }, + }], + ) + .await + } + + /// Wraps the `FT.DROPINDEX` command + pub async fn drop_index(&self, drop_indexed_documents: bool) -> FredResult<()> { + self.client + .ft_dropindex(self.index_name(), drop_indexed_documents) + .await + } + + /// Wraps the `HSET` command, adding an indexed field for invalidation keys + /// + /// If `document_id` already exists and is a hash document, new values are "merged" with it + /// (existing fields not specified here are not removed). + pub async fn insert_hash_document( + &self, + document_id: &str, + expire: Expire, + invalidation_keys: impl IntoIterator, + values: impl IntoIterator, + ) -> FredResult<()> + where + InvalidationKey: AsRef, + MapKey: Into, + MapValue: Into, + { + let mut invalidation_keys = invalidation_keys.into_iter(); + let invalidation_keys: Option = invalidation_keys.next().map(|first| { + let mut separated = first.as_ref().to_owned(); + for next in invalidation_keys { + separated.push(INVALIDATION_KEY_SEPARATOR); + separated.push_str(next.as_ref()); + } + // Looks like "{key1}{INVALIDATION_KEY_SEPARATOR}{key2}" + separated + }); + let maybe_invalidation_keys_field = invalidation_keys.map(|v| { + let k = Key::from(&self.config.invalidation_keys_field_name); + (k, v.into()) + }); + let map = fred::types::Map::from_iter( + values + .into_iter() + .map(|(k, v)| (k.into(), v.into())) + .chain(maybe_invalidation_keys_field), + ); + let id = self.document_id(document_id); + let _: () = self.client.hset(id.as_ref(), map).await?; + match expire { + Expire::In { seconds } => self.client.expire(id.as_ref(), seconds, None).await, + Expire::At { timestamp } => self.client.expire_at(id.as_ref(), timestamp, None).await, + } + } + + /// Wraps the `HMGET` command + pub async fn get_hash_document( + &self, + document_id: &str, + fields: impl Into + Send, + ) -> FredResult { + self.client + .hmget(self.document_id(document_id).as_ref(), fields) + .await + } + + /// Deletes all documents that have `invalidation_key`. + /// + /// Returns the number of deleted documents. + pub async fn invalidate(&self, invalidation_key: impl Into) -> FredResult { + // We want `NOCONTENT` but it’s apparently not supported on AWS: + // TODO: test this, they also don’t document `DIALECT 2` but give an example with it. + // https://docs.aws.amazon.com/memorydb/latest/devguide/vector-search-commands-ft.search.html + // + // A work-around is `RETURN 0` (a empty list of zero fields) but it’s not supported by Fred: + // https://github.com/aembke/fred.rs/issues/345 + // + // As a work-around for a the work-around, we request a single field that we know exists + let options = FtSearchOptions { + // nocontent: true, + // r#return: vec![], + r#return: vec![SearchField { + identifier: self.config.invalidation_keys_field_name.as_str().into(), + property: None, + }], + + limit: Some((0, INVALIDATION_BATCH_SIZE)), + dialect: Some(2), + params: vec![SearchParameter { + name: Str::from_static("key"), + value: invalidation_key.into(), + }], + + ..Default::default() + }; + let query = format!("@{}:{{$key}}", self.config.invalidation_keys_field_name); + let mut count = 0; + + // https://redis.io/docs/latest/develop/reference/protocol-spec/#resp-versions + // > Future versions of Redis may change the default protocol version + // + // The result of FT.SEARCH is a map in RESP3 v.s. an array in RESP2. + assert_eq!(self.client.protocol_version(), RespVersion::RESP2); + loop { + let search_result = self + .client + .ft_search(self.index_name(), &query, options.clone()) + .await?; + let Value::Array(array) = search_result else { + return Err(Error::new( + ErrorKind::Parse, + "Expected an array from FT.SEARCH", + )); + }; + let mut iter = array.into_iter(); + let _count = iter.next(); + if iter.len() == 0 { + return Ok(count); + } + let mut keys_to_delete = Vec::with_capacity(iter.len() / 2); + while let Some(id_value) = iter.next() { + let Value::String(id_string) = id_value else { + return Err(Error::new( + ErrorKind::Parse, + "Expected a string for document ID from FT.SEARCH", + )); + }; + keys_to_delete.push(id_string); + let _values = iter.next(); + } + let deleted: u64 = self.client.del(keys_to_delete).await?; + count += deleted; + } + } + + fn document_id<'a>(&self, id: &'a str) -> Cow<'a, str> { + if let Some(ns) = &self.config.namespace { + format!("{ns}:{id}").into() + } else { + id.into() + } + } + + fn index_name(&self) -> Cow<'_, str> { + if let Some(ns) = &self.config.namespace { + format!("{ns}:{}", self.config.index_name).into() + } else { + self.config.index_name.as_str().into() + } + } +} diff --git a/redis-experiments/src/main.rs b/redis-experiments/src/main.rs new file mode 100644 index 00000000000..1b687d6df60 --- /dev/null +++ b/redis-experiments/src/main.rs @@ -0,0 +1,104 @@ +use std::time::Instant; + +use fred::prelude::*; +use redis_experiments::Cache; +use redis_experiments::CacheConfig; +use redis_experiments::Expire; + +const TEMPORARY: bool = true; + +async fn cache() -> Cache { + // let config = Config::from_url("redis://localhost:6379").unwrap(); + // let client = Builder::from_config(config).build().unwrap(); + let client = Client::default(); + client.init().await.unwrap(); + Cache { + client, + config: CacheConfig { + namespace: Some(CacheConfig::random_namespace()), + temporary_seconds: TEMPORARY.then_some(600), + index_name: "invalidation".into(), + indexed_document_id_prefix: "".into(), + invalidation_keys_field_name: "invalidation_keys".into(), + }, + } +} + +#[tokio::main] +async fn main() { + // env_logger::init(); + test_create_existing_index().await; + test_few().await; + test_many().await; + println!("Done!") +} + +async fn test_create_existing_index() { + let cache = cache().await; + cache.create_index().await.unwrap(); + cache.create_index_if_not_exists().await.unwrap(); + cache.drop_index(true).await.unwrap(); +} + +async fn test_few() { + let cache = cache().await; + cache.create_index().await.unwrap(); + let expire = Expire::In { seconds: 600 }; + cache + .insert_hash_document("docA", expire, ["key1", "key2"], [("data", "A")]) + .await + .unwrap(); + cache + .insert_hash_document("docB", expire, ["key1"], [("data", "B")]) + .await + .unwrap(); + cache + .insert_hash_document("docC", expire, ["key2"], [("data", "C")]) + .await + .unwrap(); + let get = async |id: &str| { + cache + .get_hash_document::>(id, "data") + .await + .unwrap() + }; + assert_eq!(get("docA").await.as_deref(), Some("A")); + assert_eq!(get("docB").await.as_deref(), Some("B")); + assert_eq!(get("docC").await.as_deref(), Some("C")); + assert_eq!(cache.invalidate("key1").await.unwrap(), 2); + assert_eq!(get("docA").await.as_deref(), None); + assert_eq!(get("docB").await.as_deref(), None); + assert_eq!(get("docC").await.as_deref(), Some("C")); + cache.drop_index(true).await.unwrap(); +} + +async fn test_many() { + let cache = cache().await; + cache.create_index().await.unwrap(); + let expire = Expire::In { seconds: 600 }; + let start = Instant::now(); + for count in [100, 1_000, 10_000, 100_000] { + println!("{count} entries…"); + for i in 0..count { + cache + .insert_hash_document( + &format!("doc{i}"), + expire, + ["key1", "key2"], + [("data", "A")], + ) + .await + .unwrap(); + } + let duration = start.elapsed(); + println!("… inserted (one by one) in {} ms", duration.as_millis()); + + let start = Instant::now(); + let deleted = cache.invalidate("key2").await.unwrap(); + let duration = start.elapsed(); + println!("… invalidated (in batch) in {} ms", duration.as_millis()); + println!(); + assert_eq!(deleted, count) + } + cache.drop_index(true).await.unwrap(); +}