(Grav GitSync) Automatic Commit from smokephil

This commit is contained in:
smokephil 2024-03-22 22:42:22 +01:00 committed by GitSync
commit 4267db646d
2765 changed files with 462171 additions and 0 deletions

View file

@ -0,0 +1,34 @@
CHANGELOG
=========
5.4
---
* Deprecate not setting the `delete_after_ack` config option (or DSN parameter),
its default value will change to `true` in 6.0
5.3
---
* Add `rediss://` DSN scheme support for TLS protocol
* Deprecate TLS option, use `rediss://127.0.0.1` instead of `redis://127.0.0.1?tls=1`
* Add support for `\RedisCluster` instance in `Connection` constructor
* Add support for Redis Cluster in DSN
5.2.0
-----
* Added a `delete_after_reject` option to the DSN to allow control over message
deletion, similar to `delete_after_ack`.
* Added option `lazy` to delay connecting to Redis server until we first use it.
5.1.0
-----
* Introduced the Redis bridge.
* Added TLS option in the DSN. Example: `redis://127.0.0.1?tls=1`
* Deprecated use of invalid options
* Added ability to receive of old pending messages with new `redeliver_timeout`
and `claim_interval` options.
* Added a `delete_after_ack` option to the DSN as an alternative to
`stream_max_entries` to avoid leaking memory.

View file

@ -0,0 +1,19 @@
Copyright (c) 2018-2022 Fabien Potencier
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is furnished
to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View file

