Skip to content

Commit 6ba0c2e

Browse files
committed
adding oauth example
1 parent 797b6f3 commit 6ba0c2e

File tree

2 files changed

+222
-1
lines changed

2 files changed

+222
-1
lines changed

examples/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ Client examples
33
- [Getting started](./getting_started/getting_started.py) - Producer and Consumer example without reconnection
44
- [Reconnection](./reconnection/reconnection_example.py) - Producer and Consumer example with reconnection
55
- [TLS](./tls/tls_example.py) - Producer and Consumer using a TLS connection
6-
- [Streams](./streams/example_with_streams.py) - Example supporting stream capabilities
6+
- [Streams](./streams/example_with_streams.py) - Example supporting stream capabilities
7+
- [Oauth](./oauth/oauth.py) - Connection through Oauth token

examples/oauth/oaut.py

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
# type: ignore
2+
3+
4+
import base64
5+
from datetime import datetime, timedelta
6+
7+
import jwt
8+
9+
from rabbitmq_amqp_python_client import ( # PosixSSlConfigurationContext,; PosixClientCert,
10+
AddressHelper,
11+
AMQPMessagingHandler,
12+
Connection,
13+
Environment,
14+
Event,
15+
ExchangeSpecification,
16+
ExchangeToQueueBindingSpecification,
17+
Message,
18+
OAuth2Options,
19+
OutcomeState,
20+
QuorumQueueSpecification,
21+
)
22+
23+
MESSAGES_TO_PUBLISH = 100
24+
25+
26+
class MyMessageHandler(AMQPMessagingHandler):
27+
28+
def __init__(self):
29+
super().__init__()
30+
self._count = 0
31+
32+
def on_amqp_message(self, event: Event):
33+
print("received message: " + str(event.message.body))
34+
35+
# accepting
36+
self.delivery_context.accept(event)
37+
38+
# in case of rejection (+eventually deadlettering)
39+
# self.delivery_context.discard(event)
40+
41+
# in case of requeuing
42+
# self.delivery_context.requeue(event)
43+
44+
# annotations = {}
45+
# annotations[symbol('x-opt-string')] = 'x-test1'
46+
# in case of requeuing with annotations added
47+
# self.delivery_context.requeue_with_annotations(event, annotations)
48+
49+
# in case of rejection with annotations added
50+
# self.delivery_context.discard_with_annotations(event)
51+
52+
print("count " + str(self._count))
53+
54+
self._count = self._count + 1
55+
56+
if self._count == MESSAGES_TO_PUBLISH:
57+
print("closing receiver")
58+
# if you want you can add cleanup operations here
59+
60+
def on_connection_closed(self, event: Event):
61+
# if you want you can add cleanup operations here
62+
print("connection closed")
63+
64+
def on_link_closed(self, event: Event) -> None:
65+
# if you want you can add cleanup operations here
66+
print("link closed")
67+
68+
69+
def create_connection(environment: Environment) -> Connection:
70+
connection = environment.connection()
71+
# in case of SSL enablement
72+
# ca_cert_file = ".ci/certs/ca_certificate.pem"
73+
# client_cert = ".ci/certs/client_certificate.pem"
74+
# client_key = ".ci/certs/client_key.pem"
75+
# connection = Connection(
76+
# "amqps://guest:guest@localhost:5671/",
77+
# ssl_context=PosixSslConfigurationContext(
78+
# ca_cert=ca_cert_file,
79+
# client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
80+
# ),
81+
# )
82+
connection.dial()
83+
84+
return connection
85+
86+
87+
def main() -> None:
88+
89+
exchange_name = "test-exchange"
90+
queue_name = "example-queue"
91+
routing_key = "routing-key"
92+
93+
print("connection to amqp server")
94+
oaut_token = token(
95+
datetime.now() + timedelta(milliseconds=2500),
96+
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGH",
97+
)
98+
environment = Environment(
99+
uri="amqp://localhost:5672", oauth2_options=OAuth2Options(token=oaut_token)
100+
)
101+
connection = create_connection(environment)
102+
103+
# you can refresh the oaut token with the connection api
104+
oaut_token = token(
105+
datetime.now() + timedelta(milliseconds=10000),
106+
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGH",
107+
)
108+
109+
connection.refresh_token(
110+
oaut_token,
111+
)
112+
113+
management = connection.management()
114+
115+
print("declaring exchange and queue")
116+
management.declare_exchange(ExchangeSpecification(name=exchange_name))
117+
118+
management.declare_queue(
119+
QuorumQueueSpecification(name=queue_name)
120+
# QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
121+
)
122+
123+
print("binding queue to exchange")
124+
bind_name = management.bind(
125+
ExchangeToQueueBindingSpecification(
126+
source_exchange=exchange_name,
127+
destination_queue=queue_name,
128+
binding_key=routing_key,
129+
)
130+
)
131+
132+
addr = AddressHelper.exchange_address(exchange_name, routing_key)
133+
134+
addr_queue = AddressHelper.queue_address(queue_name)
135+
136+
print("create a publisher and publish a test message")
137+
publisher = connection.publisher(addr)
138+
139+
print("purging the queue")
140+
messages_purged = management.purge_queue(queue_name)
141+
142+
print("messages purged: " + str(messages_purged))
143+
# management.close()
144+
145+
# publish 10 messages
146+
for i in range(MESSAGES_TO_PUBLISH):
147+
print("publishing")
148+
status = publisher.publish(Message(body="test"))
149+
if status.remote_state == OutcomeState.ACCEPTED:
150+
print("message accepted")
151+
elif status.remote_state == OutcomeState.RELEASED:
152+
print("message not routed")
153+
elif status.remote_state == OutcomeState.REJECTED:
154+
print("message not rejected")
155+
156+
publisher.close()
157+
158+
print(
159+
"create a consumer and consume the test message - press control + c to terminate to consume"
160+
)
161+
consumer = connection.consumer(addr_queue, message_handler=MyMessageHandler())
162+
163+
try:
164+
consumer.run()
165+
except KeyboardInterrupt:
166+
pass
167+
168+
print("cleanup")
169+
consumer.close()
170+
# once we finish consuming if we close the connection we need to create a new one
171+
# connection = create_connection()
172+
management = connection.management()
173+
174+
print("unbind")
175+
management.unbind(bind_name)
176+
177+
print("delete queue")
178+
management.delete_queue(queue_name)
179+
180+
print("delete exchange")
181+
management.delete_exchange(exchange_name)
182+
183+
print("closing connections")
184+
management.close()
185+
print("after management closing")
186+
environment.close()
187+
print("after connection closing")
188+
189+
190+
def token(duration: datetime, token: str) -> str:
191+
# Decode the base64 key
192+
decoded_key = base64.b64decode(token)
193+
194+
# Define the claims
195+
claims = {
196+
"iss": "unit_test",
197+
"aud": "rabbitmq",
198+
"exp": int(duration.timestamp()),
199+
"scope": ["rabbitmq.configure:*/*", "rabbitmq.write:*/*", "rabbitmq.read:*/*"],
200+
"random": random_string(6),
201+
}
202+
203+
# Create the token with the claims and sign it
204+
jwt_token = jwt.encode(
205+
claims, decoded_key, algorithm="HS256", headers={"kid": "token-key"}
206+
)
207+
208+
return jwt_token
209+
210+
211+
# Helper function to generate a random string (replace with your implementation)
212+
def random_string(length: int) -> str:
213+
import random
214+
import string
215+
216+
return "".join(random.choices(string.ascii_letters + string.digits, k=length))
217+
218+
219+
if __name__ == "__main__":
220+
main()

0 commit comments

Comments
 (0)