An abstraction layer to use different adapters.
Supported adapters:
- php-amqplib (
phpamqplib
in the factory) - php-amqp extension (
amqp
in the factory)
use AMQPAL\Adapter;
use AMQPAL\Options;
$options = [
'name' => 'amqp', // or phpamqplib
'options' => [
'host' => 'localhost',
'username' => 'guest',
'password' => 'guest',
'vhost' => '/'
]
];
$factory = new Adapter\AdapterFactory();
$adapter = $factory->createAdapter($options);
$connection = $adapter->getConnection();
$channel = $connection->createChannel();
/*
* Creating exchange...
*/
$exchangeOptions = new Options\ExchangeOptions([
'name' => 'exchange-name',
'type' => 'direct'
]);
$exchange = $channel->createExchange($exchangeOptions);
// or:
$exchange = $channel->createExchange([
'name' => 'exchange-name',
'type' => 'direct'
]);
/*
* Creating queue...
*/
$queueOptions = new Options\QueueOptions([
'name' => 'queue-name',
]);
$queue = $channel->createQueue($queueOptions);
// or:
$queue = $channel->createQueue([
'name' => 'queue-name',
]);
$queue->declareQueue();
$queue->bind('exchange-name');
// publishing a message...
$exchange->publish('my message in the queue');
// get the next message in the queue...
$message = $queue->get();
// or consuming a queue...
$callback = function (Adapter\Message $message, Adapter\QueueInterface $queue) {
// ack the message...
$queue->ack($message->getDeliveryTag());
// return false to stop consuming...
return false;
};
// set channel qos to fetch just one message at time
$channel->setQos(null, 1);
// and consuming...
$queue->consume($callback); // This is a blocking function