Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ public CompletableFuture<Void> trim(RecordOffset offset) {
return wal.trim(offset);
}

@Override
public CompletableFuture<Void> truncateTail(RecordOffset offset) {
return wal.truncateTail(offset);
}

private CompletableFuture<? extends WriteAheadLog> buildRecoverWal(String kraftWalConfigs, long oldNodeEpoch) {
IdURI uri = IdURI.parse(kraftWalConfigs);
CompletableFuture<Void> cf = walHandle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,11 @@ private void trim0(long newStartOffset, CompletableFuture<Void> cf) {
}, generalCallbackExecutors);
}

@Override
public CompletableFuture<Void> truncateTail(long newNextOffset) {
return failureHandle(stream.truncateTail(newNextOffset).thenApplyAsync(nil -> nil, streamManagerCallbackExecutors));
}

@Override
public CompletableFuture<Void> close() {
return failureHandle(stream.close().thenApplyAsync(nil -> nil, streamManagerCallbackExecutors));
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/kafka/log/streamaspect/LazyStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ public CompletableFuture<Void> trim(long newStartOffset) {
return inner.trim(newStartOffset);
}

@Override
public CompletableFuture<Void> truncateTail(long newNextOffset) {
return inner.truncateTail(newNextOffset);
}

@Override
public CompletableFuture<FetchResult> fetch(FetchContext context, long startOffset, long endOffset, int maxBytesHint) {
return inner.fetch(context, startOffset, endOffset, maxBytesHint);
Expand Down Expand Up @@ -228,6 +233,11 @@ public CompletableFuture<Void> trim(long newStartOffset) {
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> truncateTail(long newNextOffset) {
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<FetchResult> fetch(FetchContext context, long startOffset, long endOffset, int maxBytesHint) {
return CompletableFuture.completedFuture(Collections::emptyList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,13 @@ public CompletableFuture<Void> trim(long newStartOffset) {
return CompletableFuture.completedFuture(null);
}

@Override
public synchronized CompletableFuture<Void> truncateTail(long newNextOffset) {
recordMap = new ConcurrentSkipListMap<>(recordMap.headMap(newNextOffset, false));
nextOffsetAlloc.updateAndGet(current -> Math.min(current, newNextOffset));
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> close() {
return CompletableFuture.completedFuture(null);
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/kafka/log/streamaspect/MetaStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ public CompletableFuture<Void> trim(long newStartOffset) {
return innerStream.trim(newStartOffset);
}

@Override
public CompletableFuture<Void> truncateTail(long newNextOffset) {
metaCache.entrySet().removeIf(entry -> entry.getValue().offset >= newNextOffset);
return innerStream.truncateTail(newNextOffset);
}

@Override
public CompletableFuture<Void> close() {
if (compactionFuture != null) {
Expand Down
8 changes: 8 additions & 0 deletions s3stream/src/main/java/com/automq/stream/api/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ default CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, i
*/
CompletableFuture<Void> trim(long newStartOffset);

/**
* Truncate the tail of the stream so that subsequent appends start from {@code newNextOffset}.
*
* @param newNextOffset new next offset after truncation
* @return future completing when truncation finishes
*/
CompletableFuture<Void> truncateTail(long newNextOffset);

/**
* Close the stream.
*/
Expand Down
129 changes: 111 additions & 18 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ static RecoveryBlockResult recoverContinuousRecords(
) {
RecordOffset logEndOffset = null;
Map<Long, Long> streamNextOffsets = new HashMap<>();
Map<Long, Queue<StreamRecordBatch>> streamDiscontinuousRecords = new HashMap<>();
Map<Long, Queue<RecoverRecord>> streamDiscontinuousRecords = new HashMap<>();
LogCache.LogCacheBlock cacheBlock = new LogCache.LogCacheBlock(maxCacheSize);

boolean first = true;
Expand All @@ -249,11 +249,11 @@ static RecoveryBlockResult recoverContinuousRecords(
first = false;
}
StreamRecordBatch streamRecordBatch = recoverResult.record();
processRecoveredRecord(streamRecordBatch, openingStreamEndOffsets, streamDiscontinuousRecords, cacheBlock, streamNextOffsets, logger);
processRecoveredRecord(streamRecordBatch, recoverResult.recordOffset(), openingStreamEndOffsets, streamDiscontinuousRecords, cacheBlock, streamNextOffsets, logger);
}
} catch (Throwable e) {
// {@link RuntimeIOException} may be thrown by {@code it.next()}
releaseAllRecords(streamDiscontinuousRecords.values());
releaseRecoverRecords(streamDiscontinuousRecords.values());
releaseAllRecords(cacheBlock.records().values());
throw e;
}
Expand All @@ -278,8 +278,9 @@ static RecoveryBlockResult recoverContinuousRecords(
*/
private static void processRecoveredRecord(
StreamRecordBatch streamRecordBatch,
RecordOffset recordOffset,
Map<Long, Long> openingStreamEndOffsets,
Map<Long, Queue<StreamRecordBatch>> streamDiscontinuousRecords,
Map<Long, Queue<RecoverRecord>> streamDiscontinuousRecords,
LogCache.LogCacheBlock cacheBlock,
Map<Long, Long> streamNextOffsets,
Logger logger
Expand All @@ -294,27 +295,27 @@ private static void processRecoveredRecord(
}

Long expectedNextOffset = streamNextOffsets.get(streamId);
Queue<StreamRecordBatch> discontinuousRecords = streamDiscontinuousRecords.get(streamId);
Queue<RecoverRecord> discontinuousRecords = streamDiscontinuousRecords.get(streamId);
boolean isContinuous = expectedNextOffset == null || expectedNextOffset == streamRecordBatch.getBaseOffset();
if (!isContinuous) {
// unexpected record, put it into discontinuous records queue.
if (discontinuousRecords == null) {
discontinuousRecords = new PriorityQueue<>(Comparator.comparingLong(StreamRecordBatch::getBaseOffset));
discontinuousRecords = new PriorityQueue<>(Comparator.comparingLong(r -> r.record.getBaseOffset()));
streamDiscontinuousRecords.put(streamId, discontinuousRecords);
}
discontinuousRecords.add(streamRecordBatch);
discontinuousRecords.add(new RecoverRecord(streamRecordBatch, recordOffset));
return;
}
// continuous record, put it into cache, and check if there is any historical discontinuous records can be polled.
cacheBlock.put(streamRecordBatch);
cacheBlock.put(streamRecordBatch, recordOffset);
expectedNextOffset = maybePollDiscontinuousRecords(streamRecordBatch, cacheBlock, discontinuousRecords, logger);
streamNextOffsets.put(streamId, expectedNextOffset);
}

private static long maybePollDiscontinuousRecords(
StreamRecordBatch streamRecordBatch,
LogCache.LogCacheBlock cacheBlock,
Queue<StreamRecordBatch> discontinuousRecords,
Queue<RecoverRecord> discontinuousRecords,
Logger logger
) {
long expectedNextOffset = streamRecordBatch.getLastOffset();
Expand All @@ -323,25 +324,28 @@ private static long maybePollDiscontinuousRecords(
}
// check and poll historical discontinuous records.
while (!discontinuousRecords.isEmpty()) {
StreamRecordBatch peek = discontinuousRecords.peek();
if (peek.getBaseOffset() != expectedNextOffset) {
RecoverRecord peek = discontinuousRecords.peek();
if (peek.record.getBaseOffset() != expectedNextOffset) {
break;
}
// should never happen, log it.
logger.error("[BUG] recover an out of order record, streamId={}, expectedNextOffset={}, record={}", streamRecordBatch.getStreamId(), expectedNextOffset, peek);
logger.error("[BUG] recover an out of order record, streamId={}, expectedNextOffset={}, record={}", streamRecordBatch.getStreamId(), expectedNextOffset, peek.record);
discontinuousRecords.poll();
cacheBlock.put(peek);
expectedNextOffset = peek.getLastOffset();
cacheBlock.put(peek.record, peek.walOffset);
expectedNextOffset = peek.record.getLastOffset();
}
return expectedNextOffset;
}

private static void releaseDiscontinuousRecords(Map<Long, Queue<StreamRecordBatch>> streamDiscontinuousRecords,
private static void releaseDiscontinuousRecords(Map<Long, Queue<RecoverRecord>> streamDiscontinuousRecords,
Logger logger) {
streamDiscontinuousRecords.values().stream()
.filter(q -> !q.isEmpty())
.peek(q -> logger.info("drop discontinuous records, records={}", q))
.forEach(S3Storage::releaseRecords);
.forEach(queue -> {
queue.forEach(record -> record.record.release());
queue.clear();
});
}

/**
Expand Down Expand Up @@ -375,7 +379,7 @@ private static RecoveryBlockResult filterOutInvalidStreams(LogCache.LogCacheBloc
LogCache.LogCacheBlock newCacheBlock = new LogCache.LogCacheBlock(1024L * 1024 * 1024);
cacheBlock.records().forEach((streamId, records) -> {
if (!invalidStreams.contains(streamId)) {
records.forEach(newCacheBlock::put);
records.forEach(record -> newCacheBlock.put(record, null));
} else {
// release invalid records.
releaseRecords(records);
Expand All @@ -388,6 +392,13 @@ private static void releaseAllRecords(Collection<? extends Collection<StreamReco
allRecords.forEach(S3Storage::releaseRecords);
}

private static void releaseRecoverRecords(Collection<Queue<RecoverRecord>> allRecords) {
allRecords.forEach(queue -> {
queue.forEach(record -> record.record.release());
queue.clear();
});
}

private static void releaseRecords(Collection<StreamRecordBatch> records) {
records.forEach(StreamRecordBatch::release);
}
Expand Down Expand Up @@ -648,6 +659,78 @@ public LogCache snapshotReadCache() {
return snapshotReadCache;
}

@Override
public CompletableFuture<Void> truncateTail(long streamId, long newNextOffset) {
if (streamId < 0) {
return CompletableFuture.failedFuture(new IllegalArgumentException("streamId must be non-negative"));
}
if (newNextOffset < 0) {
return CompletableFuture.failedFuture(new IllegalArgumentException("newNextOffset must be non-negative"));
}
try {
RecordOffset walOffset = prepareTruncateTail(streamId, newNextOffset);
if (walOffset == null) {
return CompletableFuture.completedFuture(null);
}
return deltaWAL.truncateTail(walOffset);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}

private RecordOffset prepareTruncateTail(long streamId, long newNextOffset) {
synchronized (this) {
validateTruncateTailState(streamId);
cancelBackoffRecords(streamId, newNextOffset);
RecordOffset walOffset = selectFirstRemovedWalOffset(streamId, newNextOffset);
if (walOffset != null) {
deltaWALCache.setLastRecordOffset(walOffset);
}
return walOffset;
}
}

private void validateTruncateTailState(long streamId) {
if (!walPrepareQueue.isEmpty() || !walCommitQueue.isEmpty()) {
throw new IllegalStateException("Cannot truncate tail when WAL upload tasks are pending");
}
boolean hasInflight = inflightWALUploadTasks.stream().anyMatch(ctx -> ctx.cache.containsStream(streamId));
if (hasInflight) {
throw new IllegalStateException("Cannot truncate tail while stream has inflight WAL uploads");
}
}

private void cancelBackoffRecords(long streamId, long newNextOffset) {
Iterator<WalWriteRequest> iterator = backoffRecords.iterator();
while (iterator.hasNext()) {
WalWriteRequest request = iterator.next();
StreamRecordBatch record = request.record;
if (record.getStreamId() >= 0 && record.getStreamId() == streamId && record.getBaseOffset() >= newNextOffset) {
iterator.remove();
request.cf.completeExceptionally(new IllegalStateException("Append cancelled due to truncate"));
record.release();
}
}
}

private RecordOffset selectFirstRemovedWalOffset(long streamId, long newNextOffset) {
Optional<LogCache.TruncateResult> deltaResult = deltaWALCache.truncateStreamRecords(streamId, newNextOffset);
Optional<LogCache.TruncateResult> snapshotResult = snapshotReadCache == null
? Optional.empty()
: snapshotReadCache.truncateStreamRecords(streamId, newNextOffset);
RecordOffset walOffset = deltaResult
.map(r -> r.firstRemovedWalOffset)
.filter(Objects::nonNull)
.orElse(null);
if (walOffset == null) {
walOffset = snapshotResult
.map(r -> r.firstRemovedWalOffset)
.filter(Objects::nonNull)
.orElse(null);
}
return walOffset;
}

@SuppressWarnings({"checkstyle:npathcomplexity"})
@WithSpan
private CompletableFuture<ReadDataBlock> read0(FetchContext context,
Expand Down Expand Up @@ -806,7 +889,7 @@ private void handleAppendCallback(WalWriteRequest request) {
private void handleAppendCallback0(WalWriteRequest request) {
final long startTime = System.nanoTime();
request.record.retain();
boolean full = deltaWALCache.put(request.record);
boolean full = deltaWALCache.put(request.record, request.offset);
deltaWALCache.setLastRecordOffset(request.offset);
if (full) {
// cache block is full, trigger WAL upload.
Expand Down Expand Up @@ -1062,6 +1145,16 @@ public RecoveryBlockResult(LogCache.LogCacheBlock cacheBlock, RuntimeException e
}
}

static class RecoverRecord {
final StreamRecordBatch record;
final RecordOffset walOffset;

RecoverRecord(StreamRecordBatch record, RecordOffset walOffset) {
this.record = record;
this.walOffset = walOffset;
}
}

public static class LazyCommit {
final CompletableFuture<Void> cf = new CompletableFuture<>();
final long lazyLingerMs;
Expand Down
Loading
Loading