Skip to content

Process messages from another application with Horizon #631

@mihob

Description

@mihob
  • Laravel/Lumen version: 12.15.0
  • RabbitMQ version: 4.1.0
  • Package version: 14.2.0

Can someone point me in the right direction on how to correctly process messages from other applications with Horizon?

I have tried to build a correct payload and have got so far that the jobs are actually being processed, but nothing else appears in Horizon under Completed Jobs (or anywhere else).

This is my wrapper job:

<?php

namespace App\Jobs;

use Carbon\Carbon;
use DateTimeInterface;
use Illuminate\Support\Collection;
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob;

class ProcessRabbitMQ extends RabbitMQJob
{
    public function payload(): array
    {
        $job = new MyJob(json_decode($this->getRawBody(), true));
        $job->onQueue('my-queue')->onConnection('rabbitmq');

        return [
            'uuid' => (string) \Illuminate\Support\Str::uuid(),
            'displayName' => get_class($job),
            'job' => 'Illuminate\Queue\CallQueuedHandler@call',
            'maxTries' => $this->getJobTries($job),
            'maxExceptions' => $job->maxExceptions ?? null,
            'failOnTimeout' => $job->failOnTimeout ?? false,
            'backoff' => $this->getJobBackoff($job),
            'timeout' => $job->timeout ?? null,
            'retryUntil' => $this->getJobExpiration($job),
            'data' => [
                'commandName' => get_class($job),
                'command' => serialize($job),
            ],
            'createdAt' => Carbon::now()->getTimestamp(),
            'type' => 'job',
            'tags' => [],
            'pushedAt' => Carbon::now()->getTimestamp(),
        ];
    }

    public function getJobTries($job)
    {
        if (! method_exists($job, 'tries') && ! isset($job->tries)) {
            return;
        }

        if (is_null($tries = $job->tries ?? $job->tries())) {
            return;
        }

        return $tries;
    }

    /**
     * Get the backoff for an object-based queue handler.
     *
     * @param  mixed  $job
     * @return mixed
     */
    public function getJobBackoff($job)
    {
        if (! method_exists($job, 'backoff') && ! isset($job->backoff)) {
            return;
        }

        if (is_null($backoff = $job->backoff ?? $job->backoff())) {
            return;
        }

        return Collection::wrap($backoff)
            ->map(fn ($backoff) => $backoff instanceof DateTimeInterface ? $this->secondsUntil($backoff) : $backoff)
            ->implode(',');
    }

    /**
     * Get the expiration timestamp for an object-based queue handler.
     *
     * @param  mixed  $job
     * @return mixed
     */
    public function getJobExpiration($job)
    {
        if (! method_exists($job, 'retryUntil') && ! isset($job->retryUntil)) {
            return;
        }

        $expiration = $job->retryUntil ?? $job->retryUntil();

        return $expiration instanceof DateTimeInterface
            ? $expiration->getTimestamp()
            : $expiration;
    }
}

And the actual job:

<?php

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class MyJob implements ShouldQueue
{
    use Queueable;

    private $data = null;

    public function __construct($data)
    {
        $this->data = $data;
    }

    public function handle(): void
    {
        \Log::info('Data recieved', $this->data);
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions