-
Notifications
You must be signed in to change notification settings - Fork 432
Open
Description
- Laravel/Lumen version: 12.15.0
- RabbitMQ version: 4.1.0
- Package version: 14.2.0
Can someone point me in the right direction on how to correctly process messages from other applications with Horizon?
I have tried to build a correct payload and have got so far that the jobs are actually being processed, but nothing else appears in Horizon under Completed Jobs (or anywhere else).
This is my wrapper job:
<?php
namespace App\Jobs;
use Carbon\Carbon;
use DateTimeInterface;
use Illuminate\Support\Collection;
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob;
class ProcessRabbitMQ extends RabbitMQJob
{
public function payload(): array
{
$job = new MyJob(json_decode($this->getRawBody(), true));
$job->onQueue('my-queue')->onConnection('rabbitmq');
return [
'uuid' => (string) \Illuminate\Support\Str::uuid(),
'displayName' => get_class($job),
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
'maxTries' => $this->getJobTries($job),
'maxExceptions' => $job->maxExceptions ?? null,
'failOnTimeout' => $job->failOnTimeout ?? false,
'backoff' => $this->getJobBackoff($job),
'timeout' => $job->timeout ?? null,
'retryUntil' => $this->getJobExpiration($job),
'data' => [
'commandName' => get_class($job),
'command' => serialize($job),
],
'createdAt' => Carbon::now()->getTimestamp(),
'type' => 'job',
'tags' => [],
'pushedAt' => Carbon::now()->getTimestamp(),
];
}
public function getJobTries($job)
{
if (! method_exists($job, 'tries') && ! isset($job->tries)) {
return;
}
if (is_null($tries = $job->tries ?? $job->tries())) {
return;
}
return $tries;
}
/**
* Get the backoff for an object-based queue handler.
*
* @param mixed $job
* @return mixed
*/
public function getJobBackoff($job)
{
if (! method_exists($job, 'backoff') && ! isset($job->backoff)) {
return;
}
if (is_null($backoff = $job->backoff ?? $job->backoff())) {
return;
}
return Collection::wrap($backoff)
->map(fn ($backoff) => $backoff instanceof DateTimeInterface ? $this->secondsUntil($backoff) : $backoff)
->implode(',');
}
/**
* Get the expiration timestamp for an object-based queue handler.
*
* @param mixed $job
* @return mixed
*/
public function getJobExpiration($job)
{
if (! method_exists($job, 'retryUntil') && ! isset($job->retryUntil)) {
return;
}
$expiration = $job->retryUntil ?? $job->retryUntil();
return $expiration instanceof DateTimeInterface
? $expiration->getTimestamp()
: $expiration;
}
}
And the actual job:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class MyJob implements ShouldQueue
{
use Queueable;
private $data = null;
public function __construct($data)
{
$this->data = $data;
}
public function handle(): void
{
\Log::info('Data recieved', $this->data);
}
}
Metadata
Metadata
Assignees
Labels
No labels