Skip to content

Commit 995fcc4

Browse files
committed
HADOOP-18296. Buffer slicing in checksum fs
Wire up TrackingByteBufferPool to vector read tests, with test suite AbstractContractVectoredReadTest adding test testBufferSlicing() to generate conditions which may trigger slicing. Only files which declare that they may slicing buffers are permitted to return buffers to the pool which aren't known about, or to close the pool with outstanding entries. So: no fix, just public declaration of behavior and test to verify that no other streams are doing it.
1 parent 86d6bac commit 995fcc4

File tree

5 files changed

+196
-30
lines changed

5 files changed

+196
-30
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public void initialize(URI name, Configuration conf) throws IOException {
5454
}
5555
final boolean checksum = conf.getBoolean(LOCAL_FS_VERIFY_CHECKSUM, true);
5656
setVerifyChecksum(checksum);
57-
LOG.debug("Checksum verification enabled: {}", checksum);
57+
LOG.debug("Checksum verification enabled={}", checksum);
5858

5959
}
6060

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -416,8 +416,8 @@ private void initiateRead() {
416416
public void completed(Integer result, Integer rangeIndex) {
417417
FileRange range = ranges.get(rangeIndex);
418418
ByteBuffer buffer = buffers[rangeIndex];
419-
LOG.debug("Completed read range {} into buffer {} outcome={} remaining={}"
420-
, range, System.identityHashCode(buffer), result, buffer.remaining());
419+
LOG.debug("Completed read range {} into buffer {} outcome={} remaining={}",
420+
range, System.identityHashCode(buffer), result, buffer.remaining());
421421
if (result == -1) {
422422
// no data was read back.
423423
failed(new EOFException("Read past End of File"), rangeIndex);

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java

Lines changed: 121 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,37 +16,38 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19+
1920
package org.apache.hadoop.fs.impl;
2021

2122
import java.nio.ByteBuffer;
2223
import java.util.HashMap;
2324
import java.util.Map;
2425
import java.util.Objects;
26+
import java.util.concurrent.atomic.AtomicInteger;
2527

2628
import org.slf4j.Logger;
2729
import org.slf4j.LoggerFactory;
2830

2931
import org.apache.hadoop.io.ByteBufferPool;
3032

3133
/**
32-
* A wrapper {@link ByteBufferPool} implementation that tracks whether all allocated buffers are released. It
33-
* throws the related exception at {@link #close()} if any buffer remains un-released. It also clears the buffers at
34-
* release so if they continued being used it'll generate errors.
34+
* A wrapper {@link ByteBufferPool} implementation that tracks whether all allocated buffers
35+
* are released.
36+
* <p>
37+
* It throws the related exception at {@link #close()} if any buffer remains un-released.
38+
* It also clears the buffers at release so if they continued being used it'll generate errors.
3539
* <p>
3640
* To be used for testing only.
3741
* <p>
38-
* The stacktraces of the allocation are not stored by default because it significantly decreases the unit test
39-
* execution performance. Configuring this class to log at DEBUG will trigger their collection.
42+
* The stacktraces of the allocation are not stored by default because
43+
* it can significantly decreases the unit test performance.
44+
* Configuring this class to log at DEBUG will trigger their collection.
4045
* @see ByteBufferAllocationStacktraceException
4146
* <p>
4247
* Adapted from Parquet class {@code org.apache.parquet.bytes.TrackingByteBufferAllocator}.
4348
*/
4449
public final class TrackingByteBufferPool implements ByteBufferPool, AutoCloseable {
4550

46-
/**
47-
48-
*/
49-
private static final boolean DEBUG = true;
5051
private static final Logger LOG = LoggerFactory.getLogger(TrackingByteBufferPool.class);
5152

5253
/**
@@ -60,10 +61,13 @@ public static TrackingByteBufferPool wrap(ByteBufferPool allocator) {
6061

6162
/**
6263
* Key for the tracker map.
64+
* This uses the identity hash code of the buffer as the hash code
65+
* for the map.
6366
*/
6467
private static class Key {
6568

6669
private final int hashCode;
70+
6771
private final ByteBuffer buffer;
6872

6973
Key(ByteBuffer buffer) {
@@ -105,7 +109,10 @@ private LeakDetectorHeapByteBufferPoolException(String msg, Throwable cause) {
105109
}
106110

107111
private LeakDetectorHeapByteBufferPoolException(
108-
String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
112+
String message,
113+
Throwable cause,
114+
boolean enableSuppression,
115+
boolean writableStackTrace) {
109116
super(message, cause, enableSuppression, writableStackTrace);
110117
}
111118
}
@@ -116,6 +123,9 @@ private LeakDetectorHeapByteBufferPoolException(
116123
public static final class ByteBufferAllocationStacktraceException
117124
extends LeakDetectorHeapByteBufferPoolException {
118125

126+
/**
127+
* Single stack trace instance to use when DEBUG is not enabled.
128+
*/
119129
private static final ByteBufferAllocationStacktraceException WITHOUT_STACKTRACE =
120130
new ByteBufferAllocationStacktraceException(false);
121131

@@ -134,9 +144,12 @@ private ByteBufferAllocationStacktraceException() {
134144
super("Allocation stacktrace of the first ByteBuffer:");
135145
}
136146

147+
/**
148+
* Private constructor to for the singleton {@link #WITHOUT_STACKTRACE},
149+
* telling develoers how to see a trace per buffer.
150+
*/
137151
private ByteBufferAllocationStacktraceException(boolean unused) {
138-
super(
139-
"Log org.apache.hadoop.fs.impl.TrackingByteBufferPool at DEBUG for full stack traces",
152+
super("Log org.apache.hadoop.fs.impl.TrackingByteBufferPool at DEBUG for stack traces",
140153
null,
141154
false,
142155
false);
@@ -147,20 +160,36 @@ private ByteBufferAllocationStacktraceException(boolean unused) {
147160
* Exception raised in {@link TrackingByteBufferPool#putBuffer(ByteBuffer)} if the
148161
* buffer to release was not in the hash map.
149162
*/
150-
public static final class ReleasingUnallocatedByteBufferException extends LeakDetectorHeapByteBufferPoolException {
163+
public static final class ReleasingUnallocatedByteBufferException
164+
extends LeakDetectorHeapByteBufferPoolException {
151165

152-
private ReleasingUnallocatedByteBufferException() {
153-
super("Releasing a ByteBuffer instance that is not allocated by this buffer pool or already been released");
166+
private ReleasingUnallocatedByteBufferException(final ByteBuffer b) {
167+
super(String.format("Releasing a ByteBuffer instance that is not allocated"
168+
+ " by this buffer pool or already been released: %s size %d", b, b.capacity()));
154169
}
155170
}
156171

157172
/**
158-
* Exception raised in {@link TrackingByteBufferPool#close()} if there was an unreleased buffer.
173+
* Exception raised in {@link TrackingByteBufferPool#close()} if there
174+
* was an unreleased buffer.
159175
*/
160-
public static class LeakedByteBufferException extends LeakDetectorHeapByteBufferPoolException {
176+
public static final class LeakedByteBufferException
177+
extends LeakDetectorHeapByteBufferPoolException {
178+
179+
private final int count;
161180

162181
private LeakedByteBufferException(int count, ByteBufferAllocationStacktraceException e) {
163-
super(count + " ByteBuffer object(s) is/are remained unreleased after closing this buffer pool.", e);
182+
super(count + " ByteBuffer object(s) is/are remained unreleased"
183+
+ " after closing this buffer pool.", e);
184+
this.count = count;
185+
}
186+
187+
/**
188+
* Get the number of unreleased buffers.
189+
* @return number of unreleased buffers
190+
*/
191+
public int getCount() {
192+
return count;
164193
}
165194
}
166195

@@ -170,13 +199,28 @@ private LeakedByteBufferException(int count, ByteBufferAllocationStacktraceExcep
170199
* The key maps by the object id of the buffer, and refers to either a common stack trace
171200
* or one dynamically created for each allocation.
172201
*/
173-
private final Map<Key, ByteBufferAllocationStacktraceException> allocated = new HashMap<>();
202+
private final Map<Key, ByteBufferAllocationStacktraceException> allocated =
203+
new HashMap<>();
174204

175205
/**
176206
* Wrapped buffer pool.
177207
*/
178208
private final ByteBufferPool allocator;
179209

210+
/**
211+
* Number of buffer allocations.
212+
* <p>
213+
* This is incremented in {@link #getBuffer(boolean, int)}.
214+
*/
215+
private final AtomicInteger bufferAllocations = new AtomicInteger();
216+
217+
/**
218+
* Number of buffer releases.
219+
* <p>
220+
* This is incremented in {@link #putBuffer(ByteBuffer)}.
221+
*/
222+
private final AtomicInteger bufferReleases = new AtomicInteger();
223+
180224
/**
181225
* private constructor.
182226
* @param allocator pool allocator.
@@ -185,34 +229,87 @@ private TrackingByteBufferPool(ByteBufferPool allocator) {
185229
this.allocator = allocator;
186230
}
187231

232+
public int getBufferAllocations() {
233+
return bufferAllocations.get();
234+
}
235+
236+
public int getBufferReleases() {
237+
return bufferReleases.get();
238+
}
239+
240+
/**
241+
* Get a buffer from the pool.
242+
* <p>
243+
* This increments the {@link #bufferAllocations} counter and stores the
244+
* singleron or local allocation stack trace in the {@link #allocated} map.
245+
* @param direct whether to allocate a direct buffer or not
246+
* @param size size of the buffer to allocate
247+
* @return a ByteBuffer instance
248+
*/
188249
@Override
189-
public ByteBuffer getBuffer(final boolean direct, final int size) {
250+
public synchronized ByteBuffer getBuffer(final boolean direct, final int size) {
251+
bufferAllocations.incrementAndGet();
190252
ByteBuffer buffer = allocator.getBuffer(direct, size);
191-
final ByteBufferAllocationStacktraceException ex = ByteBufferAllocationStacktraceException.create();
253+
final ByteBufferAllocationStacktraceException ex =
254+
ByteBufferAllocationStacktraceException.create();
192255
final Key key = new Key(buffer);
193256
allocated.put(key, ex);
194257
LOG.debug("Creating ByteBuffer:{} size {} {}", key.hashCode(), size, buffer, ex);
195258
return buffer;
196259
}
197260

261+
/**
262+
* Release a buffer back to the pool.
263+
* <p>
264+
* This increments the {@link #bufferReleases} counter and removes the
265+
* buffer from the {@link #allocated} map.
266+
* <p>
267+
* If the buffer was not allocated by this pool, it throws
268+
* {@link ReleasingUnallocatedByteBufferException}.
269+
*
270+
* @param b buffer to release
271+
* @throws ReleasingUnallocatedByteBufferException if the buffer was not allocated by this pool
272+
*/
198273
@Override
199-
public void putBuffer(ByteBuffer b) throws ReleasingUnallocatedByteBufferException {
274+
public synchronized void putBuffer(ByteBuffer b)
275+
throws ReleasingUnallocatedByteBufferException {
276+
277+
bufferReleases.incrementAndGet();
200278
Objects.requireNonNull(b);
201279
final Key key = new Key(b);
202280
LOG.debug("Releasing ByteBuffer: {}: {}", key.hashCode(), b);
203281
if (allocated.remove(key) == null) {
204-
throw new ReleasingUnallocatedByteBufferException();
282+
throw new ReleasingUnallocatedByteBufferException(b);
205283
}
206284
allocator.putBuffer(b);
207285
// Clearing the buffer so subsequent access would probably generate errors
208286
b.clear();
209287
}
210288

289+
/**
290+
* Check if the buffer is in the pool.
291+
* @param b buffer
292+
* @return true if the buffer is in the pool
293+
*/
294+
public boolean containsBuffer(ByteBuffer b) {
295+
Objects.requireNonNull(b);
296+
final Key key = new Key(b);
297+
return allocated.containsKey(key);
298+
}
299+
300+
/**
301+
* Get the number of allocated buffers.
302+
* @return number of allocated buffers
303+
*/
304+
public int size() {
305+
return allocated.size();
306+
}
307+
211308
/**
212309
* Expect all buffers to be released -if not, log unreleased ones
213310
* and then raise an exception with the stack trace of the first
214311
* unreleased buffer.
215-
* @throws LeakedByteBufferException if at least one was unsued.
312+
* @throws LeakedByteBufferException if at least one buffer was not released
216313
*/
217314
@Override
218315
public void close() throws LeakedByteBufferException {

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.hadoop.fs.FileStatus;
4545
import org.apache.hadoop.fs.FileSystem;
4646
import org.apache.hadoop.fs.Path;
47+
import org.apache.hadoop.fs.impl.TrackingByteBufferPool;
4748
import org.apache.hadoop.io.ElasticByteBufferPool;
4849
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
4950
import org.apache.hadoop.util.concurrent.HadoopExecutors;
@@ -52,13 +53,16 @@
5253
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
5354
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
5455
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_VECTOR;
56+
import static org.apache.hadoop.fs.StreamCapabilities.VECTOREDIO_BUFFERS_SLICED;
5557
import static org.apache.hadoop.fs.contract.ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS;
5658
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals;
5759
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
5860
import static org.apache.hadoop.fs.contract.ContractTestUtils.range;
5961
import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
6062
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
61-
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
63+
import static org.apache.hadoop.io.Sizes.S_128K;
64+
import static org.apache.hadoop.io.Sizes.S_4K;
65+
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
6266
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
6367
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
6468

@@ -74,7 +78,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
7478
private static final Logger LOG =
7579
LoggerFactory.getLogger(AbstractContractVectoredReadTest.class);
7680

77-
public static final int DATASET_LEN = 64 * 1024;
81+
public static final int DATASET_LEN = S_128K;
7882
protected static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
7983
protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
8084

@@ -91,6 +95,8 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
9195

9296
private final String bufferType;
9397

98+
private final boolean isDirect;
99+
94100
/**
95101
* Path to the vector file.
96102
*/
@@ -110,7 +116,7 @@ public static List<String> params() {
110116

111117
protected AbstractContractVectoredReadTest(String bufferType) {
112118
this.bufferType = bufferType;
113-
final boolean isDirect = !"array".equals(bufferType);
119+
this.isDirect = !"array".equals(bufferType);
114120
this.allocate = size -> pool.getBuffer(isDirect, size);
115121
}
116122

@@ -619,4 +625,60 @@ protected <T extends Throwable> void verifyExceptionalVectoredRead(
619625
});
620626
}
621627
}
628+
629+
@Test
630+
public void testBufferSlicing() throws Throwable {
631+
describe("Test buffer slicing behavior in vectored IO");
632+
633+
final int numBuffers = 8;
634+
final int bufferSize = S_4K;
635+
long offset = 0;
636+
final List<FileRange> fileRanges = new ArrayList<>();
637+
for (int i = 0; i < numBuffers; i++) {
638+
fileRanges.add(FileRange.createFileRange(offset, bufferSize));
639+
// increment and add a non-binary-aligned gap, so as to force
640+
// offsets to be misaligned with possible page sizes.
641+
offset += bufferSize + 4000;
642+
}
643+
TrackingByteBufferPool pool = TrackingByteBufferPool.wrap(getPool());
644+
int unknownBuffers = 0;
645+
boolean slicing;
646+
try (FSDataInputStream in = openVectorFile()) {
647+
slicing = in.hasCapability(VECTOREDIO_BUFFERS_SLICED);
648+
LOG.info("Slicing is {} for vectored IO with stream {}", slicing, in);
649+
in.readVectored(fileRanges, s -> pool.getBuffer(isDirect, s), pool::putBuffer);
650+
651+
// check that all buffers are from the the pool, unless they are sliced.
652+
for (FileRange res : fileRanges) {
653+
CompletableFuture<ByteBuffer> data = res.getData();
654+
ByteBuffer buffer = awaitFuture(data);
655+
Assertions.assertThat(buffer)
656+
.describedAs("Buffer must not be null")
657+
.isNotNull();
658+
Assertions.assertThat(slicing || pool.containsBuffer(buffer))
659+
.describedAs("Buffer must be from the pool")
660+
.isTrue();
661+
try {
662+
pool.putBuffer(buffer);
663+
} catch (TrackingByteBufferPool.ReleasingUnallocatedByteBufferException e) {
664+
// this can happen if the buffer was sliced, as it is not in the pool.
665+
if (!slicing) {
666+
throw e;
667+
}
668+
LOG.info("Sliced buffer detected: {}", buffer);
669+
unknownBuffers++;
670+
}
671+
}
672+
}
673+
try {
674+
pool.close();
675+
} catch (TrackingByteBufferPool.LeakedByteBufferException e) {
676+
if (!slicing) {
677+
throw e;
678+
}
679+
LOG.info("Slicing is enabled; we saw leaked buffers: {} after {} releases of unknown bufferfs",
680+
e.getCount(), unknownBuffers);
681+
}
682+
683+
}
622684
}

0 commit comments

Comments
 (0)