Home > Software engineering >  test for available data in filehandle
test for available data in filehandle

Time:09-28

For some reason I am implementing some specific network protocol similar to STOMP in plain pure Perl.

The connection can be either a direct network socket, or an SSL tunnel provided by openssl s_client created by a call to open3 (no IO::Socket::SSL available on the host).

Depending on the dialog a request to the server may or may not have a response, or may have multiple responses. How can I test the file descriptors for the existence of data? Currently when no data is available, it waits until the defined timeout.

EDIT: I have probably a vocabulary issue between file handle vs. file descriptor to perform my research. I just found that eof() may help but cannot use it correctly yet.

While it is a bit complicated to provide an SCCCE, here is the interesting parts of the code:

# creation of a direct socket connection 
sub connect_direct_socket {
    my ($host, $port) = @_;
    my $sock = new IO::Socket::INET(PeerAddr => $host,
                                    PeerPort => $port,
                                    Proto    => 'tcp') or die "Can't connect to $host:$port\n";
    $sock->autoflush(1);
    say STDERR "* connected to $host port $port" if $args{verbose} || $args{debug};
    
    return $sock, $sock, undef;
}

# for HTTPS, we are "cheating" by creating a tunnel with OpenSSL in s_client mode
my $tunnel_pid;
sub connect_ssl_tunnel {
    my ($dest) = @_;
    my ($host, $port);
    $host = $dest->{host};
    $port = $dest->{port};
    
    my $cmd = "openssl s_client -connect ${host}:${port} -servername ${host} -quiet";# -quiet -verify_quiet -partial_chain';
    $tunnel_pid = open3(*CMD_IN, *CMD_OUT, *CMD_ERR, $cmd);
    say STDERR "* connected via OpenSSL to $host:$port" if $args{verbose} || $args{debug};
    say STDERR "* command = $cmd" if $args{debug};

    $SIG{CHLD} = sub {
        print STDERR "* REAPER: status $? on ${tunnel_pid}\n" if waitpid($tunnel_pid, 0) > 0 && $args{debug};
    };
    return *CMD_IN, *CMD_OUT, *CMD_ERR;
}

# later
($OUT, $IN, $ERR) = connect_direct_socket($url->{host}, $url->{port});
# or
($OUT, $IN, $ERR) = connect_ssl_tunnel($url);

# then I am sending with a
print $OUT $request;
# and read the response with
my $selector = IO::Select->new();
$selector->add($IN);

FRAME:
while (my @ready = $selector->can_read($args{'max-wait'} || $def_max_wait)) {
    last unless @ready;
    foreach my $fh (@ready) {
        if (fileno($fh) == fileno($IN)) {
            my $buf_size = 1024 * 1024;
            my $block = $fh->sysread(my $buf, $buf_size);
            if($block){
                if ($buf =~ s/^\n*([^\n].*?)\n\n//s){
                    # process data here
                }
                if ($buf =~ s/^(.*?)\000\n*//s ){
                    goto EOR;
                    # next FRAME;
                }                }
            $selector->remove($fh) if eof($fh);
        }
    }
}
EOR:    

EDIT 2 and epilogue

As a summary, depending in the protocol dialog

  • a request can have an expected response (for instance a CONNECT must return a CONNECTED)
  • a request to get the pending messages can return a single response, multiple responses at once (without intermediate request), or no response (and in this case the can_read() with no parameter of Ikegami is blocking, what I want to avoid).

Thanks to Ikegami I have changed my code as the following:

  • the timeout argument to can_read() is passed as an argument to the sub that is processing the responses
  • for initial connections I am passing a timeout of several seconds
  • when I expect instant responses I am passing a timeout of 1 second
  • in the process loop, after any correct response I replace the initial timeout by a 0.1 to not block if no more data is waiting in the filehandle

Here is my updated code:

sub process_stomp_response {
    my $IN = shift;
    my $timeout = shift;

    my $resp = [];
    my $buf;                    # allocate the buffer once and not in loop - thanks Ikegami!
    my $buf_size = 1024 * 1024;

    my $selector = IO::Select->new();
    $selector->add($IN);

  FRAME:
    while (1){
        my @ready = $selector->can_read($timeout);
        last FRAME unless @ready;     # empty array = timed-out
        foreach my $fh (@ready) {
            if (fileno($fh) == fileno($IN)) {
                my $bytes = $fh->sysread($buf, $buf_size);
                # if bytes undef -> error, if 0 -> eof, else number of read bytes
                my %frame;
                if (defined $bytes){
                    if($bytes){
                        if ($buf =~ s/^\n*([^\n].*?)\n\n//s){
                            # process frame headers here
                            # [...]
                        }
                        if ($buf =~ s/^(.*?)\000\n*//s ){
                            # process frame body here
                            # [...]
                            push @$resp, \%frame;
                            $timeout = 0.1; # for next read short timeout
                            next FRAME;
                        }
                    } else {
                        # EOF
                        $selector->remove($fh); 
                        last FRAME;
                    }
                } else {
                    # something is wrong
                    say STDERR "Error reading STOMP response: $!";
                }
            } else {
                # what? not the given fh
            }
        }
    }
    return $resp;
}

CodePudding user response:

Do not use eof in conjunction with select (which can_read wraps). It performs a buffered read, which breaks select.

select will mark a handle as ready for reading when it reaches EOF, and sysread returns zero on EOF. So all you need to do to detect EOF is to check for sysread returning zero.

Note that using a new buffer for every pass was a mistake sysread can easily return only part of a message. The following fixes this, and shows how to handle errors and EOF from sysread.

Globals:

my %clients_by_fd;

When you get a new connection:

$selector->add( $fh );
$clients_by_fd{ fileno( $fh ) } = {
   buf => "",
   # Any other info you want here.
};

Event loop:

while ( 1 ) {
   my @ready = $selector->can_read();
   for my $fh ( @ready ) {
      my $client = $clients_by_fd{ fileno( $fh ) };

      my $buf_ref = \$client->{ buf };

      my $rv = sysread( $fh, $$buf_ref, 1024*1024, length( $$buf_ref ) );
      if ( !$rv ) {
         if ( defined( $rv ) ) {
            # EOF
            if ( length( $$buf_ref ) ) {
               warn( "Error reading: Incomplete message\n" );
            }
         } else {
            # Error
            warn( "Error reading: $!\n" );
         }

         delete $clients_by_fd{ fileno( $fh ) };
         $select->remove( $fh );
      }

      while ( $$buf_ref =~ s/^.*?\n\n//s ) {
         process_message( $client, $& );
      }
   }
}
  • Related