asynchronous prooph messages via Amazon AWS SQS

Do you know that you can easily switch to async prooph messages for your commands, events and even queries? This blog post shows how to use it to produce asynchronous messages via Amazon AWS Simple Queue Service (SQS). If you not familiar with the prooph components I will give you a short explanation. The prooph components are CQRS and Event Sourcing packages for PHP. They are enterprise ready, works with every PHP application and has support for the most famous PHP web frameworks (Zend, Symfony, Laravel) and of course, plays well with microservices, too! I recommend to try it out.

Enable prooph async switch

To enable the prooph async switcher, add the following config definition to your prooph config file for your specific service bus. This example illustrates it for the event bus. Don't forget to register a factory in your favorite dependency injection container.

<?php

declare(strict_types=1);

// prooph array config file
return [  
    'prooph' => [
        'service_bus' => [
            'event_bus' => [
                'plugins' => [
                    \Prooph\ServiceBus\Plugin\InvokeStrategy\OnEventStrategy::class,
                ],
                'router' => [
                    // only one line, that's it
                    'async_switch' => Acme\SqsMessageProducer::class,
                    'routes' => [/**/],
                ],
            ],
        ],
    ],
];

Mark prooph message class as async

The following example illustrates how to define an asynchronous event. Implement the interface Prooph\ServiceBus\Async\AsyncMessage to your event class. Is that easy, isn't it? You have nothing to do anything else. prooph service bus handles all the stuff for you. If you interested on some internals, read on or jump to the next headline. If a message occurs, the prooph AsyncSwitchMessageRouter enriches the message metadata with handled-async. If this field is not true, the message is send to the async message producer. If it is true, the message is sent to the decorated router and handled by the service bus like normally. Now let's go to the async message producer implementation.

Amazon AWS SQS async message producer

This is an example for the Amazon AWS Simple Queue Service (SQS). Be sure you have created an Amazon SQS queue and have the correct access rights. Then you should see the messages in the queue. Ok, the following code contains no fancy stuff and you are free to change it to your needs. But it should help to get started. The official Amazon AWS PHP library is used. Be sure you have installed it.

<?php

declare(strict_types=1);

namespace Acme;

use Aws\Sqs\SqsClient;  
use Prooph\Common\Messaging\Message;  
use Prooph\Common\Messaging\MessageConverter;  
use Prooph\ServiceBus\Async\MessageProducer;  
use Prooph\ServiceBus\Exception\RuntimeException;  
use React\Promise\Deferred;

class SqsMessageProducer implements MessageProducer  
{
    /**
     * AWS SQS client
     *
     * @var SqsClient
     */
    private $sqsClient;

    /**
     * Queue URL
     *
     * @var string
     */
    private $queueUrl;

    /**
     * Message converter
     *
     * @var MessageConverter
     */
    private $messageConverter;

    public function __construct(MessageConverter $messageConverter, SqsClient $sqsClient, string $queueUrl)
    {
        $this->sqsClient = $sqsClient;
        $this->messageConverter = $messageConverter;
        $this->queueUrl = $queueUrl;
    }

    public function __invoke(Message $message, Deferred $deferred = null)
    {
        if (null !== $deferred) {
            throw new \RuntimeException('The SqsMessageProducer can not handle deferred messages.');
        }

        $promise = $this->sqsClient->sendMessageAsync(array(
            'QueueUrl'    => $this->queueUrl,
            'MessageBody' => json_encode($this->messageConverter->convertToArray($message)),
        ));

        $promise->wait();
    }
}

Now the event messages are sent to the Amazon SQS queue, but where is the message consumer, right? You can use Amazon AWS Lambda to read the messages from the queue and send them to a message box HTTP endpoint, which is responsible for the incoming messages. You can use the prooph PSR-7 middleware library or write your own implementation. The AWS Lambda consumer function is triggered via an AWS::Events::Rule with a rate of one minute. It's not realtime but it works like a charm.

Conclusion

With a few lines of code, you can activate asynchronous prooph messages. This is really awesome. Don't miss to checkout the prooph website, to find out what you can do anything else with the prooph components.

Which asynchronous message producer do you use?

Discuss (0 Comments)