Skip to content

Commit 71142aa

Browse files
pubsub unit tests
1 parent 63f5ed3 commit 71142aa

File tree

4 files changed

+244
-41
lines changed

4 files changed

+244
-41
lines changed

socketio/kombu_manager.py

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
import json
21
import pickle
32

4-
import six
53
try:
64
import kombu
75
except ImportError:
@@ -10,7 +8,7 @@
108
from .pubsub_manager import PubSubManager
119

1210

13-
class KombuManager(PubSubManager):
11+
class KombuManager(PubSubManager): # pragma: no cover
1412
"""Client manager that uses kombu for inter-process messaging.
1513
1614
This class implements a client manager backend for event sharing across
@@ -53,16 +51,4 @@ def _listen(self):
5351
while True:
5452
message = listen_queue.get(block=True)
5553
message.ack()
56-
data = None
57-
if isinstance(message.payload, six.binary_type):
58-
try:
59-
data = pickle.loads(message.payload)
60-
except:
61-
pass
62-
if data is None:
63-
try:
64-
data = json.loads(message.payload)
65-
except:
66-
pass
67-
if data:
68-
yield data
54+
yield message.payload

socketio/pubsub_manager.py

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
from functools import partial
22
import uuid
33

4+
import json
5+
import pickle
6+
import six
7+
48
from .base_manager import BaseManager
59

610

@@ -18,6 +22,8 @@ class PubSubManager(BaseManager):
1822
:param channel: The channel name on which the server sends and receives
1923
notifications.
2024
"""
25+
name = 'pubsub'
26+
2127
def __init__(self, channel='socketio'):
2228
super(PubSubManager, self).__init__()
2329
self.channel = channel
@@ -40,6 +46,8 @@ def emit(self, event, data, namespace=None, room=None, skip_sid=None,
4046
"""
4147
namespace = namespace or '/'
4248
if callback is not None:
49+
if room is None:
50+
raise ValueError('Cannot use callback without a room set.')
4351
id = self._generate_ack_id(room, namespace, callback)
4452
callback = (room, namespace, id)
4553
else:
@@ -59,7 +67,7 @@ def _publish(self, data):
5967
support pub/sub backends.
6068
"""
6169
raise NotImplementedError('This method must be implemented in a '
62-
'subclass.')
70+
'subclass.') # pragma: no cover
6371

6472
def _listen(self):
6573
"""Return the next message published on the Socket.IO channel,
@@ -69,7 +77,7 @@ def _listen(self):
6977
support pub/sub backends.
7078
"""
7179
raise NotImplementedError('This method must be implemented in a '
72-
'subclass.')
80+
'subclass.') # pragma: no cover
7381

7482
def _handle_emit(self, message):
7583
# Events with callbacks are very tricky to handle across hosts
@@ -111,10 +119,24 @@ def _handle_close_room(self, message):
111119

112120
def _thread(self):
113121
for message in self._listen():
114-
if 'method' in message:
115-
if message['method'] == 'emit':
116-
self._handle_emit(message)
117-
elif message['method'] == 'callback':
118-
self._handle_callback(message)
119-
elif message['method'] == 'close_room':
120-
self._handle_close_room(message)
122+
data = None
123+
if isinstance(message, dict):
124+
data = message
125+
else:
126+
if isinstance(message, six.binary_type): # pragma: no cover
127+
try:
128+
data = pickle.loads(message)
129+
except:
130+
pass
131+
if data is None:
132+
try:
133+
data = json.loads(message)
134+
except:
135+
pass
136+
if data and 'method' in data:
137+
if data['method'] == 'emit':
138+
self._handle_emit(data)
139+
elif data['method'] == 'callback':
140+
self._handle_callback(data)
141+
elif data['method'] == 'close_room':
142+
self._handle_close_room(data)

socketio/redis_manager.py

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
import json
21
import pickle
32

4-
import six
53
try:
64
import redis
75
except ImportError:
@@ -10,7 +8,7 @@
108
from .pubsub_manager import PubSubManager
119

1210

