Skip to content

Commit 619d566

Browse files
committed
refactor parts of ShardsPool code
1 parent bd6234e commit 619d566

File tree

6 files changed

+72
-71
lines changed

6 files changed

+72
-71
lines changed

src/sw/redis++/connection_pool.cpp

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -149,14 +149,6 @@ bool ConnectionPool::_need_reconnect(const Connection &connection) {
149149
return false;
150150
}
151151

152-
ConnectionPoolGuard::ConnectionPoolGuard(ConnectionPool &pool, Connection &connection) :
153-
_pool(pool),
154-
_connection(connection) {}
155-
156-
ConnectionPoolGuard::~ConnectionPoolGuard() {
157-
_pool.release(std::move(_connection));
158-
}
159-
160152
}
161153

162154
}

src/sw/redis++/connection_pool.h

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -82,16 +82,6 @@ class ConnectionPool {
8282
std::condition_variable _cv;
8383
};
8484

85-
class ConnectionPoolGuard {
86-
public:
87-
ConnectionPoolGuard(ConnectionPool &pool, Connection &connection);
88-
~ConnectionPoolGuard();
89-
90-
private:
91-
ConnectionPool &_pool;
92-
Connection &_connection;
93-
};
94-
9585
}
9686

9787
}

src/sw/redis++/redis.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -975,6 +975,20 @@ class Redis {
975975
long long publish(const StringView &channel, const StringView &message);
976976

977977
private:
978+
class ConnectionPoolGuard {
979+
public:
980+
ConnectionPoolGuard(ConnectionPool &pool,
981+
Connection &connection) : _pool(pool), _connection(connection) {}
982+
983+
~ConnectionPoolGuard() {
984+
_pool.release(std::move(_connection));
985+
}
986+
987+
private:
988+
ConnectionPool &_pool;
989+
Connection &_connection;
990+
};
991+
978992
template <typename Cmd, typename ...Args>
979993
ReplyUPtr _score_command(std::true_type, Cmd cmd, Args &&... args);
980994

src/sw/redis++/redis_cluster.hpp

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -842,15 +842,9 @@ template <typename Cmd, typename ...Args>
842842
ReplyUPtr RedisCluster::_command(Cmd cmd, const StringView &key, Args &&...args) {
843843
for (auto idx = 0; idx < 2; ++idx) {
844844
try {
845-
auto pool = _pool.fetch(key);
845+
auto guarded_connection = _pool.fetch(key);
846846

847-
assert(bool(pool));
848-
849-
auto connection = pool->fetch();
850-
851-
ConnectionPoolGuard guard(*pool, connection);
852-
853-
return _command(cmd, connection, std::forward<Args>(args)...);
847+
return _command(cmd, guarded_connection.connection(), std::forward<Args>(args)...);
854848
} catch (const ClosedError &err) {
855849
// Node might be removed.
856850
// 1. Get up-to-date slot mapping to check if the node still exists.
@@ -863,15 +857,8 @@ ReplyUPtr RedisCluster::_command(Cmd cmd, const StringView &key, Args &&...args)
863857
// Slot mapping has been changed, update it and try again.
864858
_pool.update();
865859
} catch (const AskError &err) {
866-
auto pool = _pool.fetch(err.node());
867-
868-
assert(bool(pool));
869-
870-
auto connection = pool->fetch();
871-
872-
assert(!connection.broken());
873-
874-
ConnectionPoolGuard guard(*pool, connection);
860+
auto guarded_connection = _pool.fetch(err.node());
861+
auto &connection = guarded_connection.connection();
875862

876863
// 1. send ASKING command.
877864
_asking(connection);

src/sw/redis++/shards_pool.cpp

Lines changed: 19 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -57,50 +57,41 @@ ShardsPool& ShardsPool::operator=(ShardsPool &&that) {
5757
return *this;
5858
}
5959

60-
ConnectionPoolSPtr ShardsPool::fetch(const StringView &key) {
60+
GuardedConnection ShardsPool::fetch(const StringView &key) {
6161
auto slot = _slot(key);
6262

6363
return _fetch(slot);
6464
}
6565

66-
ConnectionPoolSPtr ShardsPool::fetch() {
66+
GuardedConnection ShardsPool::fetch() {
6767
auto slot = _slot();
6868

6969
return _fetch(slot);
7070
}
7171

72-
ConnectionPoolSPtr ShardsPool::fetch(const Node &node) {
72+
GuardedConnection ShardsPool::fetch(const Node &node) {
7373
std::lock_guard<std::mutex> lock(_mutex);
7474

75-
auto iter = _pool.find(node);
76-
if (iter == _pool.end()) {
75+
auto iter = _pools.find(node);
76+
if (iter == _pools.end()) {
7777
// Node doesn't exist, and it should be a newly created node.
7878
// So add a new connection pool.
7979
iter = _add_node(node);
8080
}
8181

82-
assert(iter != _pool.end());
82+
assert(iter != _pools.end());
8383

84-
return iter->second;
84+
return GuardedConnection(iter->second);
8585
}
8686

8787
void ShardsPool::update() {
8888
// My might send command to a removed node.
8989
// Try at most 3 times.
9090
for (auto idx = 0; idx < 3; ++idx) {
9191
try {
92-
// Randomly pick a connection pool.
93-
auto pool = fetch();
94-
95-
assert(bool(pool));
96-
97-
auto connection = pool->fetch();
98-
99-
assert(!connection.broken());
100-
101-
ConnectionPoolGuard guard(*pool, connection);
102-
103-
auto shards = _cluster_slots(connection);
92+
// Randomly pick a connection.
93+
auto guarded_connection = fetch();
94+
auto shards = _cluster_slots(guarded_connection.connection());
10495

10596
std::unordered_set<Node, NodeHash> nodes;
10697
for (const auto &shard : shards) {
@@ -114,10 +105,10 @@ void ShardsPool::update() {
114105
_shards = std::move(shards);
115106

116107
// Remove non-existent nodes.
117-
for (auto iter = _pool.begin(); iter != _pool.end(); ) {
108+
for (auto iter = _pools.begin(); iter != _pools.end(); ) {
118109
if (nodes.find(iter->first) == nodes.end()) {
119110
// Node has been removed.
120-
_pool.erase(iter++);
111+
_pools.erase(iter++);
121112
} else {
122113
++iter;
123114
}
@@ -126,7 +117,7 @@ void ShardsPool::update() {
126117
// Add connection pool for new nodes.
127118
// In fact, connections will be created lazily.
128119
for (const auto &node : nodes) {
129-
if (_pool.find(node) == _pool.end()) {
120+
if (_pools.find(node) == _pools.end()) {
130121
_add_node(node);
131122
}
132123
}
@@ -145,7 +136,7 @@ void ShardsPool::_move(ShardsPool &&that) {
145136
_pool_opts = that._pool_opts;
146137
_connection_opts = that._connection_opts;
147138
_shards = std::move(that._shards);
148-
_pool = std::move(that._pool);
139+
_pools = std::move(that._pools);
149140
}
150141

151142
void ShardsPool::_init_pool(const Shards &shards) {
@@ -268,7 +259,7 @@ Slot ShardsPool::_slot() const {
268259
return uniform_dist(engine);
269260
}
270261

271-
ConnectionPoolSPtr ShardsPool::_fetch(Slot slot) {
262+
GuardedConnection ShardsPool::_fetch(Slot slot) {
272263
std::lock_guard<std::mutex> lock(_mutex);
273264

274265
auto shards_iter = _shards.lower_bound(SlotRange{slot, slot});
@@ -278,20 +269,20 @@ ConnectionPoolSPtr ShardsPool::_fetch(Slot slot) {
278269

279270
const auto &node = shards_iter->second;
280271

281-
auto node_iter = _pool.find(node);
282-
if (node_iter == _pool.end()) {
272+
auto node_iter = _pools.find(node);
273+
if (node_iter == _pools.end()) {
283274
throw Error("Slot is NOT covered: " + std::to_string(slot));
284275
}
285276

286-
return node_iter->second;
277+
return GuardedConnection(node_iter->second);
287278
}
288279

289280
auto ShardsPool::_add_node(const Node &node) -> NodeMap::iterator {
290281
auto opts = _connection_opts;
291282
opts.host = node.host;
292283
opts.port = node.port;
293284

294-
return _pool.emplace(node, std::make_shared<ConnectionPool>(_pool_opts, opts)).first;
285+
return _pools.emplace(node, std::make_shared<ConnectionPool>(_pool_opts, opts)).first;
295286
}
296287

297288
}

src/sw/redis++/shards_pool.h

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#ifndef SEWENEW_REDISPLUSPLUS_SHARDS_POOL_H
1818
#define SEWENEW_REDISPLUSPLUS_SHARDS_POOL_H
1919

20+
#include <cassert>
2021
#include <unordered_map>
2122
#include <string>
2223
#include <random>
@@ -31,6 +32,32 @@ namespace redis {
3132

3233
using ConnectionPoolSPtr = std::shared_ptr<ConnectionPool>;
3334

35+
class GuardedConnection {
36+
public:
37+
GuardedConnection(const ConnectionPoolSPtr &pool) : _pool(pool),
38+
_connection(_pool->fetch()) {
39+
assert(!_connection.broken());
40+
}
41+
42+
GuardedConnection(const GuardedConnection &) = delete;
43+
GuardedConnection& operator=(const GuardedConnection &) = delete;
44+
45+
GuardedConnection(GuardedConnection &&) = default;
46+
GuardedConnection& operator=(GuardedConnection &&) = default;
47+
48+
~GuardedConnection() {
49+
_pool->release(std::move(_connection));
50+
}
51+
52+
Connection& connection() {
53+
return _connection;
54+
}
55+
56+
private:
57+
ConnectionPoolSPtr _pool;
58+
Connection _connection;
59+
};
60+
3461
class ShardsPool {
3562
public:
3663
ShardsPool() = default;
@@ -46,14 +73,14 @@ class ShardsPool {
4673
ShardsPool(const ConnectionPoolOptions &pool_opts,
4774
const ConnectionOptions &connection_opts);
4875

49-
// Fetch a connection pool by key.
50-
ConnectionPoolSPtr fetch(const StringView &key);
76+
// Fetch a connection by key.
77+
GuardedConnection fetch(const StringView &key);
5178

52-
// Randomly pick a connection pool.
53-
ConnectionPoolSPtr fetch();
79+
// Randomly pick a connection.
80+
GuardedConnection fetch();
5481

55-
// Fetch a connection pool by node.
56-
ConnectionPoolSPtr fetch(const Node &node);
82+
// Fetch a connection by node.
83+
GuardedConnection fetch(const Node &node);
5784

5885
void update();
5986

@@ -76,7 +103,7 @@ class ShardsPool {
76103
// Randomly pick a slot.
77104
std::size_t _slot() const;
78105

79-
ConnectionPoolSPtr _fetch(Slot slot);
106+
GuardedConnection _fetch(Slot slot);
80107

81108
using NodeMap = std::unordered_map<Node, ConnectionPoolSPtr, NodeHash>;
82109

@@ -88,7 +115,7 @@ class ShardsPool {
88115

89116
Shards _shards;
90117

91-
NodeMap _pool;
118+
NodeMap _pools;
92119

93120
std::mutex _mutex;
94121

0 commit comments

Comments
 (0)