package Supers::Task::Sendmail;
use Resque::WorkerClass;
has from => is => 'ro', isa => 'Str', required => 1;
has to => is => 'ro', isa => 'Str', required => 1;
has subject => is => 'ro', isa => 'Str', required => 1;
has body => is => 'ro', isa => 'Str', required => 1;
# The boring part of building and sending it
sub run($self) { ... }
1;
Workers listen to queues
with priorities
A worker instatiate job class and run() it
Probably forking
Simple example
Send e-mail
$resque->push( some_queue => {
class => 'Supers::Task::Sendmail',
args => [{
from => 'diego@soysuper.com',
to => 'friends@barcelona.pm',
subject => 'A simple job example',
body => 'Something boring',
}]
});
Around 50 Million
single tasks
Most common workflows involves several jobs/classes
Never mix read with write
(fork vs no-fork)
Workflow example
Crawl e-commerce (foo.com)
$resque->push( foo => {
class => 'Supers::Task::Crawl',
args => [{
supermarket => 'foo',
store_queue => 'store',
run_id => 's0m31d',
}]
});
5m to 500m events on this kind of workflows
Different priorities
vs
Different queues
Each crawler need to paralelize over different IPs
At least X workers per site/queue
No more than X parallel requests per site
Static worker distrubution
ansible + ubic
Reconfigure or add queues
=
Update ansible & deploy
Crawling workflows are evolving
since 2y ago
Standarization
of new workflows
happened along...
Which allowed Us
to grow faster and safer
We keep adding workforce boxes
... and editing f* ansible files
Remove ansible
(for reconfiguring queues)
Resque::Worker::autoconfig()
sub work($self) {
$self->startup;
while ( ! $self->shutdown ) {
if ( !$self->paused && ( my $job = $self->reserve ) ) {
$self->log("Got job $job");
$self->work_tick($job);
}
elsif( $self->interval ) {
my $status = $self->paused ? "Paused" : 'Waiting for ' . join( ', ', @{$self->queues} );
$self->procline( $status );
$self->log( $status );
sleep( $self->interval );
}
}
$self->unregister_worker;
}
sub work($self) {
$self->startup;
while ( ! $self->shutdown ) {
$self->autoconfig->($self) if $self->has_autoconfig;
if ( !$self->paused && ( my $job = $self->reserve ) ) {
$self->log("Got job $job");
$self->work_tick($job);
}
elsif( $self->interval ) {
my $status = $self->paused ? "Paused" : 'Waiting for ' . join( ', ', @{$self->queues} );
$self->procline( $status );
$self->log( $status );
sleep( $self->interval );
}
}
$self->unregister_worker;
}
if ( $opt->autoconfig ) {
require Mojo::UserAgent;
require Sys::Hostname;
my $ua = Mojo::UserAgent->new;
my $ip = $ua->get('http://ifconfig.co/json')->res->json || {};
my $last_update = 0;
$w->autoconfig(sub{
my $w = shift;
my $time = time;
# Autoconfig every $opt->autoconfig_interval
return if $time - $last_update < $opt->autoconfig_interval;
$last_update = time;
my $tx = $ua->post( $opt->autoconfig_url, json => {
worker_name => $opt->worker_name,
hostname => Sys::Hostname::hostname(),
map { $_ => $ip->{$_} } qw/ ip city country country_iso /
});
unless ( $tx->error ) {
my $conf = $tx->res->json;
for my $method ( keys %$conf ) {
next unless $w->can($method);
$w->$method( $conf->{$method} );
}
}
});
}
> ss-worker --help
ss-worker [-adfiMnquv] [long options...] <some-arg>
-q STR... --queue STR... queue name
-i NUM --interval NUM Polling interval in floating
seconds for this resque worker
(Default 5 seconds)
-f --cant-fork Don't let resque fork
-M STR... --module STR... Modules to load before forking
occurs
-a --autoconfig Activate worker autoconfig
-n STR --worker-name STR Worker name to be reported to the
config server (default to pid)
-u STR --autoconfig-url STR Autoconfig URL to request
configuration (defaults to
http://config.ss/worker)
-d INT --autoconfig-interval INT Autoconfig interval in secconds to
request configuration (defaults to
60)
Worker brain
New configuration service
Work units
name + queues + props
Prevent IP sharing
Among some units
Auto balancing
On worker registration
Simplify state sharing
Clustering?, replication?
Update based on work-load
My new pet-project
Not yet in production
Will replace Ubic
Worker supervisor
Resque focused
(Soysuper focused)
package Supers::Task::Test::Passing;
use Resque::WorkerClass;
has queues => is => 'ro', required => 1;
has pending => is => 'ro', default => sub{100};
has delay => is => 'ro', default => sub{10};
sub run($self) {
sleep $self->delay;
my $next_queue = shift $self->queues->@*;
$self->resque->push( $next_queue => {
class => __PACKAGE__,
args => [{
queues => [ $self->queues->@*, $next_queue ],
pending => $self->pending - 1,
delay => $self->delay
}]
}) if $self->pending - 1;
}
1;