Resque to the rescue!


Diego Kuperman

diegok | @freekey

Hello!

Good morning

5 years ago...

A small startup was just starting...

soysuper logo

Was in need of a job queue to organize the chaos...

chaos

= the unknown

= a lot of failure

What do we use?

Gearman

... an efficient black box

Gearman

Single queue (3 priorities)

No opinion on error management

TheSchwartz

Run on top of mysql

Abilities instead of queues (circular pick)

Manually add/load abilities on workers

Can auto-retry soft failures (work_safely)

We needed more than one queue

And split workers among those

We expected a lot of failure

And tools to look at those

... and to retry easily

... also on hard failures

Resque

to the rescue!

https://github.com/blog/542-introducing-resque

Resque is a Redis-backed library for creating background jobs, placing them on multiple queues, and processing them later.

Redis backed

  • Fast
  • Atomic, O(1) list push and pop
  • Ability to paginate over lists without mutating them
  • Queryable keyspace, high visibility
  • Support for integer counters
  • Easy to install - no dependencies
  • Replicated and Highly Available
  • Persistent
  • ...

Resque workers can be distributed between multiple machines, support priorities, are resilient to memory bloat / "leaks," are optimized for REE (but work on MRI and JRuby), tell you what they're doing, and expect failure.

Resque queues are persistent; support constant time, atomic push and pop (thanks to Redis); provide visibility into their contents; and store jobs as simple JSON packages.

The Resque frontend tells you what workers are doing, what workers are not doing, what queues you're using, what's in those queues, provides general usage stats, and helps you track failures.

Resque workers can be distributed between multiple machines, support priorities, are resilient to memory bloat / "leaks", are optimized for REE (but work on MRI and JRuby), tell you what they're doing, and expect failure.

Resque queues are persistent; support constant time, atomic push and pop (thanks to Redis); provide visibility into their contents; and store jobs as simple JSON packages.

The Resque frontend tells you what workers are doing, what workers are not doing, what queues you're using, what's in those queues, provides general usage stats, and helps you track failures.

resque-web

ruby

but...

perl ♡ ruby

hack hack hack...

hack hack hack hack hack hack

hack hack hack hack hack hack hack hack hack hack hack hack

hack hack hack hack hack hack hack hack hack hack hack hack hack hack hack hack hack hack hack hack

resque-perl

0.01      2011-12-30 17:50:59 Europe/Madrid

    - Kind of working, need lots of tests and features
    

Perl port of the original Ruby library. It's intended to work using the same backend to share tasks and be able to manage the system using ruby's resque-server webapp.

...and it had docs besides a blog post!

Ok, let's use it



use Resque;

my $r = Resque->new( redis => '127.0.0.1:6379' );
    

Queues are lists

Jobs are JSON objects pushed to a list



use Resque;

my $r = Resque->new( redis => '127.0.0.1:6379' );

$r->push( some_queue => {
    class => 'My::Task',
    args  => [ 'Hello world!' ]
});
    

Job classes must know how to perform()



package My::Task;
use strict; use v5.10;

sub perform {
    my $job = shift;
    say $job->args->[0];
}

1;
    

Workers does everything else!

Workers pull jobs and load job classes

Worker forks and manage running job object

Unless dont_fork()

Workers babysits other workers

prune_dead_workers()


    use Resque;

    my $w = Resque->new( redis => '127.0.0.1:6379' )->worker;
    $w->add_queue(qw/ some_queue some_less_important_queue /);
    $w->work;

$ resque-worker -q some_queue -q some_less_important_queue

$ resque-worker --help
resque-worker [-fiMqrv] [long options...]
    -r STR... --redis STR...     Redis server (default: 127.0.0.1:6379)
    -q STR... --queue STR...     Queue name/s (required)

    -i NUM --interval NUM        Polling interval in floating seconds for
                                 this resque worker (Default 5 seconds)

    -M STR... --module STR...    Modules to load before forking occurs
    -f --unforked                Don't let resque fork

    -v --verbose                 set resque worker to be verbose
    --help                       print usage message and exit

How we use it

How we write background job classes


$resque->push( low_priority => {
    class => 'Task::Sendmail'
    args  => [{
      to      => $user->email,
      subject => 'This is just a test email from my app.',
      body    => 'Welcome to my test',
    }]
});
    

package Task::Sendmail;

sub perform {
    my $job = shift;
    Task::Sendmail::Run->new( job => $job, %{$job->args->[0]} )->run;
}

package Task::Sendmail::Run;
use Moose;
with 'Task::Role::ResqueJob';
with 'Task::Role::MailClient';

has to      => ( is => 'ro', isa => 'Str', required => 1 );
has subject => ( is => 'ro', isa => 'Str', required => 1 );
has body    => ( is => 'ro', isa => 'Str', default => sub{''} );
has from    => ( is => 'ro', isa => 'Str', default => sub{'Our App <hello@ourapp.com>'} );

sub run {
    my $self = shift;

    $self->sendmail( map { $_ => $self->$_ } qw/ from to subject body / );
}

1;
    

package Task::Role::ResqueJob;
use Moose::Role;

has job => (
    is       => 'ro',
    isa      => 'Resque::Job',
    required => 1,
    handles  => [qw/ resque redis /]
);

sub run {}

1;
    

Remove the boilerplate


package Task::Sendmail;
use Moose;
with 'Task::Role::ResqueJob';
with 'Task::Role::MailClient';

has to      => ( is => 'ro', isa => 'Str', required => 1 );
has subject => ( is => 'ro', isa => 'Str', required => 1 );
has body    => ( is => 'ro', isa => 'Str', default => sub{''} );
has from    => ( is => 'ro', isa => 'Str', default => sub{'Our App <hello@ourapp.com>'} );

