diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index b1a203616b120..f7c56d1e50c07 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -587,7 +587,6 @@ TranslogWriter createWriter( primaryTermSupplier.getAsLong(), tragedy, persistedSequenceNumberConsumer, - bigArrays, diskIoBufferPool, operationListener, operationAsserter, @@ -607,9 +606,9 @@ TranslogWriter createWriter( * @throws IOException if adding the operation to the translog resulted in an I/O exception */ public Location add(final Operation operation) throws IOException { - try (ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays)) { + ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays); + try { writeOperationWithSize(out, operation); - final BytesReference bytes = out.bytes(); readLock.lock(); try { ensureOpen(); @@ -630,7 +629,9 @@ public Location add(final Operation operation) throws IOException { + "]" ); } - return current.add(bytes, operation.seqNo()); + var res = current.add(out, operation.seqNo()); + out = null; + return res; } finally { readLock.unlock(); } @@ -640,6 +641,10 @@ public Location add(final Operation operation) throws IOException { } catch (final Exception ex) { closeOnTragicEvent(ex); throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", ex); + } finally { + if (out != null) { + out.close(); + } } } @@ -1964,7 +1969,6 @@ public static String createEmptyTranslog( seqNo -> { throw new UnsupportedOperationException(); }, - BigArrays.NON_RECYCLING_INSTANCE, DiskIoBufferPool.INSTANCE, TranslogConfig.NOOP_OPERATION_LISTENER, TranslogOperationAsserter.DEFAULT, diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 36b6709661017..2c19c4430dfbf 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -19,7 +19,6 @@ import org.elasticsearch.common.io.DiskIoBufferPool; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.IOUtils; @@ -51,7 +50,6 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { private final ShardId shardId; private final FileChannel checkpointChannel; private final Path checkpointPath; - private final BigArrays bigArrays; // the last checkpoint that was written when the translog was last synced private volatile Checkpoint lastSyncedCheckpoint; /* the number of translog operations written to this file */ @@ -107,7 +105,6 @@ private TranslogWriter( TranslogHeader header, TragicExceptionHolder tragedy, LongConsumer persistedSequenceNumberConsumer, - BigArrays bigArrays, DiskIoBufferPool diskIoBufferPool, OperationListener operationListener, TranslogOperationAsserter operationAsserter, @@ -134,7 +131,6 @@ private TranslogWriter( assert initialCheckpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : initialCheckpoint.trimmedAboveSeqNo; this.globalCheckpointSupplier = globalCheckpointSupplier; this.persistedSequenceNumberConsumer = persistedSequenceNumberConsumer; - this.bigArrays = bigArrays; this.diskIoBufferPool = diskIoBufferPool; this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null; this.tragedy = tragedy; @@ -158,7 +154,6 @@ public static TranslogWriter create( long primaryTerm, TragicExceptionHolder tragedy, LongConsumer persistedSequenceNumberConsumer, - BigArrays bigArrays, DiskIoBufferPool diskIoBufferPool, OperationListener operationListener, TranslogOperationAsserter operationAsserter, @@ -203,7 +198,6 @@ public static TranslogWriter create( header, tragedy, persistedSequenceNumberConsumer, - bigArrays, diskIoBufferPool, operationListener, operationAsserter, @@ -235,7 +229,7 @@ private synchronized void closeWithTragicEvent(final Exception ex) { * @return the location the bytes were written to * @throws IOException if writing to the translog resulted in an I/O exception */ - public Translog.Location add(final BytesReference data, final long seqNo) throws IOException { + public Translog.Location add(final ReleasableBytesStreamOutput data, final long seqNo) throws IOException { long bufferedBytesBeforeAdd = this.bufferedBytes; if (bufferedBytesBeforeAdd >= forceWriteThreshold) { writeBufferedOps(Long.MAX_VALUE, bufferedBytesBeforeAdd >= forceWriteThreshold * 4); @@ -244,13 +238,18 @@ public Translog.Location add(final BytesReference data, final long seqNo) throws final Translog.Location location; synchronized (this) { ensureOpen(); + int len = data.size(); + final BytesReference bytes; if (buffer == null) { - buffer = new ReleasableBytesStreamOutput(bigArrays); + assert bufferedBytes == 0; + buffer = data; + bytes = data.bytes(); + } else { + assert bufferedBytes == buffer.size(); + data.bytes().writeTo(buffer); + data.close(); + bytes = buffer.bytes().slice((int) bufferedBytes, len); } - assert bufferedBytes == buffer.size(); - final long offset = totalOffset; - totalOffset += data.length(); - data.writeTo(buffer); assert minSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0; assert maxSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0; @@ -262,10 +261,12 @@ public Translog.Location add(final BytesReference data, final long seqNo) throws operationCounter++; - assert assertNoSeqNumberConflict(seqNo, data); + assert assertNoSeqNumberConflict(seqNo, bytes); - location = new Translog.Location(generation, offset, data.length()); - operationListener.operationAdded(data, seqNo, location); + final long offset = totalOffset; + totalOffset = offset + len; + location = new Translog.Location(generation, offset, len); + operationListener.operationAdded(bytes, seqNo, location); bufferedBytes = buffer.size(); } @@ -554,7 +555,7 @@ private synchronized ReleasableBytesReference pollOpsToWrite() { } private void writeAndReleaseOps(ReleasableBytesReference toWrite) throws IOException { - try (ReleasableBytesReference toClose = toWrite) { + try { assert writeLock.isHeldByCurrentThread(); final int length = toWrite.length(); if (length == 0) { @@ -585,16 +586,22 @@ private void writeAndReleaseOps(ReleasableBytesReference toWrite) throws IOExcep } } } + toWrite.close(); + toWrite = null; ioBuffer.flip(); writeToFile(ioBuffer); + } finally { + if (toWrite != null) { + toWrite.close(); + } } } @SuppressForbidden(reason = "Channel#write") private void writeToFile(ByteBuffer ioBuffer) throws IOException { - while (ioBuffer.remaining() > 0) { + do { channel.write(ioBuffer); - } + } while (ioBuffer.hasRemaining()); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index 9aa847c837e96..39282755f3fe5 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -11,9 +11,6 @@ import org.apache.lucene.store.ByteArrayDataOutput; import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.ReleasableBytesReference; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Tuple; @@ -29,6 +26,7 @@ import java.util.ArrayList; import java.util.List; +import static org.elasticsearch.index.translog.TranslogTests.wrapAsReleasableOutput; import static org.hamcrest.Matchers.equalTo; public class TranslogDeletionPolicyTests extends ESTestCase { @@ -93,7 +91,6 @@ private Tuple, TranslogWriter> createReadersAndWriter() thr randomNonNegativeLong(), new TragicExceptionHolder(), seqNo -> {}, - BigArrays.NON_RECYCLING_INSTANCE, TranslogTests.RANDOMIZING_IO_BUFFERS, TranslogConfig.NOOP_OPERATION_LISTENER, TranslogOperationAsserter.DEFAULT, @@ -106,7 +103,7 @@ private Tuple, TranslogWriter> createReadersAndWriter() thr for (int ops = randomIntBetween(0, 20); ops > 0; ops--) { out.reset(bytes); out.writeInt(ops); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), ops); + writer.add(wrapAsReleasableOutput(bytes), ops); } } return new Tuple<>(readers, writer); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 69cf9d856ed4f..b81805fcfb220 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -34,10 +34,10 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.io.DiskIoBufferPool; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -1339,7 +1339,7 @@ public void testTranslogWriter() throws IOException { if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { seenSeqNos.add(seqNo); } - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), seqNo); + writer.add(wrapAsReleasableOutput(bytes), seqNo); } assertThat(persistedSeqNos, empty()); writer.sync(); @@ -1364,7 +1364,7 @@ public void testTranslogWriter() throws IOException { byte[] bytes = new byte[4]; DataOutput out = EndiannessReverserUtil.wrapDataOutput(new ByteArrayDataOutput(bytes)); out.writeInt(2048); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong()); + writer.add(wrapAsReleasableOutput(bytes), randomNonNegativeLong()); if (reader instanceof TranslogReader) { ByteBuffer buffer = ByteBuffer.allocate(4); @@ -1387,6 +1387,16 @@ public void testTranslogWriter() throws IOException { IOUtils.close(writer); } + static ReleasableBytesStreamOutput wrapAsReleasableOutput(byte[] bytes) { + var out = new ReleasableBytesStreamOutput(NON_RECYCLING_INSTANCE); + try { + out.write(bytes); + } catch (IOException e) { + throw new AssertionError(e); + } + return out; + } + public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException { Path tempDir = createTempDir(); final TranslogConfig temp = getTranslogConfig(tempDir); @@ -1460,16 +1470,15 @@ ChannelFactory getChannelFactory() { TranslogWriter writer = translog.getCurrent(); int initialWriteCalls = writeCalls.get(); byte[] bytes = new byte[256]; - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 1); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 2); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 3); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 4); + writer.add(wrapAsReleasableOutput(bytes), 1); + writer.add(wrapAsReleasableOutput(bytes), 2); + writer.add(wrapAsReleasableOutput(bytes), 3); + writer.add(wrapAsReleasableOutput(bytes), 4); assertThat(persistedSeqNos, empty()); - assertEquals(initialWriteCalls, writeCalls.get()); if (randomBoolean()) { // Since the buffer is full, this will flush before performing the add. - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 5); + writer.add(wrapAsReleasableOutput(bytes), 5); assertThat(persistedSeqNos, empty()); assertThat(writeCalls.get(), greaterThan(initialWriteCalls)); } else { @@ -1479,7 +1488,7 @@ ChannelFactory getChannelFactory() { assertThat(writeCalls.get(), greaterThan(initialWriteCalls)); // Add after we the read flushed the buffer - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 5); + writer.add(wrapAsReleasableOutput(bytes), 5); } writer.sync(); @@ -1578,7 +1587,7 @@ ChannelFactory getChannelFactory() { byte[] bytes = new byte[4]; DataOutput out = EndiannessReverserUtil.wrapDataOutput(new ByteArrayDataOutput(new byte[4])); out.writeInt(1); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 1); + writer.add(wrapAsReleasableOutput(bytes), 1); assertThat(persistedSeqNos, empty()); startBlocking.set(true); Thread thread = new Thread(() -> { @@ -1592,7 +1601,7 @@ ChannelFactory getChannelFactory() { writeStarted.await(); // Add will not block even though we are currently writing/syncing - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 2); + writer.add(wrapAsReleasableOutput(bytes), 2); blocker.countDown(); // Sync against so that both operations are written @@ -1693,7 +1702,7 @@ public void testCloseIntoReader() throws IOException { final byte[] bytes = new byte[4]; final DataOutput out = EndiannessReverserUtil.wrapDataOutput(new ByteArrayDataOutput(bytes)); out.writeInt(i); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong()); + writer.add(wrapAsReleasableOutput(bytes), randomNonNegativeLong()); } writer.sync(); final Checkpoint writerCheckpoint = writer.getCheckpoint();