391 lines
10 KiB
PHP
391 lines
10 KiB
PHP
<?php
|
|
namespace Codeception\Module;
|
|
|
|
use Codeception\Exception\ModuleException as ModuleException;
|
|
use Codeception\Lib\Interfaces\RequiresPackage;
|
|
use Codeception\Module as CodeceptionModule;
|
|
use Codeception\TestInterface;
|
|
use Exception;
|
|
use PhpAmqpLib\Channel\AMQPChannel;
|
|
use PhpAmqpLib\Connection\AMQPStreamConnection;
|
|
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
|
|
use PhpAmqpLib\Message\AMQPMessage;
|
|
|
|
/**
|
|
* This module interacts with message broker software that implements
|
|
* the Advanced Message Queuing Protocol (AMQP) standard. For example, RabbitMQ (tested).
|
|
*
|
|
* <div class="alert alert-info">
|
|
* To use this module with Composer you need <em>"php-amqplib/php-amqplib": "~2.4"</em> package.
|
|
* </div>
|
|
*
|
|
* ## Config
|
|
*
|
|
* * host: localhost - host to connect
|
|
* * username: guest - username to connect
|
|
* * password: guest - password to connect
|
|
* * vhost: '/' - vhost to connect
|
|
* * cleanup: true - defined queues will be purged before running every test.
|
|
* * queues: [mail, twitter] - queues to cleanup
|
|
* * single_channel - create and use only one channel during test execution
|
|
*
|
|
* ### Example
|
|
*
|
|
* modules:
|
|
* enabled:
|
|
* - AMQP:
|
|
* host: 'localhost'
|
|
* port: '5672'
|
|
* username: 'guest'
|
|
* password: 'guest'
|
|
* vhost: '/'
|
|
* queues: [queue1, queue2]
|
|
* single_channel: false
|
|
*
|
|
* ## Public Properties
|
|
*
|
|
* * connection - AMQPStreamConnection - current connection
|
|
*/
|
|
class AMQP extends CodeceptionModule implements RequiresPackage
|
|
{
|
|
protected $config = [
|
|
'host' => 'localhost',
|
|
'username' => 'guest',
|
|
'password' => 'guest',
|
|
'port' => '5672',
|
|
'vhost' => '/',
|
|
'cleanup' => true,
|
|
'single_channel' => false
|
|
];
|
|
|
|
/**
|
|
* @var AMQPStreamConnection
|
|
*/
|
|
public $connection;
|
|
|
|
/**
|
|
* @var int
|
|
*/
|
|
protected $channelId;
|
|
|
|
protected $requiredFields = ['host', 'username', 'password', 'vhost'];
|
|
|
|
public function _requires()
|
|
{
|
|
return ['PhpAmqpLib\Connection\AMQPStreamConnection' => '"php-amqplib/php-amqplib": "~2.4"'];
|
|
}
|
|
|
|
public function _initialize()
|
|
{
|
|
$host = $this->config['host'];
|
|
$port = $this->config['port'];
|
|
$username = $this->config['username'];
|
|
$password = $this->config['password'];
|
|
$vhost = $this->config['vhost'];
|
|
|
|
try {
|
|
$this->connection = new AMQPStreamConnection($host, $port, $username, $password, $vhost);
|
|
} catch (Exception $e) {
|
|
throw new ModuleException(__CLASS__, $e->getMessage() . ' while establishing connection to MQ server');
|
|
}
|
|
}
|
|
|
|
public function _before(TestInterface $test)
|
|
{
|
|
if ($this->config['cleanup']) {
|
|
$this->cleanup();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Sends message to exchange by sending exchange name, message
|
|
* and (optionally) a routing key
|
|
*
|
|
* ``` php
|
|
* <?php
|
|
* $I->pushToExchange('exchange.emails', 'thanks');
|
|
* $I->pushToExchange('exchange.emails', new AMQPMessage('Thanks!'));
|
|
* $I->pushToExchange('exchange.emails', new AMQPMessage('Thanks!'), 'severity');
|
|
* ?>
|
|
* ```
|
|
*
|
|
* @param string $exchange
|
|
* @param string|\PhpAmqpLib\Message\AMQPMessage $message
|
|
* @param string $routing_key
|
|
*/
|
|
public function pushToExchange($exchange, $message, $routing_key = null)
|
|
{
|
|
$message = $message instanceof AMQPMessage
|
|
? $message
|
|
: new AMQPMessage($message);
|
|
$this->getChannel()->basic_publish($message, $exchange, $routing_key);
|
|
}
|
|
|
|
/**
|
|
* Sends message to queue
|
|
*
|
|
* ``` php
|
|
* <?php
|
|
* $I->pushToQueue('queue.jobs', 'create user');
|
|
* $I->pushToQueue('queue.jobs', new AMQPMessage('create'));
|
|
* ?>
|
|
* ```
|
|
*
|
|
* @param string $queue
|
|
* @param string|\PhpAmqpLib\Message\AMQPMessage $message
|
|
*/
|
|
public function pushToQueue($queue, $message)
|
|
{
|
|
$message = $message instanceof AMQPMessage
|
|
? $message
|
|
: new AMQPMessage($message);
|
|
|
|
$this->getChannel()->queue_declare($queue);
|
|
$this->getChannel()->basic_publish($message, '', $queue);
|
|
}
|
|
|
|
/**
|
|
* Declares an exchange
|
|
*
|
|
* This is an alias of method `exchange_declare` of `PhpAmqpLib\Channel\AMQPChannel`.
|
|
*
|
|
* ```php
|
|
* <?php
|
|
* $I->declareExchange(
|
|
* 'nameOfMyExchange', // exchange name
|
|
* 'topic' // exchange type
|
|
* )
|
|
* ```
|
|
*
|
|
* @param string $exchange
|
|
* @param string $type
|
|
* @param bool $passive
|
|
* @param bool $durable
|
|
* @param bool $auto_delete
|
|
* @param bool $internal
|
|
* @param bool $nowait
|
|
* @param array $arguments
|
|
* @param int $ticket
|
|
* @return mixed|null
|
|
*/
|
|
public function declareExchange(
|
|
$exchange,
|
|
$type,
|
|
$passive = false,
|
|
$durable = false,
|
|
$auto_delete = true,
|
|
$internal = false,
|
|
$nowait = false,
|
|
$arguments = null,
|
|
$ticket = null
|
|
) {
|
|
return $this->getChannel()->exchange_declare(
|
|
$exchange,
|
|
$type,
|
|
$passive,
|
|
$durable,
|
|
$auto_delete,
|
|
$internal,
|
|
$nowait,
|
|
$arguments,
|
|
$ticket
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Declares queue, creates if needed
|
|
*
|
|
* This is an alias of method `queue_declare` of `PhpAmqpLib\Channel\AMQPChannel`.
|
|
*
|
|
* ```php
|
|
* <?php
|
|
* $I->declareQueue(
|
|
* 'nameOfMyQueue', // exchange name
|
|
* )
|
|
* ```
|
|
*
|
|
* @param string $queue
|
|
* @param bool $passive
|
|
* @param bool $durable
|
|
* @param bool $exclusive
|
|
* @param bool $auto_delete
|
|
* @param bool $nowait
|
|
* @param array $arguments
|
|
* @param int $ticket
|
|
* @return mixed|null
|
|
*/
|
|
public function declareQueue(
|
|
$queue = '',
|
|
$passive = false,
|
|
$durable = false,
|
|
$exclusive = false,
|
|
$auto_delete = true,
|
|
$nowait = false,
|
|
$arguments = null,
|
|
$ticket = null
|
|
) {
|
|
return $this->getChannel()->queue_declare(
|
|
$queue,
|
|
$passive,
|
|
$durable,
|
|
$exclusive,
|
|
$auto_delete,
|
|
$nowait,
|
|
$arguments,
|
|
$ticket
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Binds a queue to an exchange
|
|
*
|
|
* This is an alias of method `queue_bind` of `PhpAmqpLib\Channel\AMQPChannel`.
|
|
*
|
|
* ```php
|
|
* <?php
|
|
* $I->bindQueueToExchange(
|
|
* 'nameOfMyQueueToBind', // name of the queue
|
|
* 'transactionTracking.transaction', // exchange name to bind to
|
|
* 'your.routing.key' // Optionally, provide a binding key
|
|
* )
|
|
* ```
|
|
*
|
|
* @param string $queue
|
|
* @param string $exchange
|
|
* @param string $routing_key
|
|
* @param bool $nowait
|
|
* @param array $arguments
|
|
* @param int $ticket
|
|
* @return mixed|null
|
|
*/
|
|
public function bindQueueToExchange(
|
|
$queue,
|
|
$exchange,
|
|
$routing_key = '',
|
|
$nowait = false,
|
|
$arguments = null,
|
|
$ticket = null
|
|
) {
|
|
return $this->getChannel()->queue_bind(
|
|
$queue,
|
|
$exchange,
|
|
$routing_key,
|
|
$nowait,
|
|
$arguments,
|
|
$ticket
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Checks if message containing text received.
|
|
*
|
|
* **This method drops message from queue**
|
|
* **This method will wait for message. If none is sent the script will stuck**.
|
|
*
|
|
* ``` php
|
|
* <?php
|
|
* $I->pushToQueue('queue.emails', 'Hello, davert');
|
|
* $I->seeMessageInQueueContainsText('queue.emails','davert');
|
|
* ?>
|
|
* ```
|
|
*
|
|
* @param string $queue
|
|
* @param string $text
|
|
*/
|
|
public function seeMessageInQueueContainsText($queue, $text)
|
|
{
|
|
$msg = $this->getChannel()->basic_get($queue);
|
|
if (!$msg) {
|
|
$this->fail("Message was not received");
|
|
}
|
|
if (!$msg instanceof AMQPMessage) {
|
|
$this->fail("Received message is not format of AMQPMessage");
|
|
}
|
|
$this->debugSection("Message", $msg->body);
|
|
$this->assertContains($text, $msg->body);
|
|
}
|
|
|
|
/**
|
|
* Takes last message from queue.
|
|
*
|
|
* ``` php
|
|
* <?php
|
|
* $message = $I->grabMessageFromQueue('queue.emails');
|
|
* ?>
|
|
* ```
|
|
*
|
|
* @param string $queue
|
|
* @return \PhpAmqpLib\Message\AMQPMessage
|
|
*/
|
|
public function grabMessageFromQueue($queue)
|
|
{
|
|
$message = $this->getChannel()->basic_get($queue);
|
|
return $message;
|
|
}
|
|
|
|
/**
|
|
* Purge a specific queue defined in config.
|
|
*
|
|
* ``` php
|
|
* <?php
|
|
* $I->purgeQueue('queue.emails');
|
|
* ?>
|
|
* ```
|
|
*
|
|
* @param string $queueName
|
|
*/
|
|
public function purgeQueue($queueName = '')
|
|
{
|
|
if (! in_array($queueName, $this->config['queues'])) {
|
|
throw new ModuleException(__CLASS__, "'$queueName' doesn't exist in queues config list");
|
|
}
|
|
|
|
$this->getChannel()->queue_purge($queueName, true);
|
|
}
|
|
|
|
/**
|
|
* Purge all queues defined in config.
|
|
*
|
|
* ``` php
|
|
* <?php
|
|
* $I->purgeAllQueues();
|
|
* ?>
|
|
* ```
|
|
*/
|
|
public function purgeAllQueues()
|
|
{
|
|
$this->cleanup();
|
|
}
|
|
|
|
/**
|
|
* @return \PhpAmqpLib\Channel\AMQPChannel
|
|
*/
|
|
protected function getChannel()
|
|
{
|
|
if ($this->config['single_channel'] && $this->channelId === null) {
|
|
$this->channelId = $this->connection->get_free_channel_id();
|
|
}
|
|
return $this->connection->channel($this->channelId);
|
|
}
|
|
|
|
protected function cleanup()
|
|
{
|
|
if (!isset($this->config['queues'])) {
|
|
throw new ModuleException(__CLASS__, "please set queues for cleanup");
|
|
}
|
|
if (!$this->connection) {
|
|
return;
|
|
}
|
|
foreach ($this->config['queues'] as $queue) {
|
|
try {
|
|
$this->getChannel()->queue_purge($queue);
|
|
} catch (AMQPProtocolChannelException $e) {
|
|
// ignore if exchange/queue doesn't exist and rethrow exception if it's something else
|
|
if ($e->getCode() !== 404) {
|
|
throw $e;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|