Skip to content

Commit af5e914

Browse files
committed
Streaming aggregate cursor now requires 4.1.9.0 or later for continuation deserialization
This cleans up the `StreamingAggregateCursor` and its helper classes to remove the `createDefaultIfEmpty` option. We'd been carrying that option around as we introduced that field in FoundationDB#3092 and wanted to preserve behavior when upgrading from older versions. The intention had been that all new plans would set the option starting in 4.1, but a bug (fixed in FoundationDB#3211; see FoundationDB#3096) means that we didn't actually enable it until 4.1.9.0. That means that after this change, we'd require 4.1.9.0 or newer in order to safely continue these queries. In theory, we could wait with this change, but 4.1.9.0 has enough fixes that I actually think our recommendation should be that anyone upgrading from 4.0 or below go straight to 4.1.9.0, and then they can proceed safely to a newer version with this change. I was able to validate that this change is compatible with 4.1.9.0 via the cross-version tests run during PRB. When I ran the full mixed mode tests with the `aggregate-index-tests.yamsql`, I found that only 4.1.9.0 worked, which is expected. This resolves FoundationDB#3092.
1 parent fe26701 commit af5e914

File tree

3 files changed

+14
-33
lines changed

3 files changed

+14
-33
lines changed

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/AggregateCursor.java

+3-11
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import com.apple.foundationdb.record.RecordCursor;
2626
import com.apple.foundationdb.record.RecordCursorContinuation;
2727
import com.apple.foundationdb.record.RecordCursorResult;
28-
import com.apple.foundationdb.record.RecordCursorStartContinuation;
2928
import com.apple.foundationdb.record.RecordCursorVisitor;
3029
import com.apple.foundationdb.record.query.plan.plans.QueryResult;
3130
import com.google.common.base.Verify;
@@ -50,7 +49,6 @@ public class AggregateCursor<M extends Message> implements RecordCursor<QueryRes
5049
// group aggregator to break incoming records into groups
5150
@Nonnull
5251
private final StreamGrouping<M> streamGrouping;
53-
private final boolean isCreateDefaultOnEmpty;
5452
// Previous record processed by this cursor
5553
@Nullable
5654
private RecordCursorResult<QueryResult> previousResult;
@@ -59,11 +57,9 @@ public class AggregateCursor<M extends Message> implements RecordCursor<QueryRes
5957
private RecordCursorResult<QueryResult> previousValidResult;
6058

6159
public AggregateCursor(@Nonnull RecordCursor<QueryResult> inner,
62-
@Nonnull final StreamGrouping<M> streamGrouping,
63-
boolean isCreateDefaultOnEmpty) {
60+
@Nonnull final StreamGrouping<M> streamGrouping) {
6461
this.inner = inner;
6562
this.streamGrouping = streamGrouping;
66-
this.isCreateDefaultOnEmpty = isCreateDefaultOnEmpty;
6763
}
6864

