201 lines
9.2 KiB
YAML
201 lines
9.2 KiB
YAML
name: Connection
|
|
class_comment: '# * A Redis connection.
|
|
|
|
# *
|
|
|
|
# * @author Alexander Schranz <alexander@sulu.io>
|
|
|
|
# * @author Antoine Bluchet <soyuka@gmail.com>
|
|
|
|
# * @author Robin Chalas <robin.chalas@gmail.com>
|
|
|
|
# *
|
|
|
|
# * @internal
|
|
|
|
# *
|
|
|
|
# * @final'
|
|
dependencies:
|
|
- name: Relay
|
|
type: class
|
|
source: Relay\Relay
|
|
- name: Sentinel
|
|
type: class
|
|
source: Relay\Sentinel
|
|
- name: InvalidArgumentException
|
|
type: class
|
|
source: Symfony\Component\Messenger\Exception\InvalidArgumentException
|
|
- name: LogicException
|
|
type: class
|
|
source: Symfony\Component\Messenger\Exception\LogicException
|
|
- name: TransportException
|
|
type: class
|
|
source: Symfony\Component\Messenger\Exception\TransportException
|
|
properties: []
|
|
methods:
|
|
- name: initializeRedis
|
|
visibility: private
|
|
parameters:
|
|
- name: redis
|
|
- name: host
|
|
- name: port
|
|
- name: auth
|
|
- name: params
|
|
comment: "# * A Redis connection.\n# *\n# * @author Alexander Schranz <alexander@sulu.io>\n\
|
|
# * @author Antoine Bluchet <soyuka@gmail.com>\n# * @author Robin Chalas <robin.chalas@gmail.com>\n\
|
|
# *\n# * @internal\n# *\n# * @final\n# */\n# class Connection\n# {\n# private\
|
|
\ const DEFAULT_OPTIONS = [\n# 'host' => '127.0.0.1',\n# 'port' => 6379,\n# 'stream'\
|
|
\ => 'messages',\n# 'group' => 'symfony',\n# 'consumer' => 'consumer',\n# 'auto_setup'\
|
|
\ => true,\n# 'delete_after_ack' => true,\n# 'delete_after_reject' => true,\n\
|
|
# 'stream_max_entries' => 0, // any value higher than 0 defines an approximate\
|
|
\ maximum number of stream entries\n# 'dbindex' => 0,\n# 'redeliver_timeout' =>\
|
|
\ 3600, // Timeout before redeliver messages still in pending state (seconds)\n\
|
|
# 'claim_interval' => 60000, // Interval by which pending/abandoned messages should\
|
|
\ be checked\n# 'lazy' => false,\n# 'auth' => null,\n# 'serializer' => 1, // see\
|
|
\ \\Redis::SERIALIZER_PHP,\n# 'sentinel_master' => null, // String, master to\
|
|
\ look for (optional, default is NULL meaning Sentinel support is disabled)\n\
|
|
# 'redis_sentinel' => null, // String, alias for 'sentinel_master'\n# 'timeout'\
|
|
\ => 0.0, // Float, value in seconds (optional, default is 0 meaning unlimited)\n\
|
|
# 'read_timeout' => 0.0, // Float, value in seconds (optional, default is 0 meaning\
|
|
\ unlimited)\n# 'retry_interval' => 0, // Int, value in milliseconds (optional,\
|
|
\ default is 0)\n# 'persistent_id' => null, // String, persistent connection id\
|
|
\ (optional, default is NULL meaning not persistent)\n# 'ssl' => null, // see\
|
|
\ https://php.net/context.ssl\n# ];\n# \n# private \\Redis|Relay|\\RedisCluster|\\\
|
|
Closure $redis;\n# private string $stream;\n# private string $queue;\n# private\
|
|
\ string $group;\n# private string $consumer;\n# private bool $autoSetup;\n# private\
|
|
\ int $maxEntries;\n# private int $redeliverTimeout;\n# private float $nextClaim\
|
|
\ = 0.0;\n# private float $claimInterval;\n# private bool $deleteAfterAck;\n#\
|
|
\ private bool $deleteAfterReject;\n# private bool $couldHavePendingMessages =\
|
|
\ true;\n# \n# public function __construct(array $options, \\Redis|Relay|\\RedisCluster|null\
|
|
\ $redis = null)\n# {\n# if (version_compare(phpversion('redis'), '4.3.0', '<'))\
|
|
\ {\n# throw new LogicException('The redis transport requires php-redis 4.3.0\
|
|
\ or higher.');\n# }\n# \n# $options += self::DEFAULT_OPTIONS;\n# $host = $options['host'];\n\
|
|
# $port = $options['port'];\n# $auth = $options['auth'];\n# \n# if (isset($options['redis_sentinel'])\
|
|
\ && isset($options['sentinel_master'])) {\n# throw new InvalidArgumentException('Cannot\
|
|
\ use both \"redis_sentinel\" and \"sentinel_master\" at the same time.');\n#\
|
|
\ }\n# \n# $sentinelMaster = $options['sentinel_master'] ?? $options['redis_sentinel']\
|
|
\ ?? null;\n# \n# if (null !== $sentinelMaster && !class_exists(\\RedisSentinel::class)\
|
|
\ && !class_exists(Sentinel::class)) {\n# throw new InvalidArgumentException('Redis\
|
|
\ Sentinel support requires ext-redis>=5.2, or ext-relay.');\n# }\n# \n# if (null\
|
|
\ !== $sentinelMaster && $redis instanceof \\RedisCluster) {\n# throw new InvalidArgumentException('Cannot\
|
|
\ configure Redis Sentinel and Redis Cluster instance at the same time.');\n#\
|
|
\ }\n# \n# $booleanStreamOptions = [\n# 'allow_self_signed',\n# 'capture_peer_cert',\n\
|
|
# 'capture_peer_cert_chain',\n# 'disable_compression',\n# 'SNI_enabled',\n# 'verify_peer',\n\
|
|
# 'verify_peer_name',\n# ];\n# \n# foreach ($options['ssl'] ?? [] as $streamOption\
|
|
\ => $value) {\n# if (\\in_array($streamOption, $booleanStreamOptions, true) &&\
|
|
\ \\is_string($value)) {\n# $options['ssl'][$streamOption] = filter_var($value,\
|
|
\ \\FILTER_VALIDATE_BOOL);\n# }\n# }\n# \n# if ((\\is_array($host) && null ===\
|
|
\ $sentinelMaster) || $redis instanceof \\RedisCluster) {\n# $hosts = \\is_string($host)\
|
|
\ ? [$host.':'.$port] : $host; // Always ensure we have an array\n# $this->redis\
|
|
\ = static fn () => self::initializeRedisCluster($redis, $hosts, $auth, $options);\n\
|
|
# } else {\n# $this->redis = static function () use ($redis, $sentinelMaster,\
|
|
\ $host, $port, $options, $auth) {\n# if (null !== $sentinelMaster) {\n# $sentinelClass\
|
|
\ = \\extension_loaded('redis') ? \\RedisSentinel::class : Sentinel::class;\n\
|
|
# $hostIndex = 0;\n# $hosts = \\is_array($host) ? $host : [['scheme' => 'tcp',\
|
|
\ 'host' => $host, 'port' => $port]];\n# do {\n# $host = $hosts[$hostIndex]['host'];\n\
|
|
# $port = $hosts[$hostIndex]['port'] ?? 0;\n# $tls = 'tls' === $hosts[$hostIndex]['scheme'];\n\
|
|
# $address = false;\n# \n# if (isset($hosts[$hostIndex]['host']) && $tls) {\n\
|
|
# $host = 'tls://'.$host;\n# }\n# \n# try {\n# if (\\extension_loaded('redis')\
|
|
\ && version_compare(phpversion('redis'), '6.0.0', '>=')) {\n# $params = [\n#\
|
|
\ 'host' => $host,\n# 'port' => $port,\n# 'connectTimeout' => $options['timeout'],\n\
|
|
# 'persistent' => $options['persistent_id'],\n# 'retryInterval' => $options['retry_interval'],\n\
|
|
# 'readTimeout' => $options['read_timeout'],\n# ];\n# \n# $sentinel = new \\RedisSentinel($params);\n\
|
|
# } else {\n# $sentinel = new $sentinelClass($host, $port, $options['timeout'],\
|
|
\ $options['persistent_id'], $options['retry_interval'], $options['read_timeout']);\n\
|
|
# }\n# \n# if ($address = $sentinel->getMasterAddrByName($sentinelMaster)) {\n\
|
|
# [$host, $port] = $address;\n# }\n# } catch (\\RedisException|\\Relay\\Exception\
|
|
\ $redisException) {\n# }\n# } while (++$hostIndex < \\count($hosts) && !$address);\n\
|
|
# \n# if (!$address) {\n# throw new InvalidArgumentException(\\sprintf('Failed\
|
|
\ to retrieve master information from sentinel \"%s\".', $sentinelMaster), previous:\
|
|
\ $redisException ?? null);\n# }\n# }\n# \n# return self::initializeRedis($redis\
|
|
\ ?? (\\extension_loaded('redis') ? new \\Redis() : new Relay()), $host, $port,\
|
|
\ $auth, $options);\n# };\n# }\n# \n# if (!$options['lazy']) {\n# $this->getRedis();\n\
|
|
# }\n# \n# foreach (['stream', 'group', 'consumer'] as $key) {\n# if ('' === $options[$key])\
|
|
\ {\n# throw new InvalidArgumentException(\\sprintf('\"%s\" should be configured,\
|
|
\ got an empty string.', $key));\n# }\n# }\n# \n# $this->stream = $options['stream'];\n\
|
|
# $this->group = $options['group'];\n# $this->consumer = $options['consumer'];\n\
|
|
# $this->queue = $this->stream.'__queue';\n# $this->autoSetup = $options['auto_setup'];\n\
|
|
# $this->maxEntries = $options['stream_max_entries'];\n# $this->deleteAfterAck\
|
|
\ = $options['delete_after_ack'];\n# $this->deleteAfterReject = $options['delete_after_reject'];\n\
|
|
# $this->redeliverTimeout = $options['redeliver_timeout'] * 1000;\n# $this->claimInterval\
|
|
\ = $options['claim_interval'] / 1000;\n# }\n# \n# /**\n# * @param string|string[]|null\
|
|
\ $auth"
|
|
- name: initializeRedisCluster
|
|
visibility: private
|
|
parameters:
|
|
- name: redis
|
|
- name: hosts
|
|
- name: auth
|
|
- name: params
|
|
comment: '# * @param string|string[]|null $auth'
|
|
- name: fromDsn
|
|
visibility: public
|
|
parameters:
|
|
- name: dsn
|
|
- name: options
|
|
default: '[]'
|
|
- name: redis
|
|
default: 'null'
|
|
comment: null
|
|
- name: parseDsn
|
|
visibility: private
|
|
parameters:
|
|
- name: dsn
|
|
- name: '&$options'
|
|
comment: null
|
|
- name: claimOldPendingMessages
|
|
visibility: private
|
|
parameters: []
|
|
comment: null
|
|
- name: get
|
|
visibility: public
|
|
parameters: []
|
|
comment: null
|
|
- name: ack
|
|
visibility: public
|
|
parameters:
|
|
- name: id
|
|
comment: null
|
|
- name: reject
|
|
visibility: public
|
|
parameters:
|
|
- name: id
|
|
comment: null
|
|
- name: add
|
|
visibility: public
|
|
parameters:
|
|
- name: body
|
|
- name: headers
|
|
- name: delayInMs
|
|
default: '0'
|
|
comment: null
|
|
- name: setup
|
|
visibility: public
|
|
parameters: []
|
|
comment: null
|
|
- name: cleanup
|
|
visibility: public
|
|
parameters: []
|
|
comment: null
|
|
- name: getMessageCount
|
|
visibility: public
|
|
parameters: []
|
|
comment: null
|
|
- name: rawCommand
|
|
visibility: private
|
|
parameters:
|
|
- name: command
|
|
- name: '...$arguments'
|
|
comment: null
|
|
- name: getRedis
|
|
visibility: private
|
|
parameters: []
|
|
comment: null
|
|
traits:
|
|
- Relay\Relay
|
|
- Relay\Sentinel
|
|
- Symfony\Component\Messenger\Exception\InvalidArgumentException
|
|
- Symfony\Component\Messenger\Exception\LogicException
|
|
- Symfony\Component\Messenger\Exception\TransportException
|
|
interfaces: []
|