-
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathconsumeIterator.php
More file actions
43 lines (29 loc) · 951 Bytes
/
consumeIterator.php
File metadata and controls
43 lines (29 loc) · 951 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<?php
declare(strict_types=1);
require_once __DIR__ . '/../vendor/autoload.php';
use Revolt\EventLoop;
use Thesis\Amqp\Client;
use Thesis\Amqp\Config;
use Thesis\Amqp\Message;
$client = new Client(Config::default());
$channel = $client->channel();
$queue = $channel->queueDeclare(autoDelete: true);
$messageId = 0;
EventLoop::unreference(
EventLoop::repeat(0.5, static function () use ($queue, $channel, &$messageId): void {
++$messageId;
$channel->publish(new Message("Message#{$messageId}"), routingKey: $queue->name);
}),
);
$channel->qos(prefetchCount: 1);
$deliveries = $channel->consumeIterator($queue->name, size: 1);
Amp\async(static function () use ($deliveries): void {
Amp\trapSignal([\SIGINT, \SIGTERM]);
$deliveries->complete();
});
foreach ($deliveries as $delivery) {
dump($delivery->message->body);
$delivery->ack();
}
dump('Consumer cancelled by signal.');
$client->disconnect();