Non Blocking Multiple Parallel Processing

| 0 Comments | 0 TrackBacks

Parallel processing at the user level became a critical part of my job when I encountered two situations. One is when I'm working with thousands of similar servers that I need to query by SSH'ing to, or by any other method. The second is when I had to load test LAMP servers which is done by taking an existing http access log file and re-requesting every query.


The first, querying servers, is much easier to deal with. There are so many examples out there which copy from each other the gist of their parallel processing model. Parallel_processing_Cheetah_Speed.jpgFor each server they fork a new child until a number of maximum children is reached. Then when one child finishes and exists, a new process is forked with the next server in the queue as its argument. Maximum performance is not a requirement here as long as the thousand server is queried within a reasonable amount of time. Say five to twenty minutes.

The problem is with the second task, that of application load testing. If for every line of an Apache access log you were to fork a separate child then your own load testing server would come to a halt before a decently programmed mod_perl/php application running under Apache would. For querying servers I can fork 500 times and it might take my script half an hour to complete and that's fine. But for load testing the purpose is to run hundreds of queries as fast as possible. Remember that forking is the most expensive call a kernel performs. There's no way you can load test for performance and fork continuously at the same time. I realized that I needed to modify the parallel processing model so that there's no forking involved while the requests are going on. I chose the image for this blog appropriately, that of a Cheetah. Just as a Cheetah is the fast animal on land, so is Pcmd.pm the fastest parallel processed implementation that I could come up with.

Not only I must never fork, but I need a way to collect data from each child for statistics purposes. For example I want to know how long the average HTTP GET request is taking and which URI took the longest. That's how I came up with Pcmd which stands for Parallel Command. It first spawns MaxChildren processes and then for every available input (server IP's or http access log entry), it assigns one to each child in the READy status. Once a child has been assigned it is then placed in the RUNNing status. While a child is in RUNNing status the parent attempts to read from it while not blocking on the child's file handle. Once a complete message is returned by the child it is put in DONE status. The output of a DONE child is processed and the child is put back once again in READy status where it is available to receive another input from the parent. This bidirectional message passing using BSD's socketpair and a precise handling of each child's status, and a robust protocol for passing the messages to and from each child makes this model of parallel processing work so that no additional forking is needed. The same initially spawned children can handle as many inputs as there are.

Download Pcmd.pm here.  Instead of pasting the entire source, i'll show bits of it and explain why it's there and what it does. I'll specifically explain:

1- The different hash structures used to keep track of a child's status and outputs.
2- The different hash structures used to keep track of the message passing.
3- How to fork and create a non-blocking bidirectional socketpair pipes between the parent and the child process. 
4- How the parent reads from multiple non blocking handles.
5- How message passing from and to the child is implemented such that it is robust and eliminates confusion.
6- two client examples using Pcmd.pm that put it all together.

1- The parent process loops around $children amount of time and calls the function make_child who will create the communication pipes using socketpair, fork off a process and give this new process the CHILD end of the socketpair stream. The parent has to keep track of each child's communication pipe, birth time, child's output (answer back to parent), and child's status which can be READY, RUNN or DONE.

Available within the class:
$self->{children_s}->{$pid}; # status of $pid.
$self->{children_f}->{$pid} # The CHILD end of the socketpair stream the parent uses to R/W to $pid

Defined within run():
%children_t; # {pid} = birth time in seconds.
%children_a; # {$pid} = array ref to a list of strings. They are answers read back from $pid
Exported:
$self->{timedout_answers} = []; # if a child is killed because it stayed RUNNing more than $self->{timeout} seconds, any amount of incomplete message read from that child will be stored here.

2- Message passing to and from the child can be tricky. When the parent is reading from the child's pipe, how does he know that the child is done sending information and that it should be placed in the DONE status and no longer read from? To achieve this I borrowed the idea of how HTTP POST protocol does this. In this article I explained HTTP POST. In the comments below EOM stands for EndOfMessage.

my %children_Mstate; # {pid}= Message states while looking for EOM '0\r\n\r\n'
my %children_Mstring;# {pid}= Message string to hold 0, then \r, then \n, looking for EOM.
my %children_Msize;  # {pid}= just like HTTP POST's proto, this tells our loop how many chars to read

There are two subroutines used to read messages. The parent always uses read_from_child() and the child always uses read_from_parent(). The only difference between the two is that the child will loop until a full message is read from the parent (the child blocks on the read). Whereas the parent will read from the same non-blocking child pipe until a block occurs. As soon as the parent can't read anymore it returns and moves on to the next child. But if a full message is read, the child is placed in the DONE status by read_from_child().

