193 lines
12 KiB
YAML
193 lines
12 KiB
YAML
name: AmqpExtIntegrationTest
|
|
class_comment: '# * @requires extension amqp
|
|
|
|
# *
|
|
|
|
# * @group integration'
|
|
dependencies:
|
|
- name: TestCase
|
|
type: class
|
|
source: PHPUnit\Framework\TestCase
|
|
- name: DummyMessage
|
|
type: class
|
|
source: Symfony\Component\Messenger\Bridge\Amqp\Tests\Fixtures\DummyMessage
|
|
- name: AmqpReceivedStamp
|
|
type: class
|
|
source: Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceivedStamp
|
|
- name: AmqpReceiver
|
|
type: class
|
|
source: Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceiver
|
|
- name: AmqpSender
|
|
type: class
|
|
source: Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpSender
|
|
- name: AmqpStamp
|
|
type: class
|
|
source: Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp
|
|
- name: Connection
|
|
type: class
|
|
source: Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection
|
|
- name: Envelope
|
|
type: class
|
|
source: Symfony\Component\Messenger\Envelope
|
|
- name: DelayStamp
|
|
type: class
|
|
source: Symfony\Component\Messenger\Stamp\DelayStamp
|
|
- name: RedeliveryStamp
|
|
type: class
|
|
source: Symfony\Component\Messenger\Stamp\RedeliveryStamp
|
|
- name: ReceiverInterface
|
|
type: class
|
|
source: Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface
|
|
- name: Serializer
|
|
type: class
|
|
source: Symfony\Component\Messenger\Transport\Serialization\Serializer
|
|
- name: SerializerInterface
|
|
type: class
|
|
source: Symfony\Component\Messenger\Transport\Serialization\SerializerInterface
|
|
- name: PhpProcess
|
|
type: class
|
|
source: Symfony\Component\Process\PhpProcess
|
|
- name: Process
|
|
type: class
|
|
source: Symfony\Component\Process\Process
|
|
- name: SerializerComponent
|
|
type: class
|
|
source: Symfony\Component\Serializer
|
|
- name: JsonEncoder
|
|
type: class
|
|
source: Symfony\Component\Serializer\Encoder\JsonEncoder
|
|
- name: ArrayDenormalizer
|
|
type: class
|
|
source: Symfony\Component\Serializer\Normalizer\ArrayDenormalizer
|
|
- name: ObjectNormalizer
|
|
type: class
|
|
source: Symfony\Component\Serializer\Normalizer\ObjectNormalizer
|
|
properties: []
|
|
methods:
|
|
- name: receiveEnvelopes
|
|
visibility: private
|
|
parameters:
|
|
- name: receiver
|
|
- name: timeout
|
|
comment: "# * @requires extension amqp\n# *\n# * @group integration\n# */\n# class\
|
|
\ AmqpExtIntegrationTest extends TestCase\n# {\n# protected function setUp():\
|
|
\ void\n# {\n# parent::setUp();\n# \n# if (!getenv('MESSENGER_AMQP_DSN')) {\n\
|
|
# $this->markTestSkipped('The \"MESSENGER_AMQP_DSN\" environment variable is required.');\n\
|
|
# }\n# }\n# \n# public function testItSendsAndReceivesMessages()\n# {\n# $serializer\
|
|
\ = $this->createSerializer();\n# \n# $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));\n\
|
|
# $connection->setup();\n# $connection->purgeQueues();\n# \n# $sender = new AmqpSender($connection,\
|
|
\ $serializer);\n# $receiver = new AmqpReceiver($connection, $serializer);\n#\
|
|
\ \n# $sender->send($first = new Envelope(new DummyMessage('First')));\n# $sender->send($second\
|
|
\ = new Envelope(new DummyMessage('Second')));\n# \n# $envelopes = iterator_to_array($receiver->get());\n\
|
|
# $this->assertCount(1, $envelopes);\n# /** @var Envelope $envelope */\n# $envelope\
|
|
\ = $envelopes[0];\n# $this->assertEquals($first->getMessage(), $envelope->getMessage());\n\
|
|
# $this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));\n\
|
|
# \n# $envelopes = iterator_to_array($receiver->get());\n# $this->assertCount(1,\
|
|
\ $envelopes);\n# /** @var Envelope $envelope */\n# $envelope = $envelopes[0];\n\
|
|
# $this->assertEquals($second->getMessage(), $envelope->getMessage());\n# \n#\
|
|
\ $this->assertEmpty(iterator_to_array($receiver->get()));\n# }\n# \n# public\
|
|
\ function testRetryAndDelay()\n# {\n# $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));\n\
|
|
# $connection->setup();\n# $connection->purgeQueues();\n# \n# $sender = new AmqpSender($connection);\n\
|
|
# $receiver = new AmqpReceiver($connection);\n# \n# // send a first message\n\
|
|
# $sender->send($first = new Envelope(new DummyMessage('First')));\n# \n# // receive\
|
|
\ it immediately and imitate a redeliver with 2 second delay\n# $envelopes = iterator_to_array($receiver->get());\n\
|
|
# /** @var Envelope $envelope */\n# $envelope = $envelopes[0];\n# $newEnvelope\
|
|
\ = $envelope\n# ->with(new DelayStamp(2000))\n# ->with(new RedeliveryStamp(1));\n\
|
|
# $sender->send($newEnvelope);\n# $receiver->ack($envelope);\n# \n# // send a\
|
|
\ 2nd message with a shorter delay and custom routing key\n# $customRoutingKeyMessage\
|
|
\ = new DummyMessage('custom routing key');\n# $envelopeCustomRoutingKey = new\
|
|
\ Envelope($customRoutingKeyMessage, [\n# new DelayStamp(1000),\n# new AmqpStamp('my_custom_routing_key'),\n\
|
|
# ]);\n# $sender->send($envelopeCustomRoutingKey);\n# \n# // wait for next message\
|
|
\ (but max at 3 seconds)\n# $startTime = microtime(true);\n# $envelopes = $this->receiveEnvelopes($receiver,\
|
|
\ 3);\n# \n# // duration should be about 1 second\n# $this->assertApproximateDuration($startTime,\
|
|
\ 1);\n# \n# // this should be the custom routing key message first\n# $this->assertCount(1,\
|
|
\ $envelopes);\n# /* @var Envelope $envelope */\n# $receiver->ack($envelopes[0]);\n\
|
|
# $this->assertEquals($customRoutingKeyMessage, $envelopes[0]->getMessage());\n\
|
|
# \n# // wait for final message (but max at 3 seconds)\n# $envelopes = $this->receiveEnvelopes($receiver,\
|
|
\ 3);\n# // duration should be about 2 seconds\n# $this->assertApproximateDuration($startTime,\
|
|
\ 2);\n# \n# /* @var RedeliveryStamp|null $retryStamp */\n# // verify the stamp\
|
|
\ still exists from the last send\n# $this->assertCount(1, $envelopes);\n# $retryStamp\
|
|
\ = $envelopes[0]->last(RedeliveryStamp::class);\n# $this->assertNotNull($retryStamp);\n\
|
|
# $this->assertSame(1, $retryStamp->getRetryCount());\n# \n# $receiver->ack($envelope);\n\
|
|
# }\n# \n# public function testRetryAffectsOnlyOriginalQueue()\n# {\n# $connection\
|
|
\ = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), [\n# 'exchange' => [\n#\
|
|
\ 'name' => 'messages_topic',\n# 'type' => 'topic',\n# 'default_publish_routing_key'\
|
|
\ => 'topic_routing_key',\n# ],\n# 'queues' => [\n# 'A' => ['binding_keys' =>\
|
|
\ ['topic_routing_key']],\n# 'B' => ['binding_keys' => ['topic_routing_key']],\n\
|
|
# ],\n# ]);\n# $connection->setup();\n# $connection->purgeQueues();\n# \n# $sender\
|
|
\ = new AmqpSender($connection);\n# $receiver = new AmqpReceiver($connection);\n\
|
|
# \n# // initial delivery: should receive in both queues\n# $sender->send(new\
|
|
\ Envelope(new DummyMessage('Payload')));\n# \n# $receivedEnvelopes = $this->receiveWithQueueName($receiver);\n\
|
|
# $this->assertCount(2, $receivedEnvelopes);\n# $this->assertArrayHasKey('A',\
|
|
\ $receivedEnvelopes);\n# $this->assertArrayHasKey('B', $receivedEnvelopes);\n\
|
|
# \n# // retry: should receive in only \"A\" queue\n# $retryEnvelope = $receivedEnvelopes['A']\n\
|
|
# ->with(new DelayStamp(10))\n# ->with(new RedeliveryStamp(1));\n# $sender->send($retryEnvelope);\n\
|
|
# \n# $retriedEnvelopes = $this->receiveWithQueueName($receiver);\n# $this->assertCount(1,\
|
|
\ $retriedEnvelopes);\n# $this->assertArrayHasKey('A', $retriedEnvelopes);\n#\
|
|
\ }\n# \n# public function testItReceivesSignals()\n# {\n# $serializer = $this->createSerializer();\n\
|
|
# \n# $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));\n# $connection->setup();\n\
|
|
# $connection->purgeQueues();\n# \n# $sender = new AmqpSender($connection, $serializer);\n\
|
|
# $sender->send(new Envelope(new DummyMessage('Hello')));\n# \n# $amqpReadTimeout\
|
|
\ = 30;\n# $dsn = getenv('MESSENGER_AMQP_DSN').'?read_timeout='.$amqpReadTimeout;\n\
|
|
# $process = new PhpProcess(file_get_contents(__DIR__.'/../Fixtures/long_receiver.php'),\
|
|
\ null, [\n# 'COMPONENT_ROOT' => __DIR__.'/../../',\n# 'DSN' => $dsn,\n# ]);\n\
|
|
# \n# $process->start();\n# \n# $this->waitForOutput($process, $expectedOutput\
|
|
\ = \"Receiving messages...\\n\");\n# \n# $signalTime = microtime(true);\n# $timedOutTime\
|
|
\ = time() + 10;\n# \n# // wait for worker started and registered the signal handler\n\
|
|
# usleep(100 * 1000); // 100ms\n# \n# // immediately after the process has started\
|
|
\ \"booted\", kill it\n# $process->signal(15);\n# \n# while ($process->isRunning()\
|
|
\ && time() < $timedOutTime) {\n# usleep(100 * 1000); // 100ms\n# }\n# \n# //\
|
|
\ make sure the process exited, after consuming only the 1 message\n# $this->assertFalse($process->isRunning());\n\
|
|
# $this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);\n# $this->assertSame($expectedOutput.<<<'TXT'\n\
|
|
# Get envelope with message: Symfony\\Component\\Messenger\\Bridge\\Amqp\\Tests\\\
|
|
Fixtures\\DummyMessage\n# with stamps: [\n# \"Symfony\\\\Component\\\\Messenger\\\
|
|
\\Stamp\\\\SerializedMessageStamp\",\n# \"Symfony\\\\Component\\\\Messenger\\\\\
|
|
Bridge\\\\Amqp\\\\Transport\\\\AmqpReceivedStamp\",\n# \"Symfony\\\\Component\\\
|
|
\\Messenger\\\\Stamp\\\\ReceivedStamp\",\n# \"Symfony\\\\Component\\\\Messenger\\\
|
|
\\Stamp\\\\ConsumedByWorkerStamp\",\n# \"Symfony\\\\Component\\\\Messenger\\\\\
|
|
Stamp\\\\AckStamp\"\n# ]\n# Done.\n# \n# TXT\n# , $process->getOutput());\n# }\n\
|
|
# \n# public function testItCountsMessagesInQueue()\n# {\n# $serializer = $this->createSerializer();\n\
|
|
# \n# $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));\n# $connection->setup();\n\
|
|
# $connection->purgeQueues();\n# \n# $sender = new AmqpSender($connection, $serializer);\n\
|
|
# \n# $sender->send(new Envelope(new DummyMessage('First')));\n# $sender->send(new\
|
|
\ Envelope(new DummyMessage('Second')));\n# $sender->send(new Envelope(new DummyMessage('Third')));\n\
|
|
# \n# sleep(1); // give amqp a moment to have the messages ready\n# $this->assertSame(3,\
|
|
\ $connection->countMessagesInQueues());\n# }\n# \n# private function waitForOutput(Process\
|
|
\ $process, string $output, $timeoutInSeconds = 10)\n# {\n# $timedOutTime = time()\
|
|
\ + $timeoutInSeconds;\n# \n# while (time() < $timedOutTime) {\n# if (str_starts_with($process->getOutput(),\
|
|
\ $output)) {\n# return;\n# }\n# \n# usleep(100 * 1000); // 100ms\n# }\n# \n#\
|
|
\ throw new \\RuntimeException('Expected output never arrived. Got \"'.$process->getOutput().'\"\
|
|
\ instead.');\n# }\n# \n# private function createSerializer(): SerializerInterface\n\
|
|
# {\n# return new Serializer(\n# new SerializerComponent\\Serializer([new ObjectNormalizer(),\
|
|
\ new ArrayDenormalizer()], ['json' => new JsonEncoder()])\n# );\n# }\n# \n# private\
|
|
\ function assertApproximateDuration($startTime, int $expectedDuration)\n# {\n\
|
|
# $actualDuration = microtime(true) - $startTime;\n# \n# if (method_exists($this,\
|
|
\ 'assertEqualsWithDelta')) {\n# $this->assertEqualsWithDelta($expectedDuration,\
|
|
\ $actualDuration, .5, 'Duration was not within expected range');\n# } else {\n\
|
|
# $this->assertEquals($expectedDuration, $actualDuration, 'Duration was not within\
|
|
\ expected range', .5);\n# }\n# }\n# \n# /**\n# * @return Envelope[]"
|
|
- name: receiveWithQueueName
|
|
visibility: private
|
|
parameters:
|
|
- name: receiver
|
|
comment: null
|
|
traits:
|
|
- PHPUnit\Framework\TestCase
|
|
- Symfony\Component\Messenger\Bridge\Amqp\Tests\Fixtures\DummyMessage
|
|
- Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceivedStamp
|
|
- Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceiver
|
|
- Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpSender
|
|
- Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp
|
|
- Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection
|
|
- Symfony\Component\Messenger\Envelope
|
|
- Symfony\Component\Messenger\Stamp\DelayStamp
|
|
- Symfony\Component\Messenger\Stamp\RedeliveryStamp
|
|
- Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface
|
|
- Symfony\Component\Messenger\Transport\Serialization\Serializer
|
|
- Symfony\Component\Messenger\Transport\Serialization\SerializerInterface
|
|
- Symfony\Component\Process\PhpProcess
|
|
- Symfony\Component\Process\Process
|
|
- Symfony\Component\Serializer\Encoder\JsonEncoder
|
|
- Symfony\Component\Serializer\Normalizer\ArrayDenormalizer
|
|
- Symfony\Component\Serializer\Normalizer\ObjectNormalizer
|
|
interfaces: []
|