Skip to content

Commit 1995ed5

Browse files
ShuffleReadMetricsReporter
1 parent 1c671ca commit 1995ed5

File tree

6 files changed

+112
-19
lines changed

6 files changed

+112
-19
lines changed

docs/configuration-properties.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -629,7 +629,7 @@ Must be a positive integer (above `0`)
629629

630630
**spark.shuffle.push.enabled**
631631

632-
Enables push-based shuffle on the client side
632+
Enables [Push-Based Shuffle](./push-based-shuffle.md) on the client side
633633

634634
Default: `false`
635635

docs/shuffle/BlockStoreShuffleReader.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# BlockStoreShuffleReader
22

3-
`BlockStoreShuffleReader` is a [ShuffleReader](ShuffleReader.md).
3+
`BlockStoreShuffleReader[K, C]` is a [ShuffleReader](ShuffleReader.md) of `K` keys and `C` values.
44

55
## Creating Instance
66

@@ -9,7 +9,7 @@
99
* <span id="handle"> [BaseShuffleHandle](BaseShuffleHandle.md)
1010
* <span id="blocksByAddress"> [Block](../storage/BlockId.md)s by [Address](../storage/BlockManagerId.md) (`Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]`)
1111
* <span id="context"> [TaskContext](../scheduler/TaskContext.md)
12-
* <span id="readMetrics"> `ShuffleReadMetricsReporter`
12+
* <span id="readMetrics"> [ShuffleReadMetricsReporter](ShuffleReadMetricsReporter.md)
1313
* <span id="serializerManager"> [SerializerManager](../serializer/SerializerManager.md)
1414
* <span id="blockManager"> [BlockManager](../storage/BlockManager.md)
1515
* <span id="mapOutputTracker"> [MapOutputTracker](../scheduler/MapOutputTracker.md)
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# ShuffleReadMetricsReporter
2+
3+
`ShuffleReadMetricsReporter` is an abstraction of [reporters](#implementations) that allow tracking the following **Shuffle Read Metrics** (for each shuffle):
4+
5+
Shuffle Read Metric | When used
6+
-|-
7+
Corrupt Merged Block Chunks | FIXME
8+
Fetch Wait Time | FIXME
9+
Local Blocks Fetched | FIXME
10+
Local Bytes Read | FIXME
11+
Merged Local Blocks Fetched | FIXME
12+
Merged Local Bytes Read | FIXME
13+
Merged Local Chunks Fetched | FIXME
14+
Merged Fetch Fallback Count | FIXME
15+
Merged Remote Blocks Fetched | FIXME
16+
Merged Remote Chunks Fetched | FIXME
17+
Merged Remote Bytes Read | FIXME
18+
Merged Remote Requests Duration | FIXME
19+
Remote Blocks Fetched | [ShuffleBlockFetcherIterator](../storage/ShuffleBlockFetcherIterator.md#shuffleRemoteMetricsUpdate) for [ShuffleBlockChunkId](../storage/BlockId.md#ShuffleBlockChunkId)s
20+
Remote Bytes Read | FIXME
21+
Remote Bytes Read To Disk | FIXME
22+
Remote Requests Duration | FIXME
23+
Total Records Read | FIXME
24+
25+
`ShuffleReadMetricsReporter` is used to create the following:
26+
27+
* [BlockStoreShuffleReader](BlockStoreShuffleReader.md#readMetrics)
28+
* `PushBasedFetchHelper`
29+
* [ShuffleBlockFetcherIterator](../storage/ShuffleBlockFetcherIterator.md#shuffleMetrics)
30+
31+
## Implementations
32+
33+
* `TempShuffleReadMetrics`

docs/storage/BlockId.md

Lines changed: 73 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
## Contract
66

7-
### <span id="name"><span id="toString"> Name
7+
### <span id="toString"> Name { #name }
88

99
```scala
1010
name: String
@@ -24,7 +24,9 @@ Used when:
2424
??? note "Sealed Abstract Class"
2525
`BlockId` is a Scala **sealed abstract class** which means that all of the implementations are in the same compilation unit (a single file).
2626

27-
### <span id="BroadcastBlockId"> BroadcastBlockId
27+
Learn more in the [Scala Language Specification]({{ scala.spec }}/05-classes-and-objects.html#sealed).
28+
29+
### BroadcastBlockId { #BroadcastBlockId }
2830

2931
`BlockId` for [broadcast variable](../broadcast-variables/index.md) blocks:
3032

@@ -40,7 +42,15 @@ Used when:
4042
* `SerializerManager` is requested to [shouldCompress](../serializer/SerializerManager.md#shouldCompress)
4143
* `AppStatusListener` is requested to [onBlockUpdated](../status/AppStatusListener.md#onBlockUpdated)
4244

43-
### <span id="RDDBlockId"> RDDBlockId
45+
### CacheId { #CacheId }
46+
47+
`BlockId` for...FIXME
48+
49+
### PythonStreamBlockId { #PythonStreamBlockId }
50+
51+
`BlockId` for...FIXME
52+
53+
### RDDBlockId { #RDDBlockId }
4454

4555
`BlockId` for [RDD](../rdd/RDD.md) partitions:
4656

@@ -60,9 +70,22 @@ Used when:
6070

6171
[Compressed](../serializer/SerializerManager.md#shouldCompress) when [spark.rdd.compress](../configuration-properties.md#spark.rdd.compress) configuration property is enabled
6272

63-
### <span id="ShuffleBlockBatchId"> ShuffleBlockBatchId
73+
### ShuffleBlockBatchId { #ShuffleBlockBatchId }
74+
75+
`BlockId` for...FIXME
6476

65-
### <span id="ShuffleBlockId"> ShuffleBlockId
77+
### ShuffleBlockChunkId { #ShuffleBlockChunkId }
78+
79+
`BlockId` for shuffle block chunks in [Push-Based Shuffle](../push-based-shuffle.md):
80+
81+
* `shuffleId` identifier
82+
* `shuffleMergeId` identifier
83+
* `reduceId` identifier
84+
* `chunkId` identifier
85+
86+
Uses `shuffleChunk_[shuffleId]_[shuffleMergeId]_[reduceId]_[chunkId]` pattern for the [name](#name)
87+
88+
### ShuffleBlockId { #ShuffleBlockId }
6689

6790
`BlockId` for shuffle blocks:
6891

@@ -83,13 +106,41 @@ Used when:
83106

84107
[Compressed](../serializer/SerializerManager.md#shouldCompress) when [spark.shuffle.compress](../configuration-properties.md#spark.shuffle.compress) configuration property is enabled
85108

86-
### <span id="ShuffleDataBlockId"> ShuffleDataBlockId
109+
### ShuffleChecksumBlockId { #ShuffleChecksumBlockId }
110+
111+
`BlockId` for...FIXME
112+
113+
### ShuffleDataBlockId { #ShuffleDataBlockId }
114+
115+
`BlockId` for...FIXME
87116

88-
### <span id="ShuffleIndexBlockId"> ShuffleIndexBlockId
117+
### ShuffleIndexBlockId { #ShuffleIndexBlockId }
89118

90-
### <span id="StreamBlockId"> StreamBlockId
119+
`BlockId` for...FIXME
91120

92-
`BlockId` for ...FIXME:
121+
### ShuffleMergedBlockId { #ShuffleMergedBlockId }
122+
123+
`BlockId` for...FIXME
124+
125+
### ShuffleMergedDataBlockId { #ShuffleMergedDataBlockId }
126+
127+
`BlockId` for...FIXME
128+
129+
### ShuffleMergedIndexBlockId { #ShuffleMergedIndexBlockId }
130+
131+
`BlockId` for...FIXME
132+
133+
### ShuffleMergedMetaBlockId { #ShuffleMergedMetaBlockId }
134+
135+
`BlockId` for...FIXME
136+
137+
### ShufflePushBlockId { #ShufflePushBlockId }
138+
139+
`BlockId` for...FIXME
140+
141+
### StreamBlockId { #StreamBlockId }
142+
143+
`BlockId` for...FIXME:
93144

94145
* `streamId`
95146
* `uniqueId`
@@ -102,15 +153,23 @@ input-[streamId]-[uniqueId]
102153

103154
Used in Spark Streaming
104155

105-
### <span id="TaskResultBlockId"> TaskResultBlockId
156+
### TaskResultBlockId { #TaskResultBlockId }
157+
158+
`BlockId` for...FIXME
159+
160+
### TempLocalBlockId { #TempLocalBlockId }
161+
162+
`BlockId` for...FIXME
163+
164+
### TempShuffleBlockId { #TempShuffleBlockId }
106165

107-
### <span id="TempLocalBlockId"> TempLocalBlockId
166+
`BlockId` for...FIXME
108167

109-
### <span id="TempShuffleBlockId"> TempShuffleBlockId
168+
### TestBlockId { #TestBlockId }
110169

111-
### <span id="TestBlockId"> TestBlockId
170+
`BlockId` for...FIXME
112171

113-
## <span id="apply"> Creating BlockId by Name
172+
## Creating BlockId by Name { #apply }
114173

115174
```scala
116175
apply(

docs/storage/ShuffleBlockFetcherIterator.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
* <span id="maxReqSizeShuffleToMem"> [spark.network.maxRemoteBlockSizeFetchToMem](../configuration-properties.md#spark.network.maxRemoteBlockSizeFetchToMem)
2424
* <span id="detectCorrupt"> [spark.shuffle.detectCorrupt](../configuration-properties.md#spark.shuffle.detectCorrupt)
2525
* <span id="detectCorruptUseExtraMemory"> [spark.shuffle.detectCorrupt.useExtraMemory](../configuration-properties.md#spark.shuffle.detectCorrupt.useExtraMemory)
26-
* <span id="shuffleMetrics"> `ShuffleReadMetricsReporter`
26+
* <span id="shuffleMetrics"> [ShuffleReadMetricsReporter](../shuffle/ShuffleReadMetricsReporter.md)
2727
* <span id="doBatchFetch"> `doBatchFetch` flag
2828

2929
While being created, `ShuffleBlockFetcherIterator` [initializes itself](#initialize).
@@ -185,7 +185,7 @@ next(): (BlockId, InputStream)
185185

186186
`next` takes (and removes) the head of the [results](#results) queue.
187187

188-
`next` requests the [ShuffleReadMetricsReporter](#shuffleMetrics) to `incFetchWaitTime`.
188+
`next` requests the [ShuffleReadMetricsReporter](#shuffleMetrics) to [incFetchWaitTime](../shuffle/ShuffleReadMetricsReporter.md#incFetchWaitTime).
189189

190190
`next`...FIXME
191191

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ nav:
184184
- ShuffleExternalSorter: shuffle/ShuffleExternalSorter.md
185185
- ShuffleInMemorySorter: shuffle/ShuffleInMemorySorter.md
186186
- ShuffleManager: shuffle/ShuffleManager.md
187+
- shuffle/ShuffleReadMetricsReporter.md
187188
- ShuffleWriteMetricsReporter: shuffle/ShuffleWriteMetricsReporter.md
188189
- ShuffleWriteProcessor: shuffle/ShuffleWriteProcessor.md
189190
- ShuffleWriter: shuffle/ShuffleWriter.md

0 commit comments

Comments
 (0)