Growing our workforce

Diego Kuperman | @freekey


Event based system


1 task

= 1 job

= 1 class


 package Supers::Task::Sendmail;
 use Resque::WorkerClass;

 has from    => is => 'ro', isa => 'Str', required => 1;
 has to      => is => 'ro', isa => 'Str', required => 1;
 has subject => is => 'ro', isa => 'Str', required => 1;
 has body    => is => 'ro', isa => 'Str', required => 1;

 # The boring part of building and sending it
 sub run($self) { ... }



Workers listen to queues

with priorities

Job is pushed to a queue

A worker instatiate job class and run() it

Probably forking

Simple example

Send e-mail

 $resque->push( some_queue => {
   class => 'Supers::Task::Sendmail',
   args  => [{
     from    => '',
     to      => '',
     subject => 'A simple job example',
     body    => 'Something boring',


Around 50 Million

single tasks

Every single


315 workers

140 classes

258 queues


Composing is easy

Most common workflows involves several jobs/classes

Never mix read with write

(fork vs no-fork)

Workflow example

Crawl e-commerce (

 $resque->push( foo => {
   class => 'Supers::Task::Crawl',
   args  => [{
     supermarket => 'foo',
     store_queue => 'store',
     run_id      => 's0m31d',


5m to 500m events on this kind of workflows

Different priorities


Different queues

Each crawler need to paralelize over different IPs

At least X workers per site/queue

No more than X parallel requests per site

IPs get banned

Static worker distrubution

ansible + ubic

Reconfigure or add queues


Update ansible & deploy

It works great!

It worked great!

...until it didn't!

Crawling workflows are evolving

since 2y ago

Crawling keeps growing


of new workflows

happened along...

Which allowed Us

to grow faster and safer

We keep adding workforce boxes

... and queues

... and editing f* ansible files

... and deploying

Do you feel the PAIN?


Remove ansible

(for reconfiguring queues)


 sub work($self) {

    while ( ! $self->shutdown ) {
        if ( !$self->paused && ( my $job = $self->reserve ) ) {
            $self->log("Got job $job");
        elsif( $self->interval ) {
            my $status = $self->paused ? "Paused" : 'Waiting for ' . join( ', ', @{$self->queues} );
            $self->procline( $status );
            $self->log( $status );
            sleep( $self->interval );



 sub work($self) {

    while ( ! $self->shutdown ) {
        $self->autoconfig->($self) if $self->has_autoconfig;
        if ( !$self->paused && ( my $job = $self->reserve ) ) {
            $self->log("Got job $job");
        elsif( $self->interval ) {
            my $status = $self->paused ? "Paused" : 'Waiting for ' . join( ', ', @{$self->queues} );
            $self->procline( $status );
            $self->log( $status );
            sleep( $self->interval );



 if ( $opt->autoconfig ) {
     require Mojo::UserAgent;
     require Sys::Hostname;

     my $ua = Mojo::UserAgent->new;
     my $ip = $ua->get('')->res->json || {};

     my $last_update = 0;
         my $w    = shift;
         my $time = time;
         # Autoconfig every $opt->autoconfig_interval
         return if $time - $last_update < $opt->autoconfig_interval;
         $last_update = time;

         my $tx = $ua->post( $opt->autoconfig_url, json => {
             worker_name => $opt->worker_name,
             hostname => Sys::Hostname::hostname(),
             map { $_ => $ip->{$_} } qw/ ip city country country_iso /

         unless ( $tx->error ) {
             my $conf = $tx->res->json;
             for my $method ( keys %$conf ) {
                 next unless $w->can($method);
                 $w->$method( $conf->{$method} );


 > ss-worker --help

   ss-worker [-adfiMnquv] [long options...] <some-arg>
        -q STR... --queue STR...          queue name
        -i NUM --interval NUM             Polling interval in floating
                                          seconds for this resque worker
                                          (Default 5 seconds)
        -f --cant-fork                    Don't let resque fork
        -M STR... --module STR...         Modules to load before forking

        -a --autoconfig                   Activate worker autoconfig
        -n STR --worker-name STR          Worker name to be reported to the
                                          config server (default to pid)
        -u STR --autoconfig-url STR       Autoconfig URL to request
                                          configuration (defaults to
        -d INT --autoconfig-interval INT  Autoconfig interval in secconds to
                                          request configuration (defaults to


Worker brain

New configuration service

Rule based system

Work units

name + queues + props

Min/Max workers

Max workers per IP

IP banning

Prevent IP sharing

Among some units

Geo preference

Worker sharing

Auto balancing

On worker registration



Improve auto-balance

Simplify state sharing

Clustering?, replication?

Time based IP bans

Move all kind of workers

Update based on work-load



My new pet-project

Not yet in production

Inspired by Minion

Will replace Ubic

Worker supervisor

Resque focused

(Soysuper focused)

Supervisor + API






 package Supers::Task::Test::Passing;
 use Resque::WorkerClass;

 has queues  => is => 'ro', required => 1;
 has pending => is => 'ro', default => sub{100};
 has delay   => is => 'ro', default => sub{10};

 sub run($self) {
     sleep $self->delay;

     my $next_queue = shift $self->queues->@*;

     $self->resque->push( $next_queue => {
         class => __PACKAGE__,
         args  => [{
             queues  => [ $self->queues->@*, $next_queue ],
             pending => $self->pending - 1,
             delay   => $self->delay
     }) if $self->pending - 1;
