Skip to content

Commit a72f890

Browse files
committed
add retry.
1 parent da51a99 commit a72f890

File tree

1 file changed

+13
-4
lines changed

1 file changed

+13
-4
lines changed

src/Queue/CMQQueue.php

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ public function size($queue = null)
8585
*
8686
* @return mixed
8787
* @throws \ReflectionException
88+
* @throws \Exception
8889
*/
8990
public function push($job, $data = '', $queue = null)
9091
{
@@ -112,6 +113,7 @@ public function push($job, $data = '', $queue = null)
112113
* @return \Freyo\LaravelQueueCMQ\Queue\Driver\Message|array
113114
* @throws \Freyo\LaravelQueueCMQ\Queue\Driver\CMQServerNetworkException
114115
* @throws \Freyo\LaravelQueueCMQ\Queue\Driver\CMQServerException
116+
* @throws \Exception
115117
*/
116118
public function pushRaw($payload, $queue = null, array $options = [])
117119
{
@@ -122,17 +124,23 @@ public function pushRaw($payload, $queue = null, array $options = [])
122124
if ($driver instanceof Topic) {
123125
switch ($this->topicOptions['filter']) {
124126
case self::CMQ_TOPIC_TAG_FILTER_NAME:
125-
return $driver->publish_message($message->msgBody, explode(',', $queue), null);
127+
return retry(3, function () use ($driver, $message, $queue) {
128+
return $driver->publish_message($message->msgBody, explode(',', $queue), null);
129+
});
126130
case self::CMQ_TOPIC_ROUTING_FILTER_NAME:
127-
return $driver->publish_message($message->msgBody, [], $queue);
131+
return retry(3, function () use ($driver, $message, $queue) {
132+
$driver->publish_message($message->msgBody, [], $queue);
133+
});
128134
default:
129135
throw new \InvalidArgumentException(
130136
'Invalid CMQ topic filter: ' . $this->topicOptions['filter']
131137
);
132138
}
133139
}
134140

135-
return $driver->send_message($message, Arr::get($options, 'delay', 0));
141+
return retry(3, function () use ($driver, $message, $options) {
142+
return $driver->send_message($message, Arr::get($options, 'delay', 0));
143+
});
136144
}
137145

138146
/**
@@ -145,6 +153,7 @@ public function pushRaw($payload, $queue = null, array $options = [])
145153
*
146154
* @return mixed
147155
* @throws \ReflectionException
156+
* @throws \Exception
148157
*/
149158
public function later($delay, $job, $data = '', $queue = null)
150159
{
@@ -179,7 +188,7 @@ public function pop($queue = null)
179188
$queue = $this->getQueue($queue);
180189
$message = $queue->receive_message($this->queueOptions['polling_wait_seconds']);
181190
} catch (CMQServerException $e) {
182-
if ((int)$e->getCode() === self::CMQ_QUEUE_NO_MESSAGE_CODE) { //ignore no message
191+
if (self::CMQ_QUEUE_NO_MESSAGE_CODE === (int)$e->getCode()) { //ignore no message
183192
return null;
184193
}
185194
throw $e;

0 commit comments

Comments
 (0)