Skip to content

Commit ae8dfef

Browse files
authored
add credits (#27)
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 06daba6 commit ae8dfef

File tree

2 files changed

+39
-34
lines changed

2 files changed

+39
-34
lines changed

examples/getting_started/main.py

Lines changed: 37 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# type: ignore
2-
2+
import threading
33

44
from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert,
55
AddressHelper,
@@ -78,8 +78,16 @@ def create_connection() -> Connection:
7878
return connection
7979

8080

81-
def main() -> None:
81+
def threaded_function(addr_queue):
82+
connection = create_connection()
83+
consumer = connection.consumer(addr_queue, handler=MyMessageHandler())
84+
try:
85+
consumer.run()
86+
except KeyboardInterrupt:
87+
pass
88+
8289

90+
def main() -> None:
8391
exchange_name = "test-exchange"
8492
queue_name = "example-queue"
8593
routing_key = "routing-key"
@@ -91,72 +99,67 @@ def main() -> None:
9199
management = connection.management()
92100

93101
print("declaring exchange and queue")
94-
#management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
102+
# management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
95103

96104
management.declare_queue(
97105
StreamSpecification(name=queue_name)
98106
# QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
99107
)
100108

101109
print("binding queue to exchange")
102-
#bind_name = management.bind(
110+
# bind_name = management.bind(
103111
# BindingSpecification(
104112
# source_exchange=exchange_name,
105113
# destination_queue=queue_name,
106114
# binding_key=routing_key,
107115
# )
108-
#)
116+
# )
109117

110-
#addr = AddressHelper.exchange_address(exchange_name, routing_key)
118+
# addr = AddressHelper.exchange_address(exchange_name, routing_key)
111119

112120
addr_queue = AddressHelper.queue_address(queue_name)
113121

114-
print("create a publisher and publish a test message")
122+
thread = threading.Thread(target=threaded_function, args=(addr_queue,))
123+
thread.start()
124+
## press control + c to terminate the consumer
125+
126+
# print("create a publisher and publish a test message")
115127
publisher = connection.publisher(addr_queue)
116128

117-
print("purging the queue")
118-
#messages_purged = management.purge_queue(queue_name)
129+
# print("purging the queue")
130+
# messages_purged = management.purge_queue(queue_name)
119131

120-
#print("messages purged: " + str(messages_purged))
132+
# print("messages purged: " + str(messages_purged))
121133
# management.close()
122134

123135
# publish 10 messages
124136
for i in range(messages_to_publish):
125137
status = publisher.publish(Message(body="test"))
126-
#if status.ACCEPTED:
127-
# print("message accepted")
128-
#elif status.RELEASED:
129-
# print("message not routed")
130-
#elif status.REJECTED:
131-
# print("message not rejected")
132-
138+
# # if status.ACCEPTED:
139+
# # print("message accepted")
140+
# # elif status.RELEASED:
141+
# # print("message not routed")
142+
# # elif status.REJECTED:
143+
# # print("message not rejected")
144+
#
133145
publisher.close()
146+
#
147+
# print(
148+
# "create a consumer and consume the test message - press control + c to terminate to consume"
149+
# )
134150

135-
print(
136-
"create a consumer and consume the test message - press control + c to terminate to consume"
137-
)
138-
139-
stream_filter_options = StreamFilterOptions()
140-
stream_filter_options.offset(0)
141-
142-
consumer = connection.consumer(addr_queue, handler=MyMessageHandler(), stream_filter_options=stream_filter_options)
143-
144-
try:
145-
consumer.run()
146-
except KeyboardInterrupt:
147-
pass
148-
151+
input("Press Enter to continue...")
149152
print("cleanup")
150-
consumer.close()
153+
# consumer.close()
151154
# once we finish consuming if we close the connection we need to create a new one
152155
# connection = create_connection()
153156
# management = connection.management()
154157

155158
print("unbind")
156-
#management.unbind(bind_name)
159+
# management.unbind(bind_name)
157160

158161
print("delete queue")
159-
#management.delete_queue(queue_name)
162+
# management.delete_queue(queue_name)
160163

161164
print("delete exchange")
162165
management.delete_exchange(exchange_name)

rabbitmq_amqp_python_client/consumer.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,13 @@ def _create_receiver(self, addr: str) -> BlockingReceiver:
6060
receiver = self._conn.create_receiver(
6161
addr, options=ReceiverOptionUnsettled(addr), handler=self._handler
6262
)
63+
receiver.credit = 1
6364
else:
6465
print("stream option is not None")
6566
receiver = self._conn.create_receiver(
6667
addr, options=ReceiverOptionUnsettledWithFilters(addr, self._stream_options), handler=self._handler
6768
)
69+
receiver.credit = 1
6870

6971
return receiver
7072

0 commit comments

Comments
 (0)