api/symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.yaml

194 lines
12 KiB
YAML
Raw Normal View History

2024-09-26 09:03:21 +00:00
name: AmqpExtIntegrationTest
class_comment: '# * @requires extension amqp
# *
# * @group integration'
dependencies:
- name: TestCase
type: class
source: PHPUnit\Framework\TestCase
- name: DummyMessage
type: class
source: Symfony\Component\Messenger\Bridge\Amqp\Tests\Fixtures\DummyMessage
- name: AmqpReceivedStamp
type: class
source: Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceivedStamp
- name: AmqpReceiver
type: class
source: Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceiver
- name: AmqpSender
type: class
source: Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpSender
- name: AmqpStamp
type: class
source: Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp
- name: Connection
type: class
source: Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection
- name: Envelope
type: class
source: Symfony\Component\Messenger\Envelope
- name: DelayStamp
type: class
source: Symfony\Component\Messenger\Stamp\DelayStamp
- name: RedeliveryStamp
type: class
source: Symfony\Component\Messenger\Stamp\RedeliveryStamp
- name: ReceiverInterface
type: class
source: Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface
- name: Serializer
type: class
source: Symfony\Component\Messenger\Transport\Serialization\Serializer
- name: SerializerInterface
type: class
source: Symfony\Component\Messenger\Transport\Serialization\SerializerInterface
- name: PhpProcess
type: class
source: Symfony\Component\Process\PhpProcess
- name: Process
type: class
source: Symfony\Component\Process\Process
- name: SerializerComponent
type: class
source: Symfony\Component\Serializer
- name: JsonEncoder
type: class
source: Symfony\Component\Serializer\Encoder\JsonEncoder
- name: ArrayDenormalizer
type: class
source: Symfony\Component\Serializer\Normalizer\ArrayDenormalizer
- name: ObjectNormalizer
type: class
source: Symfony\Component\Serializer\Normalizer\ObjectNormalizer
properties: []
methods:
- name: receiveEnvelopes
visibility: private
parameters:
- name: receiver
- name: timeout
comment: "# * @requires extension amqp\n# *\n# * @group integration\n# */\n# class\
\ AmqpExtIntegrationTest extends TestCase\n# {\n# protected function setUp():\
\ void\n# {\n# parent::setUp();\n# \n# if (!getenv('MESSENGER_AMQP_DSN')) {\n\
# $this->markTestSkipped('The \"MESSENGER_AMQP_DSN\" environment variable is required.');\n\
# }\n# }\n# \n# public function testItSendsAndReceivesMessages()\n# {\n# $serializer\
\ = $this->createSerializer();\n# \n# $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));\n\
# $connection->setup();\n# $connection->purgeQueues();\n# \n# $sender = new AmqpSender($connection,\
\ $serializer);\n# $receiver = new AmqpReceiver($connection, $serializer);\n#\
\ \n# $sender->send($first = new Envelope(new DummyMessage('First')));\n# $sender->send($second\
\ = new Envelope(new DummyMessage('Second')));\n# \n# $envelopes = iterator_to_array($receiver->get());\n\
# $this->assertCount(1, $envelopes);\n# /** @var Envelope $envelope */\n# $envelope\
\ = $envelopes[0];\n# $this->assertEquals($first->getMessage(), $envelope->getMessage());\n\
# $this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));\n\
# \n# $envelopes = iterator_to_array($receiver->get());\n# $this->assertCount(1,\
\ $envelopes);\n# /** @var Envelope $envelope */\n# $envelope = $envelopes[0];\n\
# $this->assertEquals($second->getMessage(), $envelope->getMessage());\n# \n#\
\ $this->assertEmpty(iterator_to_array($receiver->get()));\n# }\n# \n# public\
\ function testRetryAndDelay()\n# {\n# $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));\n\
# $connection->setup();\n# $connection->purgeQueues();\n# \n# $sender = new AmqpSender($connection);\n\
# $receiver = new AmqpReceiver($connection);\n# \n# // send a first message\n\
# $sender->send($first = new Envelope(new DummyMessage('First')));\n# \n# // receive\
\ it immediately and imitate a redeliver with 2 second delay\n# $envelopes = iterator_to_array($receiver->get());\n\
# /** @var Envelope $envelope */\n# $envelope = $envelopes[0];\n# $newEnvelope\
\ = $envelope\n# ->with(new DelayStamp(2000))\n# ->with(new RedeliveryStamp(1));\n\
# $sender->send($newEnvelope);\n# $receiver->ack($envelope);\n# \n# // send a\
\ 2nd message with a shorter delay and custom routing key\n# $customRoutingKeyMessage\
\ = new DummyMessage('custom routing key');\n# $envelopeCustomRoutingKey = new\
\ Envelope($customRoutingKeyMessage, [\n# new DelayStamp(1000),\n# new AmqpStamp('my_custom_routing_key'),\n\
# ]);\n# $sender->send($envelopeCustomRoutingKey);\n# \n# // wait for next message\
\ (but max at 3 seconds)\n# $startTime = microtime(true);\n# $envelopes = $this->receiveEnvelopes($receiver,\
\ 3);\n# \n# // duration should be about 1 second\n# $this->assertApproximateDuration($startTime,\
\ 1);\n# \n# // this should be the custom routing key message first\n# $this->assertCount(1,\
\ $envelopes);\n# /* @var Envelope $envelope */\n# $receiver->ack($envelopes[0]);\n\
# $this->assertEquals($customRoutingKeyMessage, $envelopes[0]->getMessage());\n\
# \n# // wait for final message (but max at 3 seconds)\n# $envelopes = $this->receiveEnvelopes($receiver,\
\ 3);\n# // duration should be about 2 seconds\n# $this->assertApproximateDuration($startTime,\
\ 2);\n# \n# /* @var RedeliveryStamp|null $retryStamp */\n# // verify the stamp\
\ still exists from the last send\n# $this->assertCount(1, $envelopes);\n# $retryStamp\
\ = $envelopes[0]->last(RedeliveryStamp::class);\n# $this->assertNotNull($retryStamp);\n\
# $this->assertSame(1, $retryStamp->getRetryCount());\n# \n# $receiver->ack($envelope);\n\
# }\n# \n# public function testRetryAffectsOnlyOriginalQueue()\n# {\n# $connection\
\ = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), [\n# 'exchange' => [\n#\
\ 'name' => 'messages_topic',\n# 'type' => 'topic',\n# 'default_publish_routing_key'\
\ => 'topic_routing_key',\n# ],\n# 'queues' => [\n# 'A' => ['binding_keys' =>\
\ ['topic_routing_key']],\n# 'B' => ['binding_keys' => ['topic_routing_key']],\n\
# ],\n# ]);\n# $connection->setup();\n# $connection->purgeQueues();\n# \n# $sender\
\ = new AmqpSender($connection);\n# $receiver = new AmqpReceiver($connection);\n\
# \n# // initial delivery: should receive in both queues\n# $sender->send(new\
\ Envelope(new DummyMessage('Payload')));\n# \n# $receivedEnvelopes = $this->receiveWithQueueName($receiver);\n\
# $this->assertCount(2, $receivedEnvelopes);\n# $this->assertArrayHasKey('A',\
\ $receivedEnvelopes);\n# $this->assertArrayHasKey('B', $receivedEnvelopes);\n\
# \n# // retry: should receive in only \"A\" queue\n# $retryEnvelope = $receivedEnvelopes['A']\n\
# ->with(new DelayStamp(10))\n# ->with(new RedeliveryStamp(1));\n# $sender->send($retryEnvelope);\n\
# \n# $retriedEnvelopes = $this->receiveWithQueueName($receiver);\n# $this->assertCount(1,\
\ $retriedEnvelopes);\n# $this->assertArrayHasKey('A', $retriedEnvelopes);\n#\
\ }\n# \n# public function testItReceivesSignals()\n# {\n# $serializer = $this->createSerializer();\n\
# \n# $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));\n# $connection->setup();\n\
# $connection->purgeQueues();\n# \n# $sender = new AmqpSender($connection, $serializer);\n\
# $sender->send(new Envelope(new DummyMessage('Hello')));\n# \n# $amqpReadTimeout\
\ = 30;\n# $dsn = getenv('MESSENGER_AMQP_DSN').'?read_timeout='.$amqpReadTimeout;\n\
# $process = new PhpProcess(file_get_contents(__DIR__.'/../Fixtures/long_receiver.php'),\
\ null, [\n# 'COMPONENT_ROOT' => __DIR__.'/../../',\n# 'DSN' => $dsn,\n# ]);\n\
# \n# $process->start();\n# \n# $this->waitForOutput($process, $expectedOutput\
\ = \"Receiving messages...\\n\");\n# \n# $signalTime = microtime(true);\n# $timedOutTime\
\ = time() + 10;\n# \n# // wait for worker started and registered the signal handler\n\
# usleep(100 * 1000); // 100ms\n# \n# // immediately after the process has started\
\ \"booted\", kill it\n# $process->signal(15);\n# \n# while ($process->isRunning()\
\ && time() < $timedOutTime) {\n# usleep(100 * 1000); // 100ms\n# }\n# \n# //\
\ make sure the process exited, after consuming only the 1 message\n# $this->assertFalse($process->isRunning());\n\
# $this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);\n# $this->assertSame($expectedOutput.<<<'TXT'\n\
# Get envelope with message: Symfony\\Component\\Messenger\\Bridge\\Amqp\\Tests\\\
Fixtures\\DummyMessage\n# with stamps: [\n# \"Symfony\\\\Component\\\\Messenger\\\
\\Stamp\\\\SerializedMessageStamp\",\n# \"Symfony\\\\Component\\\\Messenger\\\\\
Bridge\\\\Amqp\\\\Transport\\\\AmqpReceivedStamp\",\n# \"Symfony\\\\Component\\\
\\Messenger\\\\Stamp\\\\ReceivedStamp\",\n# \"Symfony\\\\Component\\\\Messenger\\\
\\Stamp\\\\ConsumedByWorkerStamp\",\n# \"Symfony\\\\Component\\\\Messenger\\\\\
Stamp\\\\AckStamp\"\n# ]\n# Done.\n# \n# TXT\n# , $process->getOutput());\n# }\n\
# \n# public function testItCountsMessagesInQueue()\n# {\n# $serializer = $this->createSerializer();\n\
# \n# $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));\n# $connection->setup();\n\
# $connection->purgeQueues();\n# \n# $sender = new AmqpSender($connection, $serializer);\n\
# \n# $sender->send(new Envelope(new DummyMessage('First')));\n# $sender->send(new\
\ Envelope(new DummyMessage('Second')));\n# $sender->send(new Envelope(new DummyMessage('Third')));\n\
# \n# sleep(1); // give amqp a moment to have the messages ready\n# $this->assertSame(3,\
\ $connection->countMessagesInQueues());\n# }\n# \n# private function waitForOutput(Process\
\ $process, string $output, $timeoutInSeconds = 10)\n# {\n# $timedOutTime = time()\
\ + $timeoutInSeconds;\n# \n# while (time() < $timedOutTime) {\n# if (str_starts_with($process->getOutput(),\
\ $output)) {\n# return;\n# }\n# \n# usleep(100 * 1000); // 100ms\n# }\n# \n#\
\ throw new \\RuntimeException('Expected output never arrived. Got \"'.$process->getOutput().'\"\
\ instead.');\n# }\n# \n# private function createSerializer(): SerializerInterface\n\
# {\n# return new Serializer(\n# new SerializerComponent\\Serializer([new ObjectNormalizer(),\
\ new ArrayDenormalizer()], ['json' => new JsonEncoder()])\n# );\n# }\n# \n# private\
\ function assertApproximateDuration($startTime, int $expectedDuration)\n# {\n\
# $actualDuration = microtime(true) - $startTime;\n# \n# if (method_exists($this,\
\ 'assertEqualsWithDelta')) {\n# $this->assertEqualsWithDelta($expectedDuration,\
\ $actualDuration, .5, 'Duration was not within expected range');\n# } else {\n\
# $this->assertEquals($expectedDuration, $actualDuration, 'Duration was not within\
\ expected range', .5);\n# }\n# }\n# \n# /**\n# * @return Envelope[]"
- name: receiveWithQueueName
visibility: private
parameters:
- name: receiver
comment: null
traits:
- PHPUnit\Framework\TestCase
- Symfony\Component\Messenger\Bridge\Amqp\Tests\Fixtures\DummyMessage
- Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceivedStamp
- Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceiver
- Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpSender
- Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp
- Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection
- Symfony\Component\Messenger\Envelope
- Symfony\Component\Messenger\Stamp\DelayStamp
- Symfony\Component\Messenger\Stamp\RedeliveryStamp
- Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface
- Symfony\Component\Messenger\Transport\Serialization\Serializer
- Symfony\Component\Messenger\Transport\Serialization\SerializerInterface
- Symfony\Component\Process\PhpProcess
- Symfony\Component\Process\Process
- Symfony\Component\Serializer\Encoder\JsonEncoder
- Symfony\Component\Serializer\Normalizer\ArrayDenormalizer
- Symfony\Component\Serializer\Normalizer\ObjectNormalizer
interfaces: []