Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ private CompletableFuture<Void> handleStateAndDoBuildIndexAsync(boolean markRead

return AsyncUtil.whenAll(indexesToClear.stream().map(store::clearAndMarkIndexWriteOnly).collect(Collectors.toList()))
.thenCompose(vignore -> markIndexesWriteOnly(continuedBuild, store))
.thenCompose(vignore -> setIndexingTypeOrThrow(store, continuedBuild))
.thenCompose(vignore -> checkAndSetIndexingType(store, continuedBuild))
.thenApply(ignore -> true);
}), common.indexLogMessageKeyValues("IndexingBase::handleIndexingState")
).thenCompose(doIndex ->
Expand Down Expand Up @@ -343,7 +343,7 @@ public void enforceStampOverwrite() {

@Nonnull
@SuppressWarnings("PMD.CloseResource")
private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boolean continuedBuild) {
private CompletableFuture<Void> checkAndSetIndexingType(FDBRecordStore store, boolean continuedBuild) {
// continuedBuild is set if this session isn't a continuation of a previous indexing
IndexBuildProto.IndexBuildIndexingStamp indexingTypeStamp = getIndexingTypeStamp(store);
final IndexBuildProto.IndexBuildIndexingStamp.Method method = indexingTypeStamp.getMethod();
Expand All @@ -352,12 +352,12 @@ private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boo
method == IndexBuildProto.IndexBuildIndexingStamp.Method.SCRUB_REPAIR;
heartbeat = new IndexingHeartbeat(common.getIndexerId(), indexingTypeStamp.getMethod().toString(), common.config.getLeaseLengthMillis(), allowMutual);

return forEachTargetIndex(index -> setIndexingTypeOrThrow(store, continuedBuild, index, indexingTypeStamp)
return forEachTargetIndex(index -> checkAndSetIndexingType(store, continuedBuild, index, indexingTypeStamp)
.thenCompose(ignore -> updateHeartbeat(store, index)));
}

@Nonnull
private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boolean continuedBuild, Index index, IndexBuildProto.IndexBuildIndexingStamp newStamp) {
private CompletableFuture<Void> checkAndSetIndexingType(FDBRecordStore store, boolean continuedBuild, Index index, IndexBuildProto.IndexBuildIndexingStamp newStamp) {
if (forceStampOverwrite && !continuedBuild) {
// Fresh session + overwrite = no questions asked
store.saveIndexingTypeStamp(index, newStamp);
Expand All @@ -383,7 +383,7 @@ private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boo
}
if (isTypeStampBlocked(savedStamp) && !policy.shouldAllowUnblock(savedStamp.getBlockID())) {
// Indexing is blocked
throw newPartlyBuiltException(savedStamp, newStamp, index);
return failedFutureVoid(newPartlyBuiltException(savedStamp, newStamp, index));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I understand how the change here would change any behavior?
I think it's a bit cleaner, but everywhere that you were throwing, you now return, and anything above shouldn't really know whether it completed exceptionally because that was called or an exception was thrown, I think.

}
if (areSimilar(newStamp, savedStamp)) {
// Similar stamps, replace it
Expand All @@ -404,7 +404,7 @@ private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boo
throwUnlessNoRecordWasScanned(noRecordScanned, store, index, newStamp, savedStamp));
}
// fall down to exception
throw newPartlyBuiltException(savedStamp, newStamp, index);
return failedFutureVoid(newPartlyBuiltException(savedStamp, newStamp, index));
});
}

Expand All @@ -425,6 +425,12 @@ private static IndexBuildProto.IndexBuildIndexingStamp blocklessStampOf(IndexBui
.build();
}

private CompletableFuture<Void> failedFutureVoid(Throwable ex) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it would be worthwhile to put this in MoreAsyncUtil and make it generic (CompletableFuture<T>).
There's definitely other code doing this. (honestly I was a bit surprised it wasn't supported in CompletableFuture or already existing in MoreAsyncUtil)

CompletableFuture<Void> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(ex);
return failedFuture;
}

@Nonnull
private CompletableFuture<Void> throwAsByRecordsUnlessNoRecordWasScanned(boolean noRecordScanned,
FDBRecordStore store,
Expand All @@ -448,7 +454,7 @@ private CompletableFuture<Void> throwAsByRecordsUnlessNoRecordWasScanned(boolean
.toString());
}
final IndexBuildProto.IndexBuildIndexingStamp fakeSavedStamp = IndexingMultiTargetByRecords.compileSingleTargetLegacyIndexingTypeStamp();
throw newPartlyBuiltException(fakeSavedStamp, indexingTypeStamp, index);
return failedFutureVoid(newPartlyBuiltException(fakeSavedStamp, indexingTypeStamp, index));
}

@Nonnull
Expand All @@ -464,7 +470,7 @@ private CompletableFuture<Void> throwUnlessNoRecordWasScanned(boolean noRecordSc
return AsyncUtil.DONE;
}
// A force overwrite cannot be allowed when partly built
throw newPartlyBuiltException(savedStamp, indexingTypeStamp, index);
return failedFutureVoid(newPartlyBuiltException(savedStamp, indexingTypeStamp, index));
}

@Nonnull
Expand Down Expand Up @@ -990,7 +996,7 @@ public CompletableFuture<Void> rebuildIndexAsync(@Nonnull FDBRecordStore store)
IndexingRangeSet rangeSet = IndexingRangeSet.forIndexBuild(store, index);
return rangeSet.insertRangeAsync(null, null);
}))
.thenCompose(vignore -> setIndexingTypeOrThrow(store, false))
.thenCompose(vignore -> checkAndSetIndexingType(store, false))
.thenCompose(vignore -> rebuildIndexInternalAsync(store))
.whenComplete((ignore, ignoreEx) -> clearHeartbeats(store));
}
Expand Down
Loading