(Grav GitSync) Automatic Commit from smokephil

This commit is contained in:
smokephil 2025-01-17 09:13:21 +01:00 committed by GitSync
parent 88e07926d9
commit 9d9feba04b
144 changed files with 7311 additions and 5467 deletions

View file

@ -306,19 +306,21 @@ class Connection
$this->setupExchangeAndQueues(); // also setup normal exchange for delayed messages so delay queue can DLX messages to it
}
if (0 !== $delayInMs) {
$this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp);
$this->withConnectionExceptionRetry(function () use ($body, $headers, $delayInMs, $amqpStamp) {
if (0 !== $delayInMs) {
$this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp);
return;
}
return;
}
$this->publishOnExchange(
$this->exchange(),
$body,
$this->getRoutingKeyForMessage($amqpStamp),
$headers,
$amqpStamp
);
$this->publishOnExchange(
$this->exchange(),
$body,
$this->getRoutingKeyForMessage($amqpStamp),
$headers,
$amqpStamp
);
});
}
/**
@ -570,13 +572,18 @@ class Connection
private function clearWhenDisconnected(): void
{
if (!$this->channel()->isConnected()) {
$this->amqpChannel = null;
$this->amqpQueues = [];
$this->amqpExchange = null;
$this->amqpDelayExchange = null;
$this->clear();
}
}
private function clear(): void
{
$this->amqpChannel = null;
$this->amqpQueues = [];
$this->amqpExchange = null;
$this->amqpDelayExchange = null;
}
private function getDefaultPublishRoutingKey(): ?string
{
return $this->exchangeOptions['default_publish_routing_key'] ?? null;
@ -593,6 +600,25 @@ class Connection
{
return (null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null) ?? $this->getDefaultPublishRoutingKey();
}
private function withConnectionExceptionRetry(callable $callable): void
{
$maxRetries = 3;
$retries = 0;
retry:
try {
$callable();
} catch (\AMQPConnectionException $e) {
if (++$retries <= $maxRetries) {
$this->clear();
goto retry;
}
throw $e;
}
}
}
if (!class_exists(\Symfony\Component\Messenger\Transport\AmqpExt\Connection::class, false)) {