Skip to content

Fix bug of scan aggregate index returning empty non-end continuation #3397

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,13 @@ public class ExecuteProperties {
// how record scan limit reached is handled -- false: return early with continuation, true: throw exception
private final boolean failOnScanLimitReached;
private final boolean isDryRun;
private final boolean kvCursorContSerializeToNew;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add comments in the next commit.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. I'm still not sure this really belongs in the ExecuteProperties. The point of the ExecuteProperties configuration is that it's supposed to contain stuff that a user might want to configure differently for other identical invocations of some operation. For example, various limits are in here, which could be applied to different plans. This is different, because for a given plan, it's actually very important that the user correctly serialize/deserialize KV cursor continuations. Which doesn't seem to fit the pattern


private final CursorStreamingMode defaultCursorStreamingMode;

@SuppressWarnings("java:S107")
private ExecuteProperties(int skip, int rowLimit, @Nonnull IsolationLevel isolationLevel, long timeLimit,
@Nonnull ExecuteState state, boolean failOnScanLimitReached, @Nonnull CursorStreamingMode defaultCursorStreamingMode, boolean isDryRun) {
@Nonnull ExecuteState state, boolean failOnScanLimitReached, @Nonnull CursorStreamingMode defaultCursorStreamingMode, boolean isDryRun, boolean kvCursorContSerializeToNew) {
this.skip = skip;
this.rowLimit = rowLimit;
this.isolationLevel = isolationLevel;
Expand All @@ -86,6 +87,7 @@ private ExecuteProperties(int skip, int rowLimit, @Nonnull IsolationLevel isolat
this.failOnScanLimitReached = failOnScanLimitReached;
this.defaultCursorStreamingMode = defaultCursorStreamingMode;
this.isDryRun = isDryRun;
this.kvCursorContSerializeToNew = kvCursorContSerializeToNew;
}

@Nonnull
Expand All @@ -102,7 +104,7 @@ public ExecuteProperties setSkip(final int skip) {
if (skip == this.skip) {
return this;
}
return copy(skip, rowLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
return copy(skip, rowLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

public boolean isDryRun() {
Expand All @@ -114,9 +116,11 @@ public ExecuteProperties setDryRun(final boolean isDryRun) {
if (isDryRun == this.isDryRun) {
return this;
}
return copy(skip, rowLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
return copy(skip, rowLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

public boolean isKvCursorContSerializeToNew() {return kvCursorContSerializeToNew;}


/**
* Get the limit on the number of rows that will be returned as it would be passed to FDB.
Expand All @@ -137,7 +141,7 @@ public ExecuteProperties setReturnedRowLimit(final int rowLimit) {
if (newLimit == this.rowLimit) {
return this;
}
return copy(skip, newLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
return copy(skip, newLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

/**
Expand Down Expand Up @@ -184,7 +188,7 @@ public ExecuteState getState() {
*/
@Nonnull
public ExecuteProperties setState(@Nonnull ExecuteState newState) {
return copy(skip, rowLimit, timeLimit, isolationLevel, newState, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
return copy(skip, rowLimit, timeLimit, isolationLevel, newState, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

/**
Expand All @@ -193,7 +197,7 @@ public ExecuteProperties setState(@Nonnull ExecuteState newState) {
*/
@Nonnull
public ExecuteProperties clearState() {
return copy(skip, rowLimit, timeLimit, isolationLevel, new ExecuteState(), failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
return copy(skip, rowLimit, timeLimit, isolationLevel, new ExecuteState(), failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

/**
Expand All @@ -209,15 +213,15 @@ public ExecuteProperties setFailOnScanLimitReached(boolean failOnScanLimitReache
if (failOnScanLimitReached == this.failOnScanLimitReached) {
return this;
}
return copy(skip, rowLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
return copy(skip, rowLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

@Nonnull
public ExecuteProperties clearReturnedRowLimit() {
if (getReturnedRowLimit() == ReadTransaction.ROW_LIMIT_UNLIMITED) {
return this;
}
return copy(skip, ReadTransaction.ROW_LIMIT_UNLIMITED, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
return copy(skip, ReadTransaction.ROW_LIMIT_UNLIMITED, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

/**
Expand All @@ -229,7 +233,7 @@ public ExecuteProperties clearRowAndTimeLimits() {
if (getTimeLimit() == UNLIMITED_TIME && getReturnedRowLimit() == ReadTransaction.ROW_LIMIT_UNLIMITED) {
return this;
}
return copy(skip, ReadTransaction.ROW_LIMIT_UNLIMITED, UNLIMITED_TIME, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
return copy(skip, ReadTransaction.ROW_LIMIT_UNLIMITED, UNLIMITED_TIME, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

/**
Expand All @@ -241,7 +245,7 @@ public ExecuteProperties clearSkipAndLimit() {
if (skip == 0 && rowLimit == ReadTransaction.ROW_LIMIT_UNLIMITED) {
return this;
}
return copy(0, ReadTransaction.ROW_LIMIT_UNLIMITED, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
return copy(0, ReadTransaction.ROW_LIMIT_UNLIMITED, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

/**
Expand All @@ -254,7 +258,7 @@ public ExecuteProperties clearSkipAndAdjustLimit() {
return this;
}
return copy(0, rowLimit == ReadTransaction.ROW_LIMIT_UNLIMITED ? ReadTransaction.ROW_LIMIT_UNLIMITED : rowLimit + skip,
timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

/**
Expand Down Expand Up @@ -305,7 +309,7 @@ public ExecuteProperties setDefaultCursorStreamingMode(CursorStreamingMode defau
if (defaultCursorStreamingMode == this.defaultCursorStreamingMode) {
return this;
}
return copy(skip, rowLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
return copy(skip, rowLimit, timeLimit, isolationLevel, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

/**
Expand All @@ -315,7 +319,7 @@ public ExecuteProperties setDefaultCursorStreamingMode(CursorStreamingMode defau
*/
@Nonnull
public ExecuteProperties resetState() {
return copy(skip, rowLimit, timeLimit, isolationLevel, state.reset(), failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
return copy(skip, rowLimit, timeLimit, isolationLevel, state.reset(), failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

/**
Expand All @@ -333,8 +337,8 @@ public ExecuteProperties resetState() {
@SuppressWarnings("java:S107")
@Nonnull
protected ExecuteProperties copy(int skip, int rowLimit, long timeLimit, @Nonnull IsolationLevel isolationLevel,
@Nonnull ExecuteState state, boolean failOnScanLimitReached, CursorStreamingMode defaultCursorStreamingMode, boolean isDryRun) {
return new ExecuteProperties(skip, rowLimit, isolationLevel, timeLimit, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
@Nonnull ExecuteState state, boolean failOnScanLimitReached, CursorStreamingMode defaultCursorStreamingMode, boolean isDryRun, boolean kvCursorContSerializeToNew) {
return new ExecuteProperties(skip, rowLimit, isolationLevel, timeLimit, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}

@Nonnull
Expand Down Expand Up @@ -408,6 +412,7 @@ public static class Builder {
private ExecuteState executeState = null;
private boolean failOnScanLimitReached = false;
private boolean isDryRun = false;
private boolean kvCursorContSerializeToNew = false;
private CursorStreamingMode defaultCursorStreamingMode = CursorStreamingMode.ITERATOR;

private Builder() {
Expand All @@ -422,6 +427,7 @@ private Builder(ExecuteProperties executeProperties) {
this.failOnScanLimitReached = executeProperties.failOnScanLimitReached;
this.defaultCursorStreamingMode = executeProperties.defaultCursorStreamingMode;
this.isDryRun = executeProperties.isDryRun;
this.kvCursorContSerializeToNew = executeProperties.kvCursorContSerializeToNew;
}

@Nonnull
Expand Down Expand Up @@ -455,6 +461,12 @@ public Builder setDryRun(boolean isDryRun) {
return this;
}

@Nonnull
public Builder setKvCursorContSerializeToNew(boolean kvCursorContSerializeToNew) {
this.kvCursorContSerializeToNew = kvCursorContSerializeToNew;
return this;
}

@Nonnull
public Builder setReturnedRowLimit(int rowLimit) {
this.rowLimit = validateAndNormalizeRowLimit(rowLimit);
Expand Down Expand Up @@ -607,7 +619,7 @@ public ExecuteProperties build() {
} else {
state = new ExecuteState(RecordScanLimiterFactory.enforce(scannedRecordsLimit), ByteScanLimiterFactory.enforce(scannedBytesLimit));
}
return new ExecuteProperties(skip, rowLimit, isolationLevel, timeLimit, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun);
return new ExecuteProperties(skip, rowLimit, isolationLevel, timeLimit, state, failOnScanLimitReached, defaultCursorStreamingMode, isDryRun, kvCursorContSerializeToNew);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ private IndexPrefetchRangeKeyValueCursor(@Nonnull final FDBRecordContext context
@Nonnull final AsyncIterator<MappedKeyValue> iterator,
int prefixLength,
@Nonnull final CursorLimitManager limitManager,
int valuesLimit) {
int valuesLimit,
SerializationMode serializationMode) {

super(context, iterator, prefixLength, limitManager, valuesLimit);
super(context, iterator, prefixLength, limitManager, valuesLimit, serializationMode);
}

/**
Expand All @@ -69,7 +70,7 @@ public IndexPrefetchRangeKeyValueCursor build() {
AsyncIterator<MappedKeyValue> iterator = getTransaction()
.getMappedRange(getBegin(), getEnd(), mapper, getLimit(), isReverse(), getStreamingMode())
.iterator();
return new IndexPrefetchRangeKeyValueCursor(getContext(), iterator, getPrefixLength(), getLimitManager(), getValuesLimit());
return new IndexPrefetchRangeKeyValueCursor(getContext(), iterator, getPrefixLength(), getLimitManager(), getValuesLimit(), serializationMode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ private KeyValueCursor(@Nonnull final FDBRecordContext context,
@Nonnull final AsyncIterator<KeyValue> iterator,
int prefixLength,
@Nonnull final CursorLimitManager limitManager,
int valuesLimit) {
super(context, iterator, prefixLength, limitManager, valuesLimit);
int valuesLimit,
SerializationMode serializationMode) {
super(context, iterator, prefixLength, limitManager, valuesLimit, serializationMode);
}

/**
Expand Down Expand Up @@ -77,7 +78,7 @@ public KeyValueCursor build() {
final AsyncIterator<KeyValue> iterator = getTransaction()
.getRange(getBegin(), getEnd(), getLimit(), isReverse(), getStreamingMode())
.iterator();
return new KeyValueCursor(getContext(), iterator, getPrefixLength(), getLimitManager(), getValuesLimit());
return new KeyValueCursor(getContext(), iterator, getPrefixLength(), getLimitManager(), getValuesLimit(), serializationMode);
}

@Override
Expand Down
Loading
Loading