Skip to content

Indeterministic behavior of read-only command #3239

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
lalatnayak opened this issue Apr 2, 2025 · 9 comments
Open

Indeterministic behavior of read-only command #3239

lalatnayak opened this issue Apr 2, 2025 · 9 comments

Comments

@lalatnayak
Copy link

lalatnayak commented Apr 2, 2025

Bug Report

Current Behavior

I'm using the client to execute FT.Search command on memoryDB. The ReadFrom strategy is set to Replica. However I noticed that all requests are being sent to the master node. I have one master and one replica so this is a scalability bottle neck. I found that memory DB expects commands to be in read-only mode in order to execute them on replicas. Else it redirects them to the master. I tried several options to make the command read-only but it has been in deterministic. i.e. the requests sometimes are sent to the replicas but at other times they are still sent to master. However they do get consistently sent to the node between JVM recycles.

I have tried executing a read-only command each time before the actual FT.Search execution. I have also tried executing read-only command on the connection during the creation of the connection (singleton) but same result. I tried passing read-only overrides in the client config options while creating the client but same behavior.

Input Code

Input Code
RedisAdvancedClusterAsyncCommands<String, byte[]> redisCommand = connection.async();
      redisCommand.readOnly().get();
      RedisFuture<List<Object>> nestedOutputFuture =
              redisCommand.dispatch(MemoryDbCommands.FT_SEARCH, new NestedMultiOutput<>(codec), args);
      List<Object> results = nestedOutputFuture.get(commandTimeout, TimeUnit.MILLISECONDS);
RedisClusterClient redisClusterClient = LettuceClusterClientBuilder.createRedisClusterClient(redisConfig);
            StatefulRedisClusterConnection<String, byte[]> connection
                    =  redisClusterClient.connect(RedisCodec.of(StringCodec.UTF8, ByteArrayCodec.INSTANCE));
            connection.setReadFrom(redisConfig.readFromStrategy());
            connection.async().readonly(node -> node.getRole().isReplica());
            connection.async().ping().get();
            connection.sync().ping();
            connection.async().readOnly().get();
            connection.sync().readOnly();
        return ClusterClientOptions.builder()
                .topologyRefreshOptions(topologyOptions)
                .socketOptions(socketOptions)
                .validateClusterNodeMembership(false)
                .publishOnScheduler(true)
                .readOnlyCommands(command -> true)
                .sslOptions(redisConfig.getSslOptions())
                .build();

Expected behavior/code

I would expect that all requests go to the replica as desired because I have set the ReadFrom Strategy to Replica and I'm setting the command to read-only

Environment

  • Lettuce version(s): 6.4
  • Redis version: 7.1
@tishun
Copy link
Collaborator

tishun commented Apr 9, 2025

Hey @lalatnayak ,

can you help me understand what stands for MemoryDbCommands.FT_SEARCH?

Also how you build up the args. These are critical for command routing.

@tishun tishun added status: waiting-for-feedback We need additional information before we can continue and removed status: waiting-for-triage labels Apr 9, 2025
@lalatnayak
Copy link
Author

lalatnayak commented Apr 9, 2025

@tishun
I use FT.Search command to perform a K nearest neighbor search on an HNSW index on MemoryDB(that uses a flavor of Redis internally)

I created an Enum that implements ProtocolKeyword

public enum MemoryDbCommands implements ProtocolKeyword {
  FT_SEARCH("FT.SEARCH");

  private final byte[] bytes;

  MemoryDbCommands(final String command) {
    bytes = command.getBytes(StandardCharsets.UTF_8);
  }

  @Override
  public byte[] getBytes() {
    return bytes;
  }
}

Then I use the dispatch command to execute it

args = commandBuilder.buildFTSearchCommandArgs(knnSearchArgs, codec);
      RedisFuture<List<Object>> nestedOutputFuture =
          connection
              .async()
              .dispatch(MemoryDbCommands.FT_SEARCH, new NestedMultiOutput<>(codec), args);
      List<Object> results = nestedOutputFuture.get(commandTimeout, TimeUnit.MILLISECONDS);
      searchTimeInMillis = stopwatch.elapsed(TimeUnit.MILLISECONDS);

Here is how I generate the command. Can't share exact code

public @NotNull CommandArgs<String, byte[]> buildFTSearchCommandArgs(
      T args, RedisCodec<String, byte[]> codec) {
    CommandArgs<String, byte[]> args =
        new CommandArgs<>(codec)
            .add(args.indexName))
            .add(arg.knnClause))
            .add("params")
            .add(2)
            .add(args.getVectorFieldName())
            .add(redisSerialization.serializeVector(args.getVector()))
            .add("return")
            .add(angs.getReturnFields().size() + 1)
            .add("args.knnScore");
    args.getReturnFields().forEach(args::add);
    args.add("sortby")
        .add(args.getSortFieldName())
        .add(args.isDescending() ? "desc" : "asc")
        .add("limit")
        .add(0)
        .add(args.getK())
        .add("dialect")
        .add(2);
    return args;

@lalatnayak
Copy link
Author

@tishun
Checking if you have any thoughts on this one.

@tishun tishun added status: waiting-for-triage and removed status: waiting-for-feedback We need additional information before we can continue labels Apr 12, 2025
@tishun
Copy link
Collaborator

tishun commented Apr 12, 2025

@tishun Checking if you have any thoughts on this one.

Yes, I will reply in length on Monday, but in short : it is quite probably in the way that you build the arguments.

@tishun
Copy link
Collaborator

tishun commented Apr 14, 2025

Was not able to get to that, apologies. Will write back as soon as I have some time to spare.

@tishun
Copy link
Collaborator

tishun commented Apr 15, 2025

So, provided the code you've given, there are a few things to have in mind:

  • the FT.SEARCH command does not really have a key, so you've correctly only assigned normal arguments to it; that being said the driver - when it encounters a keyless command in a clustered connection - always sends this command to the default connection. I am not sure what redisConfig contains but I assume it is using the master node as the seed node?
  • once you address that I seem to be able to route the commands in a proper way

The command tips documentation suggest that, the default behavior for keyless commands, unless the command does not provide a tip of its own, is to route to:

The command doesn't accept key name arguments: the client can execute the command on an arbitrary shard.

So the driver really does not do anything wrong here.

Since you only have 1 master and 1 replica do you really need to make a cluster connection? Can't you just make a non-cluster connection to the replica and operate directly with it?

@lalatnayak
Copy link
Author

lalatnayak commented Apr 15, 2025

@tishun
Here is the Redis Config Class. I connect directly to the cluster. In Production I have more than one replica.

/**
 * @param endpoint
 * @param port
 * @param timeoutMillis
 * @param socketConnectTimeOutMilliSeconds
 * @param periodicRefreshMinutes
 * @param reconnectDelayLowerBoundMillis
 * @param reconnectDelayUpperBoundMillis
 * @param readFromStrategy
 * @param enableTls
 */
@Builder
public record RedisConfig(@NonNull String endpoint, @NonNull Integer port, @NonNull Integer timeoutMillis,
                          @NonNull Integer socketConnectTimeOutMilliSeconds, @NonNull Integer periodicRefreshMinutes,
                          @NonNull Long reconnectDelayLowerBoundMillis, @NonNull Long reconnectDelayUpperBoundMillis,
                          @NonNull ReadFrom readFromStrategy, @NonNull Boolean enableTls) {
  
  @JsonIgnore
  public SslOptions getSslOptions() {
    return enableTls ? SslOptions.builder().build() : null;
  }
}

Here is the client builder

/**
 * Utility class to create a Redis cluster client. Use it in Guice modules to wire instances of Redis Cluster clients
 */

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class LettuceClusterClientBuilder {

    public static RedisClusterClient createRedisClusterClient(RedisConfig redisConfig) {

        RedisURI redisUri =
                RedisURI.Builder.redis(redisConfig.endpoint(), redisConfig.port())
                        .withSsl(redisConfig.enableTls())
                        .withTimeout(Duration.ofMillis(redisConfig.socketConnectTimeOutMilliSeconds()))
                        .build();

        ClientResources resources =
                DefaultClientResources.builder()
                        .dnsResolver(new DirContextDnsResolver())
                        .reconnectDelay(
                                Delay.fullJitter(
                                        Duration.ofMillis(redisConfig.reconnectDelayLowerBoundMillis()), // Lower bound
                                        Duration.ofMillis(redisConfig.reconnectDelayUpperBoundMillis()), // Upper bound
                                        1,
                                        TimeUnit.MILLISECONDS))
                        .build();

        RedisClusterClient clusterClient = RedisClusterClient.create(resources, redisUri);
        clusterClient.addListener(new RedisConnectionEventHandler());
        clusterClient.setOptions(createClusterClientOptions(redisConfig));

        return clusterClient;
    }

    private static ClusterClientOptions createClusterClientOptions(RedisConfig redisConfig) {

        Duration periodicRefreshInterval = Duration.ofMinutes(redisConfig.periodicRefreshMinutes());

        ClusterTopologyRefreshOptions topologyOptions =
                ClusterTopologyRefreshOptions.builder()
                        .enableAllAdaptiveRefreshTriggers()
                        // This lets the cluster client know which are primary and replica nodes as they
                        // can change due to failures. If not provided this defaults to 60 seconds.
                        .enablePeriodicRefresh(periodicRefreshInterval)
                        .dynamicRefreshSources(false)
                        .build();

        Duration socketConnectTimeout =
                Duration.ofMillis(redisConfig.socketConnectTimeOutMilliSeconds());

        SocketOptions socketOptions =
                SocketOptions.builder().connectTimeout(socketConnectTimeout).keepAlive(true).build();

        // Refer https://github.com/lettuce-io/lettuce-core/wiki/Client-Options#cluster-specific-options
        return ClusterClientOptions.builder()
                .topologyRefreshOptions(topologyOptions)
                .socketOptions(socketOptions)
                .validateClusterNodeMembership(false)
                .publishOnScheduler(true)
                .sslOptions(redisConfig.getSslOptions())
                .build();
    }
}

@tishun
Copy link
Collaborator

tishun commented Apr 15, 2025

Here is the Redis Config Class.

I am still not sure which node you connect to. Do you pass the address of the master?

I connect directly to the cluster. In Production I have more than one replica.

What I meant is to use RedisClient and not RedisClusterClient.
Using the RedisClusterClient client helps (amongst other things) if your environment is sharded, because it routes the requests to the correct shard for you. Is your environment sharded? (do you have more than one master?)

@lalatnayak
Copy link
Author

lalatnayak commented Apr 15, 2025

Not it's not sharded at the moment but in the future it could be.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants