Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -535,10 +535,7 @@ $client->connect();
$channel = $client->channel();
$delivery = $channel->get('service.a.events', noAck: true);

var_dump($delivery?->body);
var_dump($delivery?->messageId);
var_dump($delivery?->correlationId);
var_dump($delivery?->contentEncoding);
var_dump($delivery?->message);
```

#### ack
Expand Down Expand Up @@ -673,7 +670,7 @@ declare(strict_types=1);

use Thesis\Amqp\Config;
use Thesis\Amqp\Client;
use Thesis\Amqp\Delivery;
use Thesis\Amqp\DeliveryMessage;
use Thesis\Amqp\Channel;

$client = new Client(Config::default());
Expand All @@ -682,8 +679,8 @@ $client->connect();
$channel = $client->channel();

$channel->qos(prefetchCount: 1);
$consumerTag = $channel->consume(static function (Delivery $delivery, Channel $_): void {
var_dump($delivery->body);
$consumerTag = $channel->consume(static function (DeliveryMessage $delivery, Channel $_): void {
var_dump($delivery->message);
$delivery->ack();
}, queue: 'service.a.events');

Expand Down Expand Up @@ -717,7 +714,7 @@ Amp\async(static function () use ($deliveries): void {
});

foreach ($deliveries as $delivery) {
var_dump($delivery->body);
var_dump($delivery->message);
$delivery->ack();
}

Expand Down Expand Up @@ -753,7 +750,7 @@ Amp\async(static function () use ($deliveries): void {

try {
foreach ($deliveries as $delivery) {
var_dump($delivery->body);
var_dump($delivery->message);
$delivery->ack();
}
} catch (\Throwable $e) {
Expand Down Expand Up @@ -926,7 +923,7 @@ $channel = $client->channel();

Amp\async(static function () use ($channel): void {
foreach ($channel->returns as $return) {
var_dump("message '{$return->body}' was return from {$return->exchange}:{$return->routingKey}");
var_dump("message '{$return->message->body}' was returned from {$return->exchange}:{$return->routingKey}");
}
});

Expand Down
10 changes: 5 additions & 5 deletions src/Channel.php
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public function publishBatch(array $publishMessages): PublishBatchConfirmation
/**
* @throws \Throwable
*/
public function get(string $queue = '', bool $noAck = false): ?Delivery
public function get(string $queue = '', bool $noAck = false): ?DeliveryMessage
{
static $permit = true;
if (!$permit) {
Expand All @@ -136,7 +136,7 @@ public function get(string $queue = '', bool $noAck = false): ?Delivery
/**
* @throws \Throwable
*/
public function ack(Delivery $delivery, bool $multiple = false): void
public function ack(DeliveryMessage $delivery, bool $multiple = false): void
{
$this->connection->writeFrame(Protocol\Method::basicAck(
channelId: $this->channelId,
Expand All @@ -148,7 +148,7 @@ public function ack(Delivery $delivery, bool $multiple = false): void
/**
* @throws \Throwable
*/
public function nack(Delivery $delivery, bool $multiple = false, bool $requeue = true): void
public function nack(DeliveryMessage $delivery, bool $multiple = false, bool $requeue = true): void
{
$this->connection->writeFrame(Protocol\Method::basicNack(
channelId: $this->channelId,
Expand All @@ -161,7 +161,7 @@ public function nack(Delivery $delivery, bool $multiple = false, bool $requeue =
/**
* @throws \Throwable
*/
public function reject(Delivery $delivery, bool $requeue = true): void
public function reject(DeliveryMessage $delivery, bool $requeue = true): void
{
$this->connection->writeFrame(Protocol\Method::basicReject(
channelId: $this->channelId,
Expand Down Expand Up @@ -201,7 +201,7 @@ public function qos(int $prefetchSize = 0, int $prefetchCount = 0, bool $global
}

/**
* @param callable(Delivery, self): void $callback
* @param callable(DeliveryMessage, self): void $callback
* @param array<string, mixed> $arguments
* @return non-empty-string Consumer tag
* @throws \Throwable
Expand Down
76 changes: 0 additions & 76 deletions src/Delivery.php

This file was deleted.

48 changes: 48 additions & 0 deletions src/DeliveryMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?php

declare(strict_types=1);

namespace Thesis\Amqp;

/**
* @api
* @phpstan-type Ack = \Closure(DeliveryMessage, bool): void
* @phpstan-type Nack = \Closure(DeliveryMessage, bool, bool): void
* @phpstan-type Reject = \Closure(DeliveryMessage, bool): void
*/
final class DeliveryMessage
{
/**
* @param Ack $ack
* @param Nack $nack
* @param Reject $reject
* @param non-negative-int $deliveryTag
*/
public function __construct(
private readonly \Closure $ack,
private readonly \Closure $nack,
private readonly \Closure $reject,
public readonly Message $message,
public readonly string $exchange = '',
public readonly string $routingKey = '',
public readonly int $deliveryTag = 0,
public readonly string $consumerTag = '',
public readonly bool $redelivered = false,
public readonly bool $returned = false,
) {}

public function ack(bool $multiple = false): void
{
($this->ack)($this, $multiple);
}

public function nack(bool $multiple = false, bool $requeue = true): void
{
($this->nack)($this, $multiple, $requeue);
}

public function reject(bool $requeue = true): void
{
($this->reject)($this, $requeue);
}
}
6 changes: 3 additions & 3 deletions src/Internal/Delivery/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
namespace Thesis\Amqp\Internal\Delivery;

use Thesis\Amqp\Channel;
use Thesis\Amqp\Delivery;
use Thesis\Amqp\DeliveryMessage;

/**
* @internal
* @phpstan-type Listener = callable(Delivery, Channel): void
* @phpstan-type Listener = callable(DeliveryMessage, Channel): void
*/
final class Consumer
{
Expand Down Expand Up @@ -47,7 +47,7 @@ private function __construct(

private function run(): void
{
$this->supervisor->addConsumeListener(function (Delivery $delivery): void {
$this->supervisor->addConsumeListener(function (DeliveryMessage $delivery): void {
$consumer = $this->consumers[$delivery->consumerTag] ?? null;
if ($consumer !== null) {
$consumer($delivery, $this->channel);
Expand Down
41 changes: 22 additions & 19 deletions src/Internal/Delivery/DeliverySupervisor.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
namespace Thesis\Amqp\Internal\Delivery;

use Thesis\Amqp\Channel;
use Thesis\Amqp\Delivery;
use Thesis\Amqp\DeliveryMessage;
use Thesis\Amqp\Internal\Hooks;
use Thesis\Amqp\Internal\Protocol\Frame;
use Thesis\Amqp\Message;

/**
* @internal
* @phpstan-type ConsumeListener = callable(Delivery): void
* @phpstan-type ReturnListener = callable(Delivery): void
* @phpstan-type GetListener = callable(null|Delivery): void
* @phpstan-type ConsumeListener = callable(DeliveryMessage): void
* @phpstan-type ReturnListener = callable(DeliveryMessage): void
* @phpstan-type GetListener = callable(null|DeliveryMessage): void
*/
final class DeliverySupervisor
{
Expand Down Expand Up @@ -175,29 +176,31 @@ private function runListeners(): void
// You cannot call ack/nack/reject on a returned message.
$noAction = static function (): void {};

$delivery = new Delivery(
$delivery = new DeliveryMessage(
ack: $this->return !== null ? $noAction : $this->channel->ack(...),
nack: $this->return !== null ? $noAction : $this->channel->nack(...),
reject: $this->return !== null ? $noAction : $this->channel->reject(...),
body: $this->message,
message: new Message(
body: $this->message,
headers: $this->header->properties->headers,
contentType: $this->header->properties->contentType,
contentEncoding: $this->header->properties->contentEncoding,
deliveryMode: $this->header->properties->deliveryMode,
priority: $this->header->properties->priority,
correlationId: $this->header->properties->correlationId,
replyTo: $this->header->properties->replyTo,
expiration: $this->header->properties->expiration,
messageId: $this->header->properties->messageId,
timestamp: $this->header->properties->timestamp,
type: $this->header->properties->type,
userId: $this->header->properties->userId,
appId: $this->header->properties->appId,
),
exchange: $this->delivery->exchange ?? $this->get->exchange ?? $this->return->exchange ?? '',
routingKey: $this->delivery->routingKey ?? $this->get->routingKey ?? $this->return->routingKey ?? '',
headers: $this->header->properties->headers,
deliveryTag: $this->delivery->deliveryTag ?? $this->get->deliveryTag ?? 0,
consumerTag: $this->delivery->consumerTag ?? '',
redelivered: $this->delivery->redelivered ?? $this->get->redelivered ?? false,
contentType: $this->header->properties->contentType,
contentEncoding: $this->header->properties->contentEncoding,
deliveryMode: $this->header->properties->deliveryMode,
priority: $this->header->properties->priority,
correlationId: $this->header->properties->correlationId,
replyTo: $this->header->properties->replyTo,
expiration: $this->header->properties->expiration,
messageId: $this->header->properties->messageId,
timestamp: $this->header->properties->timestamp,
type: $this->header->properties->type,
userId: $this->header->properties->userId,
appId: $this->header->properties->appId,
returned: $this->return !== null,
);

Expand Down
10 changes: 5 additions & 5 deletions src/Internal/Delivery/Receiver.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Thesis\Amqp\Internal\Delivery;

use Amp\Pipeline;
use Thesis\Amqp\Delivery;
use Thesis\Amqp\DeliveryMessage;

/**
* @internal
Expand All @@ -20,7 +20,7 @@ public static function create(DeliverySupervisor $supervisor): self
return $receiver;
}

public function receive(): ?Delivery
public function receive(): ?DeliveryMessage
{
if (!$this->iterator->continue()) {
return null;
Expand All @@ -29,16 +29,16 @@ public function receive(): ?Delivery
return $this->iterator->getValue();
}

/** @var Pipeline\ConcurrentIterator<null|Delivery> */
/** @var Pipeline\ConcurrentIterator<null|DeliveryMessage> */
private Pipeline\ConcurrentIterator $iterator;

/** @var Pipeline\Queue<null|Delivery> */
/** @var Pipeline\Queue<null|DeliveryMessage> */
private Pipeline\Queue $queue;

private function __construct(
private readonly DeliverySupervisor $supervisor,
) {
/** @var Pipeline\Queue<null|Delivery> $queue */
/** @var Pipeline\Queue<null|DeliveryMessage> $queue */
$queue = new Pipeline\Queue(bufferSize: 1);

$this->queue = $queue;
Expand Down
10 changes: 5 additions & 5 deletions src/Iterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

/**
* @api
* @template-implements \IteratorAggregate<array-key, Delivery>
* @template-implements \IteratorAggregate<array-key, DeliveryMessage>
*/
final class Iterator implements \IteratorAggregate
{
Expand All @@ -19,18 +19,18 @@ final class Iterator implements \IteratorAggregate
*/
public static function buffered(string $consumerTag, Channel $channel, int $size): self
{
/** @var Pipeline\Queue<Delivery> $queue */
/** @var Pipeline\Queue<DeliveryMessage> $queue */
$queue = new Pipeline\Queue(bufferSize: $size);

return new self($queue, $channel, $consumerTag);
}

/** @var Pipeline\ConcurrentIterator<Delivery> */
/** @var Pipeline\ConcurrentIterator<DeliveryMessage> */
private readonly Pipeline\ConcurrentIterator $iterator;

/**
* @internal
* @param Pipeline\Queue<Delivery> $queue
* @param Pipeline\Queue<DeliveryMessage> $queue
* @param non-empty-string $consumerTag
*/
private function __construct(
Expand All @@ -44,7 +44,7 @@ private function __construct(
/**
* @internal
*/
public function push(Delivery $delivery): void
public function push(DeliveryMessage $delivery): void
{
$this->queue->push($delivery);
}
Expand Down
Loading