-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathreconnection_example_async.py
More file actions
192 lines (149 loc) · 6.85 KB
/
reconnection_example_async.py
File metadata and controls
192 lines (149 loc) · 6.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# type: ignore
import asyncio
import signal
from rabbitmq_amqp_python_client import (
AddressHelper,
AMQPMessagingHandler,
AsyncEnvironment,
ConnectionClosed,
Converter,
Event,
ExchangeSpecification,
ExchangeToQueueBindingSpecification,
Message,
QuorumQueueSpecification,
)
MESSAGES_TO_PUBLISH = 50000
class MyMessageHandler(AMQPMessagingHandler):
def __init__(self):
super().__init__()
self._count = 0
def on_amqp_message(self, event: Event):
if self._count % 1000 == 0:
print(
"received 1000 messages: "
+ Converter.bytes_to_string(event.message.body)
)
# accepting
self.delivery_context.accept(event)
# in case of rejection (+eventually deadlettering)
# self.delivery_context.discard(event)
# in case of requeuing
# self.delivery_context.requeue(event)
# annotations = {}
# annotations[symbol('x-opt-string')] = 'x-test1'
# in case of requeuing with annotations added
# self.delivery_context.requeue_with_annotations(event, annotations)
# in case of rejection with annotations added
# self.delivery_context.discard_with_annotations(event)
self._count = self._count + 1
if self._count == MESSAGES_TO_PUBLISH:
print("received all messages")
def on_connection_closed(self, event: Event):
# if you want you can add cleanup operations here
print("connection closed")
def on_link_closed(self, event: Event) -> None:
# if you want you can add cleanup operations here
print("link closed")
async def main() -> None:
exchange_name = "test-exchange"
queue_name = "example-queue"
routing_key = "routing-key"
print("connection to amqp server")
async with AsyncEnvironment(
uri="amqp://guest:guest@localhost:5672/"
) as environment:
async with await environment.connection() as connection:
async with await connection.management() as management:
print("declaring exchange and queue")
await management.declare_exchange(
ExchangeSpecification(name=exchange_name)
)
await management.declare_queue(
QuorumQueueSpecification(name=queue_name)
# QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
)
print("binding queue to exchange")
bind_name = await management.bind(
ExchangeToQueueBindingSpecification(
source_exchange=exchange_name,
destination_queue=queue_name,
binding_key=routing_key,
)
)
addr = AddressHelper.exchange_address(exchange_name, routing_key)
addr_queue = AddressHelper.queue_address(queue_name)
print("create a publisher and publish messages")
async with await connection.publisher(addr) as publisher:
print("purging the queue")
messages_purged = await management.purge_queue(queue_name)
print("messages purged: " + str(messages_purged))
# publishing messages with reconnection handling
while True:
try:
for i in range(MESSAGES_TO_PUBLISH):
if i % 1000 == 0:
print("published 1000 messages...")
try:
await publisher.publish(
Message(body=Converter.string_to_bytes("test"))
)
except ConnectionClosed:
print("publisher connection closed, retrying...")
await asyncio.sleep(1)
continue
print("all messages published successfully")
break
except ConnectionClosed:
print("publisher connection closed, retrying batch...")
await asyncio.sleep(1)
continue
print(
"create a consumer and consume messages - press control + c to terminate"
)
handler = MyMessageHandler()
async with await connection.consumer(
addr_queue, message_handler=handler
) as consumer:
# Create stop event and signal handler
stop_event = asyncio.Event()
def handle_sigint():
print("\nCtrl+C detected, stopping consumer gracefully...")
stop_event.set()
# Register signal handler
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGINT, handle_sigint)
try:
while True:
try:
# Run consumer in background
consumer_task = asyncio.create_task(consumer.run())
# Wait for stop signal
await stop_event.wait()
# Stop consumer gracefully
print("Stopping consumer...")
await consumer.stop_processing()
# Wait for task to complete
try:
await asyncio.wait_for(consumer_task, timeout=3.0)
except asyncio.TimeoutError:
print("Consumer task timed out")
break
except ConnectionClosed:
print("consumer connection closed, reconnecting...")
await asyncio.sleep(1)
continue
except Exception as e:
print("consumer exited for exception " + str(e))
break
finally:
loop.remove_signal_handler(signal.SIGINT)
print("cleanup")
print("unbind")
await management.unbind(bind_name)
print("delete queue")
await management.delete_queue(queue_name)
print("delete exchange")
await management.delete_exchange(exchange_name)
if __name__ == "__main__":
asyncio.run(main())