Skip to content

Commit 79b7e7d

Browse files
authored
Merge pull request #2 from sportstech/pubsub
Pubsub implementation
2 parents df58bee + 347aa0f commit 79b7e7d

File tree

13 files changed

+443
-76
lines changed

13 files changed

+443
-76
lines changed

README.md

Lines changed: 128 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ Currently supports:
99

1010
# Installation instructions
1111

12-
For instaling graphql-ws, just run this command in your shell
12+
For installing graphql-ws, just run this command in your shell
1313

1414
```bash
1515
pip install graphql-ws
1616
```
1717

18-
## Examples
18+
## Subscription Server
1919

2020
### aiohttp
2121

@@ -63,90 +63,167 @@ async def subscriptions(request, ws):
6363

6464
app.run(host="0.0.0.0", port=8000)
6565
```
66-
67-
And then, plug into a subscribable schema:
66+
### Gevent
67+
For setting up, just plug into your Gevent server.
6868

6969
```python
70-
import asyncio
71-
import graphene
70+
subscription_server = GeventSubscriptionServer(schema)
71+
app.app_protocol = lambda environ_path_info: 'graphql-ws'
7272

73+
@sockets.route('/subscriptions')
74+
def echo_socket(ws):
75+
subscription_server.handle(ws)
76+
return []
77+
```
78+
### Django (with channels)
7379

74-
class Query(graphene.ObjectType):
75-
base = graphene.String()
80+
First `pip install channels` and add it to your django apps
7681

82+
Then add the following to your settings.py
7783

78-
class Subscription(graphene.ObjectType):
79-
count_seconds = graphene.Float(up_to=graphene.Int())
84+
```python
85+
CHANNELS_WS_PROTOCOLS = ["graphql-ws", ]
86+
CHANNEL_LAYERS = {
87+
"default": {
88+
"BACKEND": "asgiref.inmemory.ChannelLayer",
89+
"ROUTING": "django_subscriptions.urls.channel_routing",
90+
},
8091

81-
async def resolve_count_seconds(root, info, up_to):
82-
for i in range(up_to):
83-
yield i
84-
await asyncio.sleep(1.)
85-
yield up_to
92+
}
93+
```
94+
95+
Add the channel routes to your Django server.
8696

97+
```python
98+
from channels.routing import route_class
99+
from graphql_ws.django_channels import GraphQLSubscriptionConsumer
87100

88-
schema = graphene.Schema(query=Query, subscription=Subscription)
101+
channel_routing = [
102+
route_class(GraphQLSubscriptionConsumer, path=r"^/subscriptions"),
103+
]
89104
```
90105

91-
You can see a full example here: https://github.com/graphql-python/graphql-ws/tree/master/examples/aiohttp
106+
## Publish-Subscribe
107+
Included are several publish-subscribe (pubsub) classes for hooking
108+
up your mutations to your subscriptions. When a client makes a
109+
subscription, the pubsub can be used to map from one subscription name
110+
to one or more channel names to subscribe to the right channels.
111+
The subscription query will be re-run every time something is
112+
published to one of these channels. Using these classes, a
113+
subscription is just the result of a mutation.
92114

93-
### Gevent
115+
### Asyncio
94116

95-
For setting up, just plug into your Gevent server.
117+
There are two pubsub classes for asyncio, one that is in-memory and the other
118+
that utilizes Redis (for production), via the [aredis](https://github.com/NoneGG/aredis) libary, which
119+
is a asynchronous port of the excellent [redis-py](https://github.com/andymccurdy/redis-py) library.
120+
121+
The schema for asyncio would look something like this below:
96122

97123
```python
98-
subscription_server = GeventSubscriptionServer(schema)
99-
app.app_protocol = lambda environ_path_info: 'graphql-ws'
124+
import asyncio
125+
import graphene
100126

101-
@sockets.route('/subscriptions')
102-
def echo_socket(ws):
103-
subscription_server.handle(ws)
104-
return []
127+
from graphql_ws.pubsub import AsyncioPubsub
128+
129+
# create a new pubsub object; this class is in-memory and does
130+
# not utilze Redis
131+
pubsub = AsyncioPubsub()
132+
133+
134+
class MutationExample(graphene.Mutation):
135+
class Arguments:
136+
input_text = graphene.String()
137+
138+
output_text = graphene.String()
139+
140+
async def mutate(self, info, input_text):
141+
# publish to the pubsub object before returning mutation
142+
await pubsub.publish('BASE', input_text)
143+
return MutationExample(output_text=input_text)
144+
145+
146+
class Mutations(graphene.ObjectType):
147+
mutation_example = MutationExample.Field()
148+
149+
150+
class Subscription(graphene.ObjectType):
151+
mutation_example = graphene.String()
152+
153+
async def resolve_mutation_example(root, info):
154+
try:
155+
# pubsub subscribe_to_channel method returns
156+
# subscription id and an asyncio.Queue
157+
sub_id, q = pubsub.subscribe_to_channel('BASE')
158+
while True:
159+
payload = await q.get()
160+
yield payload
161+
except asyncio.CancelledError:
162+
# unsubscribe subscription id from channel
163+
# when coroutine is cancelled
164+
pubsub.unsubscribe('BASE', sub_id)
165+
166+
schema = graphene.Schema(mutation=Mutations,
167+
subscription=Subscription)
105168
```
106169

107-
And then, plug into a subscribable schema:
170+
You can see a full asyncio example here: https://github.com/graphql-python/graphql-ws/tree/master/examples/aiohttp
171+
172+
### Gevent
173+
174+
There are two pubsub classes for Gevent as well, one that is
175+
in-memory and the other that utilizes Redis (for production), via
176+
[redis-py](https://github.com/andymccurdy/redis-py).
177+
178+
Finally, plug into a subscribable schema:
108179

109180
```python
110181
import graphene
182+
183+
from graphql_ws.pubsub import GeventRxRedisPubsub
111184
from rx import Observable
112185

186+
# create a new pubsub object; in the case you'll need to
187+
# be running a redis-server instance in a separate process
188+
pubsub = GeventRxRedisPubsub()
113189

114-
class Query(graphene.ObjectType):
115-
base = graphene.String()
116190

191+
class MutationExample(graphene.Mutation):
192+
class Arguments:
193+
input_text = graphene.String()
117194

118-
class Subscription(graphene.ObjectType):
119-
count_seconds = graphene.Float(up_to=graphene.Int())
195+
output_text = graphene.String()
120196

121-
async def resolve_count_seconds(root, info, up_to=5):
122-
return Observable.interval(1000)\
123-
.map(lambda i: "{0}".format(i))\
124-
.take_while(lambda i: int(i) <= up_to)
197+
def mutate(self, info, input_text):
198+
# publish to the pubsub before returning mutation
199+
pubsub.publish('BASE', input_text)
200+
return MutationExample(output_text=input_text)
125201

126202

127-
schema = graphene.Schema(query=Query, subscription=Subscription)
128-
```
203+
class Mutations(graphene.ObjectType):
204+
mutation_example = MutationExample.Field()
129205

130-
You can see a full example here: https://github.com/graphql-python/graphql-ws/tree/master/examples/flask_gevent
131206

207+
class Subscription(graphene.ObjectType):
208+
mutation_example = graphene.String()
132209

133-
### Django Channels
210+
def resolve_mutation_example(root, info):
211+
# pubsub subscribe_to_channel method returns an observable
212+
# when observable is disposed of, the subscription will
213+
# be cleaned up and unsubscribed from
214+
return pubsub.subscribe_to_channel('BASE')\
215+
.map(lambda i: "{0}".format(i))
134216

135217

136-
First `pip install channels` and it to your django apps
218+
schema = graphene.Schema(mutation=Mutations,
219+
subscription=Subscription)
220+
```
137221

138-
Then add the following to your settings.py
222+
You can see a full example here: https://github.com/graphql-python/graphql-ws/tree/master/examples/flask_gevent
139223

140-
```python
141-
CHANNELS_WS_PROTOCOLS = ["graphql-ws", ]
142-
CHANNEL_LAYERS = {
143-
"default": {
144-
"BACKEND": "asgiref.inmemory.ChannelLayer",
145-
"ROUTING": "django_subscriptions.urls.channel_routing",
146-
},
147224

148-
}
149-
```
225+
### Django (with channels)
226+
150227

151228
Setup your graphql schema
152229

@@ -167,8 +244,8 @@ class Subscription(graphene.ObjectType):
167244

168245

