48
48
#include " memtier_benchmark.h"
49
49
#include " obj_gen.h"
50
50
#include " shard_connection.h"
51
+ #include " crc16_slottable.h"
51
52
52
53
#define KEY_INDEX_QUEUE_MAX_SIZE 1000000
53
54
@@ -100,24 +101,13 @@ static inline uint16_t crc16(const char *buf, size_t len) {
100
101
return crc;
101
102
}
102
103
103
- static uint32_t calc_hslot_crc16_cluster (const char *str, size_t length)
104
- {
105
- uint32_t rv = (uint32_t ) crc16 (str, length) & MAX_CLUSTER_HSLOT;
106
- return rv;
107
- }
108
-
109
104
// /////////////////////////////////////////////////////////////////////////////////////////////////////
110
105
111
106
cluster_client::cluster_client (client_group* group) : client(group)
112
107
{
113
108
}
114
109
115
110
cluster_client::~cluster_client () {
116
- for (unsigned int i = 0 ; i < m_key_index_pools.size (); i++) {
117
- key_index_pool* key_idx_pool = m_key_index_pools[i];
118
- delete key_idx_pool;
119
- }
120
- m_key_index_pools.clear ();
121
111
}
122
112
123
113
int cluster_client::connect (void ) {
@@ -128,11 +118,6 @@ int cluster_client::connect(void) {
128
118
// set main connection to send 'CLUSTER SLOTS' command
129
119
sc->set_cluster_slots ();
130
120
131
- // create key index pool for main connection
132
- key_index_pool* key_idx_pool = new key_index_pool;
133
- m_key_index_pools.push_back (key_idx_pool);
134
- assert (m_connections.size () == m_key_index_pools.size ());
135
-
136
121
// continue with base class
137
122
client::connect ();
138
123
@@ -166,22 +151,10 @@ shard_connection* cluster_client::create_shard_connection(abstract_protocol* abs
166
151
167
152
m_connections.push_back (sc);
168
153
169
- // create key index pool
170
- key_index_pool* key_idx_pool = new key_index_pool;
171
- assert (key_idx_pool != NULL );
172
-
173
- m_key_index_pools.push_back (key_idx_pool);
174
- assert (m_connections.size () == m_key_index_pools.size ());
175
-
176
154
return sc;
177
155
}
178
156
179
157
bool cluster_client::connect_shard_connection (shard_connection* sc, char * address, char * port) {
180
- // empty key index queue
181
- if (m_key_index_pools[sc->get_id ()]->size ()) {
182
- key_index_pool empty_queue;
183
- std::swap (*m_key_index_pools[sc->get_id ()], empty_queue);
184
- }
185
158
186
159
// save address and port
187
160
sc->set_address_port (address, port);
@@ -224,9 +197,12 @@ void cluster_client::handle_cluster_slots(protocol_response *r) {
224
197
*/
225
198
unsigned long prev_connections_size = m_connections.size ();
226
199
std::vector<bool > close_sc (prev_connections_size, true );
200
+ for (unsigned int i = 0 ; i < MAX_SLOTS; i++) {
201
+ m_conn_to_init_slot[i] = UINT16_MAX;
202
+ }
227
203
228
204
// run over response and create connections
229
- for (unsigned int i= 0 ; i< r->get_mbulk_value ()->mbulks_elements .size (); i++) {
205
+ for (unsigned int i = 0 ; i < r->get_mbulk_value ()->mbulks_elements .size (); i++) {
230
206
// create connection
231
207
mbulk_size_el* shard = r->get_mbulk_value ()->mbulks_elements [i]->as_mbulk_size ();
232
208
@@ -273,17 +249,26 @@ void cluster_client::handle_cluster_slots(protocol_response *r) {
273
249
connect_shard_connection (sc, addr, port);
274
250
}
275
251
276
- // update range
252
+ unsigned int sc_id = sc->get_id ();
253
+ // Set the initial slot for this shard connection
254
+ if (m_conn_to_init_slot[sc_id] == UINT16_MAX) {
255
+ m_conn_to_init_slot[sc_id] = min_slot;
256
+ }
277
257
for (int j = min_slot; j <= max_slot; j++) {
278
- m_slot_to_shard[j] = sc->get_id ();
258
+ if (j < max_slot) {
259
+ m_slot_lists[j] = j+1 ;
260
+ } else {
261
+ // Close the loop - point the last index to the first one owned by the shard connection
262
+ m_slot_lists[j] = m_conn_to_init_slot[sc_id];
263
+ }
279
264
}
280
265
281
266
free (addr);
282
267
free (port);
283
268
}
284
269
285
270
// check if some connections left with no slots, and need to be closed
286
- for (unsigned int i= 0 ; i < prev_connections_size; i++) {
271
+ for (unsigned int i = 0 ; i < prev_connections_size; i++) {
287
272
if ((close_sc[i] == true ) &&
288
273
(m_connections[i]->get_connection_state () != conn_disconnected)) {
289
274
@@ -299,8 +284,7 @@ bool cluster_client::hold_pipeline(unsigned int conn_id) {
299
284
300
285
// don't exceed requests
301
286
if (m_config->requests ) {
302
- if (m_key_index_pools[conn_id]->empty () &&
303
- m_reqs_generated >= m_config->requests ) {
287
+ if (m_reqs_generated >= m_config->requests ) {
304
288
return true ;
305
289
}
306
290
}
@@ -309,53 +293,13 @@ bool cluster_client::hold_pipeline(unsigned int conn_id) {
309
293
}
310
294
311
295
bool cluster_client::get_key_for_conn (unsigned int conn_id, int iter, unsigned long long * key_index) {
312
- // first check if we already have key in pool
313
- if (!m_key_index_pools[conn_id]->empty ()) {
314
- *key_index = m_key_index_pools[conn_id]->front ();
315
- m_key_len = snprintf (m_key_buffer, sizeof (m_key_buffer)-1 , " %s%llu" , m_obj_gen->get_key_prefix (), *key_index);
316
-
317
- m_key_index_pools[conn_id]->pop ();
318
- return true ;
319
- }
320
296
321
- // keep generate key till it match for this connection, or requests reached
322
- while (true ) {
323
- // generate key
324
- *key_index = m_obj_gen->get_key_index (iter);
325
- m_key_len = snprintf (m_key_buffer, sizeof (m_key_buffer)-1 , " %s%llu" , m_obj_gen->get_key_prefix (), *key_index);
326
-
327
- unsigned int hslot = calc_hslot_crc16_cluster (m_key_buffer, m_key_len);
328
-
329
- // check if the key match for this connection
330
- if (m_slot_to_shard[hslot] == conn_id) {
331
- m_reqs_generated++;
332
- return true ;
333
- }
334
-
335
- // handle key for other connection
336
- unsigned int other_conn_id = m_slot_to_shard[hslot];
337
-
338
- // in case we generated key for connection that is disconnected, 'slot to shard' map may need to be updated
339
- if (m_connections[other_conn_id]->get_connection_state () == conn_disconnected) {
340
- m_connections[conn_id]->set_cluster_slots ();
341
- return false ;
342
- }
343
-
344
- // in case connection is during cluster slots command, his slots mapping not relevant
345
- if (m_connections[other_conn_id]->get_cluster_slots_state () != setup_done)
346
- continue ;
347
-
348
- // store key for other connection, if queue is not full
349
- key_index_pool* key_idx_pool = m_key_index_pools[other_conn_id];
350
- if (key_idx_pool->size () < KEY_INDEX_QUEUE_MAX_SIZE) {
351
- key_idx_pool->push (*key_index);
352
- m_reqs_generated++;
353
- }
354
-
355
- // don't exceed requests
356
- if (m_config->requests > 0 && m_reqs_generated >= m_config->requests )
357
- return false ;
358
- }
297
+ *key_index = m_obj_gen->get_key_index (iter);
298
+ m_key_len = snprintf (m_key_buffer, sizeof (m_key_buffer)-1 , " %s{%s}%llu" ,
299
+ m_obj_gen->get_key_prefix (), crc16_slot_table[m_conn_to_init_slot[conn_id]], *key_index);
300
+ m_conn_to_init_slot[conn_id] = m_slot_lists[m_conn_to_init_slot[conn_id]];
301
+ m_reqs_generated++;
302
+ return true ;
359
303
}
360
304
361
305
// This function could use some urgent TLC -- but we need to do it without altering the behavior
@@ -432,10 +376,6 @@ void cluster_client::handle_moved(unsigned int conn_id, struct timeval timestamp
432
376
if (m_connections[conn_id]->get_cluster_slots_state () != setup_done)
433
377
return ;
434
378
435
- // queue may stored uncorrected mapping indexes, empty them
436
- key_index_pool empty_queue;
437
- std::swap (*m_key_index_pools[conn_id], empty_queue);
438
-
439
379
// set connection to send 'CLUSTER SLOTS' command
440
380
m_connections[conn_id]->set_cluster_slots ();
441
381
}
0 commit comments