13-
class RedisManager(PubSubManager):
11+
class RedisManager(PubSubManager): # pragma: no cover
1412
"""Redis based client manager.
1513
1614
This class implements a Redis backend for event sharing across multiple
@@ -48,17 +46,5 @@ def _listen(self):
4846
for message in self.pubsub.listen():
4947
if message['channel'] == channel and \
5048
message['type'] == 'message' and 'data' in message:
51-
data = None
52-
if isinstance(message['data'], six.binary_type):
53-
try:
54-
data = pickle.loads(message['data'])
55-
except:
56-
pass
57-
if data is None:
58-
try:
59-
data = json.loads(message['data'])
60-
except:
61-
pass
62-
if data:
63-
yield data
49+
yield message['data']
6450
self.pubsub.unsubscribe(self.channel)

tests/test_pubsub_manager.py

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
import functools
2+
import unittest
3+
4+
import six
5+
if six.PY3:
6+
from unittest import mock
7+
else:
8+
import mock
9+
10+
from socketio import base_manager
11+
from socketio import pubsub_manager
12+
13+
14+
class TestBaseManager(unittest.TestCase):
15+
def setUp(self):
16+
mock_server = mock.MagicMock()
17+
self.pm = pubsub_manager.PubSubManager()
18+
self.pm._publish = mock.MagicMock()
19+
self.pm.initialize(mock_server)
20+
21+
def test_default_init(self):
22+
self.assertEqual(self.pm.channel, 'socketio')
23+
self.assertEqual(len(self.pm.host_id), 32)
24+
self.pm.server.start_background_task.assert_called_once_with(
25+
self.pm._thread)
26+
27+
def test_custom_init(self):
28+
pubsub = pubsub_manager.PubSubManager(channel='foo')
29+
self.assertEqual(pubsub.channel, 'foo')
30+
self.assertEqual(len(pubsub.host_id), 32)
31+
32+
def test_emit(self):
33+
self.pm.emit('foo', 'bar')
34+
self.pm._publish.assert_called_once_with(
35+
{'method': 'emit', 'event': 'foo', 'data': 'bar',
36+
'namespace': '/', 'room': None, 'skip_sid': None,
37+
'callback': None})
38+
39+
def test_emit_with_namespace(self):
40+
self.pm.emit('foo', 'bar', namespace='/baz')
41+
self.pm._publish.assert_called_once_with(
42+
{'method': 'emit', 'event': 'foo', 'data': 'bar',
43+
'namespace': '/baz', 'room': None, 'skip_sid': None,
44+
'callback': None})
45+
46+
def test_emit_with_room(self):
47+
self.pm.emit('foo', 'bar', room='baz')
48+
self.pm._publish.assert_called_once_with(
49+
{'method': 'emit', 'event': 'foo', 'data': 'bar',
50+
'namespace': '/', 'room': 'baz', 'skip_sid': None,
51+
'callback': None})
52+
53+
def test_emit_with_skip_sid(self):
54+
self.pm.emit('foo', 'bar', skip_sid='baz')
55+
self.pm._publish.assert_called_once_with(
56+
{'method': 'emit', 'event': 'foo', 'data': 'bar',
57+
'namespace': '/', 'room': None, 'skip_sid': 'baz',
58+
'callback': None})
59+
60+
def test_emit_with_callback(self):
61+
with mock.patch.object(self.pm, '_generate_ack_id',
62+
return_value='123'):
63+
self.pm.emit('foo', 'bar', room='baz', callback='cb')
64+
self.pm._publish.assert_called_once_with(
65+
{'method': 'emit', 'event': 'foo', 'data': 'bar',
66+
'namespace': '/', 'room': 'baz', 'skip_sid': None,
67+
'callback': ('baz', '/', '123')})
68+
69+
def test_emit_with_callback_missing_room(self):
70+
with mock.patch.object(self.pm, '_generate_ack_id',
71+
return_value='123'):
72+
self.assertRaises(ValueError, self.pm.emit, 'foo', 'bar',
73+
callback='cb')
74+
75+
def test_close_room(self):
76+
self.pm.close_room('foo')
77+
self.pm._publish.assert_called_once_with(
78+
{'method': 'close_room', 'room': 'foo', 'namespace': '/'})
79+
80+
def test_close_room_with_namespace(self):
81+
self.pm.close_room('foo', '/bar')
82+
self.pm._publish.assert_called_once_with(
83+
{'method': 'close_room', 'room': 'foo', 'namespace': '/bar'})
84+
85+
def test_handle_emit(self):
86+
with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit:
87+
self.pm._handle_emit({'event': 'foo', 'data': 'bar'})
88+
super_emit.assert_called_once_with('foo', 'bar', namespace=None,
89+
room=None, skip_sid=None,
90+
callback=None)
91+
92+
def test_handle_emit_with_namespace(self):
93+
with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit:
94+
self.pm._handle_emit({'event': 'foo', 'data': 'bar',
95+
'namespace': '/baz'})
96+
super_emit.assert_called_once_with('foo', 'bar', namespace='/baz',
97+
room=None, skip_sid=None,
98+
callback=None)
99+
100+
def test_handle_emiti_with_room(self):
101+
with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit:
102+
self.pm._handle_emit({'event': 'foo', 'data': 'bar',
103+
'room': 'baz'})
104+
super_emit.assert_called_once_with('foo', 'bar', namespace=None,
105+
room='baz', skip_sid=None,
106+
callback=None)
107+
108+
def test_handle_emit_with_skip_sid(self):
109+
with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit:
110+
self.pm._handle_emit({'event': 'foo', 'data': 'bar',
111+
'skip_sid': '123'})
112+
super_emit.assert_called_once_with('foo', 'bar', namespace=None,
113+
room=None, skip_sid='123',
114+
callback=None)
115+
116+
def test_handle_emit_with_callback(self):
117+
host_id = self.pm.host_id
118+
with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit:
119+
self.pm._handle_emit({'event': 'foo', 'data': 'bar',
120+
'namespace': '/baz',
121+
'callback': ('sid', '/baz', 123)})
122+
self.assertEqual(super_emit.call_count, 1)
123+
self.assertEqual(super_emit.call_args[0], ('foo', 'bar'))
124+
self.assertEqual(super_emit.call_args[1]['namespace'], '/baz')
125+
self.assertIsNone(super_emit.call_args[1]['room'])
126+
self.assertIsNone(super_emit.call_args[1]['skip_sid'])
127+
self.assertIsInstance(super_emit.call_args[1]['callback'],
128+
functools.partial)
129+
super_emit.call_args[1]['callback']('one', 2, 'three')
130+
self.pm._publish.assert_called_once_with(
131+
{'method': 'callback', 'host_id': host_id, 'sid': 'sid',
132+
'namespace': '/baz', 'id': 123, 'args': ('one', 2, 'three')})
133+
134+
def test_handle_callback(self):
135+
host_id = self.pm.host_id
136+
with mock.patch.object(self.pm, 'trigger_callback') as trigger:
137+
self.pm._handle_callback({'method': 'callback',
138+
'host_id': host_id, 'sid': 'sid',
139+
'namespace': '/', 'id': 123,
140+
'args': ('one', 2)})
141+
trigger.assert_called_once_with('sid', '/', 123, ('one', 2))
142+
143+
def test_handle_callback_bad_host_id(self):
144+
with mock.patch.object(self.pm, 'trigger_callback') as trigger:
145+
self.pm._handle_callback({'method': 'callback',
146+
'host_id': 'bad', 'sid': 'sid',
147+
'namespace': '/', 'id': 123,
148+
'args': ('one', 2)})
149+
self.assertEqual(trigger.call_count, 0)
150+
151+
def test_handle_callback_missing_args(self):
152+
host_id = self.pm.host_id
153+
with mock.patch.object(self.pm, 'trigger_callback') as trigger:
154+
self.pm._handle_callback({'method': 'callback',
155+
'host_id': host_id, 'sid': 'sid',
156+
'namespace': '/', 'id': 123})
157+
self.pm._handle_callback({'method': 'callback',
158+
'host_id': host_id, 'sid': 'sid',
159+
'namespace': '/'})
160+
self.pm._handle_callback({'method': 'callback',
161+
'host_id': host_id, 'sid': 'sid'})
162+
self.pm._handle_callback({'method': 'callback',
163+
'host_id': host_id})
164+
self.assertEqual(trigger.call_count, 0)
165+
166+
def test_handle_close_room(self):
167+
with mock.patch.object(base_manager.BaseManager, 'close_room') \
168+
as super_close_room:
169+
self.pm._handle_close_room({'method': 'close_room',
170+
'room': 'foo'})
171+
super_close_room.assert_called_once_with(room='foo',
172+
namespace=None)
173+
174+
def test_handle_close_room_with_namespace(self):
175+
with mock.patch.object(base_manager.BaseManager, 'close_room') \
176+
as super_close_room:
177+
self.pm._handle_close_room({'method': 'close_room',
178+
'room': 'foo', 'namespace': '/bar'})
179+
super_close_room.assert_called_once_with(room='foo',
180+
namespace='/bar')
181+
182+
def test_background_thread(self):
183+
self.pm._handle_emit = mock.MagicMock()
184+
self.pm._handle_callback = mock.MagicMock()
185+
self.pm._handle_close_room = mock.MagicMock()
186+
187+
def messages():
188+
import pickle
189+
yield {'method': 'emit', 'value': 'foo'}
190+
yield {'missing': 'method'}
191+
yield '{"method": "callback", "value": "bar"}'
192+
yield {'method': 'bogus'}
193+
yield pickle.dumps({'method': 'close_room', 'value': 'baz'})
194+
yield 'bad json'
195+
yield b'bad pickled'
196+
raise KeyboardInterrupt
197+
198+
self.pm._listen = mock.MagicMock(side_effect=messages)
199+
try:
200+
self.pm._thread()
201+
except KeyboardInterrupt:
202+
pass
203+
204+
self.pm._handle_emit.assert_called_once_with(
205+
{'method': 'emit', 'value': 'foo'})
206+
self.pm._handle_callback.assert_called_once_with(
207+
{'method': 'callback', 'value': 'bar'})
208+
self.pm._handle_close_room.assert_called_once_with(
209+
{'method': 'close_room', 'value': 'baz'})

0 commit comments

Comments
 (0)