name: AmqpExtIntegrationTest class_comment: '# * @requires extension amqp # * # * @group integration' dependencies: - name: TestCase type: class source: PHPUnit\Framework\TestCase - name: DummyMessage type: class source: Symfony\Component\Messenger\Bridge\Amqp\Tests\Fixtures\DummyMessage - name: AmqpReceivedStamp type: class source: Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceivedStamp - name: AmqpReceiver type: class source: Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceiver - name: AmqpSender type: class source: Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpSender - name: AmqpStamp type: class source: Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp - name: Connection type: class source: Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection - name: Envelope type: class source: Symfony\Component\Messenger\Envelope - name: DelayStamp type: class source: Symfony\Component\Messenger\Stamp\DelayStamp - name: RedeliveryStamp type: class source: Symfony\Component\Messenger\Stamp\RedeliveryStamp - name: ReceiverInterface type: class source: Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface - name: Serializer type: class source: Symfony\Component\Messenger\Transport\Serialization\Serializer - name: SerializerInterface type: class source: Symfony\Component\Messenger\Transport\Serialization\SerializerInterface - name: PhpProcess type: class source: Symfony\Component\Process\PhpProcess - name: Process type: class source: Symfony\Component\Process\Process - name: SerializerComponent type: class source: Symfony\Component\Serializer - name: JsonEncoder type: class source: Symfony\Component\Serializer\Encoder\JsonEncoder - name: ArrayDenormalizer type: class source: Symfony\Component\Serializer\Normalizer\ArrayDenormalizer - name: ObjectNormalizer type: class source: Symfony\Component\Serializer\Normalizer\ObjectNormalizer properties: [] methods: - name: receiveEnvelopes visibility: private parameters: - name: receiver - name: timeout comment: "# * @requires extension amqp\n# *\n# * @group integration\n# */\n# class\ \ AmqpExtIntegrationTest extends TestCase\n# {\n# protected function setUp():\ \ void\n# {\n# parent::setUp();\n# \n# if (!getenv('MESSENGER_AMQP_DSN')) {\n\ # $this->markTestSkipped('The \"MESSENGER_AMQP_DSN\" environment variable is required.');\n\ # }\n# }\n# \n# public function testItSendsAndReceivesMessages()\n# {\n# $serializer\ \ = $this->createSerializer();\n# \n# $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));\n\ # $connection->setup();\n# $connection->purgeQueues();\n# \n# $sender = new AmqpSender($connection,\ \ $serializer);\n# $receiver = new AmqpReceiver($connection, $serializer);\n#\ \ \n# $sender->send($first = new Envelope(new DummyMessage('First')));\n# $sender->send($second\ \ = new Envelope(new DummyMessage('Second')));\n# \n# $envelopes = iterator_to_array($receiver->get());\n\ # $this->assertCount(1, $envelopes);\n# /** @var Envelope $envelope */\n# $envelope\ \ = $envelopes[0];\n# $this->assertEquals($first->getMessage(), $envelope->getMessage());\n\ # $this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));\n\ # \n# $envelopes = iterator_to_array($receiver->get());\n# $this->assertCount(1,\ \ $envelopes);\n# /** @var Envelope $envelope */\n# $envelope = $envelopes[0];\n\ # $this->assertEquals($second->getMessage(), $envelope->getMessage());\n# \n#\ \ $this->assertEmpty(iterator_to_array($receiver->get()));\n# }\n# \n# public\ \ function testRetryAndDelay()\n# {\n# $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));\n\ # $connection->setup();\n# $connection->purgeQueues();\n# \n# $sender = new AmqpSender($connection);\n\ # $receiver = new AmqpReceiver($connection);\n# \n# // send a first message\n\ # $sender->send($first = new Envelope(new DummyMessage('First')));\n# \n# // receive\ \ it immediately and imitate a redeliver with 2 second delay\n# $envelopes = iterator_to_array($receiver->get());\n\ # /** @var Envelope $envelope */\n# $envelope = $envelopes[0];\n# $newEnvelope\ \ = $envelope\n# ->with(new DelayStamp(2000))\n# ->with(new RedeliveryStamp(1));\n\ # $sender->send($newEnvelope);\n# $receiver->ack($envelope);\n# \n# // send a\ \ 2nd message with a shorter delay and custom routing key\n# $customRoutingKeyMessage\ \ = new DummyMessage('custom routing key');\n# $envelopeCustomRoutingKey = new\ \ Envelope($customRoutingKeyMessage, [\n# new DelayStamp(1000),\n# new AmqpStamp('my_custom_routing_key'),\n\ # ]);\n# $sender->send($envelopeCustomRoutingKey);\n# \n# // wait for next message\ \ (but max at 3 seconds)\n# $startTime = microtime(true);\n# $envelopes = $this->receiveEnvelopes($receiver,\ \ 3);\n# \n# // duration should be about 1 second\n# $this->assertApproximateDuration($startTime,\ \ 1);\n# \n# // this should be the custom routing key message first\n# $this->assertCount(1,\ \ $envelopes);\n# /* @var Envelope $envelope */\n# $receiver->ack($envelopes[0]);\n\ # $this->assertEquals($customRoutingKeyMessage, $envelopes[0]->getMessage());\n\ # \n# // wait for final message (but max at 3 seconds)\n# $envelopes = $this->receiveEnvelopes($receiver,\ \ 3);\n# // duration should be about 2 seconds\n# $this->assertApproximateDuration($startTime,\ \ 2);\n# \n# /* @var RedeliveryStamp|null $retryStamp */\n# // verify the stamp\ \ still exists from the last send\n# $this->assertCount(1, $envelopes);\n# $retryStamp\ \ = $envelopes[0]->last(RedeliveryStamp::class);\n# $this->assertNotNull($retryStamp);\n\ # $this->assertSame(1, $retryStamp->getRetryCount());\n# \n# $receiver->ack($envelope);\n\ # }\n# \n# public function testRetryAffectsOnlyOriginalQueue()\n# {\n# $connection\ \ = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), [\n# 'exchange' => [\n#\ \ 'name' => 'messages_topic',\n# 'type' => 'topic',\n# 'default_publish_routing_key'\ \ => 'topic_routing_key',\n# ],\n# 'queues' => [\n# 'A' => ['binding_keys' =>\ \ ['topic_routing_key']],\n# 'B' => ['binding_keys' => ['topic_routing_key']],\n\ # ],\n# ]);\n# $connection->setup();\n# $connection->purgeQueues();\n# \n# $sender\ \ = new AmqpSender($connection);\n# $receiver = new AmqpReceiver($connection);\n\ # \n# // initial delivery: should receive in both queues\n# $sender->send(new\ \ Envelope(new DummyMessage('Payload')));\n# \n# $receivedEnvelopes = $this->receiveWithQueueName($receiver);\n\ # $this->assertCount(2, $receivedEnvelopes);\n# $this->assertArrayHasKey('A',\ \ $receivedEnvelopes);\n# $this->assertArrayHasKey('B', $receivedEnvelopes);\n\ # \n# // retry: should receive in only \"A\" queue\n# $retryEnvelope = $receivedEnvelopes['A']\n\ # ->with(new DelayStamp(10))\n# ->with(new RedeliveryStamp(1));\n# $sender->send($retryEnvelope);\n\ # \n# $retriedEnvelopes = $this->receiveWithQueueName($receiver);\n# $this->assertCount(1,\ \ $retriedEnvelopes);\n# $this->assertArrayHasKey('A', $retriedEnvelopes);\n#\ \ }\n# \n# public function testItReceivesSignals()\n# {\n# $serializer = $this->createSerializer();\n\ # \n# $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));\n# $connection->setup();\n\ # $connection->purgeQueues();\n# \n# $sender = new AmqpSender($connection, $serializer);\n\ # $sender->send(new Envelope(new DummyMessage('Hello')));\n# \n# $amqpReadTimeout\ \ = 30;\n# $dsn = getenv('MESSENGER_AMQP_DSN').'?read_timeout='.$amqpReadTimeout;\n\ # $process = new PhpProcess(file_get_contents(__DIR__.'/../Fixtures/long_receiver.php'),\ \ null, [\n# 'COMPONENT_ROOT' => __DIR__.'/../../',\n# 'DSN' => $dsn,\n# ]);\n\ # \n# $process->start();\n# \n# $this->waitForOutput($process, $expectedOutput\ \ = \"Receiving messages...\\n\");\n# \n# $signalTime = microtime(true);\n# $timedOutTime\ \ = time() + 10;\n# \n# // wait for worker started and registered the signal handler\n\ # usleep(100 * 1000); // 100ms\n# \n# // immediately after the process has started\ \ \"booted\", kill it\n# $process->signal(15);\n# \n# while ($process->isRunning()\ \ && time() < $timedOutTime) {\n# usleep(100 * 1000); // 100ms\n# }\n# \n# //\ \ make sure the process exited, after consuming only the 1 message\n# $this->assertFalse($process->isRunning());\n\ # $this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);\n# $this->assertSame($expectedOutput.<<<'TXT'\n\ # Get envelope with message: Symfony\\Component\\Messenger\\Bridge\\Amqp\\Tests\\\ Fixtures\\DummyMessage\n# with stamps: [\n# \"Symfony\\\\Component\\\\Messenger\\\ \\Stamp\\\\SerializedMessageStamp\",\n# \"Symfony\\\\Component\\\\Messenger\\\\\ Bridge\\\\Amqp\\\\Transport\\\\AmqpReceivedStamp\",\n# \"Symfony\\\\Component\\\ \\Messenger\\\\Stamp\\\\ReceivedStamp\",\n# \"Symfony\\\\Component\\\\Messenger\\\ \\Stamp\\\\ConsumedByWorkerStamp\",\n# \"Symfony\\\\Component\\\\Messenger\\\\\ Stamp\\\\AckStamp\"\n# ]\n# Done.\n# \n# TXT\n# , $process->getOutput());\n# }\n\ # \n# public function testItCountsMessagesInQueue()\n# {\n# $serializer = $this->createSerializer();\n\ # \n# $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));\n# $connection->setup();\n\ # $connection->purgeQueues();\n# \n# $sender = new AmqpSender($connection, $serializer);\n\ # \n# $sender->send(new Envelope(new DummyMessage('First')));\n# $sender->send(new\ \ Envelope(new DummyMessage('Second')));\n# $sender->send(new Envelope(new DummyMessage('Third')));\n\ # \n# sleep(1); // give amqp a moment to have the messages ready\n# $this->assertSame(3,\ \ $connection->countMessagesInQueues());\n# }\n# \n# private function waitForOutput(Process\ \ $process, string $output, $timeoutInSeconds = 10)\n# {\n# $timedOutTime = time()\ \ + $timeoutInSeconds;\n# \n# while (time() < $timedOutTime) {\n# if (str_starts_with($process->getOutput(),\ \ $output)) {\n# return;\n# }\n# \n# usleep(100 * 1000); // 100ms\n# }\n# \n#\ \ throw new \\RuntimeException('Expected output never arrived. Got \"'.$process->getOutput().'\"\ \ instead.');\n# }\n# \n# private function createSerializer(): SerializerInterface\n\ # {\n# return new Serializer(\n# new SerializerComponent\\Serializer([new ObjectNormalizer(),\ \ new ArrayDenormalizer()], ['json' => new JsonEncoder()])\n# );\n# }\n# \n# private\ \ function assertApproximateDuration($startTime, int $expectedDuration)\n# {\n\ # $actualDuration = microtime(true) - $startTime;\n# \n# if (method_exists($this,\ \ 'assertEqualsWithDelta')) {\n# $this->assertEqualsWithDelta($expectedDuration,\ \ $actualDuration, .5, 'Duration was not within expected range');\n# } else {\n\ # $this->assertEquals($expectedDuration, $actualDuration, 'Duration was not within\ \ expected range', .5);\n# }\n# }\n# \n# /**\n# * @return Envelope[]" - name: receiveWithQueueName visibility: private parameters: - name: receiver comment: null traits: - PHPUnit\Framework\TestCase - Symfony\Component\Messenger\Bridge\Amqp\Tests\Fixtures\DummyMessage - Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceivedStamp - Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceiver - Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpSender - Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp - Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection - Symfony\Component\Messenger\Envelope - Symfony\Component\Messenger\Stamp\DelayStamp - Symfony\Component\Messenger\Stamp\RedeliveryStamp - Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface - Symfony\Component\Messenger\Transport\Serialization\Serializer - Symfony\Component\Messenger\Transport\Serialization\SerializerInterface - Symfony\Component\Process\PhpProcess - Symfony\Component\Process\Process - Symfony\Component\Serializer\Encoder\JsonEncoder - Symfony\Component\Serializer\Normalizer\ArrayDenormalizer - Symfony\Component\Serializer\Normalizer\ObjectNormalizer interfaces: []