@@ -80,6 +80,8 @@ def create_connection() -> Connection:
80
80
81
81
def threaded_function (addr_queue ):
82
82
connection = create_connection ()
83
+ offset_specification = StreamFilterOptions ()
84
+ offset_specification .offset (10 )
83
85
consumer = connection .consumer (addr_queue , handler = MyMessageHandler ())
84
86
try :
85
87
consumer .run ()
@@ -101,10 +103,10 @@ def main() -> None:
101
103
print ("declaring exchange and queue" )
102
104
# management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
103
105
104
- management .declare_queue (
105
- StreamSpecification (name = queue_name )
106
+ # management.declare_queue(
107
+ # StreamSpecification(name=queue_name)
106
108
# QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
107
- )
109
+ # )
108
110
109
111
print ("binding queue to exchange" )
110
112
# bind_name = management.bind(
@@ -124,7 +126,7 @@ def main() -> None:
124
126
## press control + c to terminate the consumer
125
127
126
128
# print("create a publisher and publish a test message")
127
- publisher = connection .publisher (addr_queue )
129
+ # publisher = connection.publisher(addr_queue)
128
130
129
131
# print("purging the queue")
130
132
# messages_purged = management.purge_queue(queue_name)
@@ -133,16 +135,16 @@ def main() -> None:
133
135
# management.close()
134
136
135
137
# publish 10 messages
136
- for i in range (messages_to_publish ):
137
- status = publisher .publish (Message (body = "test" ))
138
+ # for i in range(messages_to_publish):
139
+ # status = publisher.publish(Message(body="test"))
138
140
# # if status.ACCEPTED:
139
141
# # print("message accepted")
140
142
# # elif status.RELEASED:
141
143
# # print("message not routed")
142
144
# # elif status.REJECTED:
143
145
# # print("message not rejected")
144
146
#
145
- publisher .close ()
147
+ # publisher.close()
146
148
#
147
149
# print(
148
150
# "create a consumer and consume the test message - press control + c to terminate to consume"
0 commit comments