(Grav GitSync) Automatic Commit from smokephil
This commit is contained in:
parent
ab8f386766
commit
54730a9480
251 changed files with 5008 additions and 8945 deletions
|
|
@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
|
|||
|
||||
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||
use Symfony\Component\Messenger\Exception\LogicException;
|
||||
use Symfony\Component\Messenger\Exception\TransportException;
|
||||
|
||||
/**
|
||||
* An AMQP connection.
|
||||
|
|
@ -32,6 +33,9 @@ class Connection
|
|||
'x-message-ttl',
|
||||
];
|
||||
|
||||
/**
|
||||
* @see https://github.com/php-amqp/php-amqp/blob/master/amqp_connection_resource.h
|
||||
*/
|
||||
private const AVAILABLE_OPTIONS = [
|
||||
'host',
|
||||
'port',
|
||||
|
|
@ -53,6 +57,7 @@ class Connection
|
|||
'write_timeout',
|
||||
'confirm_timeout',
|
||||
'connect_timeout',
|
||||
'rpc_timeout',
|
||||
'cacert',
|
||||
'cert',
|
||||
'key',
|
||||
|
|
@ -102,7 +107,12 @@ class Connection
|
|||
*/
|
||||
private $amqpDelayExchange;
|
||||
|
||||
public function __construct(array $connectionOptions, array $exchangeOptions, array $queuesOptions, AmqpFactory $amqpFactory = null)
|
||||
/**
|
||||
* @var int
|
||||
*/
|
||||
private $lastActivityTime = 0;
|
||||
|
||||
public function __construct(array $connectionOptions, array $exchangeOptions, array $queuesOptions, ?AmqpFactory $amqpFactory = null)
|
||||
{
|
||||
if (!\extension_loaded('amqp')) {
|
||||
throw new LogicException(sprintf('You cannot use the "%s" as the "amqp" extension is not installed.', __CLASS__));
|
||||
|
|
@ -166,26 +176,26 @@ class Connection
|
|||
* * verify: Enable or disable peer verification. If peer verification is enabled then the common name in the
|
||||
* server certificate must match the server name. Peer verification is enabled by default.
|
||||
*/
|
||||
public static function fromDsn(string $dsn, array $options = [], AmqpFactory $amqpFactory = null): self
|
||||
public static function fromDsn(string $dsn, array $options = [], ?AmqpFactory $amqpFactory = null): self
|
||||
{
|
||||
if (false === $parsedUrl = parse_url($dsn)) {
|
||||
if (false === $params = parse_url($dsn)) {
|
||||
// this is a valid URI that parse_url cannot handle when you want to pass all parameters as options
|
||||
if (!\in_array($dsn, ['amqp://', 'amqps://'])) {
|
||||
throw new InvalidArgumentException(sprintf('The given AMQP DSN "%s" is invalid.', $dsn));
|
||||
throw new InvalidArgumentException('The given AMQP DSN is invalid.');
|
||||
}
|
||||
|
||||
$parsedUrl = [];
|
||||
$params = [];
|
||||
}
|
||||
|
||||
$useAmqps = 0 === strpos($dsn, 'amqps://');
|
||||
$pathParts = isset($parsedUrl['path']) ? explode('/', trim($parsedUrl['path'], '/')) : [];
|
||||
$pathParts = isset($params['path']) ? explode('/', trim($params['path'], '/')) : [];
|
||||
$exchangeName = $pathParts[1] ?? 'messages';
|
||||
parse_str($parsedUrl['query'] ?? '', $parsedQuery);
|
||||
parse_str($params['query'] ?? '', $parsedQuery);
|
||||
$port = $useAmqps ? 5671 : 5672;
|
||||
|
||||
$amqpOptions = array_replace_recursive([
|
||||
'host' => $parsedUrl['host'] ?? 'localhost',
|
||||
'port' => $parsedUrl['port'] ?? $port,
|
||||
'host' => $params['host'] ?? 'localhost',
|
||||
'port' => $params['port'] ?? $port,
|
||||
'vhost' => isset($pathParts[0]) ? urldecode($pathParts[0]) : '/',
|
||||
'exchange' => [
|
||||
'name' => $exchangeName,
|
||||
|
|
@ -194,12 +204,12 @@ class Connection
|
|||
|
||||
self::validateOptions($amqpOptions);
|
||||
|
||||
if (isset($parsedUrl['user'])) {
|
||||
$amqpOptions['login'] = urldecode($parsedUrl['user']);
|
||||
if (isset($params['user'])) {
|
||||
$amqpOptions['login'] = rawurldecode($params['user']);
|
||||
}
|
||||
|
||||
if (isset($parsedUrl['pass'])) {
|
||||
$amqpOptions['password'] = urldecode($parsedUrl['pass']);
|
||||
if (isset($params['pass'])) {
|
||||
$amqpOptions['password'] = rawurldecode($params['pass']);
|
||||
}
|
||||
|
||||
if (!isset($amqpOptions['queues'])) {
|
||||
|
|
@ -288,7 +298,7 @@ class Connection
|
|||
/**
|
||||
* @throws \AMQPException
|
||||
*/
|
||||
public function publish(string $body, array $headers = [], int $delayInMs = 0, AmqpStamp $amqpStamp = null): void
|
||||
public function publish(string $body, array $headers = [], int $delayInMs = 0, ?AmqpStamp $amqpStamp = null): void
|
||||
{
|
||||
$this->clearWhenDisconnected();
|
||||
|
||||
|
|
@ -324,7 +334,7 @@ class Connection
|
|||
/**
|
||||
* @throws \AMQPException
|
||||
*/
|
||||
private function publishWithDelay(string $body, array $headers, int $delay, AmqpStamp $amqpStamp = null)
|
||||
private function publishWithDelay(string $body, array $headers, int $delay, ?AmqpStamp $amqpStamp = null)
|
||||
{
|
||||
$routingKey = $this->getRoutingKeyForMessage($amqpStamp);
|
||||
$isRetryAttempt = $amqpStamp ? $amqpStamp->isRetryAttempt() : false;
|
||||
|
|
@ -340,13 +350,15 @@ class Connection
|
|||
);
|
||||
}
|
||||
|
||||
private function publishOnExchange(\AMQPExchange $exchange, string $body, string $routingKey = null, array $headers = [], AmqpStamp $amqpStamp = null)
|
||||
private function publishOnExchange(\AMQPExchange $exchange, string $body, ?string $routingKey = null, array $headers = [], ?AmqpStamp $amqpStamp = null)
|
||||
{
|
||||
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
|
||||
$attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers);
|
||||
$attributes['delivery_mode'] = $attributes['delivery_mode'] ?? 2;
|
||||
$attributes['timestamp'] = $attributes['timestamp'] ?? time();
|
||||
|
||||
$this->lastActivityTime = time();
|
||||
|
||||
$exchange->publish(
|
||||
$body,
|
||||
$routingKey,
|
||||
|
|
@ -401,7 +413,7 @@ class Connection
|
|||
// delete the delay queue 10 seconds after the message expires
|
||||
// publishing another message redeclares the queue which renews the lease
|
||||
'x-expires' => $delay + 10000,
|
||||
// message should be broadcasted to all consumers during delay, but to only one queue during retry
|
||||
// message should be broadcast to all consumers during delay, but to only one queue during retry
|
||||
// empty name is default direct exchange
|
||||
'x-dead-letter-exchange' => $isRetryAttempt ? '' : $this->exchangeOptions['name'],
|
||||
// after being released from to DLX, make sure the original routing key will be used
|
||||
|
|
@ -445,12 +457,12 @@ class Connection
|
|||
|
||||
public function ack(\AMQPEnvelope $message, string $queueName): bool
|
||||
{
|
||||
return $this->queue($queueName)->ack($message->getDeliveryTag());
|
||||
return $this->queue($queueName)->ack($message->getDeliveryTag()) ?? true;
|
||||
}
|
||||
|
||||
public function nack(\AMQPEnvelope $message, string $queueName, int $flags = \AMQP_NOPARAM): bool
|
||||
{
|
||||
return $this->queue($queueName)->nack($message->getDeliveryTag(), $flags);
|
||||
return $this->queue($queueName)->nack($message->getDeliveryTag(), $flags) ?? true;
|
||||
}
|
||||
|
||||
public function setup(): void
|
||||
|
|
@ -505,11 +517,16 @@ class Connection
|
|||
static function (): bool {
|
||||
return false;
|
||||
},
|
||||
static function (): bool {
|
||||
return false;
|
||||
static function () {
|
||||
throw new TransportException('Message publication failed due to a negative acknowledgment (nack) from the broker.');
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
$this->lastActivityTime = time();
|
||||
} elseif (0 < ($this->connectionOptions['heartbeat'] ?? 0) && time() > $this->lastActivityTime + 2 * $this->connectionOptions['heartbeat']) {
|
||||
$disconnectMethod = 'true' === ($this->connectionOptions['persistent'] ?? 'false') ? 'pdisconnect' : 'disconnect';
|
||||
$this->amqpChannel->getConnection()->{$disconnectMethod}();
|
||||
}
|
||||
|
||||
return $this->amqpChannel;
|
||||
|
|
@ -518,7 +535,7 @@ class Connection
|
|||
public function queue(string $queueName): \AMQPQueue
|
||||
{
|
||||
if (!isset($this->amqpQueues[$queueName])) {
|
||||
$queueConfig = $this->queuesOptions[$queueName];
|
||||
$queueConfig = $this->queuesOptions[$queueName] ?? [];
|
||||
|
||||
$amqpQueue = $this->amqpFactory->createQueue($this->channel());
|
||||
$amqpQueue->setName($queueName);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue