Skip to content
This repository was archived by the owner on Jan 9, 2024. It is now read-only.

Commit 80b4369

Browse files
committed
use gevent for pipeline execution
1 parent 018826d commit 80b4369

File tree

3 files changed

+22
-6
lines changed

3 files changed

+22
-6
lines changed

rediscluster/pipeline.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
# python std lib
44
import sys
5+
import logging
56

67
# rediscluster imports
78
from .client import RedisCluster
@@ -15,6 +16,10 @@
1516
from redis.exceptions import ConnectionError, RedisError, TimeoutError
1617
from redis._compat import imap, unicode
1718

19+
from gevent import monkey; monkey.patch_all()
20+
import gevent
21+
22+
log = logging.getLogger(__name__)
1823

1924
ERRORS_ALLOW_RETRY = (ConnectionError, TimeoutError, MovedError, AskError, TryAgainError)
2025

@@ -174,6 +179,11 @@ def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=T
174179
# If it fails the configured number of times then raise exception back to caller of this method
175180
raise ClusterDownError("CLUSTERDOWN error. Unable to rebuild the cluster")
176181

182+
def _execute_node_commands(self, n):
183+
n.write()
184+
185+
n.read()
186+
177187
def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=True):
178188
"""
179189
Send a bunch of cluster commands to the redis cluster.
@@ -227,11 +237,11 @@ def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=
227237
# so that we can read them all in parallel as they come back.
228238
# we dont' multiplex on the sockets as they come available, but that shouldn't make too much difference.
229239
node_commands = nodes.values()
240+
events = []
230241
for n in node_commands:
231-
n.write()
242+
events.append(gevent.spawn(self._execute_node_commands, n))
232243

233-
for n in node_commands:
234-
n.read()
244+
gevent.joinall(events)
235245

236246
# release all of the redis connections we allocated earlier back into the connection pool.
237247
# we used to do this step as part of a try/finally block, but it is really dangerous to
@@ -269,6 +279,8 @@ def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=
269279
# If a lot of commands have failed, we'll be setting the
270280
# flag to rebuild the slots table from scratch. So MOVED errors should
271281
# correct themselves fairly quickly.
282+
283+
log.debug("pipeline has failed commands: {}".format(attempt))
272284
self.connection_pool.nodes.increment_reinitialize_counter(len(attempt))
273285
for c in attempt:
274286
try:

requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
redis>=3.0.0,<4.0.0
2+
gevent==1.5.0
3+
greenlet==0.4.16

setup.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
setup(
2222
name="redis-py-cluster-patched",
23-
version="2.1.0.999.8",
23+
version="2.1.0.999.9",
2424
description="Library for communicating with Redis Clusters. Built on top of redis-py lib",
2525
long_description=readme + '\n\n' + history,
2626
long_description_content_type="text/markdown",
@@ -30,10 +30,12 @@
3030
maintainer_email='Grokzen@gmail.com',
3131
packages=["rediscluster"],
3232
url='http://github.com/grokzen/redis-py-cluster',
33-
download_url="https://github.com/duke-cliff/redis-py-cluster/archive/2.1.0.999.8.tar.gz",
33+
download_url="https://github.com/duke-cliff/redis-py-cluster/archive/2.1.0.999.9.tar.gz",
3434
license='MIT',
3535
install_requires=[
36-
'redis>=3.0.0,<4.0.0'
36+
'redis>=3.0.0,<4.0.0',
37+
'gevent==1.5.0',
38+
'greenlet==0.4.16',
3739
],
3840
python_requires=">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4",
3941
extras_require={

0 commit comments

Comments
 (0)