3 - The easiest way to have a bi-directional conversation with a child is to use socketpair, IMHO. Spawning a child and establishing a bi-directional non-blocking streams between the child process and the parent is done in subroutine make_child. I've commented within the code what's happening so no extra paragraphs should be needed:

my $CHILD  = gensym; # parent uses this handle to talk to the child

my $PARENT = gensym; # child uses this handle to talk to the parent

socketpair($CHILD, $PARENT, AF_UNIX, SOCK_STREAM, PF_UNSPEC) ||  die "socketpair: $!";

  

my  $flags = 0;

fcntl($CHILD, F_GETFL, $flags) or die "Couldn't get flags for HANDLE : $!\n";

$flags |= O_NONBLOCK;

fcntl($CHILD, F_SETFL, $flags) or die "Couldn't set flags for HANDLE: $!\n";


$flags = 0;

fcntl($PARENT, F_GETFL, $flags) or die "Couldn't get flags for HANDLE : $!\n";

$flags |= O_NONBLOCK;

fcntl($PARENT, F_SETFL, $flags) or die "Couldn't set flags for HANDLE: $!\n";


# Once a filehandle is set for non-blocking I/O, the sysread or syswrite calls that 

# would block will instead return undef and set $! to EAGAIN:

my $old_fh = select($CHILD);

$|=1;

select($old_fh);

$old_fh = select($PARENT);

$|=1;

select($old_fh);


$CHILD->autoflush(1);

$PARENT->autoflush(1);

    

if (my $pid = fork()){

# parent records the child's birth and returns

close($PARENT); # parent doesn't need PARENT handle.

        $self->{children_f}->{$pid} = $CHILD;

return $pid;

}else{

die "cannot fork: $!" unless defined $pid;

close $CHILD; # child doesn't need CHILD handle. Uses PARENT to talk to parent.

$SIG{'INT'} = 'DEFAULT';

$SIG{'TERM'} = 'DEFAULT';

$self->child($PARENT);

close($CHILD); # in case child() doens't close the handle.

exit(0); # in case child doesn't exit.

}



4 and 5 - For each child created, the parent holds a non blocking end of a socketpair pipe in %children_f where each keys is a PID with its value being the pipe to that PID. Foreach PID, if it is in the RUNN status, then an attempt to read from its output is made. This attempt cannot block on a child that is hung because that would starve all other children. In order to be able to read and parse a string according to a set of rules, I don't see other options but to read one character at a time from the file handle.

The parent is expecting a child who will send its output mimicking HTTP POST's protocol. I first expect a number which terminates by a newline character. This number designates how many characters the child is sending. He's telling me regardless of the character you have to read this many characters. This sequence of number, newline, message, number, newline, message ... continues until the number is a zero followed by these tour characters: \r\n\r\n. To achieve the parsing of each message, the read goes through six different states, 0,1,5,6,7 and 8. It starts at state 0 and once state 8 is reached then a full child message has been read. The child is placed in the DONE status and the message is passed to the defined parent callback subroutine. These states are obviously stored in each PID's $children_Mstate{$pid} structure. The message parsing loop is this:

while($self->{children_s}->{$pid} ne 'DONE'){

    my $rv = sysread($self->{children_f}->{$pid}, $buf, $BUFSIZ);

if (!defined($rv) && $! == EAGAIN) {

last; # would block

}else{

# if state 0, read msg size until \n then set state to 1

if($children_Mstate->{$pid} == 0 && $buf eq "\n"){

$children_Mstate->{$pid} = 1;

}elsif($children_Mstate->{$pid} == 0 && $buf == 0 && \

                                               ($children_Msize->{$pid} == 0)){

  # if Msize ==0, it means that the 0 in buf is in first pos.

                  # Msg length integer does NOT have leading zeros. This is

                  # how i distinguish the zero in 40 with the 0 in '0\r\n\r\n'

        $children_Mstate->{$pid} = 5;

}elsif($children_Mstate->{$pid} == 0){

$children_Msize->{$pid} .= $buf;

}


# if state 1, read msg body until Msize is 0 then go back to state 0

elsif($children_Mstate->{$pid} == 1 && $children_Msize->{$pid} > 0){

$children_Mstring->{$pid} .= $buf;

$children_Msize->{$pid} -= 1;


#if this was the last char to read, go back to state 0.

if($children_Msize->{$pid} == 0){

    push @{$children_a->{$pid}}, $children_Mstring->{$pid};

    $children_Mstring->{$pid} = '';

    $children_Mstate->{$pid} = 0;

}

}elsif($children_Mstate->{$pid} == 5 && $buf eq "\r"){

$children_Mstate->{$pid} = 6;

}elsif($children_Mstate->{$pid} == 6 && $buf eq "\n"){

$children_Mstate->{$pid} = 7;

}elsif($children_Mstate->{$pid} == 7 && $buf eq "\r"){

$children_Mstate->{$pid} = 8;

}elsif($children_Mstate->{$pid} == 8 && $buf eq "\n"){

   # End of msg successfully reached. Child is DONE so parent 

                   # shouldn't attempt to continue reading.

                        $self->{children_s}->{$pid} = 'DONE';

                        $children_Mstate->{$pid} = 0;

}else{

my $state = $children_Mstate->{$pid};

my $str = $children_Mstring->{$pid};

print "ERROR read_from_child $pid impossible state ";

print "$state with but \"$buf\" Mstring: \"$str\"\n";

print "Please report this as a bug\n";

last;

}

}

}