sub perform {
    my $job = shift;
    __PACKAGE__->new( job => $job, %{$job->args->[0]} )->run;
}

sub run {
    my $self = shift;

    $self->sendmail( map { $_ => $self->$_ } qw/ from to subject body / );
}

1;
    

Introduce Resque::WorkerClass


package Task::Sendmail;
use Resque::WorkerClass;
with 'Task::Role::MailClient';

has to      => ( is => 'ro', isa => 'Str', required => 1 );
has subject => ( is => 'ro', isa => 'Str', required => 1 );
has body    => ( is => 'ro', isa => 'Str', default => sub{''} );
has from    => ( is => 'ro', isa => 'Str', default => sub{'Our App <hello@ourapp.com>'} );

sub run {
    my $self = shift;
    $self->sendmail( map { $_ => $self->$_ } qw/ from to subject body / );
}

1;
    

¯\_(ツ)_/¯

Hooks?, Plugins?


package Task::Role::Retry;
use Moose::Role;
use Try::Tiny;

has max_retries => ( is => 'ro', default => sub{1} );
has retries     => ( is => 'ro', default => sub{0} );
has last_error  => ( is => 'ro' );

around 'run' => sub {
    my ( $orig, $self ) = @_;

    try { $self->$orig() }
    catch {
	if ( $self->retries < $self->max_retries ) {
            $self->resque->push( $self->job->queue => {
                class => $self->job->queue,
                args  => [{ %{$self->job->args->[0]},
		    retries    => $self->retries+1,
                    last_error => "$_"
                }]
            });
        }
        else { die $_ }
    };
};

1;
    

Error handling

Retry / Delete


#!/usr/bin/env perl
use v5.10; use strict;
use Resque;
use Getopt::Long::Descriptive;

my $opt    = getopt();
my $resque = Resque->new( redis => $opt->redis );
my $total  = $resque->failures->count;

say "Reviewing $total failures" if $opt->verbose;
for my $idx (reverse 0 .. $total-1 ) {
    my @fail = $resque->failures->all($idx,1);
    my $item = $fail[0];

    if ( my $re = $opt->queue ){ $item->{queue} =~ /$re/ || next }
    if ( my $re = $opt->class ){ $item->{payload}{class} =~ /$re/ || next }
    if ( my $re = $opt->error ){ $item->{error} =~ /$re/ || next }
    if ( my $re = $opt->args ) { $resque->encoder->encode($item->{payload}{args}) =~ /$re/ || next }

    say $item->{payload}{class} if $opt->verbose;
    if ( $opt->retry ) {
        $resque->failures->requeue($idx);
        say "-> Retried!" if $opt->verbose;
    }
    if ( $opt->delete ) {
        $resque->failures->remove($idx);
        say "-> Deleted!" if $opt->verbose;
    }
}

    

sub getopt {
    my ($opt, $usage) = describe_options(
        'peek_failed %o',
        [ 'redis|r=s', "Redis instance", { default => '127.0.0.1:6379' } ],
        [],
        [ 'queue|q=s', "Origin queue name for filtering failed tasks (all by default)" ],
        [ 'class|c=s', "Task class name for filtering failed tasks (all by default)" ],
        [ 'error|e=s', "Error string for filtering failed tasks (all by default)" ],
        [ 'args|a=s',  "Argument string for filtering failed tasks (all by default)" ],
        [],
        [ 'retry',     "Retry matching tasks" ],
        [ 'delete',    "Delete matching tasks" ],
        [ 'verbose|v', "Show what's happening" ],
        [],
        [ 'help|h',    "print usage message and exit" ],
    );

    print($usage->text), exit if $opt->help;

    return $opt;
}
    

Resque::Failures::mass_remove()

Stalled workers


#!/usr/bin/env perl
use Supers::Command;
use v5.10; use strict;
use Time::Concise;

my $c = Supers::Command->new( no_schema=>1, options => [
    [ 'host|e=s',  "Error string for filtering failed tasks (required)" ],
    [ 'pid|a=s',   "Proccess ID of the worker (all by default)" ],
    [ 'queue|q=s', "Origin queue name for filtering failed tasks (all by default)" ],
    [ 'class|c=s', "Task class name for filtering failed tasks (all by default)" ],
    [ 'max-age=s', "Workers working for more than the provided duration, examples: 1d, 2h, 10m" ],
    [],
    [ 'list',   "Display matching workers" ],
    [ 'delete', "Delete matching workers" ],
]);
    

my $limit = $c->opt->max_age 
          ? DateTime->now->subtract( seconds => from_concise($c->opt->max_age) )
          : 0;

for my $w ( $c->resque->worker->all ) {
    my ($host, $pid, $queues) = split( ':', $w->id );
    next if $c->opt->host && $host ne $c->opt->host;
    next if $c->opt->pid  && $pid ne $c->opt->pid;

    if ( $w->is_working ) {
        my $started = $w->processing_started;
        next if $limit && $started > $limit;

        my $job_data  = $w->processing;
        my $job_queue = $job_data->{queue} || next; # not doing nothing
        my $job_class = $job_data->{payload}{class};

        next if $c->opt->queue && $job_queue ne $c->opt->queue;
        next if $c->opt->class && $job_class ne $c->opt->class;

        say "[$host:$pid] $started - $job_class on queue $job_queue" if $c->opt->list;
    }
    elsif ( !$limit && !$c->opt->queue && !$c->opt->class ) {
        say "[$host:$pid] waiting for a job on $queues..." if $c->opt->list;
    }
    else { next }

    $w->unregister_worker if $c->opt->delete;
}
    

...

Thank you!

Any question?