diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/ExecuteProperties.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/ExecuteProperties.java index 825fe56d3e..a16cae9209 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/ExecuteProperties.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/ExecuteProperties.java @@ -72,12 +72,13 @@ public class ExecuteProperties { // how record scan limit reached is handled -- false: return early with continuation, true: throw exception private final boolean failOnScanLimitReached; private final boolean isDryRun; + private final boolean kvCursorContSerializeToNew; private final CursorStreamingMode defaultCursorStreamingMode; @SuppressWarnings("java:S107") private ExecuteProperties(int skip, int rowLimit, @Nonnull IsolationLevel isolationLevel, long timeLimit, - @Nonnull ExecuteState state, boolean failOnScanLimitReached, @Nonnull CursorStreamingMode defaultCursorStreamingMode, boolean isDryRun) { + @Nonnull ExecuteState state, boolean failOnScanLimitReached, @Nonnull CursorStreamingMode defaultCursorStreamingMode, boolean isDryRun, boolean kvCursorContSerializeToNew) { this.skip = skip; this.rowLimit = rowLimit; this.isolationLevel = isolationLevel; @@ -86,6 +87,7 @@ private ExecuteProperties(int skip, int rowLimit, @Nonnull IsolationLevel isolat this.failOnScanLimitReached = failOnScanLimitReached; this.defaultCursorStreamingMode = defaultCursorStreamingMode; this.isDryRun = isDryRun; + this.kvCursorContSerializeToNew = kvCursorContSerializeToNew; } @Nonnull @@ -102,7 +104,7 @@ public ExecuteProperties setSkip(final int skip) { if (skip == this.skip) { return this; } - return copy(skip, rowLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun); + return copy(skip, rowLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew); } public boolean isDryRun() { @@ -114,9 +116,12 @@ public ExecuteProperties setDryRun(final boolean isDryRun) { if (isDryRun == this.isDryRun) { return this; } - return copy(skip, rowLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun); + return copy(skip, rowLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew); } + public boolean isKvCursorContSerializeToNew() { + return kvCursorContSerializeToNew; + } /** * Get the limit on the number of rows that will be returned as it would be passed to FDB. @@ -137,7 +142,7 @@ public ExecuteProperties setReturnedRowLimit(final int rowLimit) { if (newLimit == this.rowLimit) { return this; } - return copy(skip, newLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun); + return copy(skip, newLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew); } /** @@ -184,7 +189,7 @@ public ExecuteState getState() { */ @Nonnull public ExecuteProperties setState(@Nonnull ExecuteState newState) { - return copy(skip, rowLimit, timeLimit, isolationLevel, newState, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun); + return copy(skip, rowLimit, timeLimit, isolationLevel, newState, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew); } /** @@ -193,7 +198,7 @@ public ExecuteProperties setState(@Nonnull ExecuteState newState) { */ @Nonnull public ExecuteProperties clearState() { - return copy(skip, rowLimit, timeLimit, isolationLevel, new ExecuteState(), failOnScanLimitReached, defaultCursorStreamingMode, isDryRun); + return copy(skip, rowLimit, timeLimit, isolationLevel, new ExecuteState(), failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew); } /** @@ -209,7 +214,7 @@ public ExecuteProperties setFailOnScanLimitReached(boolean failOnScanLimitReache if (failOnScanLimitReached == this.failOnScanLimitReached) { return this; } - return copy(skip, rowLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun); + return copy(skip, rowLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew); } @Nonnull @@ -217,7 +222,7 @@ public ExecuteProperties clearReturnedRowLimit() { if (getReturnedRowLimit() == ReadTransaction.ROW_LIMIT_UNLIMITED) { return this; } - return copy(skip, ReadTransaction.ROW_LIMIT_UNLIMITED, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun); + return copy(skip, ReadTransaction.ROW_LIMIT_UNLIMITED, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew); } /** @@ -229,7 +234,7 @@ public ExecuteProperties clearRowAndTimeLimits() { if (getTimeLimit() == UNLIMITED_TIME && getReturnedRowLimit() == ReadTransaction.ROW_LIMIT_UNLIMITED) { return this; } - return copy(skip, ReadTransaction.ROW_LIMIT_UNLIMITED, UNLIMITED_TIME, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun); + return copy(skip, ReadTransaction.ROW_LIMIT_UNLIMITED, UNLIMITED_TIME, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew); } /** @@ -241,7 +246,7 @@ public ExecuteProperties clearSkipAndLimit() { if (skip == 0 && rowLimit == ReadTransaction.ROW_LIMIT_UNLIMITED) { return this; } - return copy(0, ReadTransaction.ROW_LIMIT_UNLIMITED, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun); + return copy(0, ReadTransaction.ROW_LIMIT_UNLIMITED, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew); } /** @@ -254,7 +259,7 @@ public ExecuteProperties clearSkipAndAdjustLimit() { return this; } return copy(0, rowLimit == ReadTransaction.ROW_LIMIT_UNLIMITED ? ReadTransaction.ROW_LIMIT_UNLIMITED : rowLimit + skip, - timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun); + timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew); } /** @@ -305,7 +310,7 @@ public ExecuteProperties setDefaultCursorStreamingMode(CursorStreamingMode defau if (defaultCursorStreamingMode == this.defaultCursorStreamingMode) { return this; } - return copy(skip, rowLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun); + return copy(skip, rowLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew); } /** @@ -315,7 +320,7 @@ public ExecuteProperties setDefaultCursorStreamingMode(CursorStreamingMode defau */ @Nonnull public ExecuteProperties resetState() { - return copy(skip, rowLimit, timeLimit, isolationLevel, state.reset(), failOnScanLimitReached, defaultCursorStreamingMode, isDryRun); + return copy(skip, rowLimit, timeLimit, isolationLevel, state.reset(), failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew); } /** @@ -333,8 +338,8 @@ public ExecuteProperties resetState() { @SuppressWarnings("java:S107") @Nonnull protected ExecuteProperties copy(int skip, int rowLimit, long timeLimit, @Nonnull IsolationLevel isolationLevel, - @Nonnull ExecuteState state, boolean failOnScanLimitReached, CursorStreamingMode defaultCursorStreamingMode, boolean isDryRun) { - return new ExecuteProperties(skip, rowLimit, isolationLevel, timeLimit, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun); + @Nonnull ExecuteState state, boolean failOnScanLimitReached, CursorStreamingMode defaultCursorStreamingMode, boolean isDryRun, boolean kvCursorContSerializeToNew) { + return new ExecuteProperties(skip, rowLimit, isolationLevel, timeLimit, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew); } @Nonnull @@ -408,6 +413,7 @@ public static class Builder { private ExecuteState executeState = null; private boolean failOnScanLimitReached = false; private boolean isDryRun = false; + private boolean kvCursorContSerializeToNew = false; private CursorStreamingMode defaultCursorStreamingMode = CursorStreamingMode.ITERATOR; private Builder() { @@ -422,6 +428,7 @@ private Builder(ExecuteProperties executeProperties) { this.failOnScanLimitReached = executeProperties.failOnScanLimitReached; this.defaultCursorStreamingMode = executeProperties.defaultCursorStreamingMode; this.isDryRun = executeProperties.isDryRun; + this.kvCursorContSerializeToNew = executeProperties.kvCursorContSerializeToNew; } @Nonnull @@ -455,6 +462,12 @@ public Builder setDryRun(boolean isDryRun) { return this; } + @Nonnull + public Builder setKvCursorContSerializeToNew(boolean kvCursorContSerializeToNew) { + this.kvCursorContSerializeToNew = kvCursorContSerializeToNew; + return this; + } + @Nonnull public Builder setReturnedRowLimit(int rowLimit) { this.rowLimit = validateAndNormalizeRowLimit(rowLimit); @@ -607,7 +620,7 @@ public ExecuteProperties build() { } else { state = new ExecuteState(RecordScanLimiterFactory.enforce(scannedRecordsLimit), ByteScanLimiterFactory.enforce(scannedBytesLimit)); } - return new ExecuteProperties(skip, rowLimit, isolationLevel, timeLimit, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun); + return new ExecuteProperties(skip, rowLimit, isolationLevel, timeLimit, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew); } } } diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexPrefetchRangeKeyValueCursor.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexPrefetchRangeKeyValueCursor.java index df4ad626ec..71bda251a3 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexPrefetchRangeKeyValueCursor.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexPrefetchRangeKeyValueCursor.java @@ -41,9 +41,10 @@ private IndexPrefetchRangeKeyValueCursor(@Nonnull final FDBRecordContext context @Nonnull final AsyncIterator iterator, int prefixLength, @Nonnull final CursorLimitManager limitManager, - int valuesLimit) { + int valuesLimit, + SerializationMode serializationMode) { - super(context, iterator, prefixLength, limitManager, valuesLimit); + super(context, iterator, prefixLength, limitManager, valuesLimit, serializationMode); } /** @@ -69,7 +70,7 @@ public IndexPrefetchRangeKeyValueCursor build() { AsyncIterator iterator = getTransaction() .getMappedRange(getBegin(), getEnd(), mapper, getLimit(), isReverse(), getStreamingMode()) .iterator(); - return new IndexPrefetchRangeKeyValueCursor(getContext(), iterator, getPrefixLength(), getLimitManager(), getValuesLimit()); + return new IndexPrefetchRangeKeyValueCursor(getContext(), iterator, getPrefixLength(), getLimitManager(), getValuesLimit(), serializationMode); } @Override diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/KeyValueCursor.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/KeyValueCursor.java index 6534ceb5a4..ed5ea7e3b5 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/KeyValueCursor.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/KeyValueCursor.java @@ -38,8 +38,9 @@ private KeyValueCursor(@Nonnull final FDBRecordContext context, @Nonnull final AsyncIterator iterator, int prefixLength, @Nonnull final CursorLimitManager limitManager, - int valuesLimit) { - super(context, iterator, prefixLength, limitManager, valuesLimit); + int valuesLimit, + SerializationMode serializationMode) { + super(context, iterator, prefixLength, limitManager, valuesLimit, serializationMode); } /** @@ -77,7 +78,7 @@ public KeyValueCursor build() { final AsyncIterator iterator = getTransaction() .getRange(getBegin(), getEnd(), getLimit(), isReverse(), getStreamingMode()) .iterator(); - return new KeyValueCursor(getContext(), iterator, getPrefixLength(), getLimitManager(), getValuesLimit()); + return new KeyValueCursor(getContext(), iterator, getPrefixLength(), getLimitManager(), getValuesLimit(), serializationMode); } @Override diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/KeyValueCursorBase.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/KeyValueCursorBase.java index ec5f7b9d17..11eb4b9f4a 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/KeyValueCursorBase.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/KeyValueCursorBase.java @@ -33,6 +33,7 @@ import com.apple.foundationdb.record.KeyRange; import com.apple.foundationdb.record.RecordCoreException; import com.apple.foundationdb.record.RecordCursorContinuation; +import com.apple.foundationdb.record.RecordCursorProto; import com.apple.foundationdb.record.RecordCursorResult; import com.apple.foundationdb.record.ScanProperties; import com.apple.foundationdb.record.TupleRange; @@ -42,11 +43,13 @@ import com.apple.foundationdb.subspace.Subspace; import com.apple.foundationdb.tuple.Tuple; import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.ZeroCopyByteString; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.Arrays; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -65,18 +68,22 @@ public abstract class KeyValueCursorBase extends AsyncIterat // the pointer may be mutated, but the actual array must never be mutated or continuations will break @Nullable private byte[] lastKey; + @Nonnull + private final SerializationMode serializationMode; protected KeyValueCursorBase(@Nonnull final FDBRecordContext context, @Nonnull final AsyncIterator iterator, int prefixLength, @Nonnull final CursorLimitManager limitManager, - int valuesLimit) { + int valuesLimit, + @Nonnull SerializationMode serializationMode) { super(context.getExecutor(), iterator); this.context = context; this.prefixLength = prefixLength; this.limitManager = limitManager; this.valuesLimit = valuesLimit; + this.serializationMode = serializationMode; context.instrument(FDBStoreTimer.DetailEvents.GET_SCAN_RANGE_RAW_FIRST_CHUNK, iterator.onHasNext()); } @@ -131,21 +138,23 @@ public RecordCursorResult getNext() { @Nonnull private RecordCursorContinuation continuationHelper() { - return new Continuation(lastKey, prefixLength); + return new Continuation(lastKey, prefixLength, serializationMode); } - private static class Continuation implements RecordCursorContinuation { + public static class Continuation implements RecordCursorContinuation { @Nullable private final byte[] lastKey; private final int prefixLength; + private final SerializationMode serializationMode; - public Continuation(@Nullable final byte[] lastKey, final int prefixLength) { + public Continuation(@Nullable final byte[] lastKey, final int prefixLength, final SerializationMode serializationMode) { // Note that doing this without a full copy is dangerous if the array is ever mutated. // Currently, this never happens and the only thing that changes is which array lastKey points to. // However, if logic in KeyValueCursor or KeyValue changes, this could break continuations. // To resolve it, we could resort to doing a full copy here, although that's somewhat expensive. this.lastKey = lastKey; this.prefixLength = prefixLength; + this.serializationMode = serializationMode; } @Override @@ -156,21 +165,64 @@ public boolean isEnd() { @Nonnull @Override public ByteString toByteString() { - if (lastKey == null) { - return ByteString.EMPTY; + if (serializationMode == SerializationMode.TO_OLD) { + if (lastKey == null) { + return ByteString.EMPTY; + } + ByteString base = ZeroCopyByteString.wrap(lastKey); + return base.substring(prefixLength, lastKey.length); + } else { + return toProto().toByteString(); } - ByteString base = ZeroCopyByteString.wrap(lastKey); - return base.substring(prefixLength, lastKey.length); } @Nullable @Override public byte[] toBytes() { + ByteString byteString = toByteString(); + return byteString.isEmpty() ? null : byteString.toByteArray(); + } + + @Nullable + public byte[] getInnerContinuationInBytes() { if (lastKey == null) { return null; } return Arrays.copyOfRange(lastKey, prefixLength, lastKey.length); } + + @Nonnull + public ByteString getInnerContinuationInByteString() { + if (lastKey == null) { + return ByteString.EMPTY; + } + ByteString base = ZeroCopyByteString.wrap(lastKey); + return base.substring(prefixLength, lastKey.length); + } + + public static byte[] fromRawBytes(@Nonnull byte[] rawBytes, SerializationMode serializationMode) { + if (serializationMode == SerializationMode.TO_OLD) { + return rawBytes; + } + try { + RecordCursorProto.KeyValueCursorContinuation continuationProto = RecordCursorProto.KeyValueCursorContinuation.parseFrom(rawBytes); + return continuationProto.getContinuation().toByteArray(); + } catch (InvalidProtocolBufferException ipbe) { + return rawBytes; + } + } + + @Nonnull + private RecordCursorProto.KeyValueCursorContinuation toProto() { + RecordCursorProto.KeyValueCursorContinuation.Builder builder = RecordCursorProto.KeyValueCursorContinuation.newBuilder(); + ByteString base = ZeroCopyByteString.wrap(Objects.requireNonNull(lastKey)); + return builder.setContinuation(base.substring(prefixLength, lastKey.length)).build(); + } + } + + public enum SerializationMode { + TO_OLD, + TO_NEW } /** @@ -208,6 +260,7 @@ public abstract static class Builder> { private StreamingMode streamingMode; private KeySelector begin; private KeySelector end; + protected SerializationMode serializationMode; protected Builder(@Nonnull Subspace subspace) { this.subspace = subspace; @@ -247,10 +300,23 @@ protected void prepare() { prefixLength = calculatePrefixLength(); reverse = scanProperties.isReverse(); + serializationMode = scanProperties.getExecuteProperties().isKvCursorContSerializeToNew() ? SerializationMode.TO_NEW : SerializationMode.TO_OLD; + if (continuation != null) { - final byte[] continuationBytes = new byte[prefixLength + continuation.length]; + byte[] realContinuation; + if (serializationMode == SerializationMode.TO_OLD) { + realContinuation = continuation; + } else { + try { + RecordCursorProto.KeyValueCursorContinuation keyValueCursorContinuation = RecordCursorProto.KeyValueCursorContinuation.parseFrom(continuation); + realContinuation = keyValueCursorContinuation.getContinuation().toByteArray(); + } catch (InvalidProtocolBufferException ex) { + realContinuation = continuation; + } + } + final byte[] continuationBytes = new byte[prefixLength + realContinuation.length]; System.arraycopy(lowBytes, 0, continuationBytes, 0, prefixLength); - System.arraycopy(continuation, 0, continuationBytes, prefixLength, continuation.length); + System.arraycopy(realContinuation, 0, continuationBytes, prefixLength, realContinuation.length); if (reverse) { highBytes = continuationBytes; highEndpoint = EndpointType.CONTINUATION; @@ -334,6 +400,11 @@ public T setHigh(@Nonnull byte[] highBytes, @Nonnull EndpointType highEndpoint) return self(); } + public T setSerializationMode(@Nonnull final SerializationMode serializationMode) { + this.serializationMode = serializationMode; + return self(); + } + /** * Calculate the key prefix length for the returned values. This will be used to derive the primary key used in * the calculated continuation. diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryIndexPlan.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryIndexPlan.java index 8e2ade285f..99dd4be764 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryIndexPlan.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryIndexPlan.java @@ -57,6 +57,7 @@ import com.apple.foundationdb.record.provider.foundationdb.IndexScanComparisons; import com.apple.foundationdb.record.provider.foundationdb.IndexScanParameters; import com.apple.foundationdb.record.provider.foundationdb.IndexScanRange; +import com.apple.foundationdb.record.provider.foundationdb.KeyValueCursorBase; import com.apple.foundationdb.record.provider.foundationdb.MultidimensionalIndexScanComparisons; import com.apple.foundationdb.record.provider.foundationdb.UnsupportedRemoteFetchIndexException; import com.apple.foundationdb.record.query.plan.AvailableFields; @@ -328,7 +329,7 @@ private RecordCursor executeEntriesWithOverScan( @Nonnull FDBRecordStoreBase store, @Nonnull Index index, @Nullable byte[] continuation, @Nonnull ExecuteProperties executeProperties) { final byte[] prefixBytes = getRangePrefixBytes(tupleScanRange); - final IndexScanContinuationConvertor continuationConvertor = new IndexScanContinuationConvertor(prefixBytes); + final IndexScanContinuationConvertor continuationConvertor = new IndexScanContinuationConvertor(prefixBytes, executeProperties.isKvCursorContSerializeToNew() ? KeyValueCursorBase.SerializationMode.TO_NEW : KeyValueCursorBase.SerializationMode.TO_OLD); // Scan a wider range, and then halt when either this scans outside the given range final IndexScanRange newScanRange = new IndexScanRange(IndexScanType.BY_VALUE, widenedScanRange); @@ -745,9 +746,12 @@ public static RecordQueryIndexPlan fromProto(@Nonnull final PlanSerializationCon private static class IndexScanContinuationConvertor implements RecordCursor.ContinuationConvertor { @Nonnull private final byte[] prefixBytes; + @Nonnull + private final KeyValueCursorBase.SerializationMode serializationMode; - public IndexScanContinuationConvertor(@Nonnull byte[] prefixBytes) { + public IndexScanContinuationConvertor(@Nonnull byte[] prefixBytes, @Nonnull final KeyValueCursorBase.SerializationMode serializationMode) { this.prefixBytes = prefixBytes; + this.serializationMode = serializationMode; } @Nullable @@ -757,7 +761,8 @@ public byte[] unwrapContinuation(@Nullable final byte[] continuation) { return null; } // Add the prefix back to the inner continuation - return ByteArrayUtil.join(prefixBytes, continuation); + byte[] innerContinuation = KeyValueCursorBase.Continuation.fromRawBytes(continuation, serializationMode); + return ByteArrayUtil.join(prefixBytes, innerContinuation); } @Override @@ -765,7 +770,11 @@ public RecordCursorContinuation wrapContinuation(@Nonnull final RecordCursorCont if (continuation.isEnd()) { return continuation; } - final byte[] continuationBytes = continuation.toBytes(); + byte[] continuationBytes; + if (!(continuation instanceof KeyValueCursorBase.Continuation)) { + throw new RecordCoreException("can only wrap KeyValueCursorBase.Continuation class"); + } + continuationBytes = ((KeyValueCursorBase.Continuation) continuation).getInnerContinuationInBytes(); if (continuationBytes != null && ByteArrayUtil.startsWith(continuationBytes, prefixBytes)) { // Strip away the prefix. Note that ByteStrings re-use the underlying ByteArray, so this can // save a copy. @@ -796,7 +805,7 @@ public byte[] toBytes() { if (bytes == null) { synchronized (this) { if (bytes == null) { - byte[] baseContinuationBytes = baseContinuation.toBytes(); + byte[] baseContinuationBytes = baseContinuation instanceof KeyValueCursorBase.Continuation ? ((KeyValueCursorBase.Continuation) baseContinuation).getInnerContinuationInBytes() : baseContinuation.toBytes(); if (baseContinuationBytes == null) { return null; } @@ -810,7 +819,7 @@ public byte[] toBytes() { @Nonnull @Override public ByteString toByteString() { - return baseContinuation.toByteString().substring(prefixLength); + return (baseContinuation instanceof KeyValueCursorBase.Continuation) ? ((KeyValueCursorBase.Continuation) baseContinuation).getInnerContinuationInByteString().substring(prefixLength) : baseContinuation.toByteString().substring(prefixLength); } @Override diff --git a/fdb-record-layer-core/src/main/proto/record_cursor.proto b/fdb-record-layer-core/src/main/proto/record_cursor.proto index 04095bf779..f64492ddfd 100644 --- a/fdb-record-layer-core/src/main/proto/record_cursor.proto +++ b/fdb-record-layer-core/src/main/proto/record_cursor.proto @@ -165,3 +165,7 @@ message OneOfTypedState { message RangeCursorContinuation { optional int64 nextPosition = 1; } + +message KeyValueCursorContinuation { + optional bytes continuation = 1; +} diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/KeyValueCursorTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/KeyValueCursorTest.java index d97e4b0ef9..1854d84083 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/KeyValueCursorTest.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/KeyValueCursorTest.java @@ -44,6 +44,8 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.util.Arrays; import java.util.Collections; @@ -90,14 +92,16 @@ public void runBefore() { }); } - @Test - public void all() { + @ParameterizedTest + @EnumSource(KeyValueCursorBase.SerializationMode.class) + public void all(KeyValueCursorBase.SerializationMode serializationMode) { fdb.run(context -> { KeyValueCursor cursor = KeyValueCursor.Builder.withSubspace(subspace) .setContext(context) .setRange(TupleRange.ALL) .setContinuation(null) .setScanProperties(ScanProperties.FORWARD_SCAN) + .setSerializationMode(serializationMode) .build(); for (int i = 0; i < 5; i++) { for (int j = 0; j < 5; j++) { @@ -113,6 +117,7 @@ public void all() { .setRange(TupleRange.ALL) .setContinuation(null) .setScanProperties(new ScanProperties(ExecuteProperties.newBuilder().setReturnedRowLimit(10).build())) + .setSerializationMode(serializationMode) .build(); assertEquals(10, (int)cursor.getCount().join()); cursor = KeyValueCursor.Builder.withSubspace(subspace) @@ -120,6 +125,7 @@ public void all() { .setRange(TupleRange.ALL) .setContinuation(cursor.getNext().getContinuation().toBytes()) .setScanProperties(ScanProperties.FORWARD_SCAN) + .setSerializationMode(serializationMode) .build(); assertEquals(15, (int)cursor.getCount().join()); @@ -127,14 +133,16 @@ public void all() { }); } - @Test - public void beginsWith() { + @ParameterizedTest + @EnumSource(KeyValueCursorBase.SerializationMode.class) + public void beginsWith(KeyValueCursorBase.SerializationMode serializationMode) { fdb.run(context -> { KeyValueCursor cursor = KeyValueCursor.Builder.withSubspace(subspace) .setContext(context) .setRange(TupleRange.allOf(Tuple.from(3))) .setContinuation(null) .setScanProperties(ScanProperties.FORWARD_SCAN) + .setSerializationMode(serializationMode) .build(); for (int j = 0; j < 5; j++) { KeyValue kv = cursor.getNext().get(); @@ -148,6 +156,7 @@ public void beginsWith() { .setRange(TupleRange.allOf(Tuple.from(3))) .setContinuation(null) .setScanProperties(new ScanProperties(ExecuteProperties.newBuilder().setReturnedRowLimit(2).build())) + .setSerializationMode(serializationMode) .build(); assertEquals(2, (int)cursor.getCount().join()); cursor = KeyValueCursor.Builder.withSubspace(subspace) @@ -155,6 +164,7 @@ public void beginsWith() { .setRange(TupleRange.allOf(Tuple.from(3))) .setContinuation(cursor.getNext().getContinuation().toBytes()) .setScanProperties(new ScanProperties(ExecuteProperties.newBuilder().setReturnedRowLimit(3).build())) + .setSerializationMode(serializationMode) .build(); assertEquals(3, (int)cursor.getCount().join()); @@ -162,8 +172,9 @@ public void beginsWith() { }); } - @Test - public void inclusiveRange() { + @ParameterizedTest + @EnumSource(KeyValueCursorBase.SerializationMode.class) + public void inclusiveRange(KeyValueCursorBase.SerializationMode serializationMode) { fdb.run(context -> { KeyValueCursor cursor = KeyValueCursor.Builder.withSubspace(subspace) .setContext(context) @@ -171,6 +182,7 @@ public void inclusiveRange() { .setHigh(Tuple.from(4, 2), EndpointType.RANGE_INCLUSIVE) .setContinuation(null) .setScanProperties(ScanProperties.FORWARD_SCAN) + .setSerializationMode(serializationMode) .build(); assertEquals(Arrays.asList(Tuple.from(3L, 3L), Tuple.from(3L, 4L), Tuple.from(4L, 0L), Tuple.from(4L, 1L), Tuple.from(4L, 2L)), cursor.map(KeyValue::getValue).map(Tuple::fromBytes).asList().join()); @@ -181,6 +193,7 @@ public void inclusiveRange() { .setHigh(Tuple.from(4, 2), EndpointType.RANGE_INCLUSIVE) .setContinuation(null) .setScanProperties(new ScanProperties(ExecuteProperties.newBuilder().setReturnedRowLimit(2).build())) + .setSerializationMode(serializationMode) .build(); assertEquals(Arrays.asList(Tuple.from(3L, 3L), Tuple.from(3L, 4L)), cursor.map(KeyValue::getValue).map(Tuple::fromBytes).asList().join()); @@ -190,6 +203,7 @@ public void inclusiveRange() { .setHigh(Tuple.from(4, 2), EndpointType.RANGE_INCLUSIVE) .setContinuation(cursor.getNext().getContinuation().toBytes()) .setScanProperties(ScanProperties.FORWARD_SCAN) + .setSerializationMode(serializationMode) .build(); assertEquals(Arrays.asList(Tuple.from(4L, 0L), Tuple.from(4L, 1L), Tuple.from(4L, 2L)), cursor.map(KeyValue::getValue).map(Tuple::fromBytes).asList().join()); @@ -198,8 +212,9 @@ public void inclusiveRange() { }); } - @Test - public void exclusiveRange() { + @ParameterizedTest + @EnumSource(KeyValueCursorBase.SerializationMode.class) + public void exclusiveRange(KeyValueCursorBase.SerializationMode serializationMode) { fdb.run(context -> { KeyValueCursor cursor = KeyValueCursor.Builder.withSubspace(subspace) .setContext(context) @@ -207,6 +222,7 @@ public void exclusiveRange() { .setHigh(Tuple.from(4, 2), EndpointType.RANGE_EXCLUSIVE) .setContinuation(null) .setScanProperties(ScanProperties.FORWARD_SCAN) + .setSerializationMode(serializationMode) .build(); assertEquals(Arrays.asList(Tuple.from(3L, 4L), Tuple.from(4L, 0L), Tuple.from(4L, 1L)), cursor.map(KeyValue::getValue).map(Tuple::fromBytes).asList().join()); @@ -217,6 +233,7 @@ public void exclusiveRange() { .setHigh(Tuple.from(4, 2), EndpointType.RANGE_EXCLUSIVE) .setContinuation(null) .setScanProperties(new ScanProperties(ExecuteProperties.newBuilder().setReturnedRowLimit(2).build())) + .setSerializationMode(serializationMode) .build(); assertEquals(Arrays.asList(Tuple.from(3L, 4L), Tuple.from(4L, 0L)), cursor.map(KeyValue::getValue).map(Tuple::fromBytes).asList().join()); @@ -226,6 +243,7 @@ public void exclusiveRange() { .setHigh(Tuple.from(4, 2), EndpointType.RANGE_EXCLUSIVE) .setContinuation(cursor.getNext().getContinuation().toBytes()) .setScanProperties(ScanProperties.FORWARD_SCAN) + .setSerializationMode(serializationMode) .build(); assertEquals(Collections.singletonList(Tuple.from(4L, 1L)), cursor.map(KeyValue::getValue).map(Tuple::fromBytes).asList().join()); @@ -234,8 +252,9 @@ public void exclusiveRange() { }); } - @Test - public void inclusiveNull() { + @ParameterizedTest + @EnumSource(KeyValueCursorBase.SerializationMode.class) + public void inclusiveNull(KeyValueCursorBase.SerializationMode serializationMode) { fdb.run(context -> { RecordCursorIterator cursor = KeyValueCursor.Builder.withSubspace(subspace) .setContext(context) @@ -243,6 +262,7 @@ public void inclusiveNull() { .setHigh((Tuple) null, EndpointType.RANGE_INCLUSIVE) .setContinuation(null) .setScanProperties(ScanProperties.FORWARD_SCAN) + .setSerializationMode(serializationMode) .build() .asIterator(); for (int j = 0; j < 5; j++) { @@ -256,8 +276,9 @@ public void inclusiveNull() { }); } - @Test - public void exclusiveNull() { + @ParameterizedTest + @EnumSource(KeyValueCursorBase.SerializationMode.class) + public void exclusiveNull(KeyValueCursorBase.SerializationMode serializationMode) { fdb.run(context -> { RecordCursorIterator cursor = KeyValueCursor.Builder.withSubspace(subspace) .setContext(context) @@ -265,6 +286,7 @@ public void exclusiveNull() { .setHigh((Tuple) null, EndpointType.RANGE_EXCLUSIVE) .setContinuation(null) .setScanProperties(ScanProperties.FORWARD_SCAN) + .setSerializationMode(serializationMode) .build() .asIterator(); assertThat(cursor.hasNext(), is(false)); @@ -273,13 +295,15 @@ public void exclusiveNull() { }); } - @Test - public void noNextReasons() { + @ParameterizedTest + @EnumSource(KeyValueCursorBase.SerializationMode.class) + public void noNextReasons(KeyValueCursorBase.SerializationMode serializationMode) { fdb.run(context -> { KeyValueCursor cursor = KeyValueCursor.Builder.withSubspace(subspace) .setContext(context) .setRange(TupleRange.allOf(Tuple.from(3))) .setContinuation(null) + .setSerializationMode(serializationMode) .setScanProperties(ScanProperties.FORWARD_SCAN.with(props -> props.setReturnedRowLimit(3))) .build(); assertEquals(Arrays.asList(Tuple.from(3L, 0L), Tuple.from(3L, 1L), Tuple.from(3L, 2L)), @@ -290,6 +314,7 @@ public void noNextReasons() { .setContext(context) .setRange(TupleRange.allOf(Tuple.from(3))) .setContinuation(result.getContinuation().toBytes()) + .setSerializationMode(serializationMode) .setScanProperties(ScanProperties.FORWARD_SCAN.with(props -> props.setReturnedRowLimit(3))) .build(); assertEquals(Arrays.asList(Tuple.from(3L, 3L), Tuple.from(3L, 4L)), @@ -306,14 +331,16 @@ private ScanProperties forwardScanWithLimiter(RecordScanLimiter limiter) { return new ScanProperties(ExecuteProperties.SERIAL_EXECUTE.setState(new ExecuteState(limiter, null))); } - @Test - public void simpleScanLimit() { + @ParameterizedTest + @EnumSource(KeyValueCursorBase.SerializationMode.class) + public void simpleScanLimit(KeyValueCursorBase.SerializationMode serializationMode) { fdb.run(context -> { RecordScanLimiter limiter = RecordScanLimiterFactory.enforce(2); KeyValueCursor cursor = KeyValueCursor.Builder.withSubspace(subspace) .setContext(context) .setRange(TupleRange.ALL) .setScanProperties(forwardScanWithLimiter(limiter)) + .setSerializationMode(serializationMode) .build(); assertEquals(2, (int) cursor.getCount().join()); RecordCursorResult result = cursor.getNext(); @@ -324,8 +351,9 @@ public void simpleScanLimit() { }); } - @Test - public void limitNotReached() { + @ParameterizedTest + @EnumSource(KeyValueCursorBase.SerializationMode.class) + public void limitNotReached(KeyValueCursorBase.SerializationMode serializationMode) { fdb.run(context -> { RecordScanLimiter limiter = RecordScanLimiterFactory.enforce(4); KeyValueCursor cursor = KeyValueCursor.Builder.withSubspace(subspace) @@ -333,6 +361,7 @@ public void limitNotReached() { .setLow(Tuple.from(3, 3), EndpointType.RANGE_EXCLUSIVE) .setHigh(Tuple.from(4, 2), EndpointType.RANGE_EXCLUSIVE) .setScanProperties(forwardScanWithLimiter(limiter)) + .setSerializationMode(serializationMode) .build(); assertEquals(3, (int) cursor.getCount().join()); RecordCursorResult result = cursor.getNext(); @@ -347,15 +376,17 @@ private boolean hasNextAndAdvance(KeyValueCursor cursor) { return cursor.getNext().hasNext(); } - @Test - public void sharedLimiter() { + @ParameterizedTest + @EnumSource(KeyValueCursorBase.SerializationMode.class) + public void sharedLimiter(KeyValueCursorBase.SerializationMode serializationMode) { fdb.run(context -> { RecordScanLimiter limiter = RecordScanLimiterFactory.enforce(4); KeyValueCursor.Builder builder = KeyValueCursor.Builder.withSubspace(subspace) .setContext(context) .setLow(Tuple.from(3, 3), EndpointType.RANGE_EXCLUSIVE) .setHigh(Tuple.from(4, 2), EndpointType.RANGE_EXCLUSIVE) - .setScanProperties(forwardScanWithLimiter(limiter)); + .setScanProperties(forwardScanWithLimiter(limiter)) + .setSerializationMode(serializationMode); KeyValueCursor cursor1 = builder.build(); KeyValueCursor cursor2 = builder.build(); @@ -374,8 +405,9 @@ public void sharedLimiter() { }); } - @Test - public void limiterWithLookahead() { + @ParameterizedTest + @EnumSource(KeyValueCursorBase.SerializationMode.class) + public void limiterWithLookahead(KeyValueCursorBase.SerializationMode serializationMode) { fdb.run(context -> { RecordScanLimiter limiter = RecordScanLimiterFactory.enforce(1); KeyValueCursor kvCursor = KeyValueCursor.Builder.withSubspace(subspace) @@ -383,6 +415,7 @@ public void limiterWithLookahead() { .setLow(Tuple.from(3, 3), EndpointType.RANGE_EXCLUSIVE) .setHigh(Tuple.from(4, 2), EndpointType.RANGE_EXCLUSIVE) .setScanProperties(forwardScanWithLimiter(limiter)) + .setSerializationMode(serializationMode) .build(); RecordCursor cursor = kvCursor.skip(2); // should exhaust limit first RecordCursorResult result = cursor.getNext(); @@ -393,14 +426,16 @@ public void limiterWithLookahead() { }); } - @Test - public void emptyScan() { + @ParameterizedTest + @EnumSource(KeyValueCursorBase.SerializationMode.class) + public void emptyScan(KeyValueCursorBase.SerializationMode serializationMode) { fdb.run(context -> { RecordCursor cursor = KeyValueCursor.Builder.withSubspace(subspace) .setContext(context) .setRange(TupleRange.allOf(Tuple.from(9))) .setContinuation(null) .setScanProperties(ScanProperties.FORWARD_SCAN) + .setSerializationMode(serializationMode) .build(); RecordCursorResult result = cursor.getNext(); assertFalse(result.hasNext()); @@ -411,14 +446,16 @@ public void emptyScan() { }); } - @Test - public void emptyScanSplit() { + @ParameterizedTest + @EnumSource(KeyValueCursorBase.SerializationMode.class) + public void emptyScanSplit(KeyValueCursorBase.SerializationMode serializationMode) { fdb.run(context -> { RecordCursor kvCursor = KeyValueCursor.Builder.withSubspace(subspace) .setContext(context) .setRange(TupleRange.allOf(Tuple.from(9))) .setContinuation(null) .setScanProperties(ScanProperties.FORWARD_SCAN) + .setSerializationMode(serializationMode) .build(); RecordCursor cursor = new SplitHelper.KeyValueUnsplitter(context, subspace, kvCursor, false, null, false, new CursorLimitManager(context, ScanProperties.FORWARD_SCAN)); diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/indexes/MultidimensionalIndexTestBase.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/indexes/MultidimensionalIndexTestBase.java index bd5d25564b..78b7fc4671 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/indexes/MultidimensionalIndexTestBase.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/indexes/MultidimensionalIndexTestBase.java @@ -496,7 +496,7 @@ void basicReadWithNullsTest(final boolean useAsync, @Nonnull final String storag } void indexReadTest(final boolean useAsync, final long seed, final int numRecords, @Nonnull final String storage, - final boolean storeHilbertValues, final boolean useNodeSlotIndex) throws Exception { + final boolean storeHilbertValues, final boolean useNodeSlotIndex) throws Exception { final RecordMetaDataHook additionalIndexes = metaDataBuilder -> { addCalendarNameStartEpochIndex(metaDataBuilder); diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryIndexPlanWithOverScanTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryIndexPlanWithOverScanTest.java index 98763c8670..d1be40c1f3 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryIndexPlanWithOverScanTest.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryIndexPlanWithOverScanTest.java @@ -34,6 +34,7 @@ import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore; import com.apple.foundationdb.record.provider.foundationdb.FDBStoreTimer; +import com.apple.foundationdb.record.provider.foundationdb.KeyValueCursorBase; import com.apple.foundationdb.record.provider.foundationdb.query.FDBRecordStoreQueryTestBase; import com.apple.foundationdb.record.query.ParameterRelationshipGraph; import com.apple.foundationdb.record.query.RecordQuery; @@ -43,6 +44,7 @@ import com.apple.foundationdb.record.query.plan.RecordQueryPlannerConfiguration; import com.apple.test.BooleanSource; import com.apple.test.Tags; +import com.google.protobuf.ByteString; import com.google.protobuf.Message; import org.hamcrest.Matcher; import org.junit.jupiter.api.Tag; @@ -250,7 +252,6 @@ private static RecordCursorContinuation assertSameResults(@Nonnull FDBRecordStor overscanResult = overscanCursor.getNext(); assertEquals(indexResult.getContinuation().toByteString(), overscanResult.getContinuation().toByteString(), "Continuation byte strings should match"); assertArrayEquals(indexResult.getContinuation().toBytes(), overscanResult.getContinuation().toBytes(), "Continuation byte arrays should match"); - assertEquals(indexResult.hasNext(), overscanResult.hasNext(), "Overscan cursor should have next if index result has next"); if (indexResult.hasNext()) { assertEquals(indexResult.get().getRecord(), overscanResult.get().getRecord(), "Result returned via overscan cursor should match regular cursor"); diff --git a/fdb-relational-api/src/main/java/com/apple/foundationdb/relational/api/Options.java b/fdb-relational-api/src/main/java/com/apple/foundationdb/relational/api/Options.java index e9ca6e747a..d066e39838 100644 --- a/fdb-relational-api/src/main/java/com/apple/foundationdb/relational/api/Options.java +++ b/fdb-relational-api/src/main/java/com/apple/foundationdb/relational/api/Options.java @@ -194,7 +194,8 @@ public enum Name { * operations interacting with FDB. * Scope: Engine */ - ASYNC_OPERATIONS_TIMEOUT_MILLIS + ASYNC_OPERATIONS_TIMEOUT_MILLIS, + KEYVALUE_CURSOR_CONTINUATION_SERIALIZE_TO_NEW, } public enum IndexFetchMethod { @@ -229,6 +230,7 @@ public enum IndexFetchMethod { builder.put(Name.CASE_SENSITIVE_IDENTIFIERS, false); builder.put(Name.CONTINUATIONS_CONTAIN_COMPILED_STATEMENTS, true); builder.put(Name.ASYNC_OPERATIONS_TIMEOUT_MILLIS, 10_000L); + builder.put(Name.KEYVALUE_CURSOR_CONTINUATION_SERIALIZE_TO_NEW, false); OPTIONS_DEFAULT_VALUES = builder.build(); } @@ -364,6 +366,7 @@ private static Map> makeContracts() { data.put(Name.VALID_PLAN_HASH_MODES, List.of(TypeContract.stringType())); data.put(Name.CONTINUATIONS_CONTAIN_COMPILED_STATEMENTS, List.of(TypeContract.booleanType())); data.put(Name.ASYNC_OPERATIONS_TIMEOUT_MILLIS, List.of(TypeContract.longType(), RangeContract.of(0L, Long.MAX_VALUE))); + data.put(Name.KEYVALUE_CURSOR_CONTINUATION_SERIALIZE_TO_NEW, List.of(TypeContract.booleanType())); return Collections.unmodifiableMap(data); } diff --git a/fdb-relational-core/src/main/java/com/apple/foundationdb/relational/recordlayer/RecordLayerIterator.java b/fdb-relational-core/src/main/java/com/apple/foundationdb/relational/recordlayer/RecordLayerIterator.java index ed99e1130b..0e233a5cf0 100644 --- a/fdb-relational-core/src/main/java/com/apple/foundationdb/relational/recordlayer/RecordLayerIterator.java +++ b/fdb-relational-core/src/main/java/com/apple/foundationdb/relational/recordlayer/RecordLayerIterator.java @@ -90,6 +90,8 @@ private void fetchNextResult() { noNextReason = result.getNoNextReason(); if (noNextReason == RecordCursor.NoNextReason.SOURCE_EXHAUSTED) { this.continuation = ContinuationImpl.END; + } else { + this.continuation = ContinuationImpl.fromRecordCursorContinuation(result.getContinuation()); } } } diff --git a/fdb-relational-core/src/main/java/com/apple/foundationdb/relational/recordlayer/query/PlanGenerator.java b/fdb-relational-core/src/main/java/com/apple/foundationdb/relational/recordlayer/query/PlanGenerator.java index 94a4bfc73c..2ab8ea3814 100644 --- a/fdb-relational-core/src/main/java/com/apple/foundationdb/relational/recordlayer/query/PlanGenerator.java +++ b/fdb-relational-core/src/main/java/com/apple/foundationdb/relational/recordlayer/query/PlanGenerator.java @@ -395,6 +395,7 @@ private static CascadesPlanner createPlanner(@Nonnull RecordMetaData metaData, @Nonnull Options options) throws RelationalException { // todo (yhatem) TODO (Interaction between planner configurations and query cache) Options.IndexFetchMethod indexFetchMethod = options.getOption(Options.Name.INDEX_FETCH_METHOD); + boolean kvCursorContinuationSerializeToNew = options.getOption(Options.Name.KEYVALUE_CURSOR_CONTINUATION_SERIALIZE_TO_NEW); CascadesPlanner planner = new CascadesPlanner(metaData, recordStoreState); // TODO: TODO (Expose planner configuration parameters like index scan preference) RecordQueryPlannerConfiguration configuration = RecordQueryPlannerConfiguration.builder() diff --git a/fdb-relational-core/src/main/java/com/apple/foundationdb/relational/recordlayer/query/QueryPlan.java b/fdb-relational-core/src/main/java/com/apple/foundationdb/relational/recordlayer/query/QueryPlan.java index ee44d48eb9..d02986cf06 100644 --- a/fdb-relational-core/src/main/java/com/apple/foundationdb/relational/recordlayer/query/QueryPlan.java +++ b/fdb-relational-core/src/main/java/com/apple/foundationdb/relational/recordlayer/query/QueryPlan.java @@ -375,6 +375,7 @@ private RelationalResultSet executePhysicalPlan(@Nonnull final RecordLayerSchema final var executeProperties = connection.getExecuteProperties().toBuilder() .setReturnedRowLimit(options.getOption(Options.Name.MAX_ROWS)) .setDryRun(options.getOption(Options.Name.DRY_RUN)) + .setKvCursorContSerializeToNew(options.getOption(Options.Name.KEYVALUE_CURSOR_CONTINUATION_SERIALIZE_TO_NEW)) .build(); cursor = executionContext.metricCollector.clock( RelationalMetric.RelationalEvent.EXECUTE_RECORD_QUERY_PLAN, () -> recordQueryPlan.executePlan(fdbRecordStore, evaluationContext, diff --git a/fdb-relational-core/src/test/java/com/apple/foundationdb/relational/recordlayer/query/ForceContinuationQueryTests.java b/fdb-relational-core/src/test/java/com/apple/foundationdb/relational/recordlayer/query/ForceContinuationQueryTests.java new file mode 100644 index 0000000000..7fed79a85a --- /dev/null +++ b/fdb-relational-core/src/test/java/com/apple/foundationdb/relational/recordlayer/query/ForceContinuationQueryTests.java @@ -0,0 +1,242 @@ +/* + * JoinWithLimitTest.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2021-2024 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.relational.recordlayer.query; + +import com.apple.foundationdb.relational.api.Continuation; +import com.apple.foundationdb.relational.api.Options; +import com.apple.foundationdb.relational.api.exceptions.ContextualSQLException; +import com.apple.foundationdb.relational.recordlayer.EmbeddedRelationalExtension; +import com.apple.foundationdb.relational.recordlayer.RelationalConnectionRule; +import com.apple.foundationdb.relational.recordlayer.RelationalStatementRule; +import com.apple.foundationdb.relational.recordlayer.UniqueIndexTests; +import com.apple.foundationdb.relational.recordlayer.Utils; +import com.apple.foundationdb.relational.utils.SimpleDatabaseRule; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.sql.SQLException; +import java.util.stream.Stream; + +public class ForceContinuationQueryTests { + + @RegisterExtension + @Order(0) + public final EmbeddedRelationalExtension relationalExtension = new EmbeddedRelationalExtension(); + + private static final String getTemplate_definition = + "create table t1(id bigint, col1 bigint, col2 bigint, primary key(id))\n" + + "create index mv1 as select count(*) from t1\n" + + "create index mv2 as select count(*) from t1 group by col2\n" + + "create index mv3 as select count(col1) from t1\n" + + "create index mv4 as select count(col1) from t1 group by col2\n" + + + "create table t2(id bigint, col1 bigint, col2 bigint, col3 bigint, primary key(id))\n" + + "create index mv5 as select col2 from t2\n" + + "create index mv7 as select min_ever(col3) from t2\n" + + + "create table t3(id bigint, col1 bigint, col2 bigint, primary key(id))\n" + + "create index t3_i1 as select count(*) from t3\n" + + "create index t3_i2 as select count(*) from t3 group by col1\n" + + "create index t3_i3 as select count(col2) from t3\n" + + "create index t3_i4 as select count(col2) from t3 group by col1\n" + + "create index t3_i5 as select sum(col1) from t3\n" + + "create index t3_i6 as select sum(col1) from t3 group by col2"; + + @RegisterExtension + @Order(1) + public final SimpleDatabaseRule db = new SimpleDatabaseRule(relationalExtension, UniqueIndexTests.class, getTemplate_definition); + + @RegisterExtension + @Order(2) + public final RelationalConnectionRule connection = new RelationalConnectionRule(db::getConnectionUri) + .withOptions(Options.builder().withOption(Options.Name.CONTINUATIONS_CONTAIN_COMPILED_STATEMENTS, true).build()) + .withSchema(db.getSchemaName()); + + @RegisterExtension + @Order(3) + public final RelationalStatementRule statement = new RelationalStatementRule(connection); + + public ForceContinuationQueryTests() throws SQLException { + } + + @BeforeAll + public static void beforeAll() { + Utils.enableCascadesDebugger(); + } + + @BeforeEach + void setup() throws Exception { + statement.execute("INSERT INTO T1 VALUES (1, 10, 1), (2, null, 2), (3, null, 2), (4, 12, 2)"); + statement.execute("INSERT INTO T2 VALUES (1, 10, 1, 11), (2, null, 2, 20), (3, null, 2, 20), (4, 12, 2, 20)"); + statement.execute("insert into t3 values (1, 2, 3), (2, 2, 3), (3, 2, 3)"); + statement.execute("delete from t3"); + } + + @ParameterizedTest + @MethodSource("failedQueries") + void testOldSerializationFails(String sql, long result) throws Exception { + Continuation continuation; + statement.setMaxRows(1); + try (var resultSet = statement.executeQuery(sql)) { + Assertions.assertTrue(resultSet.next()); + Assertions.assertEquals(result, resultSet.getLong(1)); + continuation = resultSet.getContinuation(); + } + // old kvCursorContinuation cause continuation at beginning exception + try (final var preparedStatement = connection.prepareStatement("EXECUTE CONTINUATION ?param")) { + preparedStatement.setMaxRows(1); + preparedStatement.setBytes("param", continuation.serialize()); + Assertions.assertThrows(ContextualSQLException.class, preparedStatement::executeQuery); + } + } + + @ParameterizedTest + @MethodSource("failedQueries") + void testNewSerialization(String sql, long result) throws Exception { + connection.setOption(Options.Name.KEYVALUE_CURSOR_CONTINUATION_SERIALIZE_TO_NEW, true); + Continuation continuation; + try (final var s = connection.createStatement()) { + s.setMaxRows(1); + try (var resultSet = s.executeQuery(sql)) { + Assertions.assertTrue(resultSet.next()); + Assertions.assertEquals(result, resultSet.getLong(1)); + continuation = resultSet.getContinuation(); + } + } + // new serialization fixed the issue + try (final var preparedStatement = connection.prepareStatement("EXECUTE CONTINUATION ?param")) { + preparedStatement.setMaxRows(1); + preparedStatement.setBytes("param", continuation.serialize()); + try (var resultSet = preparedStatement.executeQuery()) { + Assertions.assertFalse(resultSet.next()); + } + } + } + + @Test + void testOldSerializationWorks() throws Exception { + Continuation continuation; + statement.setMaxRows(1); + try (var resultSet = statement.executeQuery("select count(*) from t2 group by col2")) { + Assertions.assertTrue(resultSet.next()); + Assertions.assertEquals(1L, resultSet.getLong(1)); + continuation = resultSet.getContinuation(); + } + try (final var preparedStatement = connection.prepareStatement("EXECUTE CONTINUATION ?param")) { + preparedStatement.setMaxRows(1); + preparedStatement.setBytes("param", continuation.serialize()); + try (var resultSet = preparedStatement.executeQuery()) { + Assertions.assertTrue(resultSet.next()); + Assertions.assertEquals(3L, resultSet.getLong(1)); + continuation = resultSet.getContinuation(); + } + } + try (final var preparedStatement = connection.prepareStatement("EXECUTE CONTINUATION ?param")) { + preparedStatement.setMaxRows(1); + preparedStatement.setBytes("param", continuation.serialize()); + try (var resultSet = preparedStatement.executeQuery()) { + Assertions.assertFalse(resultSet.next()); + } + } + } + + @Test + void testNewSerializationWorks() throws Exception { + connection.setOption(Options.Name.KEYVALUE_CURSOR_CONTINUATION_SERIALIZE_TO_NEW, true); + Continuation continuation; + try (final var s = connection.createStatement()) { + s.setMaxRows(1); + try (var resultSet = s.executeQuery("select count(*) from t2 group by col2")) { + Assertions.assertTrue(resultSet.next()); + Assertions.assertEquals(1L, resultSet.getLong(1)); + continuation = resultSet.getContinuation(); + } + } + try (final var preparedStatement = connection.prepareStatement("EXECUTE CONTINUATION ?param")) { + preparedStatement.setMaxRows(1); + preparedStatement.setBytes("param", continuation.serialize()); + try (var resultSet = preparedStatement.executeQuery()) { + Assertions.assertTrue(resultSet.next()); + Assertions.assertEquals(3L, resultSet.getLong(1)); + continuation = resultSet.getContinuation(); + } + } + try (final var preparedStatement = connection.prepareStatement("EXECUTE CONTINUATION ?param")) { + preparedStatement.setMaxRows(1); + preparedStatement.setBytes("param", continuation.serialize()); + try (var resultSet = preparedStatement.executeQuery()) { + Assertions.assertFalse(resultSet.next()); + } + } + } + + @Test + void testOldThenNewWorks() throws Exception { + Continuation continuation; + try (final var s = connection.createStatement()) { + s.setMaxRows(1); + try (var resultSet = s.executeQuery("select count(*) from t2 group by col2")) { + Assertions.assertTrue(resultSet.next()); + Assertions.assertEquals(1L, resultSet.getLong(1)); + continuation = resultSet.getContinuation(); + } + } + connection.setOption(Options.Name.KEYVALUE_CURSOR_CONTINUATION_SERIALIZE_TO_NEW, true); + try (final var preparedStatement = connection.prepareStatement("EXECUTE CONTINUATION ?param")) { + preparedStatement.setMaxRows(1); + preparedStatement.setBytes("param", continuation.serialize()); + try (var resultSet = preparedStatement.executeQuery()) { + Assertions.assertTrue(resultSet.next()); + Assertions.assertEquals(3L, resultSet.getLong(1)); + continuation = resultSet.getContinuation(); + } + } + try (final var preparedStatement = connection.prepareStatement("EXECUTE CONTINUATION ?param")) { + preparedStatement.setMaxRows(1); + preparedStatement.setBytes("param", continuation.serialize()); + try (var resultSet = preparedStatement.executeQuery()) { + Assertions.assertFalse(resultSet.next()); + } + } + } + + private static Stream failedQueries() { + return Stream.of( + // aggregate-index-count.yamsql + Arguments.of("select count(*) from t1", 4L), + Arguments.of("select count(col1) from t1", 2L), + // aggregate-index-tests.yamsql + Arguments.of("select min_ever(col3) from t2", 11L), + // aggregate-empty-table.yamsql + Arguments.of("select count(*) from t3", 0L), + Arguments.of("select count(col2) from t3", 0L), + Arguments.of("select sum(col1) from t3", 0L) + ); + } +} diff --git a/yaml-tests/src/test/resources/aggregate-index-tests-count.yamsql b/yaml-tests/src/test/resources/aggregate-index-tests-count.yamsql index c04499e1ce..5ae240d176 100644 --- a/yaml-tests/src/test/resources/aggregate-index-tests-count.yamsql +++ b/yaml-tests/src/test/resources/aggregate-index-tests-count.yamsql @@ -85,8 +85,6 @@ test_block: - - query: select count(*) from t2 - explain: "ISCAN(MV5 <,>) | MAP (_ AS _0) | AGG (count_star(*) AS _0) | ON EMPTY NULL | MAP (coalesce_long(_._0._0, promote(0l AS LONG)) AS _0)" - # Cannot run with FORCE_CONTINUATIONS due to: https://github.com/FoundationDB/fdb-record-layer/issues/3206 - - maxRows: 0 - result: [{4}] - - query: select count(*) from t2 group by col2 @@ -108,8 +106,6 @@ test_block: - - query: select count(col1) from t2 - explain: "ISCAN(MV5 <,>) | MAP (_ AS _0) | AGG (count(_._0.COL1) AS _0) | ON EMPTY NULL | MAP (coalesce_long(_._0._0, promote(0l AS LONG)) AS _0)" - # Cannot run with FORCE_CONTINUATIONS due to: https://github.com/FoundationDB/fdb-record-layer/issues/3206 - - maxRows: 0 - result: [{2}] - - query: select count(col1) from t2 group by col2 diff --git a/yaml-tests/src/test/resources/catalog.yamsql b/yaml-tests/src/test/resources/catalog.yamsql index 4b9ec7208b..e1aa8c7799 100644 --- a/yaml-tests/src/test/resources/catalog.yamsql +++ b/yaml-tests/src/test/resources/catalog.yamsql @@ -62,7 +62,6 @@ test_block: - query: select sum(cnt) from (select count(*) as cnt, template_name, template_version from schemas group by template_name, template_version having template_name = 'TEST_TEMPLATE_1') as t; - explain: "AISCAN(TEMPLATES_COUNT_INDEX [EQUALS promote(@c29 AS STRING)] BY_GROUP -> [_0: KEY:[0], _1: KEY:[1], _2: VALUE:[0]]) | MAP (_._2 AS CNT, _._0 AS TEMPLATE_NAME, _._1 AS TEMPLATE_VERSION) | MAP (_ AS _0) | AGG (sum_l(_._0.CNT) AS _0) | ON EMPTY NULL | MAP (_._0._0 AS _0)" - - maxRows: 0 # Disable force continuations because of empty continuation due to: https://github.com/FoundationDB/fdb-record-layer/issues/3206 - result: [{4}] - # How many schemas with the specified schemaTemplateName and schemaTemplateVersion exist in this cluster?