The parent reading from a child and the child reading from the parent follow the same message passing mechanism. Both subroutines read_from_child and read_from_parent are identical except that when the parent sees EAGAIN is stops, with the state of that PID as is so that the read resumes at a subsequent time, and control is returned back to the main loop inside run(). But the child will keep on reading until a full message is passed because it doesn't have anything else to do :-)

6 - let's just give a very simple example of using Pcmd.pm. The synopsis actually suffices. We'll define the parent and child subroutines that Pcmd object uses as callbacks. We'll define a number of IP addresses in a text file called ip.txt. Pcmd reads ip.txt and assigns one IP to each child. You define what the child will do with this IP. Then when the child returns, its answer is passed to the parent callback for some post processing. The parent callback should keep things to a very minimum because the parent callback actually blocks the main running loop inside Pcmd. It should mostly store the answers and process them after all of the children are done.

Synopsis:

use strict; use warnings;

use Pcmd;


# Spawn 24 children and each child won't run a command longer than 90 seconds.

my $cmd = Pcmd->new( "children" => 24 );

$cmd->timeout(90);


# Each child will be given a line from the file in.txt (Or, STDIN, i.e.: cat in.txt | cmd.pl)

$cmd->input_file("in.txt");

OR

$cmd->input_file('-'); # the default is STDIN so you pipe to your script


# The input_delimiter defines $/ inside Pcmd.pm. Used to read your input_file

$cmd->input_delimiter("\n"); # the default. 

# Be very careful not to exit from a child_callback. Run anything that you feel

# might cause an error inside an eval block. Make sure that you always return 

# from child_callback. At this time of writing, Pcmd.pl won't detect that a child

# has suddenly and unexpectedly died. Just behave socially :-)

sub child_callback { # This is your child code, what runs in parallel

my $m = shift;

        chomp $m;

        my $o = your_func($m);

        return $o# Returned value is passed to parent_callback 

}


my @all_answers ;

sub parent_callback {
my $answer = shift;

push @all_answers, @$answer; #that's all I do, save the answer for later.

# or do whatever else with @$answer here ... 

# but carefull as you should return quickly from parent_callback

# so that Pcmd can go on working with its other children

        # in Pcmd.pm i'm not even dealing with a return value for parent_callback.

}


$cmd->callback(\&child_callback, 'child');

$cmd->callback(\&parent_callback,'parent');


$cmd->run(); 

foreach my $a (@all_answers){

print "I am parent. Child said: $a\n";

}


#Any child that timeout, if there was anything in its output stream, it would be store here

my $timedout_answers = $cmd->timedout_answers();

foreach my $a (@$timedout_answers){#each $a is a reference to an array of answers

foreach my $b (@$a){

print "timedout answers:\"" . $b ."\"\n" if ($b);

}

}


Or put URI's one per line in uris.txt and load test a drupal site:


#!/usr/bin/perl


use strict;

use warnings;

use Pcmd;

use HTTP::Tiny;

use Time::HiRes qw(gettimeofday tv_interval);


my $cmd = Pcmd->new( "children" => 24 );

$cmd->timeout(90);

$cmd->input_file("uris.txt");

$cmd->input_delimiter("\n"); # the default.



