In my Symfony project, there is a queue message handler, and I have an error that randomly appears during the execution:
[2022-10-12T07:31:40.060119 00:00] console.CRITICAL: Error thrown while running command "messenger:consume async --limit=10". Message: "Library error: a socket error occurred" {"exception":"[object] (Symfony\\Component\\Messenger\\Exception
TransportException(code: 0): Library error: a socket error occurred at /var/www/app/vendor/symfony/amqp-messenger/Transport/AmqpReceiver.php:62)
[previous exception] [object] (AMQPException(code: 0): Library error: a socket error occurred at /var/www/app/vendor/symfony/amqp-messenger/Transport/Connection.php:439)","command":"messenger:consume async --limit=10","message":"Library error: a socket error occurred"} []
The handler executes HTTP requests that could last some seconds and the whole process of a single message could even take more than one minute if APIs are slow. The strange thing is that the problem disappears for hours but then it randomly appears again. The more messages are sent to the queue, the easier it's to see the exception.
config\packages\messenger.yaml
framework:
messenger:
transports:
# https://symfony.com/doc/current/messenger.html#transport-configuration
async:
dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
options:
exchange:
name: async_exchange
queues:
async: ~
heartbeat: 45
write_timeout: 90
read_timeout: 90
retry_strategy:
max_retries: 0
routing:
# Route your messages to the transports
'App\Message\MessageUpdateRequest': async
App\MessageHandler\MessageUpdateRequestHandler.php
<?php
declare(strict_types=1);
namespace App\MessageHandler;
use App\Message\MessageUpdateRequest;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
class MessageUpdateRequestHandler implements MessageHandlerInterface
{
public function __invoke(MessageUpdateRequest $message)
{
// Logic executing API requests...
return 0;
}
}
Environment
- Symfony Messenger: 5.4.17
- PHP: 8.1
- RabbitMQ: 3.11.5
Things that I tried
- upgrading Symfony Messenger to 5.4.17, using the fix available here;
- adding the following options:
heartbeat
,write_timeout
andread_timeout
in themessenger.yaml
file.
Related issues/links
- https://github.com/php-amqp/php-amqp/issues/258
- https://github.com/symfony/symfony/issues/32357
- https://github.com/symfony/symfony/pull/47831
How can I fix this issue?
CodePudding user response:
Regarding a socket error that occurs in Symfony Messenger, I always suggest following the step-wise approach and checking if you are missing anything. It should fix this type of error almost every time. Please follow these guidelines:
- Verify that the RabbitMQ service is active and accessible.
- Verify that the hostname, port, username, and password are listed in the
messenger.yaml
file are accurate. - In the
messenger.yaml
file, increase the heartbeat, write timeout, and read timeout settings. - Verify your use case to determine whether the max retries number in
messenger.yaml
is appropriate. - Look for any network problems that could be causing the socket error.
- Make sure your PHP version is compatible with RabbitMQ and Symfony Messenger.
- Verify that the server's resources (CPU, Memory, and Disk) are not used up.
- Look for any relevant error messages in the PHP error log.
- Determine whether there is a problem with the
MessageUpdateRequestHandler
class's logic.
Tip: Still stuck? Try to reproduce the error with a smaller message set and in a controlled environment to isolate the root cause that we may be missing.
Hope it helps. Happy debugging and good luck.
CodePudding user response:
(May not be answer, but too long for comment. I don't use Symphony so can't test, but do a lot with queues.)
In this code (https://github.com/surfingmig/symfony/blob/608f4dade49f2f0367380d37c946b4a6a3d3985c/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php) there is no verification that the connection is still alive before the ack()
; everything is stored in locally held variables and they are only checked before get and send.
I suggest you copy
$this->clearWhenDisconnected();
if ($this->autoSetupExchange) {
$this->setupExchangeAndQueues(); // also setup normal exchange for delayed messages so delay queue can DLX messages to it
}
from get()
into the top of the ack()
function.
If that works, also copy to the nack()
, purge()
and countMessagesInQueues()
functions, or make a common function "checkStillConnected()" then turn into a pull request.
Note: You will still get exceptions (there is always a gap between "am I connected", and "ack()", even if microseconds, where disconnect can happen). So a retry handler in your code as well would be appropriate. I try three times before giving up.
(Other note: one of those change requests linked is only a stop-gap solution that triggers under certain conditions, but will miss most issues.)