Growing our workforce






Diego Kuperman | @freekey

https://diegok.github.io/growing-our-workforce/

Hola!

Event based system

Resque

1 task

= 1 job

= 1 class

Resque::WorkerClass



 package Supers::Task::Sendmail;
 use Resque::WorkerClass;

 has from    => is => 'ro', isa => 'Str', required => 1;
 has to      => is => 'ro', isa => 'Str', required => 1;
 has subject => is => 'ro', isa => 'Str', required => 1;
 has body    => is => 'ro', isa => 'Str', required => 1;

 # The boring part of building and sending it
 sub run($self) { ... }

 1;

    

Workers listen to queues

with priorities

Job is pushed to a queue

A worker instatiate job class and run() it

Probably forking

Simple example

Send e-mail



 $resque->push( some_queue => {
   class => 'Supers::Task::Sendmail',
   args  => [{
     from    => 'diego@soysuper.com',
     to      => 'friends@barcelona.pm',
     subject => 'A simple job example',
     body    => 'Something boring',
   }]
 });

    

Around 50 Million

single tasks

Every single

DAY

315 workers

140 classes

258 queues

DRY

Composing is easy

Most common workflows involves several jobs/classes

Never mix read with write

(fork vs no-fork)

Workflow example

Crawl e-commerce (foo.com)



 $resque->push( foo => {
   class => 'Supers::Task::Crawl',
   args  => [{
     supermarket => 'foo',
     store_queue => 'store',
     run_id      => 's0m31d',
   }]
 });

    

5m to 500m events on this kind of workflows

Different priorities

vs

Different queues

Each crawler need to paralelize over different IPs

At least X workers per site/queue

No more than X parallel requests per site

IPs get banned

Static worker distrubution

ansible + ubic

Reconfigure or add queues

=

Update ansible & deploy

It works great!

It worked great!

...until it didn't!

Crawling workflows are evolving

since 2y ago

Crawling keeps growing

Standarization

of new workflows

happened along...

Which allowed Us

to grow faster and safer

We keep adding workforce boxes

... and queues

... and editing f* ansible files

... and deploying

Do you feel the PAIN?

...

Remove ansible

(for reconfiguring queues)

Resque::Worker::autoconfig()



 sub work($self) {
    $self->startup;

    while ( ! $self->shutdown ) {
        if ( !$self->paused && ( my $job = $self->reserve ) ) {
            $self->log("Got job $job");
            $self->work_tick($job);
        }
        elsif( $self->interval ) {
            my $status = $self->paused ? "Paused" : 'Waiting for ' . join( ', ', @{$self->queues} );
            $self->procline( $status );
            $self->log( $status );
            sleep( $self->interval );
        }
    }

    $self->unregister_worker;
 }

  


 sub work($self) {
    $self->startup;

    while ( ! $self->shutdown ) {
        $self->autoconfig->($self) if $self->has_autoconfig;
        if ( !$self->paused && ( my $job = $self->reserve ) ) {
            $self->log("Got job $job");
            $self->work_tick($job);
        }
        elsif( $self->interval ) {
            my $status = $self->paused ? "Paused" : 'Waiting for ' . join( ', ', @{$self->queues} );
            $self->procline( $status );
            $self->log( $status );
            sleep( $self->interval );
        }
    }

    $self->unregister_worker;
 }

  


 if ( $opt->autoconfig ) {
     require Mojo::UserAgent;
     require Sys::Hostname;

     my $ua = Mojo::UserAgent->new;
     my $ip = $ua->get('http://ifconfig.co/json')->res->json || {};

     my $last_update = 0;
     $w->autoconfig(sub{
         my $w    = shift;
         my $time = time;
         # Autoconfig every $opt->autoconfig_interval
         return if $time - $last_update < $opt->autoconfig_interval;
         $last_update = time;

         my $tx = $ua->post( $opt->autoconfig_url, json => {
             worker_name => $opt->worker_name,
             hostname => Sys::Hostname::hostname(),
             map { $_ => $ip->{$_} } qw/ ip city country country_iso /
         });

         unless ( $tx->error ) {
             my $conf = $tx->res->json;
             for my $method ( keys %$conf ) {
                 next unless $w->can($method);
                 $w->$method( $conf->{$method} );
             }
         }
     });
 }

    

 > ss-worker --help

   ss-worker [-adfiMnquv] [long options...] <some-arg>
        -q STR... --queue STR...          queue name
        -i NUM --interval NUM             Polling interval in floating
                                          seconds for this resque worker
                                          (Default 5 seconds)
        -f --cant-fork                    Don't let resque fork
        -M STR... --module STR...         Modules to load before forking
                                          occurs

        -a --autoconfig                   Activate worker autoconfig
        -n STR --worker-name STR          Worker name to be reported to the
                                          config server (default to pid)
        -u STR --autoconfig-url STR       Autoconfig URL to request
                                          configuration (defaults to
                                          http://config.ss/worker)
        -d INT --autoconfig-interval INT  Autoconfig interval in secconds to
                                          request configuration (defaults to
                                          60)

      

Worker brain

New configuration service

Rule based system

Work units

name + queues + props

Min/Max workers

Max workers per IP

IP banning

Prevent IP sharing

Among some units

Geo preference

Worker sharing

Auto balancing

On worker registration

SSOT

Next?

Improve auto-balance

Simplify state sharing

Clustering?, replication?

Time based IP bans

Move all kind of workers

Update based on work-load

...

resque-workforce

My new pet-project

Not yet in production

Inspired by Minion

Will replace Ubic

Worker supervisor

Resque focused

(Soysuper focused)

Supervisor + API

Client

(React)

...

Thanks!

Questions?



 package Supers::Task::Test::Passing;
 use Resque::WorkerClass;

 has queues  => is => 'ro', required => 1;
 has pending => is => 'ro', default => sub{100};
 has delay   => is => 'ro', default => sub{10};

 sub run($self) {
     sleep $self->delay;

     my $next_queue = shift $self->queues->@*;

     $self->resque->push( $next_queue => {
         class => __PACKAGE__,
         args  => [{
             queues  => [ $self->queues->@*, $next_queue ],
             pending => $self->pending - 1,
             delay   => $self->delay
         }]
     }) if $self->pending - 1;
 }

 1;