init
This commit is contained in:
390
vendor/codeception/base/src/Codeception/Module/AMQP.php
vendored
Normal file
390
vendor/codeception/base/src/Codeception/Module/AMQP.php
vendored
Normal file
@@ -0,0 +1,390 @@
|
||||
<?php
|
||||
namespace Codeception\Module;
|
||||
|
||||
use Codeception\Exception\ModuleException as ModuleException;
|
||||
use Codeception\Lib\Interfaces\RequiresPackage;
|
||||
use Codeception\Module as CodeceptionModule;
|
||||
use Codeception\TestInterface;
|
||||
use Exception;
|
||||
use PhpAmqpLib\Channel\AMQPChannel;
|
||||
use PhpAmqpLib\Connection\AMQPStreamConnection;
|
||||
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
|
||||
use PhpAmqpLib\Message\AMQPMessage;
|
||||
|
||||
/**
|
||||
* This module interacts with message broker software that implements
|
||||
* the Advanced Message Queuing Protocol (AMQP) standard. For example, RabbitMQ (tested).
|
||||
*
|
||||
* <div class="alert alert-info">
|
||||
* To use this module with Composer you need <em>"php-amqplib/php-amqplib": "~2.4"</em> package.
|
||||
* </div>
|
||||
*
|
||||
* ## Config
|
||||
*
|
||||
* * host: localhost - host to connect
|
||||
* * username: guest - username to connect
|
||||
* * password: guest - password to connect
|
||||
* * vhost: '/' - vhost to connect
|
||||
* * cleanup: true - defined queues will be purged before running every test.
|
||||
* * queues: [mail, twitter] - queues to cleanup
|
||||
* * single_channel - create and use only one channel during test execution
|
||||
*
|
||||
* ### Example
|
||||
*
|
||||
* modules:
|
||||
* enabled:
|
||||
* - AMQP:
|
||||
* host: 'localhost'
|
||||
* port: '5672'
|
||||
* username: 'guest'
|
||||
* password: 'guest'
|
||||
* vhost: '/'
|
||||
* queues: [queue1, queue2]
|
||||
* single_channel: false
|
||||
*
|
||||
* ## Public Properties
|
||||
*
|
||||
* * connection - AMQPStreamConnection - current connection
|
||||
*/
|
||||
class AMQP extends CodeceptionModule implements RequiresPackage
|
||||
{
|
||||
protected $config = [
|
||||
'host' => 'localhost',
|
||||
'username' => 'guest',
|
||||
'password' => 'guest',
|
||||
'port' => '5672',
|
||||
'vhost' => '/',
|
||||
'cleanup' => true,
|
||||
'single_channel' => false
|
||||
];
|
||||
|
||||
/**
|
||||
* @var AMQPStreamConnection
|
||||
*/
|
||||
public $connection;
|
||||
|
||||
/**
|
||||
* @var int
|
||||
*/
|
||||
protected $channelId;
|
||||
|
||||
protected $requiredFields = ['host', 'username', 'password', 'vhost'];
|
||||
|
||||
public function _requires()
|
||||
{
|
||||
return ['PhpAmqpLib\Connection\AMQPStreamConnection' => '"php-amqplib/php-amqplib": "~2.4"'];
|
||||
}
|
||||
|
||||
public function _initialize()
|
||||
{
|
||||
$host = $this->config['host'];
|
||||
$port = $this->config['port'];
|
||||
$username = $this->config['username'];
|
||||
$password = $this->config['password'];
|
||||
$vhost = $this->config['vhost'];
|
||||
|
||||
try {
|
||||
$this->connection = new AMQPStreamConnection($host, $port, $username, $password, $vhost);
|
||||
} catch (Exception $e) {
|
||||
throw new ModuleException(__CLASS__, $e->getMessage() . ' while establishing connection to MQ server');
|
||||
}
|
||||
}
|
||||
|
||||
public function _before(TestInterface $test)
|
||||
{
|
||||
if ($this->config['cleanup']) {
|
||||
$this->cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends message to exchange by sending exchange name, message
|
||||
* and (optionally) a routing key
|
||||
*
|
||||
* ``` php
|
||||
* <?php
|
||||
* $I->pushToExchange('exchange.emails', 'thanks');
|
||||
* $I->pushToExchange('exchange.emails', new AMQPMessage('Thanks!'));
|
||||
* $I->pushToExchange('exchange.emails', new AMQPMessage('Thanks!'), 'severity');
|
||||
* ?>
|
||||
* ```
|
||||
*
|
||||
* @param string $exchange
|
||||
* @param string|\PhpAmqpLib\Message\AMQPMessage $message
|
||||
* @param string $routing_key
|
||||
*/
|
||||
public function pushToExchange($exchange, $message, $routing_key = null)
|
||||
{
|
||||
$message = $message instanceof AMQPMessage
|
||||
? $message
|
||||
: new AMQPMessage($message);
|
||||
$this->getChannel()->basic_publish($message, $exchange, $routing_key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends message to queue
|
||||
*
|
||||
* ``` php
|
||||
* <?php
|
||||
* $I->pushToQueue('queue.jobs', 'create user');
|
||||
* $I->pushToQueue('queue.jobs', new AMQPMessage('create'));
|
||||
* ?>
|
||||
* ```
|
||||
*
|
||||
* @param string $queue
|
||||
* @param string|\PhpAmqpLib\Message\AMQPMessage $message
|
||||
*/
|
||||
public function pushToQueue($queue, $message)
|
||||
{
|
||||
$message = $message instanceof AMQPMessage
|
||||
? $message
|
||||
: new AMQPMessage($message);
|
||||
|
||||
$this->getChannel()->queue_declare($queue);
|
||||
$this->getChannel()->basic_publish($message, '', $queue);
|
||||
}
|
||||
|
||||
/**
|
||||
* Declares an exchange
|
||||
*
|
||||
* This is an alias of method `exchange_declare` of `PhpAmqpLib\Channel\AMQPChannel`.
|
||||
*
|
||||
* ```php
|
||||
* <?php
|
||||
* $I->declareExchange(
|
||||
* 'nameOfMyExchange', // exchange name
|
||||
* 'topic' // exchange type
|
||||
* )
|
||||
* ```
|
||||
*
|
||||
* @param string $exchange
|
||||
* @param string $type
|
||||
* @param bool $passive
|
||||
* @param bool $durable
|
||||
* @param bool $auto_delete
|
||||
* @param bool $internal
|
||||
* @param bool $nowait
|
||||
* @param array $arguments
|
||||
* @param int $ticket
|
||||
* @return mixed|null
|
||||
*/
|
||||
public function declareExchange(
|
||||
$exchange,
|
||||
$type,
|
||||
$passive = false,
|
||||
$durable = false,
|
||||
$auto_delete = true,
|
||||
$internal = false,
|
||||
$nowait = false,
|
||||
$arguments = null,
|
||||
$ticket = null
|
||||
) {
|
||||
return $this->getChannel()->exchange_declare(
|
||||
$exchange,
|
||||
$type,
|
||||
$passive,
|
||||
$durable,
|
||||
$auto_delete,
|
||||
$internal,
|
||||
$nowait,
|
||||
$arguments,
|
||||
$ticket
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Declares queue, creates if needed
|
||||
*
|
||||
* This is an alias of method `queue_declare` of `PhpAmqpLib\Channel\AMQPChannel`.
|
||||
*
|
||||
* ```php
|
||||
* <?php
|
||||
* $I->declareQueue(
|
||||
* 'nameOfMyQueue', // exchange name
|
||||
* )
|
||||
* ```
|
||||
*
|
||||
* @param string $queue
|
||||
* @param bool $passive
|
||||
* @param bool $durable
|
||||
* @param bool $exclusive
|
||||
* @param bool $auto_delete
|
||||
* @param bool $nowait
|
||||
* @param array $arguments
|
||||
* @param int $ticket
|
||||
* @return mixed|null
|
||||
*/
|
||||
public function declareQueue(
|
||||
$queue = '',
|
||||
$passive = false,
|
||||
$durable = false,
|
||||
$exclusive = false,
|
||||
$auto_delete = true,
|
||||
$nowait = false,
|
||||
$arguments = null,
|
||||
$ticket = null
|
||||
) {
|
||||
return $this->getChannel()->queue_declare(
|
||||
$queue,
|
||||
$passive,
|
||||
$durable,
|
||||
$exclusive,
|
||||
$auto_delete,
|
||||
$nowait,
|
||||
$arguments,
|
||||
$ticket
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Binds a queue to an exchange
|
||||
*
|
||||
* This is an alias of method `queue_bind` of `PhpAmqpLib\Channel\AMQPChannel`.
|
||||
*
|
||||
* ```php
|
||||
* <?php
|
||||
* $I->bindQueueToExchange(
|
||||
* 'nameOfMyQueueToBind', // name of the queue
|
||||
* 'transactionTracking.transaction', // exchange name to bind to
|
||||
* 'your.routing.key' // Optionally, provide a binding key
|
||||
* )
|
||||
* ```
|
||||
*
|
||||
* @param string $queue
|
||||
* @param string $exchange
|
||||
* @param string $routing_key
|
||||
* @param bool $nowait
|
||||
* @param array $arguments
|
||||
* @param int $ticket
|
||||
* @return mixed|null
|
||||
*/
|
||||
public function bindQueueToExchange(
|
||||
$queue,
|
||||
$exchange,
|
||||
$routing_key = '',
|
||||
$nowait = false,
|
||||
$arguments = null,
|
||||
$ticket = null
|
||||
) {
|
||||
return $this->getChannel()->queue_bind(
|
||||
$queue,
|
||||
$exchange,
|
||||
$routing_key,
|
||||
$nowait,
|
||||
$arguments,
|
||||
$ticket
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if message containing text received.
|
||||
*
|
||||
* **This method drops message from queue**
|
||||
* **This method will wait for message. If none is sent the script will stuck**.
|
||||
*
|
||||
* ``` php
|
||||
* <?php
|
||||
* $I->pushToQueue('queue.emails', 'Hello, davert');
|
||||
* $I->seeMessageInQueueContainsText('queue.emails','davert');
|
||||
* ?>
|
||||
* ```
|
||||
*
|
||||
* @param string $queue
|
||||
* @param string $text
|
||||
*/
|
||||
public function seeMessageInQueueContainsText($queue, $text)
|
||||
{
|
||||
$msg = $this->getChannel()->basic_get($queue);
|
||||
if (!$msg) {
|
||||
$this->fail("Message was not received");
|
||||
}
|
||||
if (!$msg instanceof AMQPMessage) {
|
||||
$this->fail("Received message is not format of AMQPMessage");
|
||||
}
|
||||
$this->debugSection("Message", $msg->body);
|
||||
$this->assertContains($text, $msg->body);
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes last message from queue.
|
||||
*
|
||||
* ``` php
|
||||
* <?php
|
||||
* $message = $I->grabMessageFromQueue('queue.emails');
|
||||
* ?>
|
||||
* ```
|
||||
*
|
||||
* @param string $queue
|
||||
* @return \PhpAmqpLib\Message\AMQPMessage
|
||||
*/
|
||||
public function grabMessageFromQueue($queue)
|
||||
{
|
||||
$message = $this->getChannel()->basic_get($queue);
|
||||
return $message;
|
||||
}
|
||||
|
||||
/**
|
||||
* Purge a specific queue defined in config.
|
||||
*
|
||||
* ``` php
|
||||
* <?php
|
||||
* $I->purgeQueue('queue.emails');
|
||||
* ?>
|
||||
* ```
|
||||
*
|
||||
* @param string $queueName
|
||||
*/
|
||||
public function purgeQueue($queueName = '')
|
||||
{
|
||||
if (! in_array($queueName, $this->config['queues'])) {
|
||||
throw new ModuleException(__CLASS__, "'$queueName' doesn't exist in queues config list");
|
||||
}
|
||||
|
||||
$this->getChannel()->queue_purge($queueName, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Purge all queues defined in config.
|
||||
*
|
||||
* ``` php
|
||||
* <?php
|
||||
* $I->purgeAllQueues();
|
||||
* ?>
|
||||
* ```
|
||||
*/
|
||||
public function purgeAllQueues()
|
||||
{
|
||||
$this->cleanup();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return \PhpAmqpLib\Channel\AMQPChannel
|
||||
*/
|
||||
protected function getChannel()
|
||||
{
|
||||
if ($this->config['single_channel'] && $this->channelId === null) {
|
||||
$this->channelId = $this->connection->get_free_channel_id();
|
||||
}
|
||||
return $this->connection->channel($this->channelId);
|
||||
}
|
||||
|
||||
protected function cleanup()
|
||||
{
|
||||
if (!isset($this->config['queues'])) {
|
||||
throw new ModuleException(__CLASS__, "please set queues for cleanup");
|
||||
}
|
||||
if (!$this->connection) {
|
||||
return;
|
||||
}
|
||||
foreach ($this->config['queues'] as $queue) {
|
||||
try {
|
||||
$this->getChannel()->queue_purge($queue);
|
||||
} catch (AMQPProtocolChannelException $e) {
|
||||
// ignore if exchange/queue doesn't exist and rethrow exception if it's something else
|
||||
if ($e->getCode() !== 404) {
|
||||
throw $e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user