Skip to content

Commit ce1fa39

Browse files
committed
fix: handle undefined message in Retryable catch blocks
Add defensive checks for message existence before attempting retry logic to prevent undefined variable errors if Retryable is thrown before Message object creation.
1 parent 37da7c4 commit ce1fa39

File tree

2 files changed

+37
-30
lines changed

2 files changed

+37
-30
lines changed

src/Queue/Broker/AMQP.php

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -97,22 +97,27 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
9797
/**
9898
* Re-queue the job if max retries not exceeded.
9999
*/
100-
$shouldRetry = $this->maxRetries === null || $message->getRetries() < $this->maxRetries;
101-
102-
if ($shouldRetry) {
103-
// Increment retry count and re-publish
104-
$messageData = $message->asArray();
105-
$messageData['retries'] = $message->getRetries() + 1;
106-
107-
$newMessage = new AMQPMessage(
108-
json_encode($messageData),
109-
['content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
110-
);
111-
112-
$amqpMessage->getChannel()->basic_publish($newMessage, $queue->namespace, routing_key: $queue->name);
113-
$amqpMessage->ack();
100+
if (isset($message)) {
101+
$shouldRetry = $this->maxRetries === null || $message->getRetries() < $this->maxRetries;
102+
103+
if ($shouldRetry) {
104+
// Increment retry count and re-publish
105+
$messageData = $message->asArray();
106+
$messageData['retries'] = $message->getRetries() + 1;
107+
108+
$newMessage = new AMQPMessage(
109+
json_encode($messageData),
110+
['content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
111+
);
112+
113+
$amqpMessage->getChannel()->basic_publish($newMessage, $queue->namespace, routing_key: $queue->name);
114+
$amqpMessage->ack();
115+
} else {
116+
// Max retries exceeded, send to dead-letter-exchange (failed queue)
117+
$amqpMessage->nack();
118+
}
114119
} else {
115-
// Max retries exceeded, send to dead-letter-exchange (failed queue)
120+
// Message wasn't created yet, treat as non-retryable
116121
$amqpMessage->nack();
117122
}
118123

src/Queue/Broker/Redis.php

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -70,23 +70,25 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
7070
/**
7171
* Re-queue the job if max retries not exceeded.
7272
*/
73-
$shouldRetry = $this->maxRetries === null || $message->getRetries() < $this->maxRetries;
74-
75-
if ($shouldRetry) {
76-
// Increment retry count and re-queue
77-
$messageData = $message->asArray();
78-
$messageData['retries'] = $message->getRetries() + 1;
79-
$this->connection->leftPushArray("{$queue->namespace}.queue.{$queue->name}", $messageData);
80-
} else {
81-
// Max retries exceeded, move to failed queue
82-
$this->connection->leftPush("{$queue->namespace}.failed.{$queue->name}", $message->getPid());
83-
$this->connection->increment("{$queue->namespace}.stats.{$queue->name}.failed");
73+
if (isset($message)) {
74+
$shouldRetry = $this->maxRetries === null || $message->getRetries() < $this->maxRetries;
75+
76+
if ($shouldRetry) {
77+
// Increment retry count and re-queue
78+
$messageData = $message->asArray();
79+
$messageData['retries'] = $message->getRetries() + 1;
80+
$this->connection->leftPushArray("{$queue->namespace}.queue.{$queue->name}", $messageData);
81+
} else {
82+
// Max retries exceeded, move to failed queue
83+
$this->connection->leftPush("{$queue->namespace}.failed.{$queue->name}", $message->getPid());
84+
$this->connection->increment("{$queue->namespace}.stats.{$queue->name}.failed");
85+
}
86+
87+
// Remove job from jobs list in either case
88+
$this->connection->remove("{$queue->namespace}.jobs.{$queue->name}.{$message->getPid()}");
8489
}
8590

86-
// Remove job from jobs list in either case
87-
$this->connection->remove("{$queue->namespace}.jobs.{$queue->name}.{$message->getPid()}");
88-
89-
$errorCallback($message, $e);
91+
$errorCallback($message ?? null, $e);
9092
} catch (\Throwable $th) {
9193
/**
9294
* Move failed Job to Failed list.

0 commit comments

Comments
 (0)