Skip to content

Speed up translog writes by moving buffers where possible #127836

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,6 @@ TranslogWriter createWriter(
primaryTermSupplier.getAsLong(),
tragedy,
persistedSequenceNumberConsumer,
bigArrays,
diskIoBufferPool,
operationListener,
operationAsserter,
Expand All @@ -607,9 +606,9 @@ TranslogWriter createWriter(
* @throws IOException if adding the operation to the translog resulted in an I/O exception
*/
public Location add(final Operation operation) throws IOException {
try (ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays)) {
ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
try {
writeOperationWithSize(out, operation);
final BytesReference bytes = out.bytes();
readLock.lock();
try {
ensureOpen();
Expand All @@ -630,7 +629,9 @@ public Location add(final Operation operation) throws IOException {
+ "]"
);
}
return current.add(bytes, operation.seqNo());
var res = current.add(out, operation.seqNo());
out = null;
return res;
} finally {
readLock.unlock();
}
Expand All @@ -640,6 +641,10 @@ public Location add(final Operation operation) throws IOException {
} catch (final Exception ex) {
closeOnTragicEvent(ex);
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", ex);
} finally {
if (out != null) {
out.close();
}
}
}

Expand Down Expand Up @@ -1964,7 +1969,6 @@ public static String createEmptyTranslog(
seqNo -> {
throw new UnsupportedOperationException();
},
BigArrays.NON_RECYCLING_INSTANCE,
DiskIoBufferPool.INSTANCE,
TranslogConfig.NOOP_OPERATION_LISTENER,
TranslogOperationAsserter.DEFAULT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.common.io.DiskIoBufferPool;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.IOUtils;
Expand Down Expand Up @@ -51,7 +50,6 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
private final ShardId shardId;
private final FileChannel checkpointChannel;
private final Path checkpointPath;
private final BigArrays bigArrays;
// the last checkpoint that was written when the translog was last synced
private volatile Checkpoint lastSyncedCheckpoint;
/* the number of translog operations written to this file */
Expand Down Expand Up @@ -107,7 +105,6 @@ private TranslogWriter(
TranslogHeader header,
TragicExceptionHolder tragedy,
LongConsumer persistedSequenceNumberConsumer,
BigArrays bigArrays,
DiskIoBufferPool diskIoBufferPool,
OperationListener operationListener,
TranslogOperationAsserter operationAsserter,
Expand All @@ -134,7 +131,6 @@ private TranslogWriter(
assert initialCheckpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : initialCheckpoint.trimmedAboveSeqNo;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.persistedSequenceNumberConsumer = persistedSequenceNumberConsumer;
this.bigArrays = bigArrays;
this.diskIoBufferPool = diskIoBufferPool;
this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null;
this.tragedy = tragedy;
Expand All @@ -158,7 +154,6 @@ public static TranslogWriter create(
long primaryTerm,
TragicExceptionHolder tragedy,
LongConsumer persistedSequenceNumberConsumer,
BigArrays bigArrays,
DiskIoBufferPool diskIoBufferPool,
OperationListener operationListener,
TranslogOperationAsserter operationAsserter,
Expand Down Expand Up @@ -203,7 +198,6 @@ public static TranslogWriter create(
header,
tragedy,
persistedSequenceNumberConsumer,
bigArrays,
diskIoBufferPool,
operationListener,
operationAsserter,
Expand Down Expand Up @@ -235,7 +229,7 @@ private synchronized void closeWithTragicEvent(final Exception ex) {
* @return the location the bytes were written to
* @throws IOException if writing to the translog resulted in an I/O exception
*/
public Translog.Location add(final BytesReference data, final long seqNo) throws IOException {
public Translog.Location add(final ReleasableBytesStreamOutput data, final long seqNo) throws IOException {
long bufferedBytesBeforeAdd = this.bufferedBytes;
if (bufferedBytesBeforeAdd >= forceWriteThreshold) {
writeBufferedOps(Long.MAX_VALUE, bufferedBytesBeforeAdd >= forceWriteThreshold * 4);
Expand All @@ -244,13 +238,18 @@ public Translog.Location add(final BytesReference data, final long seqNo) throws
final Translog.Location location;
synchronized (this) {
ensureOpen();
int len = data.size();
final BytesReference bytes;
if (buffer == null) {
buffer = new ReleasableBytesStreamOutput(bigArrays);
assert bufferedBytes == 0;
buffer = data;
bytes = data.bytes();
} else {
assert bufferedBytes == buffer.size();
data.bytes().writeTo(buffer);
data.close();
bytes = buffer.bytes().slice((int) bufferedBytes, len);
}
assert bufferedBytes == buffer.size();
final long offset = totalOffset;
totalOffset += data.length();
data.writeTo(buffer);

assert minSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0;
assert maxSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0;
Expand All @@ -262,10 +261,12 @@ public Translog.Location add(final BytesReference data, final long seqNo) throws

operationCounter++;

assert assertNoSeqNumberConflict(seqNo, data);
assert assertNoSeqNumberConflict(seqNo, bytes);

location = new Translog.Location(generation, offset, data.length());
operationListener.operationAdded(data, seqNo, location);
final long offset = totalOffset;
totalOffset = offset + len;
location = new Translog.Location(generation, offset, len);
operationListener.operationAdded(bytes, seqNo, location);
bufferedBytes = buffer.size();
}

Expand Down Expand Up @@ -554,7 +555,7 @@ private synchronized ReleasableBytesReference pollOpsToWrite() {
}

private void writeAndReleaseOps(ReleasableBytesReference toWrite) throws IOException {
try (ReleasableBytesReference toClose = toWrite) {
try {
assert writeLock.isHeldByCurrentThread();
final int length = toWrite.length();
if (length == 0) {
Expand Down Expand Up @@ -585,16 +586,22 @@ private void writeAndReleaseOps(ReleasableBytesReference toWrite) throws IOExcep
}
}
}
toWrite.close();
toWrite = null;
ioBuffer.flip();
writeToFile(ioBuffer);
} finally {
if (toWrite != null) {
toWrite.close();
}
}
}

@SuppressForbidden(reason = "Channel#write")
private void writeToFile(ByteBuffer ioBuffer) throws IOException {
while (ioBuffer.remaining() > 0) {
do {
channel.write(ioBuffer);
}
} while (ioBuffer.hasRemaining());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@

import org.apache.lucene.store.ByteArrayDataOutput;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Tuple;
Expand All @@ -29,6 +26,7 @@
import java.util.ArrayList;
import java.util.List;

import static org.elasticsearch.index.translog.TranslogTests.wrapAsReleasableOutput;
import static org.hamcrest.Matchers.equalTo;

public class TranslogDeletionPolicyTests extends ESTestCase {
Expand Down Expand Up @@ -93,7 +91,6 @@ private Tuple<List<TranslogReader>, TranslogWriter> createReadersAndWriter() thr
randomNonNegativeLong(),
new TragicExceptionHolder(),
seqNo -> {},
BigArrays.NON_RECYCLING_INSTANCE,
TranslogTests.RANDOMIZING_IO_BUFFERS,
TranslogConfig.NOOP_OPERATION_LISTENER,
TranslogOperationAsserter.DEFAULT,
Expand All @@ -106,7 +103,7 @@ private Tuple<List<TranslogReader>, TranslogWriter> createReadersAndWriter() thr
for (int ops = randomIntBetween(0, 20); ops > 0; ops--) {
out.reset(bytes);
out.writeInt(ops);
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), ops);
writer.add(wrapAsReleasableOutput(bytes), ops);
}
}
return new Tuple<>(readers, writer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.io.DiskIoBufferPool;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
Expand Down Expand Up @@ -1339,7 +1339,7 @@ public void testTranslogWriter() throws IOException {
if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
seenSeqNos.add(seqNo);
}
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), seqNo);
writer.add(wrapAsReleasableOutput(bytes), seqNo);
}
assertThat(persistedSeqNos, empty());
writer.sync();
Expand All @@ -1364,7 +1364,7 @@ public void testTranslogWriter() throws IOException {
byte[] bytes = new byte[4];
DataOutput out = EndiannessReverserUtil.wrapDataOutput(new ByteArrayDataOutput(bytes));
out.writeInt(2048);
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong());
writer.add(wrapAsReleasableOutput(bytes), randomNonNegativeLong());

if (reader instanceof TranslogReader) {
ByteBuffer buffer = ByteBuffer.allocate(4);
Expand All @@ -1387,6 +1387,16 @@ public void testTranslogWriter() throws IOException {
IOUtils.close(writer);
}

static ReleasableBytesStreamOutput wrapAsReleasableOutput(byte[] bytes) {
var out = new ReleasableBytesStreamOutput(NON_RECYCLING_INSTANCE);
try {
out.write(bytes);
} catch (IOException e) {
throw new AssertionError(e);
}
return out;
}

public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException {
Path tempDir = createTempDir();
final TranslogConfig temp = getTranslogConfig(tempDir);
Expand Down Expand Up @@ -1460,16 +1470,15 @@ ChannelFactory getChannelFactory() {
TranslogWriter writer = translog.getCurrent();
int initialWriteCalls = writeCalls.get();
byte[] bytes = new byte[256];
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 1);
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 2);
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 3);
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 4);
writer.add(wrapAsReleasableOutput(bytes), 1);
writer.add(wrapAsReleasableOutput(bytes), 2);
writer.add(wrapAsReleasableOutput(bytes), 3);
writer.add(wrapAsReleasableOutput(bytes), 4);
assertThat(persistedSeqNos, empty());
assertEquals(initialWriteCalls, writeCalls.get());

if (randomBoolean()) {
// Since the buffer is full, this will flush before performing the add.
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 5);
writer.add(wrapAsReleasableOutput(bytes), 5);
assertThat(persistedSeqNos, empty());
assertThat(writeCalls.get(), greaterThan(initialWriteCalls));
} else {
Expand All @@ -1479,7 +1488,7 @@ ChannelFactory getChannelFactory() {
assertThat(writeCalls.get(), greaterThan(initialWriteCalls));

// Add after we the read flushed the buffer
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 5);
writer.add(wrapAsReleasableOutput(bytes), 5);
}

writer.sync();
Expand Down Expand Up @@ -1578,7 +1587,7 @@ ChannelFactory getChannelFactory() {
byte[] bytes = new byte[4];
DataOutput out = EndiannessReverserUtil.wrapDataOutput(new ByteArrayDataOutput(new byte[4]));
out.writeInt(1);
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 1);
writer.add(wrapAsReleasableOutput(bytes), 1);
assertThat(persistedSeqNos, empty());
startBlocking.set(true);
Thread thread = new Thread(() -> {
Expand All @@ -1592,7 +1601,7 @@ ChannelFactory getChannelFactory() {
writeStarted.await();

// Add will not block even though we are currently writing/syncing
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 2);
writer.add(wrapAsReleasableOutput(bytes), 2);

blocker.countDown();
// Sync against so that both operations are written
Expand Down Expand Up @@ -1693,7 +1702,7 @@ public void testCloseIntoReader() throws IOException {
final byte[] bytes = new byte[4];
final DataOutput out = EndiannessReverserUtil.wrapDataOutput(new ByteArrayDataOutput(bytes));
out.writeInt(i);
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong());
writer.add(wrapAsReleasableOutput(bytes), randomNonNegativeLong());
}
writer.sync();
final Checkpoint writerCheckpoint = writer.getCheckpoint();
Expand Down