-
Notifications
You must be signed in to change notification settings - Fork 103
Fix the bug when aggregation stops before a group is finished #3177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
0ec127b
1ae9d4d
3a153b0
347e151
b4633a0
31da893
58a9aeb
c49f578
1017f00
14c8cfa
41afb52
a6ddf59
8dfc8d6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,11 +22,14 @@ | |
|
||
import com.apple.foundationdb.annotation.API; | ||
import com.apple.foundationdb.async.AsyncUtil; | ||
import com.apple.foundationdb.record.ByteArrayContinuation; | ||
import com.apple.foundationdb.record.RecordCursor; | ||
import com.apple.foundationdb.record.RecordCursorContinuation; | ||
import com.apple.foundationdb.record.RecordCursorEndContinuation; | ||
import com.apple.foundationdb.record.RecordCursorResult; | ||
import com.apple.foundationdb.record.RecordCursorStartContinuation; | ||
import com.apple.foundationdb.record.RecordCursorVisitor; | ||
import com.apple.foundationdb.record.provider.foundationdb.KeyValueCursorBase; | ||
import com.apple.foundationdb.record.query.plan.plans.QueryResult; | ||
import com.google.common.base.Verify; | ||
import com.google.protobuf.Message; | ||
|
@@ -57,48 +60,77 @@ public class AggregateCursor<M extends Message> implements RecordCursor<QueryRes | |
// Previous non-empty record processed by this cursor | ||
@Nullable | ||
private RecordCursorResult<QueryResult> previousValidResult; | ||
// last row in last group, is null if the current group is the first group | ||
@Nullable | ||
private RecordCursorResult<QueryResult> lastInLastGroup; | ||
byte[] continuation; | ||
|
||
public AggregateCursor(@Nonnull RecordCursor<QueryResult> inner, | ||
@Nonnull final StreamGrouping<M> streamGrouping, | ||
boolean isCreateDefaultOnEmpty) { | ||
boolean isCreateDefaultOnEmpty, | ||
byte[] continuation) { | ||
this.inner = inner; | ||
this.streamGrouping = streamGrouping; | ||
this.isCreateDefaultOnEmpty = isCreateDefaultOnEmpty; | ||
this.continuation = continuation; | ||
} | ||
|
||
@Nonnull | ||
@Override | ||
public CompletableFuture<RecordCursorResult<QueryResult>> onNext() { | ||
if (previousResult != null && !previousResult.hasNext()) { | ||
// we are done | ||
return CompletableFuture.completedFuture(RecordCursorResult.exhausted()); | ||
return CompletableFuture.completedFuture(RecordCursorResult.withoutNextValue(previousResult.getContinuation(), | ||
previousResult.getNoNextReason())); | ||
} | ||
|
||
return AsyncUtil.whileTrue(() -> inner.onNext().thenApply(innerResult -> { | ||
previousResult = innerResult; | ||
if (!innerResult.hasNext()) { | ||
if (!isNoRecords() || (isCreateDefaultOnEmpty && streamGrouping.isResultOnEmpty())) { | ||
// the method streamGrouping.finalizeGroup() computes previousCompleteResult and resets the accumulator | ||
streamGrouping.finalizeGroup(); | ||
} | ||
return false; | ||
} else { | ||
final QueryResult queryResult = Objects.requireNonNull(innerResult.get()); | ||
boolean groupBreak = streamGrouping.apply(queryResult); | ||
previousValidResult = innerResult; | ||
if (groupBreak) { | ||
lastInLastGroup = previousValidResult; | ||
} else { | ||
previousValidResult = innerResult; | ||
} | ||
return (!groupBreak); | ||
} | ||
}), getExecutor()).thenApply(vignore -> { | ||
if (isNoRecords()) { | ||
// Edge case where there are no records at all | ||
if (isCreateDefaultOnEmpty && streamGrouping.isResultOnEmpty()) { | ||
return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), RecordCursorStartContinuation.START); | ||
// either innerResult.hasNext() = false; or groupBreak = true | ||
if (Verify.verifyNotNull(previousResult).hasNext()) { | ||
// in this case groupBreak = true, return aggregated result and continuation | ||
RecordCursorContinuation continuation = Verify.verifyNotNull(previousValidResult).getContinuation(); | ||
previousValidResult = previousResult; | ||
return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), continuation); | ||
} else { | ||
if (Verify.verifyNotNull(previousResult).getNoNextReason() == NoNextReason.SOURCE_EXHAUSTED) { | ||
if (previousValidResult == null) { | ||
return RecordCursorResult.exhausted(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this need to check to insert the empty result if Perhaps it would be simpler if there were fewer moving parts here. If we wait until 4.2, we could drop support entirely for |
||
} else { | ||
RecordCursorContinuation continuation =previousValidResult.getContinuation(); | ||
previousValidResult = previousResult; | ||
return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), continuation); | ||
} | ||
} else { | ||
return RecordCursorResult.exhausted(); | ||
RecordCursorContinuation currentContinuation; | ||
// in the current scan, if current group is the first group, set the continuation to the start of the current scan | ||
// otherwise set the continuation to the last row in the last group | ||
if (lastInLastGroup == null) { | ||
currentContinuation = continuation == null ? RecordCursorStartContinuation.START : ByteArrayContinuation.fromNullable(continuation); | ||
} else { | ||
currentContinuation = lastInLastGroup.getContinuation(); | ||
} | ||
previousValidResult = lastInLastGroup; | ||
return RecordCursorResult.withoutNextValue(currentContinuation, Verify.verifyNotNull(previousResult).getNoNextReason()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this also set That is: RecordCursor<T> aggCursor = getAggregateCursor();
RecordCursorResult<T> withValueResult = aggCursor.getNext(); // has a value
RecordCursorResult<T> withoutValueResult1 = aggCursor.getNext(); // first result with out-of-bound limit; continuation borrowed from withValueResult
RecordCursorResult<T> withoutValueResult2 = aggCursor.getNext(); // second result with out-of-bound limit; continuation may be greater than withoutValueResult1 In general, once a cursor returns a |
||
} | ||
} | ||
// Use the last valid result for the continuation as we need non-terminal one here. | ||
RecordCursorContinuation continuation = Verify.verifyNotNull(previousValidResult).getContinuation(); | ||
return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), continuation); | ||
}); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,9 +20,17 @@ | |
|
||
package com.apple.foundationdb.record.provider.foundationdb.query; | ||
|
||
import com.apple.foundationdb.record.ByteScanLimiterFactory; | ||
import com.apple.foundationdb.record.EvaluationContext; | ||
import com.apple.foundationdb.record.ExecuteProperties; | ||
import com.apple.foundationdb.record.ExecuteState; | ||
import com.apple.foundationdb.record.RecordCursor; | ||
import com.apple.foundationdb.record.RecordCursorContinuation; | ||
import com.apple.foundationdb.record.RecordCursorEndContinuation; | ||
import com.apple.foundationdb.record.RecordCursorResult; | ||
import com.apple.foundationdb.record.RecordCursorStartContinuation; | ||
import com.apple.foundationdb.record.RecordMetaData; | ||
import com.apple.foundationdb.record.RecordScanLimiterFactory; | ||
import com.apple.foundationdb.record.TestRecords1Proto; | ||
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext; | ||
import com.apple.foundationdb.record.query.plan.ScanComparisons; | ||
|
@@ -48,15 +56,21 @@ | |
import org.junit.jupiter.api.Assertions; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Tag; | ||
import org.junit.jupiter.api.Test; | ||
import org.junit.jupiter.params.ParameterizedTest; | ||
import org.junit.jupiter.params.provider.Arguments; | ||
import org.junit.jupiter.params.provider.MethodSource; | ||
|
||
import javax.annotation.Nonnull; | ||
import javax.annotation.Nullable; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.LinkedList; | ||
import java.util.List; | ||
import java.util.function.BiConsumer; | ||
import java.util.function.Function; | ||
import java.util.stream.Stream; | ||
|
||
/** | ||
* Tests related to planning and executing queries with string collation. | ||
|
@@ -213,6 +227,26 @@ void aggregateThreeGroupByTwo(final boolean useNestedResult) { | |
} | ||
} | ||
|
||
@ParameterizedTest(name = "[{displayName}-{index}] {0}") | ||
@MethodSource("provideArguments") | ||
void aggregateOneGroupByThree(final boolean useNestedResult, final int rowLimit) { | ||
// each group only has one row | ||
try (final var context = openContext()) { | ||
openSimpleRecordStore(context, NO_HOOK); | ||
|
||
final var plan = | ||
new AggregationPlanBuilder(recordStore.getRecordMetaData(), "MySimpleRecord") | ||
.withAggregateValue("num_value_2", value -> new NumericAggregationValue.Sum(NumericAggregationValue.PhysicalOperator.SUM_I, value)) | ||
.withGroupCriterion("num_value_3_indexed") | ||
.withGroupCriterion("str_value_indexed") | ||
.withGroupCriterion("num_value_unique") | ||
.build(useNestedResult); | ||
|
||
final var result = executePlanWithRowLimit(plan, rowLimit); | ||
assertResults(useNestedResult ? this::assertResultNested : this::assertResultFlattened, result, resultOf(0, "0", 0, 0), resultOf(0, "0", 1, 1), resultOf(1, "0", 2, 2), resultOf(1, "1", 3, 3), resultOf(2, "1", 4, 4), resultOf(2, "1", 5, 5)); | ||
} | ||
} | ||
|
||
@ParameterizedTest(name = "[{displayName}-{index}] {0}") | ||
@BooleanSource | ||
void aggregateNoRecords(final boolean useNestedResult) { | ||
|
@@ -280,6 +314,66 @@ void aggregateNoRecordsNoGroupNoAggregate(final boolean useNestedResult) { | |
} | ||
} | ||
|
||
@Test | ||
void aggregateHitScanLimitReached() { | ||
try (final var context = openContext()) { | ||
openSimpleRecordStore(context, NO_HOOK); | ||
|
||
final var plan = | ||
new AggregationPlanBuilder(recordStore.getRecordMetaData(), "MySimpleRecord") | ||
.withAggregateValue("num_value_2", value -> new NumericAggregationValue.Sum(NumericAggregationValue.PhysicalOperator.SUM_I, value)) | ||
.withGroupCriterion("str_value_indexed") | ||
.build(false); | ||
|
||
// In the testing data, there are 2 groups, each group has 3 rows. | ||
// recordScanLimit = 5: scans 3 rows, and the 4th scan hits SCAN_LIMIT_REACHED | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure why it only scans 4 times instead of 5, but the offset=1 seems consistent. |
||
// although the first group contains exactly 3 rows, we don't know we've finished the first group before we get to the 4th row, so nothing is returned, continuation is back to START | ||
RecordCursorContinuation continuation1 = executePlanWithRecordScanLimit(plan, 5, null, null); | ||
Assertions.assertEquals(RecordCursorStartContinuation.START, continuation1); | ||
// recordScanLimit = 6: scans 4 rows, and the 5th scan hits SCAN_LIMIT_REACHED, we know that we've finished the 1st group, aggregated result is returned | ||
RecordCursorContinuation continuation2 = executePlanWithRecordScanLimit(plan, 6, continuation1.toBytes(), resultOf("0", 3)); | ||
// continue with recordScanLimit = 5, scans 3 rows and hits SCAN_LIMIT_REACHED | ||
// again, we don't know that we've finished the 2nd group, nothing is returned, continuation is back to where the scan starts | ||
RecordCursorContinuation continuation3 = executePlanWithRecordScanLimit(plan, 5, continuation2.toBytes(), null); | ||
Assertions.assertTrue(Arrays.equals(continuation2.toBytes(), continuation3.toBytes())); | ||
// finish the 2nd group, aggregated result is returned, exhausted the source | ||
RecordCursorContinuation continuation4 = executePlanWithRecordScanLimit(plan, 6, continuation3.toBytes(), resultOf("1", 12)); | ||
Assertions.assertEquals(RecordCursorEndContinuation.END, continuation4); | ||
} | ||
} | ||
|
||
private RecordCursorContinuation executePlanWithRecordScanLimit(final RecordQueryPlan plan, final int recordScanLimit, byte[] continuation, @Nullable List<?> expectedResult) { | ||
List<QueryResult> queryResults = new LinkedList<>(); | ||
RecordCursor<QueryResult> currentCursor = executePlan(plan, 0, recordScanLimit, continuation); | ||
RecordCursorResult<QueryResult> currentCursorResult; | ||
RecordCursorContinuation cursorContinuation; | ||
while (true) { | ||
currentCursorResult = currentCursor.getNext(); | ||
cursorContinuation = currentCursorResult.getContinuation(); | ||
if (!currentCursorResult.hasNext()) { | ||
break; | ||
} | ||
queryResults.add(currentCursorResult.get()); | ||
} | ||
if (expectedResult == null) { | ||
Assertions.assertTrue(queryResults.isEmpty()); | ||
} else { | ||
assertResults(this::assertResultFlattened, queryResults, expectedResult); | ||
} | ||
return cursorContinuation; | ||
} | ||
|
||
private static Stream<Arguments> provideArguments() { | ||
// (boolean, rowLimit) | ||
// setting rowLimit = 0 is equivalent to no limit | ||
List<Arguments> arguments = new LinkedList<>(); | ||
for (int i = 0; i <= 4; i++) { | ||
arguments.add(Arguments.of(false, i)); | ||
arguments.add(Arguments.of(true, i)); | ||
} | ||
return arguments.stream(); | ||
} | ||
|
||
private void populateDB(final int numRecords) throws Exception { | ||
try (FDBRecordContext context = openContext()) { | ||
openSimpleRecordStore(context); | ||
|
@@ -290,12 +384,32 @@ private void populateDB(final int numRecords) throws Exception { | |
recBuilder.setNumValue2(i); | ||
recBuilder.setNumValue3Indexed(i / 2); // some field that changes every 2nd record | ||
recBuilder.setStrValueIndexed(Integer.toString(i / 3)); // some field that changes every 3rd record | ||
recBuilder.setNumValueUnique(i); | ||
recordStore.saveRecord(recBuilder.build()); | ||
} | ||
commit(context); | ||
} | ||
} | ||
|
||
@Nonnull | ||
private RecordCursor<QueryResult> executePlan(final RecordQueryPlan plan, final int rowLimit, final int recordScanLimit, final byte[] continuation) { | ||
final var types = plan.getDynamicTypes(); | ||
final var typeRepository = TypeRepository.newBuilder().addAllTypes(types).build(); | ||
ExecuteState executeState; | ||
if (recordScanLimit > 0) { | ||
executeState = new ExecuteState(RecordScanLimiterFactory.enforce(recordScanLimit), ByteScanLimiterFactory.tracking()); | ||
} else { | ||
executeState = ExecuteState.NO_LIMITS; | ||
} | ||
ExecuteProperties executeProperties = ExecuteProperties.SERIAL_EXECUTE; | ||
executeProperties = executeProperties.setReturnedRowLimit(rowLimit).setState(executeState); | ||
try { | ||
return plan.executePlan(recordStore, EvaluationContext.forTypeRepository(typeRepository), continuation, executeProperties); | ||
} catch (final Throwable t) { | ||
throw Assertions.<RuntimeException>fail(t); | ||
} | ||
} | ||
|
||
@Nonnull | ||
private List<QueryResult> executePlan(final RecordQueryPlan plan) { | ||
final var types = plan.getDynamicTypes(); | ||
|
@@ -307,6 +421,40 @@ private List<QueryResult> executePlan(final RecordQueryPlan plan) { | |
} | ||
} | ||
|
||
@Nonnull | ||
private RecordCursor<QueryResult> executePlan(final RecordQueryPlan plan, final int rowLimit, final byte[] continuation) { | ||
final var types = plan.getDynamicTypes(); | ||
final var typeRepository = TypeRepository.newBuilder().addAllTypes(types).build(); | ||
ExecuteProperties executeProperties = ExecuteProperties.SERIAL_EXECUTE; | ||
executeProperties = executeProperties.setReturnedRowLimit(rowLimit); | ||
try { | ||
return plan.executePlan(recordStore, EvaluationContext.forTypeRepository(typeRepository), continuation, executeProperties); | ||
} catch (final Throwable t) { | ||
throw Assertions.<RuntimeException>fail(t); | ||
} | ||
} | ||
|
||
private List<QueryResult> executePlanWithRowLimit(final RecordQueryPlan plan, final int rowLimit) { | ||
byte[] continuation = null; | ||
List<QueryResult> queryResults = new LinkedList<>(); | ||
while (true) { | ||
RecordCursor<QueryResult> currentCursor = executePlan(plan, rowLimit, continuation); | ||
RecordCursorResult<QueryResult> currentCursorResult; | ||
while (true) { | ||
currentCursorResult = currentCursor.getNext(); | ||
continuation = currentCursorResult.getContinuation().toBytes(); | ||
if (!currentCursorResult.hasNext()) { | ||
break; | ||
} | ||
queryResults.add(currentCursorResult.get()); | ||
} | ||
if (currentCursorResult.getNoNextReason() == RecordCursor.NoNextReason.SOURCE_EXHAUSTED) { | ||
break; | ||
} | ||
} | ||
return queryResults; | ||
} | ||
|
||
private void assertResults(@Nonnull final BiConsumer<QueryResult, List<?>> checkConsumer, @Nonnull final List<QueryResult> actual, @Nonnull final List<?>... expected) { | ||
Assertions.assertEquals(expected.length, actual.size()); | ||
for (var i = 0 ; i < actual.size() ; i++) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this still hasn't marked the
continuation
field as private. Is there a reason it needs package visibility?