Skip to content

Commit f34aeda

Browse files
authored
refactor: Shuffle read runtime stats rename (#26432)
Summary: There is only one partition read source for each mapper job so we can always report max size to fetch from the source. Differential Revision: D85500999 ``` == NO RELEASE NOTE == ```
1 parent 52132d8 commit f34aeda

File tree

3 files changed

+68
-61
lines changed

3 files changed

+68
-61
lines changed

presto-native-execution/presto_cpp/main/operators/ShuffleRead.cpp

Lines changed: 22 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
#include "presto_cpp/main/operators/ShuffleRead.h"
1515
#include "presto_cpp/main/operators/ShuffleExchangeSource.h"
1616
#include "velox/common/Casts.h"
17-
#include "velox/exec/Exchange.h"
1817
#include "velox/row/CompactRow.h"
1918

2019
using namespace facebook::velox::exec;
@@ -25,61 +24,30 @@ velox::core::PlanNodeId deserializePlanNodeId(const folly::dynamic& obj) {
2524
return obj["id"].asString();
2625
}
2726

28-
namespace {
29-
class ShuffleRead : public Exchange {
30-
public:
31-
ShuffleRead(
32-
int32_t operatorId,
33-
DriverCtx* ctx,
34-
const std::shared_ptr<const ShuffleReadNode>& shuffleReadNode,
35-
std::shared_ptr<ExchangeClient> exchangeClient)
36-
: Exchange(
37-
operatorId,
38-
ctx,
39-
std::make_shared<core::ExchangeNode>(
40-
shuffleReadNode->id(),
41-
shuffleReadNode->outputType(),
42-
VectorSerde::Kind::kCompactRow),
43-
exchangeClient,
44-
"ShuffleRead") {
45-
initStats();
46-
}
47-
48-
RowVectorPtr getOutput() override;
49-
50-
void close() override;
51-
52-
protected:
53-
VectorSerde* getSerde() override {
54-
VELOX_UNSUPPORTED("ShuffleReadOperator doesn't use serde");
55-
}
56-
57-
private:
58-
static inline const std::string kShuffleDecodeTime{"shuffleDecodeWallNanos"};
59-
static inline const std::string kShuffleNumBatchesPerRead{
60-
"shuffleNumBatchesPerRead"};
61-
static inline const std::string kShuffleNumBatches{"shuffleNumBatches"};
62-
63-
void initStats();
64-
65-
void resetOutputState();
66-
67-
int64_t numBatches_{0};
68-
std::unordered_map<std::string, velox::RuntimeMetric> runtimeStats_;
69-
70-
size_t nextRow_{0};
71-
size_t nextPage_{0};
72-
// Reusable buffers.
73-
std::vector<std::string_view> rows_;
74-
std::vector<size_t> pageRows_;
75-
};
27+
ShuffleRead::ShuffleRead(
28+
int32_t operatorId,
29+
DriverCtx* ctx,
30+
const std::shared_ptr<const ShuffleReadNode>& shuffleReadNode,
31+
std::shared_ptr<ExchangeClient> exchangeClient)
32+
: Exchange(
33+
operatorId,
34+
ctx,
35+
std::make_shared<core::ExchangeNode>(
36+
shuffleReadNode->id(),
37+
shuffleReadNode->outputType(),
38+
VectorSerde::Kind::kCompactRow),
39+
exchangeClient,
40+
"ShuffleRead") {
41+
initStats();
42+
}
7643

7744
void ShuffleRead::initStats() {
7845
VELOX_CHECK(runtimeStats_.empty());
7946
runtimeStats_.insert(
8047
std::pair{kShuffleDecodeTime, velox::RuntimeCounter::Unit::kNanos});
8148
runtimeStats_.insert(
82-
std::pair{kShuffleNumBatchesPerRead, velox::RuntimeCounter::Unit::kNone});
49+
std::pair{
50+
kShufflePagesPerInputBatch, velox::RuntimeCounter::Unit::kNone});
8351
}
8452

8553
void ShuffleRead::resetOutputState() {
@@ -122,8 +90,8 @@ RowVectorPtr ShuffleRead::getOutput() {
12290
}
12391
}
12492
if (!currentPages_.empty()) {
125-
runtimeStats_[kShuffleNumBatchesPerRead].addValue(currentPages_.size());
126-
numBatches_ += currentPages_.size();
93+
runtimeStats_[kShufflePagesPerInputBatch].addValue(currentPages_.size());
94+
++numInputBatches_;
12795
}
12896
}
12997
VELOX_CHECK_LE(nextRow_, rows_.size());
@@ -177,12 +145,11 @@ void ShuffleRead::close() {
177145
}
178146
lockedStats->runtimeStats[name] = metric;
179147
}
180-
if (numBatches_ != 0) {
148+
if (numInputBatches_ != 0) {
181149
lockedStats->addRuntimeStat(
182-
kShuffleNumBatches, RuntimeCounter(numBatches_));
150+
kShuffleInputBatches, RuntimeCounter(numInputBatches_));
183151
}
184152
}
185-
} // namespace
186153