sub child_callback { # This is your child code, what runs in parallel

    

    my $uri = shift;

    chomp $uri;

    my $start_t = [gettimeofday];

    my $response = HTTP::Tiny->new->get("http://stage.farhadsaberi.com${uri}");

    my $elapsed_t = tv_interval ($start_t, [gettimeofday]);

    $elapsed_t = 'E' unless $response->{success};

    

    my $cacheHit;

    if (exists $response->{'headers'}->{'x-drupal-cache'}){

        $cacheHit = $response->{'headers'}->{'x-drupal-cache'} eq 'HIT' ? 'H' : 'M';

    }else{

        $cacheHit = 'N'; # means that the header is not even there

    }

    

    # format of the return is "H 1.235224 /" where the first letter is an H or M for

    # cache MISS or HIT. We want to count how many hits we get.

    return "$cacheHit $elapsed_t $uri"; # Returned value is passed to parent_callback

}


my @all_answers;

$|=1;

sub parent_callback {

    my $answer = shift;

    

    # ## spinner code if needed

    # ## print substr( "-/|\\", $spin++ % 4, 1 ), "\b";

    # ##

    

    push @all_answers, @$answer; #that's all I do, save the answer for later.

    my $requests= scalar(@all_answers);

    

    my $b = length $requests;

    print "requests: $requests";

    $b += 10;

    print "\b" x $b;

}


$cmd->callback(\&child_callback, 'child');

$cmd->callback(\&parent_callback,'parent');


$cmd->run();

my $tot = 0;

my $tot_c_hits = 0;          # total caches

my $tot_c_none_existant = 0; # No cache info in header

my $tot_c_miss = 0;          # Cache misses

my $tot_t = 0;               # total times


my %error_uris; # keys are the uris and value the count

foreach my $a (@all_answers){

    $a =~ /([HMN])\s(\d+\.?\d*|E)\s(.*)$/;

    my ($c, $t, $uri) = ($1, $2, $3);

    

    # #########

    # possible values:

    # $1: H= Cache Hit, M= Cache Miss, N= No Cache Header

    # $2: E= Error in HTTP respone, x.abc= Decimal number timing the request

    # $3: The requested URI

    # /\/\/\/\/\

    

    ++$tot_c_hits if ($c eq 'H');

    ++$tot_c_miss if ($c eq 'M');

    ++$tot_c_none_existant if ($c eq 'N');

    

    if($t =~ /^\d+\.?\d*/){

        $tot_t += $t;

        $tot++;

    }elsif($t eq 'E'){

        if(exists $error_uris{$uri}){

            $error_uris{$uri} += 1;

        }else{

            $error_uris{$uri} = 1;

        }

    }else{

        print "ERROR Bad child response: \"$a\"\n";

    }

}


my $all_requests_count = scalar(@all_answers);

print "There were $tot successful requests out of $all_requests_count\n";


my $avg = $tot_t / $tot;

print "average response time: $avg\n";

print "Cache Hits: $tot_c_hits. Missing Cache header: $tot_c_none_existant.\n";


print "\nUnsuccessful URI's:\n";

foreach my $k (keys %error_uris){

    print $error_uris{$k} . "  $k\n";

}


#Any child that timeout, if there was anything in its output stream, it would be store here

my $timedout_answers = $cmd->timedout_answers();

foreach my $a (@$timedout_answers){#each $a is a reference to an array of answers

    foreach my $b (@$a){

        print "timedout answers:\"" . $b ."\"\n" if ($b);

    }

}



I hope that there's some use to this for others. When I first started programming in parallel processing it was difficult to get a grasp of what's going on and why it would fail. But after many years of experience and especially many failed attempts it becomes clear. One note about signal handling. I did not explain what I put here and not all signals are handled. That's perhaps the subject of another article, once I would know what I would exactly write :-)

Clearly you will never die or exit within your child callback. This model does not expect any child to ever exit on its own. The parent is the only one with the discretion of terminating a hung child and replacing it with another one with the next item from its input. So, be paranoid and place anything that you think might exit or die your code in an eval block. Pcmd.pm will call the child callback in an eval block as well and if a die or any error has occurred the whole process group is terminated. If a child exits, the parent process will handle SIGCHLD by terminating all other children and exiting itself also. Therefore, never exit from a child process because it is programmed this way for maximum speed.


No TrackBacks

TrackBack URL: http://www.farhadsaberi.com/cgi-bin/mt/mt-tb.cgi/18

Leave a comment

About this Entry

This page contains a single entry by Farhad Saberi published on October 11, 2012 1:58 PM.

HTTP Post stream upload a file chunked transfer was the previous entry in this blog.

Follow Symbolic Link Tree pstree is the next entry in this blog.

Find recent content on the main index or look in the archives to find all content.