Symfony – RabbitMq

Past few days I did some work on the software I use for my research. The analysis of the evolution of software projects can take some time if the code base gets big and there are many releases. Till now I was using JMSJobQueueBundle to run long jobs is the background. Excellent bundle, easy to use. The drawback is that it cant be scalled. The next stage in my software is to make possible to distribute the execution of Tag (software release) to multiple machines and run in parallel, in order to speed up the overall time of the analysis. RappitMq can do the job. Actually part of the job. The requirement is to run some analysis tasks on the code of a release in a particular order and on the same machine. The last one came up to avoid file sharing (the source code to analyse) between the machines.

job-levels

I had to build some mechanism on top of RabbitMq to accomplish those. To get an idea of what had to be done have a look at the sequence diagram showing the normal execution of an analysis.

process-run-normalRabbitMq made things easy, I used the excellent library videlalvaro/php-amqplib, no bundle on top of the library. Messaging is so straightforward that adding a buddle seems unnecessary. Also followed the idea I read in an excellent blog entry using symfony events in AMQ consumers. Actually the post is on Improving Symfony workers, but it is pointing to a presentation by Jonatham Wage on “Building OpenSky” where he also says that they use that technic also. It is interesting because you code a single consumer command that fires symfony events, and you can hook listeners for each event. So the code that deals with RabbitMq is at a single point, doing something simple. The following code fragment is command’s execute method and the evend dispatching method.

    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $this->output = $output;
        $this->em = $this->getContainer()->get('doctrine')->getEntityManager('default');
        $this->mq = $this->getContainer()->get('slx_rabbit_mq.rabbitmq');
        $this->mq->setConsumer(RabbitMqConstants::FAST_TRACK_QUEUE, array($this, 'onMessage'));
        while(count($this->mq->getChannel()->callbacks)) {
            $this->output->writeln('waiting...');
            $this->mq->getChannel()->wait();
        }
        $this->mq->close();
    }
 
    public function onMessage($msg) {
        $data = $msg->body;
        $toks = explode('.',$data);
        $eventName = sprintf("rabbitmq.%s%s", ucfirst($toks[0]), ucfirst($toks[1]));
        $data = isset($toks[2]) ? $toks[2] : '';
        $this->output->writeln(sprintf("Event: %s data: %s", $eventName, $data));
 
        $event = new MqJobEvent($data);
        $eventDispatcher = $this->getContainer()->get('event_dispatcher');
        $eventDispatcher->dispatch($eventName, $event);
 
        $this->mq->messageAck($msg);
    }

Then using supervisor you can run as many instances of the command you can afford. Supervisor was in my toolbox from jmsJobQueue stage, exellent and highly used tool.

Another interesting post on RicBra’s blog about the issues he faced on long running php workers and he finally build a consumer with Go. The Go consumer is running for ever and when receiving a message he runs a Symfony command to do the requested job. So php is running only as long as the jobs takes. Then stops and memory cleans, avoiding any memory issues that PHP might have. So far I dont have such issues, the truth is I solved them. PHP scripts are runned for 10-12 days analysing source codes without a crach. After last tuning I managed to keep memory to 3-5% (I always have top utility running in a window) and stay there till analysis ended (10 days later). It can be done if you manually free huge arrays and specially arrays holding objects and call garbage collector some times (Leassons learned from Java 13 years ago).

The conclusion is that RabbitMq works smoothly with Symfony 🙂