187154
folly::dynamic ShuffleReadNode::serialize() const {
188155
auto obj = PlanNode::serialize();

presto-native-execution/presto_cpp/main/operators/ShuffleRead.h

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#pragma once
1515

1616
#include "velox/core/PlanNode.h"
17+
#include "velox/exec/Exchange.h"
1718
#include "velox/exec/Operator.h"
1819

1920
namespace facebook::presto::operators {
@@ -90,6 +91,43 @@ class ShuffleReadNode : public velox::core::PlanNode {
9091
const velox::RowTypePtr outputType_;
9192
};
9293

94+
class ShuffleRead : public velox::exec::Exchange {
95+
public:
96+
ShuffleRead(
97+
int32_t operatorId,
98+
velox::exec::DriverCtx* ctx,
99+
const std::shared_ptr<const ShuffleReadNode>& shuffleReadNode,
100+
std::shared_ptr<velox::exec::ExchangeClient> exchangeClient);
101+
102+
velox::RowVectorPtr getOutput() override;
103+
104+
void close() override;
105+
106+
static inline const std::string kShuffleDecodeTime{"shuffleDecodeWallNanos"};
107+
static inline const std::string kShufflePagesPerInputBatch{
108+
"shuffleNumPagesPerInputBatch"};
109+
static inline const std::string kShuffleInputBatches{"shuffleInputBatches"};
110+
111+
protected:
112+
velox::VectorSerde* getSerde() override {
113+
VELOX_UNSUPPORTED("ShuffleReadOperator doesn't use serde");
114+
}
115+
116+
private:
117+
void initStats();
118+
119+
void resetOutputState();
120+
121+
int64_t numInputBatches_{0};
122+
std::unordered_map<std::string, velox::RuntimeMetric> runtimeStats_;
123+
124+
size_t nextRow_{0};
125+
size_t nextPage_{0};
126+
// Reusable buffers.
127+
std::vector<std::string_view> rows_;
128+
std::vector<size_t> pageRows_;
129+
};
130+
93131
class ShuffleReadTranslator : public velox::exec::Operator::PlanNodeTranslator {
94132
public:
95133
std::unique_ptr<velox::exec::Operator> toOperator(

presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1470,21 +1470,23 @@ TEST_F(ShuffleTest, shuffleReadRuntimeStats) {
14701470
taskCursor->task()->taskStats().pipelineStats[0].operatorStats[0];
14711471
const auto& runtimeStats = operatorStats.runtimeStats;
14721472

1473-
ASSERT_EQ(runtimeStats.count("shuffleDecodeWallNanos"), 1);
1474-
const auto& decodeTimeStat = runtimeStats.at("shuffleDecodeWallNanos");
1473+
ASSERT_EQ(runtimeStats.count(ShuffleRead::kShuffleDecodeTime), 1);
1474+
const auto& decodeTimeStat =
1475+
runtimeStats.at(ShuffleRead::kShuffleDecodeTime);
14751476
ASSERT_GT(decodeTimeStat.count, 0);
14761477
ASSERT_GT(decodeTimeStat.sum, 0);
14771478
ASSERT_EQ(velox::RuntimeCounter::Unit::kNanos, decodeTimeStat.unit);
14781479

1479-
ASSERT_EQ(runtimeStats.count("shuffleNumBatchesPerRead"), 1);
1480+
ASSERT_EQ(runtimeStats.count(ShuffleRead::kShufflePagesPerInputBatch), 1);
14801481
const auto& batchesPerReadStat =
1481-
runtimeStats.at("shuffleNumBatchesPerRead");
1482+
runtimeStats.at(ShuffleRead::kShufflePagesPerInputBatch);
14821483
ASSERT_EQ(velox::RuntimeCounter::Unit::kNone, batchesPerReadStat.unit);
14831484
ASSERT_GT(batchesPerReadStat.count, 0);
14841485
ASSERT_GT(batchesPerReadStat.sum, 0);
14851486

1486-
ASSERT_EQ(runtimeStats.count("shuffleNumBatches"), 1);
1487-
const auto& numBatchesStat = runtimeStats.at("shuffleNumBatches");
1487+
ASSERT_EQ(runtimeStats.count(ShuffleRead::kShuffleInputBatches), 1);
1488+
const auto& numBatchesStat =
1489+
runtimeStats.at(ShuffleRead::kShuffleInputBatches);
14881490
ASSERT_GT(numBatchesStat.count, 0);
14891491
ASSERT_GT(numBatchesStat.sum, 0);
14901492
ASSERT_EQ(velox::RuntimeCounter::Unit::kNone, numBatchesStat.unit);

0 commit comments

Comments
 (0)