169246
def resolve_count_seconds(
170-
root,
171-
info,
247+
root,
248+
info,
172249
up_to=5
173250
):
174251
return Observable.interval(1000)\
@@ -192,14 +269,3 @@ GRAPHENE = {
192269
'SCHEMA': 'path.to.schema'
193270
}
194271
```
195-
196-
and finally add the channel routes
197-
198-
```python
199-
from channels.routing import route_class
200-
from graphql_ws.django_channels import GraphQLSubscriptionConsumer
201-
202-
channel_routing = [
203-
route_class(GraphQLSubscriptionConsumer, path=r"^/subscriptions"),
204-
]
205-
```

examples/aiohttp/schema.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,32 @@
22
import asyncio
33
import graphene
44

5+
from graphql_ws.pubsub import AsyncioPubsub
6+
7+
pubsub = AsyncioPubsub()
8+
59

610
class Query(graphene.ObjectType):
711
base = graphene.String()
812

13+
async def resolve_base(root, info):
14+
return 'Hello World!'
15+
16+
17+
class MutationExample(graphene.Mutation):
18+
class Arguments:
19+
input_text = graphene.String()
20+
21+
output_text = graphene.String()
22+
23+
async def mutate(self, info, input_text):
24+
await pubsub.publish('BASE', input_text)
25+
return MutationExample(output_text=input_text)
26+
27+
28+
class Mutations(graphene.ObjectType):
29+
mutation_example = MutationExample.Field()
30+
931

1032
class RandomType(graphene.ObjectType):
1133
seconds = graphene.Int()
@@ -15,6 +37,16 @@ class RandomType(graphene.ObjectType):
1537
class Subscription(graphene.ObjectType):
1638
count_seconds = graphene.Float(up_to=graphene.Int())
1739
random_int = graphene.Field(RandomType)
40+
mutation_example = graphene.String()
41+
42+
async def resolve_mutation_example(root, info):
43+
try:
44+
sub_id, q = pubsub.subscribe_to_channel('BASE')
45+
while True:
46+
payload = await q.get()
47+
yield payload
48+
finally:
49+
pubsub.unsubscribe('BASE', sub_id)
1850

1951
async def resolve_count_seconds(root, info, up_to=5):
2052
for i in range(up_to):
@@ -31,4 +63,5 @@ async def resolve_random_int(root, info):
3163
i += 1
3264

3365

34-
schema = graphene.Schema(query=Query, subscription=Subscription)
66+
schema = graphene.Schema(query=Query, mutation=Mutations,
67+
subscription=Subscription)

examples/flask_gevent/app.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ def graphql_view():
2020

2121

2222
app.add_url_rule(
23-
'/graphql', view_func=GraphQLView.as_view('graphql', schema=schema, graphiql=False))
23+
'/graphql', view_func=GraphQLView.as_view('graphql', schema=schema,
24+
graphiql=False))
2425

2526
subscription_server = GeventSubscriptionServer(schema)
2627
app.app_protocol = lambda environ_path_info: 'graphql-ws'

examples/flask_gevent/schema.py

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,58 @@
1-
import random
21
import graphene
2+
import random
3+
4+
from graphql_ws.pubsub import GeventRxPubsub
35
from rx import Observable
46

7+
pubsub = GeventRxPubsub()
8+
59

610
class Query(graphene.ObjectType):
711
base = graphene.String()
812

13+
def resolve_base(root, info):
14+
return 'Hello World!'
15+
16+
17+
class MutationExample(graphene.Mutation):
18+
class Arguments:
19+
input_text = graphene.String()
20+
21+
output_text = graphene.String()
22+
23+
def mutate(self, info, input_text):
24+
pubsub.publish('BASE', input_text)
25+
return MutationExample(output_text=input_text)
26+
27+
28+
class Mutations(graphene.ObjectType):
29+
mutation_example = MutationExample.Field()
30+
931

1032
class RandomType(graphene.ObjectType):
1133
seconds = graphene.Int()
1234
random_int = graphene.Int()
1335

1436

1537
class Subscription(graphene.ObjectType):
16-
1738
count_seconds = graphene.Int(up_to=graphene.Int())
18-
1939
random_int = graphene.Field(RandomType)
40+
mutation_example = graphene.String()
41+
42+
def resolve_mutation_example(root, info):
43+
# subscribe_to_channel method returns an observable
44+
return pubsub.subscribe_to_channel('BASE')\
45+
.map(lambda i: "{0}".format(i))
2046

2147
def resolve_count_seconds(root, info, up_to=5):
2248
return Observable.interval(1000)\
2349
.map(lambda i: "{0}".format(i))\
2450
.take_while(lambda i: int(i) <= up_to)
2551

2652
def resolve_random_int(root, info):
27-
return Observable.interval(1000).map(lambda i: RandomType(seconds=i, random_int=random.randint(0, 500)))
53+
return Observable.interval(1000).map(
54+
lambda i: RandomType(seconds=i, random_int=random.randint(0, 500)))
2855

2956

30-
schema = graphene.Schema(query=Query, subscription=Subscription)
57+
schema = graphene.Schema(query=Query, mutation=Mutations,
58+
subscription=Subscription)

examples/flask_gevent/template.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,8 @@ def render_graphiql():
117117
</script>
118118
</body>
119119
</html>''').substitute(
120-
GRAPHIQL_VERSION='0.11.7',
120+
GRAPHIQL_VERSION='0.10.2',
121121
SUBSCRIPTIONS_TRANSPORT_VERSION='0.7.0',
122122
subscriptionsEndpoint='ws://localhost:5000/subscriptions',
123-
# subscriptionsEndpoint='ws://localhost:5000/',
124123
endpointURL='/graphql',
125124
)

0 commit comments

Comments
 (0)