6965
@Nonnull
@@ -77,7 +73,7 @@ public CompletableFuture<RecordCursorResult<QueryResult>> onNext() {
7773
return AsyncUtil.whileTrue(() -> inner.onNext().thenApply(innerResult -> {
7874
previousResult = innerResult;
7975
if (!innerResult.hasNext()) {
80-
if (!isNoRecords() || (isCreateDefaultOnEmpty && streamGrouping.isResultOnEmpty())) {
76+
if (!isNoRecords()) {
8177
streamGrouping.finalizeGroup();
8278
}
8379
return false;
@@ -93,11 +89,7 @@ public CompletableFuture<RecordCursorResult<QueryResult>> onNext() {
9389
}), getExecutor()).thenApply(vignore -> {
9490
if (isNoRecords()) {
9591
// Edge case where there are no records at all
96-
if (isCreateDefaultOnEmpty && streamGrouping.isResultOnEmpty()) {
97-
return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), RecordCursorStartContinuation.START);
98-
} else {
99-
return RecordCursorResult.exhausted();
100-
}
92+
return RecordCursorResult.exhausted();
10193
}
10294
// Use the last valid result for the continuation as we need non-terminal one here.
10395
RecordCursorContinuation continuation = Verify.verifyNotNull(previousValidResult).getContinuation();

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/aggregate/StreamGrouping.java

-4
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,4 @@ private Object evalGroupingKey(@Nullable final Object currentObject) {
193193
final EvaluationContext nestedContext = context.withBinding(Bindings.Internal.CORRELATION, alias, currentObject);
194194
return Objects.requireNonNull(groupingKeyValue).eval(store, nestedContext);
195195
}
196-
197-
public boolean isResultOnEmpty() {
198-
return groupingKeyValue == null;
199-
}
200196
}

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryStreamingAggregationPlan.java

+11-18
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.apple.foundationdb.record.PlanDeserializer;
2828
import com.apple.foundationdb.record.PlanHashable;
2929
import com.apple.foundationdb.record.PlanSerializationContext;
30+
import com.apple.foundationdb.record.RecordCoreArgumentException;
3031
import com.apple.foundationdb.record.RecordCursor;
3132
import com.apple.foundationdb.record.cursors.aggregate.AggregateCursor;
3233
import com.apple.foundationdb.record.cursors.aggregate.StreamGrouping;
@@ -96,13 +97,6 @@ public class RecordQueryStreamingAggregationPlan implements RecordQueryPlanWithC
9697
private final CorrelationIdentifier aggregateAlias;
9798
@Nonnull
9899
private final Value completeResultValue;
99-
//
100-
// This flag is needed to distinguish if we need to create a default value on-empty or not (i.e.
101-
// RecordQueryDefaultOnEmptyPlan will do that going forward). We will always plan with that flag set to false going
102-
// forward, but we accept and honor this field coming from proto if we are continuing OR if it not there we imply
103-
// true.
104-
// https://github.com/FoundationDB/fdb-record-layer/issues/3107
105-
private final boolean isCreateDefaultOnEmpty;
106100

107101
/**
108102
* Construct a new plan.
@@ -119,15 +113,13 @@ private RecordQueryStreamingAggregationPlan(@Nonnull final Quantifier.Physical i
119113
@Nonnull final AggregateValue aggregateValue,
120114
@Nonnull final CorrelationIdentifier groupingKeyAlias,
121115
@Nonnull final CorrelationIdentifier aggregateAlias,
122-
@Nonnull final Value completeResultValue,
123-
final boolean isCreateDefaultOnEmpty) {
116+
@Nonnull final Value completeResultValue) {
124117
this.inner = inner;
125118
this.groupingKeyValue = groupingKeyValue;
126119
this.aggregateValue = aggregateValue;
127120
this.groupingKeyAlias = groupingKeyAlias;
128121
this.aggregateAlias = aggregateAlias;
129122
this.completeResultValue = completeResultValue;
130-
this.isCreateDefaultOnEmpty = isCreateDefaultOnEmpty;
131123
}
132124

133125
@Nonnull
@@ -148,7 +140,7 @@ public <M extends Message> RecordCursor<QueryResult> executePlan(@Nonnull FDBRec
148140
(FDBRecordStoreBase<Message>)store,
149141
context,
150142
inner.getAlias());
151-
return new AggregateCursor<>(innerCursor, streamGrouping, isCreateDefaultOnEmpty)
143+
return new AggregateCursor<>(innerCursor, streamGrouping)
152144
.skipThenLimit(executeProperties.getSkip(),
153145
executeProperties.getReturnedRowLimit());
154146
}
@@ -206,8 +198,7 @@ public RecordQueryStreamingAggregationPlan translateCorrelations(@Nonnull final
206198
translatedAggregateValue,
207199
groupingKeyAlias,
208200
aggregateAlias,
209-
completeResultValue,
210-
isCreateDefaultOnEmpty);
201+
completeResultValue);
211202
}
212203

213204
@Nonnull
@@ -218,8 +209,7 @@ public RecordQueryStreamingAggregationPlan withChild(@Nonnull final Reference ch
218209
aggregateValue,
219210
groupingKeyAlias,
220211
aggregateAlias,
221-
completeResultValue,
222-
isCreateDefaultOnEmpty);
212+
completeResultValue);
223213
}
224214

225215
@Nonnull
@@ -371,7 +361,7 @@ public PRecordQueryStreamingAggregationPlan toProto(@Nonnull final PlanSerializa
371361
builder.setGroupingKeyAlias(groupingKeyAlias.getId())
372362
.setAggregateAlias(aggregateAlias.getId())
373363
.setCompleteResultValue(completeResultValue.toValueProto(serializationContext))
374-
.setIsCreateDefaultOnEmpty(isCreateDefaultOnEmpty);
364+
.setIsCreateDefaultOnEmpty(false);
375365
return builder.build();
376366
}
377367

@@ -396,7 +386,10 @@ public static RecordQueryStreamingAggregationPlan fromProto(@Nonnull final PlanS
396386
final CorrelationIdentifier aggregateAlias = CorrelationIdentifier.of(Objects.requireNonNull(recordQueryStreamingAggregationPlanProto.getAggregateAlias()));
397387
final Value completeResultValue = Value.fromValueProto(serializationContext, Objects.requireNonNull(recordQueryStreamingAggregationPlanProto.getCompleteResultValue()));
398388
final boolean isCreateDefaultOnEmpty = recordQueryStreamingAggregationPlanProto.hasIsCreateDefaultOnEmpty() ? recordQueryStreamingAggregationPlanProto.getIsCreateDefaultOnEmpty() : true;
399-
return new RecordQueryStreamingAggregationPlan(inner, groupingKeyValue, aggregateValue, groupingKeyAlias, aggregateAlias, completeResultValue, isCreateDefaultOnEmpty);
389+
if (isCreateDefaultOnEmpty) {
390+
throw new RecordCoreArgumentException("cannot create streaming aggregate plan with default value on empty");
391+
}
392+
return new RecordQueryStreamingAggregationPlan(inner, groupingKeyValue, aggregateValue, groupingKeyAlias, aggregateAlias, completeResultValue);
400393
}
401394

402395
@Nonnull
@@ -428,7 +421,7 @@ public static RecordQueryStreamingAggregationPlan of(@Nonnull final Quantifier.P
428421
final var referencedAggregateValue = ObjectValue.of(aggregateAlias, aggregateValue.getResultType());
429422

430423
return new RecordQueryStreamingAggregationPlan(inner, groupingKeyValue, aggregateValue, groupingKeyAlias, aggregateAlias,
431-
resultValueFunction.apply(referencedGroupingKeyValue, referencedAggregateValue), false);
424+
resultValueFunction.apply(referencedGroupingKeyValue, referencedAggregateValue));
432425
}
433426

434427
/**

0 commit comments

Comments
 (0)