Resque is a Redis-backed library for creating background jobs, placing them on multiple queues, and processing them later.
Resque workers can be distributed between multiple machines, support priorities, are resilient to memory bloat / "leaks," are optimized for REE (but work on MRI and JRuby), tell you what they're doing, and expect failure.
Resque queues are persistent; support constant time, atomic push and pop (thanks to Redis); provide visibility into their contents; and store jobs as simple JSON packages.
The Resque frontend tells you what workers are doing, what workers are not doing, what queues you're using, what's in those queues, provides general usage stats, and helps you track failures.
Resque workers can be distributed between multiple machines, support priorities, are resilient to memory bloat / "leaks", are optimized for REE (but work on MRI and JRuby), tell you what they're doing, and expect failure.
Resque queues are persistent; support constant time, atomic push and pop (thanks to Redis); provide visibility into their contents; and store jobs as simple JSON packages.
The Resque frontend tells you what workers are doing, what workers are not doing, what queues you're using, what's in those queues, provides general usage stats, and helps you track failures.
0.01 2011-12-30 17:50:59 Europe/Madrid
- Kind of working, need lots of tests and features
Perl port of the original Ruby library. It's intended to work using the same backend to share tasks and be able to manage the system using ruby's resque-server webapp.
use Resque;
my $r = Resque->new( redis => '127.0.0.1:6379' );
use Resque;
my $r = Resque->new( redis => '127.0.0.1:6379' );
$r->push( some_queue => {
class => 'My::Task',
args => [ 'Hello world!' ]
});
package My::Task;
use strict; use v5.10;
sub perform {
my $job = shift;
say $job->args->[0];
}
1;
use Resque;
my $w = Resque->new( redis => '127.0.0.1:6379' )->worker;
$w->add_queue(qw/ some_queue some_less_important_queue /);
$w->work;
$ resque-worker -q some_queue -q some_less_important_queue
$ resque-worker --help
resque-worker [-fiMqrv] [long options...]
-r STR... --redis STR... Redis server (default: 127.0.0.1:6379)
-q STR... --queue STR... Queue name/s (required)
-i NUM --interval NUM Polling interval in floating seconds for
this resque worker (Default 5 seconds)
-M STR... --module STR... Modules to load before forking occurs
-f --unforked Don't let resque fork
-v --verbose set resque worker to be verbose
--help print usage message and exit
$resque->push( low_priority => {
class => 'Task::Sendmail'
args => [{
to => $user->email,
subject => 'This is just a test email from my app.',
body => 'Welcome to my test',
}]
});
package Task::Sendmail;
sub perform {
my $job = shift;
Task::Sendmail::Run->new( job => $job, %{$job->args->[0]} )->run;
}
package Task::Sendmail::Run;
use Moose;
with 'Task::Role::ResqueJob';
with 'Task::Role::MailClient';
has to => ( is => 'ro', isa => 'Str', required => 1 );
has subject => ( is => 'ro', isa => 'Str', required => 1 );
has body => ( is => 'ro', isa => 'Str', default => sub{''} );
has from => ( is => 'ro', isa => 'Str', default => sub{'Our App <hello@ourapp.com>'} );
sub run {
my $self = shift;
$self->sendmail( map { $_ => $self->$_ } qw/ from to subject body / );
}
1;
package Task::Role::ResqueJob;
use Moose::Role;
has job => (
is => 'ro',
isa => 'Resque::Job',
required => 1,
handles => [qw/ resque redis /]
);
sub run {}
1;
package Task::Sendmail;
use Moose;
with 'Task::Role::ResqueJob';
with 'Task::Role::MailClient';
has to => ( is => 'ro', isa => 'Str', required => 1 );
has subject => ( is => 'ro', isa => 'Str', required => 1 );
has body => ( is => 'ro', isa => 'Str', default => sub{''} );
has from => ( is => 'ro', isa => 'Str', default => sub{'Our App <hello@ourapp.com>'} );
sub perform {
my $job = shift;
__PACKAGE__->new( job => $job, %{$job->args->[0]} )->run;
}
sub run {
my $self = shift;
$self->sendmail( map { $_ => $self->$_ } qw/ from to subject body / );
}
1;
package Task::Sendmail;
use Resque::WorkerClass;
with 'Task::Role::MailClient';
has to => ( is => 'ro', isa => 'Str', required => 1 );
has subject => ( is => 'ro', isa => 'Str', required => 1 );
has body => ( is => 'ro', isa => 'Str', default => sub{''} );
has from => ( is => 'ro', isa => 'Str', default => sub{'Our App <hello@ourapp.com>'} );
sub run {
my $self = shift;
$self->sendmail( map { $_ => $self->$_ } qw/ from to subject body / );
}
1;
package Task::Role::Retry;
use Moose::Role;
use Try::Tiny;
has max_retries => ( is => 'ro', default => sub{1} );
has retries => ( is => 'ro', default => sub{0} );
has last_error => ( is => 'ro' );
around 'run' => sub {
my ( $orig, $self ) = @_;
try { $self->$orig() }
catch {
if ( $self->retries < $self->max_retries ) {
$self->resque->push( $self->job->queue => {
class => $self->job->queue,
args => [{ %{$self->job->args->[0]},
retries => $self->retries+1,
last_error => "$_"
}]
});
}
else { die $_ }
};
};
1;
#!/usr/bin/env perl
use v5.10; use strict;
use Resque;
use Getopt::Long::Descriptive;
my $opt = getopt();
my $resque = Resque->new( redis => $opt->redis );
my $total = $resque->failures->count;
say "Reviewing $total failures" if $opt->verbose;
for my $idx (reverse 0 .. $total-1 ) {
my @fail = $resque->failures->all($idx,1);
my $item = $fail[0];
if ( my $re = $opt->queue ){ $item->{queue} =~ /$re/ || next }
if ( my $re = $opt->class ){ $item->{payload}{class} =~ /$re/ || next }
if ( my $re = $opt->error ){ $item->{error} =~ /$re/ || next }
if ( my $re = $opt->args ) { $resque->encoder->encode($item->{payload}{args}) =~ /$re/ || next }
say $item->{payload}{class} if $opt->verbose;
if ( $opt->retry ) {
$resque->failures->requeue($idx);
say "-> Retried!" if $opt->verbose;
}
if ( $opt->delete ) {
$resque->failures->remove($idx);
say "-> Deleted!" if $opt->verbose;
}
}
sub getopt {
my ($opt, $usage) = describe_options(
'peek_failed %o',
[ 'redis|r=s', "Redis instance", { default => '127.0.0.1:6379' } ],
[],
[ 'queue|q=s', "Origin queue name for filtering failed tasks (all by default)" ],
[ 'class|c=s', "Task class name for filtering failed tasks (all by default)" ],
[ 'error|e=s', "Error string for filtering failed tasks (all by default)" ],
[ 'args|a=s', "Argument string for filtering failed tasks (all by default)" ],
[],
[ 'retry', "Retry matching tasks" ],
[ 'delete', "Delete matching tasks" ],
[ 'verbose|v', "Show what's happening" ],
[],
[ 'help|h', "print usage message and exit" ],
);
print($usage->text), exit if $opt->help;
return $opt;
}
#!/usr/bin/env perl
use Supers::Command;
use v5.10; use strict;
use Time::Concise;
my $c = Supers::Command->new( no_schema=>1, options => [
[ 'host|e=s', "Error string for filtering failed tasks (required)" ],
[ 'pid|a=s', "Proccess ID of the worker (all by default)" ],
[ 'queue|q=s', "Origin queue name for filtering failed tasks (all by default)" ],
[ 'class|c=s', "Task class name for filtering failed tasks (all by default)" ],
[ 'max-age=s', "Workers working for more than the provided duration, examples: 1d, 2h, 10m" ],
[],
[ 'list', "Display matching workers" ],
[ 'delete', "Delete matching workers" ],
]);
my $limit = $c->opt->max_age
? DateTime->now->subtract( seconds => from_concise($c->opt->max_age) )
: 0;
for my $w ( $c->resque->worker->all ) {
my ($host, $pid, $queues) = split( ':', $w->id );
next if $c->opt->host && $host ne $c->opt->host;
next if $c->opt->pid && $pid ne $c->opt->pid;
if ( $w->is_working ) {
my $started = $w->processing_started;
next if $limit && $started > $limit;
my $job_data = $w->processing;
my $job_queue = $job_data->{queue} || next; # not doing nothing
my $job_class = $job_data->{payload}{class};
next if $c->opt->queue && $job_queue ne $c->opt->queue;
next if $c->opt->class && $job_class ne $c->opt->class;
say "[$host:$pid] $started - $job_class on queue $job_queue" if $c->opt->list;
}
elsif ( !$limit && !$c->opt->queue && !$c->opt->class ) {
say "[$host:$pid] waiting for a job on $queues..." if $c->opt->list;
}
else { next }
$w->unregister_worker if $c->opt->delete;
}