@ -0,0 +1,12 @@
Redis Messenger
===============
Provides Redis integration for Symfony Messenger.
Resources
---------
* [Contributing](https://symfony.com/doc/current/contributing/index.html)
* [Report issues](https://github.com/symfony/symfony/issues) and
[send Pull Requests](https://github.com/symfony/symfony/pulls)
in the [main Symfony repository](https://github.com/symfony/symfony)

View file

@ -0,0 +1,596 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Redis\Transport;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\TransportException;
/**
* A Redis connection.
*
* @author Alexander Schranz <alexander@sulu.io>
* @author Antoine Bluchet <soyuka@gmail.com>
* @author Robin Chalas <robin.chalas@gmail.com>
*
* @internal
* @final
*/
class Connection
{
private const DEFAULT_OPTIONS = [
'stream' => 'messages',
'group' => 'symfony',
'consumer' => 'consumer',
'auto_setup' => true,
'delete_after_ack' => false,
'delete_after_reject' => true,
'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries
'dbindex' => 0,
'tls' => false,
'redeliver_timeout' => 3600, // Timeout before redeliver messages still in pending state (seconds)
'claim_interval' => 60000, // Interval by which pending/abandoned messages should be checked
'lazy' => false,
'auth' => null,
'serializer' => \Redis::SERIALIZER_PHP,
];
private $connection;
private $stream;
private $queue;
private $group;
private $consumer;
private $autoSetup;
private $maxEntries;
private $redeliverTimeout;
private $nextClaim = 0.0;
private $claimInterval;
private $deleteAfterAck;
private $deleteAfterReject;
private $couldHavePendingMessages = true;
/**
* @param \Redis|\RedisCluster|null $redis
*/
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], $redis = null)
{
if (version_compare(phpversion('redis'), '4.3.0', '<')) {
throw new LogicException('The redis transport requires php-redis 4.3.0 or higher.');
}
$host = $connectionCredentials['host'] ?? '127.0.0.1';
$port = $connectionCredentials['port'] ?? 6379;
$serializer = $redisOptions['serializer'] ?? \Redis::SERIALIZER_PHP;
$dbIndex = $configuration['dbindex'] ?? self::DEFAULT_OPTIONS['dbindex'];
$auth = $connectionCredentials['auth'] ?? null;
if ('' === $auth) {
$auth = null;
}
$lazy = $configuration['lazy'] ?? self::DEFAULT_OPTIONS['lazy'];
if (\is_array($host) || $redis instanceof \RedisCluster) {
$hosts = \is_string($host) ? [$host.':'.$port] : $host; // Always ensure we have an array
$initializer = static function ($redis) use ($hosts, $auth, $serializer) {
return self::initializeRedisCluster($redis, $hosts, $auth, $serializer);
};
$redis = $lazy ? new RedisClusterProxy($redis, $initializer) : $initializer($redis);
} else {
$redis = $redis ?? new \Redis();
$initializer = static function ($redis) use ($host, $port, $auth, $serializer, $dbIndex) {
return self::initializeRedis($redis, $host, $port, $auth, $serializer, $dbIndex);
};
$redis = $lazy ? new RedisProxy($redis, $initializer) : $initializer($redis);
}
$this->connection = $redis;
foreach (['stream', 'group', 'consumer'] as $key) {
if (isset($configuration[$key]) && '' === $configuration[$key]) {
throw new InvalidArgumentException(sprintf('"%s" should be configured, got an empty string.', $key));
}
}
$this->stream = $configuration['stream'] ?? self::DEFAULT_OPTIONS['stream'];
$this->group = $configuration['group'] ?? self::DEFAULT_OPTIONS['group'];
$this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer'];
$this->queue = $this->stream.'__queue';
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
$this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries'];
$this->deleteAfterAck = $configuration['delete_after_ack'] ?? self::DEFAULT_OPTIONS['delete_after_ack'];
$this->deleteAfterReject = $configuration['delete_after_reject'] ?? self::DEFAULT_OPTIONS['delete_after_reject'];
$this->redeliverTimeout = ($configuration['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout']) * 1000;
$this->claimInterval = ($configuration['claim_interval'] ?? self::DEFAULT_OPTIONS['claim_interval']) / 1000;
}
/**
* @param string|string[]|null $auth
*/
private static function initializeRedis(\Redis $redis, string $host, int $port, $auth, int $serializer, int $dbIndex): \Redis
{
$redis->connect($host, $port);
$redis->setOption(\Redis::OPT_SERIALIZER, $serializer);
if (null !== $auth && !$redis->auth($auth)) {
throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
}
if ($dbIndex && !$redis->select($dbIndex)) {
throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
}
return $redis;
}
/**
* @param string|string[]|null $auth
*/
private static function initializeRedisCluster(?\RedisCluster $redis, array $hosts, $auth, int $serializer): \RedisCluster
{
if (null === $redis) {
$redis = new \RedisCluster(null, $hosts, 0.0, 0.0, false, $auth);
}
$redis->setOption(\Redis::OPT_SERIALIZER, $serializer);
return $redis;
}
/**
* @param \Redis|\RedisCluster|null $redis
*/
public static function fromDsn(string $dsn, array $redisOptions = [], $redis = null): self
{
if (false === strpos($dsn, ',')) {
$parsedUrl = self::parseDsn($dsn, $redisOptions);
} else {
$dsns = explode(',', $dsn);
$parsedUrls = array_map(function ($dsn) use (&$redisOptions) {
return self::parseDsn($dsn, $redisOptions);
}, $dsns);
// Merge all the URLs, the last one overrides the previous ones
$parsedUrl = array_merge(...$parsedUrls);
// Regroup all the hosts in an array interpretable by RedisCluster
$parsedUrl['host'] = array_map(function ($parsedUrl, $dsn) {
if (!isset($parsedUrl['host'])) {
throw new InvalidArgumentException(sprintf('Missing host in DSN part "%s", it must be defined when using Redis Cluster.', $dsn));
}
return $parsedUrl['host'].':'.($parsedUrl['port'] ?? 6379);
}, $parsedUrls, $dsns);
}
self::validateOptions($redisOptions);
$autoSetup = null;
if (\array_key_exists('auto_setup', $redisOptions)) {
$autoSetup = filter_var($redisOptions['auto_setup'], \FILTER_VALIDATE_BOOLEAN);
unset($redisOptions['auto_setup']);
}
$maxEntries = null;
if (\array_key_exists('stream_max_entries', $redisOptions)) {
$maxEntries = filter_var($redisOptions['stream_max_entries'], \FILTER_VALIDATE_INT);
unset($redisOptions['stream_max_entries']);
}
$deleteAfterAck = null;
if (\array_key_exists('delete_after_ack', $redisOptions)) {
$deleteAfterAck = filter_var($redisOptions['delete_after_ack'], \FILTER_VALIDATE_BOOLEAN);
unset($redisOptions['delete_after_ack']);
} else {
trigger_deprecation('symfony/redis-messenger', '5.4', 'Not setting the "delete_after_ack" boolean option explicitly is deprecated, its default value will change to true in 6.0.');
}
$deleteAfterReject = null;
if (\array_key_exists('delete_after_reject', $redisOptions)) {
$deleteAfterReject = filter_var($redisOptions['delete_after_reject'], \FILTER_VALIDATE_BOOLEAN);
unset($redisOptions['delete_after_reject']);
}
$dbIndex = null;
if (\array_key_exists('dbindex', $redisOptions)) {
$dbIndex = filter_var($redisOptions['dbindex'], \FILTER_VALIDATE_INT);
unset($redisOptions['dbindex']);
}
$tls = 'rediss' === $parsedUrl['scheme'];
if (\array_key_exists('tls', $redisOptions)) {
trigger_deprecation('symfony/redis-messenger', '5.3', 'Providing "tls" parameter is deprecated, use "rediss://" DSN scheme instead');
$tls = filter_var($redisOptions['tls'], \FILTER_VALIDATE_BOOLEAN);
unset($redisOptions['tls']);
}
$redeliverTimeout = null;
if (\array_key_exists('redeliver_timeout', $redisOptions)) {
$redeliverTimeout = filter_var($redisOptions['redeliver_timeout'], \FILTER_VALIDATE_INT);
unset($redisOptions['redeliver_timeout']);
}
$claimInterval = null;
if (\array_key_exists('claim_interval', $redisOptions)) {
$claimInterval = filter_var($redisOptions['claim_interval'], \FILTER_VALIDATE_INT);
unset($redisOptions['claim_interval']);
}
$configuration = [
'stream' => $redisOptions['stream'] ?? null,
'group' => $redisOptions['group'] ?? null,
'consumer' => $redisOptions['consumer'] ?? null,
'lazy' => $redisOptions['lazy'] ?? self::DEFAULT_OPTIONS['lazy'],
'auto_setup' => $autoSetup,
'stream_max_entries' => $maxEntries,
'delete_after_ack' => $deleteAfterAck,
'delete_after_reject' => $deleteAfterReject,
'dbindex' => $dbIndex,
'redeliver_timeout' => $redeliverTimeout,
'claim_interval' => $claimInterval,
];
if (isset($parsedUrl['host'])) {
$pass = '' !== ($parsedUrl['pass'] ?? '') ? urldecode($parsedUrl['pass']) : null;
$user = '' !== ($parsedUrl['user'] ?? '') ? urldecode($parsedUrl['user']) : null;
$connectionCredentials = [
'host' => $parsedUrl['host'] ?? '127.0.0.1',
'port' => $parsedUrl['port'] ?? 6379,
// See: https://github.com/phpredis/phpredis/#auth
'auth' => $redisOptions['auth'] ?? (null !== $pass && null !== $user ? [$user, $pass] : ($pass ?? $user)),
];
$pathParts = explode('/', rtrim($parsedUrl['path'] ?? '', '/'));
$configuration['stream'] = $pathParts[1] ?? $configuration['stream'];
$configuration['group'] = $pathParts[2] ?? $configuration['group'];
$configuration['consumer'] = $pathParts[3] ?? $configuration['consumer'];
if ($tls) {
$connectionCredentials['host'] = 'tls://'.$connectionCredentials['host'];
}
} else {
$connectionCredentials = [
'host' => $parsedUrl['path'],
'port' => 0,
];
}
return new self($configuration, $connectionCredentials, $redisOptions, $redis);
}
private static function parseDsn(string $dsn, array &$redisOptions): array
{
$url = $dsn;
$scheme = 0 === strpos($dsn, 'rediss:') ? 'rediss' : 'redis';
if (preg_match('#^'.$scheme.':///([^:@])+$#', $dsn)) {
$url = str_replace($scheme.':', 'file:', $dsn);
}
if (false === $parsedUrl = parse_url($url)) {
throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn));
}
if (isset($parsedUrl['query'])) {
parse_str($parsedUrl['query'], $dsnOptions);
$redisOptions = array_merge($redisOptions, $dsnOptions);
}
return $parsedUrl;
}
private static function validateOptions(array $options): void
{
$availableOptions = array_keys(self::DEFAULT_OPTIONS);
if (0 < \count($invalidOptions = array_diff(array_keys($options), $availableOptions))) {
trigger_deprecation('symfony/messenger', '5.1', 'Invalid option(s) "%s" passed to the Redis Messenger transport. Passing invalid options is deprecated.', implode('", "', $invalidOptions));
}
}
private function claimOldPendingMessages()
{
try {
// This could soon be optimized with https://github.com/antirez/redis/issues/5212 or
// https://github.com/antirez/redis/issues/6256
$pendingMessages = $this->connection->xpending($this->stream, $this->group, '-', '+', 1);
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
$claimableIds = [];
foreach ($pendingMessages as $pendingMessage) {
if ($pendingMessage[1] === $this->consumer) {
$this->couldHavePendingMessages = true;
return;
}
if ($pendingMessage[2] >= $this->redeliverTimeout) {
$claimableIds[] = $pendingMessage[0];
}
}
if (\count($claimableIds) > 0) {
try {
$this->connection->xclaim(
$this->stream,
$this->group,
$this->consumer,
$this->redeliverTimeout,
$claimableIds,
['JUSTID']
);
$this->couldHavePendingMessages = true;
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
}
$this->nextClaim = microtime(true) + $this->claimInterval;
}
public function get(): ?array
{
if ($this->autoSetup) {
$this->setup();
}
$now = microtime();
$now = substr($now, 11).substr($now, 2, 3);
$queuedMessageCount = $this->rawCommand('ZCOUNT', 0, $now);
while ($queuedMessageCount--) {
if (!$message = $this->rawCommand('ZPOPMIN', 1)) {
break;
}
[$queuedMessage, $expiry] = $message;
if (\strlen($expiry) === \strlen($now) ? $expiry > $now : \strlen($expiry) < \strlen($now)) {
// if a future-placed message is popped because of a race condition with
// another running consumer, the message is readded to the queue
if (!$this->rawCommand('ZADD', 'NX', $expiry, $queuedMessage)) {
throw new TransportException('Could not add a message to the redis stream.');
}
break;
}
$decodedQueuedMessage = json_decode($queuedMessage, true);
$this->add(\array_key_exists('body', $decodedQueuedMessage) ? $decodedQueuedMessage['body'] : $queuedMessage, $decodedQueuedMessage['headers'] ?? [], 0);
}
if (!$this->couldHavePendingMessages && $this->nextClaim <= microtime(true)) {
$this->claimOldPendingMessages();
}
$messageId = '>'; // will receive new messages
if ($this->couldHavePendingMessages) {
$messageId = '0'; // will receive consumers pending messages
}
try {
$messages = $this->connection->xreadgroup(
$this->group,
$this->consumer,
[$this->stream => $messageId],
1
);
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
if (false === $messages) {
if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}
throw new TransportException($error ?? 'Could not read messages from the redis stream.');
}
if ($this->couldHavePendingMessages && empty($messages[$this->stream])) {
$this->couldHavePendingMessages = false;
// No pending messages so get a new one
return $this->get();
}
foreach ($messages[$this->stream] ?? [] as $key => $message) {
return [
'id' => $key,
'data' => $message,
];
}
return null;
}
public function ack(string $id): void
{
try {
$acknowledged = $this->connection->xack($this->stream, $this->group, [$id]);
if ($this->deleteAfterAck) {
$acknowledged = $this->connection->xdel($this->stream, [$id]);
}
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
if (!$acknowledged) {
if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}
throw new TransportException($error ?? sprintf('Could not acknowledge redis message "%s".', $id));
}
}
public function reject(string $id): void
{
try {
$deleted = $this->connection->xack($this->stream, $this->group, [$id]);
if ($this->deleteAfterReject) {
$deleted = $this->connection->xdel($this->stream, [$id]) && $deleted;
}
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
if (!$deleted) {
if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}
throw new TransportException($error ?? sprintf('Could not delete message "%s" from the redis stream.', $id));
}
}
public function add(string $body, array $headers, int $delayInMs = 0): void
{
if ($this->autoSetup) {
$this->setup();
}
try {
if ($delayInMs > 0) { // the delay is <= 0 for queued messages
$message = json_encode([
'body' => $body,
'headers' => $headers,
// Entry need to be unique in the sorted set else it would only be added once to the delayed messages queue
'uniqid' => uniqid('', true),
]);
if (false === $message) {
throw new TransportException(json_last_error_msg());
}
$now = explode(' ', microtime(), 2);
$now[0] = str_pad($delayInMs + substr($now[0], 2, 3), 3, '0', \STR_PAD_LEFT);
if (3 < \strlen($now[0])) {
$now[1] += substr($now[0], 0, -3);
$now[0] = substr($now[0], -3);
if (\is_float($now[1])) {
throw new TransportException("Message delay is too big: {$delayInMs}ms.");
}
}
$added = $this->rawCommand('ZADD', 'NX', $now[1].$now[0], $message);
} else {
$message = json_encode([
'body' => $body,
'headers' => $headers,
]);
if (false === $message) {
throw new TransportException(json_last_error_msg());
}
if ($this->maxEntries) {
$added = $this->connection->xadd($this->stream, '*', ['message' => $message], $this->maxEntries, true);
} else {
$added = $this->connection->xadd($this->stream, '*', ['message' => $message]);
}
}
} catch (\RedisException $e) {
if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}
throw new TransportException($error ?? $e->getMessage(), 0, $e);
}
if (!$added) {
if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}
throw new TransportException($error ?? 'Could not add a message to the redis stream.');
}
}
public function setup(): void
{
try {
$this->connection->xgroup('CREATE', $this->stream, $this->group, 0, true);
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
// group might already exist, ignore
if ($this->connection->getLastError()) {
$this->connection->clearLastError();
}
if ($this->deleteAfterAck || $this->deleteAfterReject) {
$groups = $this->connection->xinfo('GROUPS', $this->stream);
if (
// support for Redis extension version 5+
(\is_array($groups) && 1 < \count($groups))
// support for Redis extension version 4.x
|| (\is_string($groups) && substr_count($groups, '"name"'))
) {
throw new LogicException(sprintf('More than one group exists for stream "%s", delete_after_ack and delete_after_reject cannot be enabled as it risks deleting messages before all groups could consume them.', $this->stream));
}
}
$this->autoSetup = false;
}
private function getCurrentTimeInMilliseconds(): int
{
return (int) (microtime(true) * 1000);
}
public function cleanup(): void
{
static $unlink = true;
if ($unlink) {
try {
$unlink = false !== $this->connection->unlink($this->stream, $this->queue);
} catch (\Throwable $e) {
$unlink = false;
}
}
if (!$unlink) {
$this->connection->del($this->stream, $this->queue);
}
}
/**
* @return mixed
*/
private function rawCommand(string $command, ...$arguments)
{
try {
if ($this->connection instanceof \RedisCluster || $this->connection instanceof RedisClusterProxy) {
$result = $this->connection->rawCommand($this->queue, $command, $this->queue, ...$arguments);
} else {
$result = $this->connection->rawCommand($command, $this->queue, ...$arguments);
}
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
if (false === $result) {
if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}
throw new TransportException($error ?? sprintf('Could not run "%s" on Redis queue.', $command));
}
return $result;
}
}
if (!class_exists(\Symfony\Component\Messenger\Transport\RedisExt\Connection::class, false)) {
class_alias(Connection::class, \Symfony\Component\Messenger\Transport\RedisExt\Connection::class);
}

View file

@ -0,0 +1,42 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Redis\Transport;
/**
* Allow to delay connection to Redis Cluster.
*
* @author Johann Pardanaud <johann@pardanaud.com>
*
* @internal
*/
class RedisClusterProxy
{
private $redis;
private $initializer;
private $ready = false;
public function __construct(?\RedisCluster $redis, \Closure $initializer)
{
$this->redis = $redis;
$this->initializer = $initializer;
}
public function __call(string $method, array $args)
{
if (!$this->ready) {
$this->redis = $this->initializer->__invoke($this->redis);
$this->ready = true;
}
return $this->redis->{$method}(...$args);
}
}

View file

@ -0,0 +1,43 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Redis\Transport;
/**
* Allow to delay connection to Redis.
*
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
* @author Nicolas Grekas <p@tchwork.com>
*
* @internal
*/
class RedisProxy
{
private $redis;
private $initializer;
private $ready = false;
public function __construct(\Redis $redis, \Closure $initializer)
{
$this->redis = $redis;
$this->initializer = $initializer;
}
public function __call(string $method, array $args)
{
if (!$this->ready) {
$this->redis = $this->initializer->__invoke($this->redis);
$this->ready = true;
}
return $this->redis->{$method}(...$args);
}
}

View file

@ -0,0 +1,36 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Redis\Transport;
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;
/**
* @author Alexander Schranz <alexander@sulu.io>
*/
class RedisReceivedStamp implements NonSendableStampInterface
{
private $id;
public function __construct(string $id)
{
$this->id = $id;
}
public function getId(): string
{
return $this->id;
}
}
if (!class_exists(\Symfony\Component\Messenger\Transport\RedisExt\RedisReceivedStamp::class, false)) {
class_alias(RedisReceivedStamp::class, \Symfony\Component\Messenger\Transport\RedisExt\RedisReceivedStamp::class);
}

View file

@ -0,0 +1,102 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Redis\Transport;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
/**
* @author Alexander Schranz <alexander@sulu.io>
* @author Antoine Bluchet <soyuka@gmail.com>
*/
class RedisReceiver implements ReceiverInterface
{
private $connection;
private $serializer;
public function __construct(Connection $connection, SerializerInterface $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? new PhpSerializer();
}
/**
* {@inheritdoc}
*/
public function get(): iterable
{
$message = $this->connection->get();
if (null === $message) {
return [];
}
$redisEnvelope = json_decode($message['data']['message'] ?? '', true);
if (null === $redisEnvelope) {
return [];
}
try {
if (\array_key_exists('body', $redisEnvelope) && \array_key_exists('headers', $redisEnvelope)) {
$envelope = $this->serializer->decode([
'body' => $redisEnvelope['body'],
'headers' => $redisEnvelope['headers'],
]);
} else {
$envelope = $this->serializer->decode($redisEnvelope);
}
} catch (MessageDecodingFailedException $exception) {
$this->connection->reject($message['id']);
throw $exception;
}
return [$envelope->with(new RedisReceivedStamp($message['id']))];
}
/**
* {@inheritdoc}
*/
public function ack(Envelope $envelope): void
{
$this->connection->ack($this->findRedisReceivedStamp($envelope)->getId());
}
/**
* {@inheritdoc}
*/
public function reject(Envelope $envelope): void
{
$this->connection->reject($this->findRedisReceivedStamp($envelope)->getId());
}
private function findRedisReceivedStamp(Envelope $envelope): RedisReceivedStamp
{
/** @var RedisReceivedStamp|null $redisReceivedStamp */
$redisReceivedStamp = $envelope->last(RedisReceivedStamp::class);
if (null === $redisReceivedStamp) {
throw new LogicException('No RedisReceivedStamp found on the Envelope.');
}
return $redisReceivedStamp;
}
}
if (!class_exists(\Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver::class, false)) {
class_alias(RedisReceiver::class, \Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver::class);
}

View file

@ -0,0 +1,53 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Redis\Transport;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
/**
* @author Alexander Schranz <alexander@sulu.io>
* @author Antoine Bluchet <soyuka@gmail.com>
*/
class RedisSender implements SenderInterface
{
private $connection;
private $serializer;
public function __construct(Connection $connection, SerializerInterface $serializer)
{
$this->connection = $connection;
$this->serializer = $serializer;
}
/**
* {@inheritdoc}
*/
public function send(Envelope $envelope): Envelope
{
$encodedMessage = $this->serializer->encode($envelope);
/** @var DelayStamp|null $delayStamp */
$delayStamp = $envelope->last(DelayStamp::class);
$delayInMs = null !== $delayStamp ? $delayStamp->getDelay() : 0;
$this->connection->add($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delayInMs);
return $envelope;
}
}
if (!class_exists(\Symfony\Component\Messenger\Transport\RedisExt\RedisSender::class, false)) {
class_alias(RedisSender::class, \Symfony\Component\Messenger\Transport\RedisExt\RedisSender::class);
}

View file

@ -0,0 +1,90 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Redis\Transport;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
/**
* @author Alexander Schranz <alexander@sulu.io>
* @author Antoine Bluchet <soyuka@gmail.com>
*/
class RedisTransport implements TransportInterface, SetupableTransportInterface
{
private $serializer;
private $connection;
private $receiver;
private $sender;
public function __construct(Connection $connection, SerializerInterface $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? new PhpSerializer();
}
/**
* {@inheritdoc}
*/
public function get(): iterable
{
return ($this->receiver ?? $this->getReceiver())->get();
}
/**
* {@inheritdoc}
*/
public function ack(Envelope $envelope): void
{
($this->receiver ?? $this->getReceiver())->ack($envelope);
}
/**
* {@inheritdoc}
*/
public function reject(Envelope $envelope): void
{
($this->receiver ?? $this->getReceiver())->reject($envelope);
}
/**
* {@inheritdoc}
*/
public function send(Envelope $envelope): Envelope
{
return ($this->sender ?? $this->getSender())->send($envelope);
}
/**
* {@inheritdoc}
*/
public function setup(): void
{
$this->connection->setup();
}
private function getReceiver(): RedisReceiver
{
return $this->receiver = new RedisReceiver($this->connection, $this->serializer);
}
private function getSender(): RedisSender
{
return $this->sender = new RedisSender($this->connection, $this->serializer);
}
}
if (!class_exists(\Symfony\Component\Messenger\Transport\RedisExt\RedisTransport::class, false)) {
class_alias(RedisTransport::class, \Symfony\Component\Messenger\Transport\RedisExt\RedisTransport::class);
}

View file

@ -0,0 +1,39 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Redis\Transport;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
/**
* @author Alexander Schranz <alexander@suluio>
* @author Antoine Bluchet <soyuka@gmail.com>
*/
class RedisTransportFactory implements TransportFactoryInterface
{
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
{
unset($options['transport_name']);
return new RedisTransport(Connection::fromDsn($dsn, $options), $serializer);
}
public function supports(string $dsn, array $options): bool
{
return 0 === strpos($dsn, 'redis://') || 0 === strpos($dsn, 'rediss://');
}
}
if (!class_exists(\Symfony\Component\Messenger\Transport\RedisExt\RedisTransportFactory::class, false)) {
class_alias(RedisTransportFactory::class, \Symfony\Component\Messenger\Transport\RedisExt\RedisTransportFactory::class);
}

View file

@ -0,0 +1,34 @@
{
"name": "symfony/redis-messenger",
"type": "symfony-messenger-bridge",
"description": "Symfony Redis extension Messenger Bridge",
"keywords": [],
"homepage": "https://symfony.com",
"license": "MIT",
"authors": [
{
"name": "Fabien Potencier",
"email": "fabien@symfony.com"
},
{
"name": "Symfony Community",
"homepage": "https://symfony.com/contributors"
}
],
"require": {
"php": ">=7.2.5",
"symfony/deprecation-contracts": "^2.1|^3",
"symfony/messenger": "^5.1|^6.0"
},
"require-dev": {
"symfony/property-access": "^4.4|^5.0|^6.0",
"symfony/serializer": "^4.4|^5.0|^6.0"
},
"autoload": {
"psr-4": { "Symfony\\Component\\Messenger\\Bridge\\Redis\\": "" },
"exclude-from-classmap": [
"/Tests/"
]
},
"minimum-stability": "dev"
}