@@ -196,9 +196,12 @@ def _get_commands_by_node(self, cmds):
196
196
197
197
master_node = self .connection_pool .get_node_by_slot (slot )
198
198
199
+ # for the same master_node, it should always get the same proxy_node to group
200
+ # as many commands as possible per node
199
201
if master_node ['name' ] in proxy_node_by_master :
200
202
node = proxy_node_by_master [master_node ['name' ]]
201
203
else :
204
+ # TODO: should determine if using replicas by if command is read only
202
205
node = self .connection_pool .get_node_by_slot (slot , self .read_from_replicas )
203
206
proxy_node_by_master [master_node ['name' ]] = node
204
207
@@ -216,7 +219,7 @@ def _get_commands_by_node(self, cmds):
216
219
217
220
nodes [node_name ].append (c )
218
221
219
- return nodes
222
+ return nodes , connection_by_node
220
223
221
224
def _execute_single_command (self , cmd ):
222
225
try :
@@ -234,43 +237,69 @@ def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=
234
237
"""
235
238
# the first time sending the commands we send all of the commands that were queued up.
236
239
# if we have to run through it again, we only retry the commands that failed.
237
- attempt = sorted (stack , key = lambda x : x .position )
238
-
239
- # build a list of node objects based on node names we need to
240
- nodes = self ._get_commands_by_node (attempt )
241
-
242
- # send the commands in sequence.
243
- # we write to all the open sockets for each node first, before reading anything
244
- # this allows us to flush all the requests out across the network essentially in parallel
245
- # so that we can read them all in parallel as they come back.
246
- # we dont' multiplex on the sockets as they come available, but that shouldn't make too much difference.
247
- node_commands = nodes .values ()
248
- events = []
249
- for n in node_commands :
250
- events .append (gevent .spawn (self ._execute_node_commands , n ))
251
-
252
- gevent .joinall (events )
253
-
254
- # release all of the redis connections we allocated earlier back into the connection pool.
255
- # we used to do this step as part of a try/finally block, but it is really dangerous to
256
- # release connections back into the pool if for some reason the socket has data still left in it
257
- # from a previous operation. The write and read operations already have try/catch around them for
258
- # all known types of errors including connection and socket level errors.
259
- # So if we hit an exception, something really bad happened and putting any of
260
- # these connections back into the pool is a very bad idea.
261
- # the socket might have unread buffer still sitting in it, and then the
262
- # next time we read from it we pass the buffered result back from a previous
263
- # command and every single request after to that connection will always get
264
- # a mismatched result. (not just theoretical, I saw this happen on production x.x).
265
- for conn in connection_by_node .values ():
266
- self .connection_pool .release (conn )
240
+ cmds = sorted (stack , key = lambda x : x .position )
241
+
242
+ max_redirects = 5
243
+ cur_attempt = 0
244
+
245
+ while cur_attempt < max_redirects :
246
+
247
+ # build a list of node objects based on node names we need to
248
+ nodes , connection_by_node = self ._get_commands_by_node (cmds )
249
+
250
+ # send the commands in sequence.
251
+ # we write to all the open sockets for each node first, before reading anything
252
+ # this allows us to flush all the requests out across the network essentially in parallel
253
+ # so that we can read them all in parallel as they come back.
254
+ # we dont' multiplex on the sockets as they come available, but that shouldn't make too much difference.
255
+
256
+ # duke-cliff: I think it would still be faster if we use gevent to make the command in parallel
257
+ # the io is non-blocking, but serialization/deserialization will still be blocking previously
258
+ node_commands = nodes .values ()
259
+ events = []
260
+ for n in node_commands :
261
+ events .append (gevent .spawn (self ._execute_node_commands , n ))
262
+
263
+ gevent .joinall (events )
264
+
265
+ # release all of the redis connections we allocated earlier back into the connection pool.
266
+ # we used to do this step as part of a try/finally block, but it is really dangerous to
267
+ # release connections back into the pool if for some reason the socket has data still left in it
268
+ # from a previous operation. The write and read operations already have try/catch around them for
269
+ # all known types of errors including connection and socket level errors.
270
+ # So if we hit an exception, something really bad happened and putting any of
271
+ # these connections back into the pool is a very bad idea.
272
+ # the socket might have unread buffer still sitting in it, and then the
273
+ # next time we read from it we pass the buffered result back from a previous
274
+ # command and every single request after to that connection will always get
275
+ # a mismatched result. (not just theoretical, I saw this happen on production x.x).
276
+ for conn in connection_by_node .values ():
277
+ self .connection_pool .release (conn )
278
+
279
+ # will regroup moved commands and retry using pipeline(stacked commands)
280
+ # this would increase the pipeline performance a lot
281
+ moved_cmds = []
282
+ for c in cmds :
283
+ if isinstance (c .result , MovedError ):
284
+ e = c .result
285
+ node = self .connection_pool .nodes .get_node (e .host , e .port , server_type = 'master' )
286
+ self .connection_pool .nodes .move_slot_to_node (e .slot_id , node )
287
+
288
+ moved_cmds .append (c )
289
+
290
+ if moved_cmds :
291
+ cur_attempt += 1
292
+ cmds = sorted (moved_cmds , key = lambda x : x .position )
293
+ continue
294
+
295
+ break
267
296
268
297
# if the response isn't an exception it is a valid response from the node
269
298
# we're all done with that command, YAY!
270
299
# if we have more commands to attempt, we've run into problems.
271
300
# collect all the commands we are allowed to retry.
272
301
# (MOVED, ASK, or connection errors or timeout errors)
273
- attempt = sorted ([c for c in attempt if isinstance (c .result , ERRORS_ALLOW_RETRY )], key = lambda x : x .position )
302
+ attempt = sorted ([c for c in stack if isinstance (c .result , ERRORS_ALLOW_RETRY )], key = lambda x : x .position )
274
303
if attempt and allow_redirections :
275
304
# RETRY MAGIC HAPPENS HERE!
276
305
# send these remaing comamnds one at a time using `execute_command`
@@ -288,14 +317,19 @@ def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=
288
317
# flag to rebuild the slots table from scratch. So MOVED errors should
289
318
# correct themselves fairly quickly.
290
319
291
- log .debug ("pipeline has failed commands: {}" .format ([c .result for c in attempt ]))
320
+ # with the previous redirect retries, I could barely see the slow mode happening again
321
+ log .info ("pipeline in slow mode to execute failed commands: {}" .format ([c .result for c in attempt ]))
292
322
293
323
self .connection_pool .nodes .increment_reinitialize_counter (len (attempt ))
324
+
325
+ # even in the slow mode, we could use gevent to make things faster
294
326
events = []
295
327
for c in attempt :
296
328
events .append (gevent .spawn (self ._execute_single_command , c ))
329
+
297
330
gevent .joinall (events )
298
331
332
+
299
333
# turn the response back into a simple flat array that corresponds
300
334
# to the sequence of commands issued in the stack in pipeline.execute()
301
335
response = [c .result for c in sorted (stack , key = lambda x : x .position )]
0 commit comments