#!/usr/bin/env perl
use strict;
use Coro;
use Coro::State;
use Coro::AnyEvent;
use Coro::Semaphore;
use Coro::Handle;
use Coro::Signal;
use Coro::Timer;
use Coro::RWLock;
use AnyEvent::Socket;
use Socket;
use Getopt::Long;
#use Coro::Debug;
my ($LOCAL_PATH)=('/tmp/timeserver');
my $MAXCLIENTS=1000;
use constant {
R_STARTINGUP=>1,
R_RUNNING=>2,
R_STOPPING=>1,
};
my $running=R_STARTINGUP;
our $DEBUG=0;
our $W=0;
our %W=(0=>'W', 1=>'I', 2=>'D');
$Coro::State::WARNHOOK=sub {
if($W<=$DEBUG) {
my ($s)=@_;
my $t=localtime();
my $c=exists $W{$W} ? $W{$W} : 'D'.($W-1);
$s=~s/^/"[$t] $c timeserver: "/gem;
warn $s;
}
};
$Coro::State::DIEHOOK=sub {
die @_ if $^S; # do nothing if exception is being caught
die @_ if ref($_[0]); # or is an object
my ($s)=@_;
my $t=localtime();
$s=~s/^/"[$t] F timeserver: "/gem;
die $s;
};
sub config {
my $help;
Getopt::Long::Configure(qw/no_ignore_case/);
GetOptions('n=i'=>\$MAXCLIENTS,
'd=i'=>\$DEBUG,
'h'=>\$help) && !$help or do {
local $SIG{__WARN__};
warn <<"USAGE";
timeserver [-n maxclients] [-d debuglevel] [socketpath] [-h]
-n set max. number of parallel client connections, default $MAXCLIENTS
-d set a debug level (0..2)
-h print this help
socketpath set the path to the listening AF_UNIX socket,
default $LOCAL_PATH
USAGE
exit 1;
};
if( @ARGV ) {
$LOCAL_PATH=shift @ARGV;
}
local $W=1;
warn "Listening on $LOCAL_PATH\n";
}
$SIG{PIPE} = 'IGNORE';
my $sys64;
BEGIN {
use POSIX qw/uname/;
my @uname=uname;
if( $uname[$#uname]=~/^i\d86$/ ) {
$sys64=0;
} elsif( $uname[$#uname] eq 'x86_64' ) {
$sys64=1;
} else {
die "Unsupported operating system: @uname\n";
}
}
use constant {
RLIMIT_NOFILE=>7, # from /usr/include/bits/resource.h
SYS_setrlimit=>$sys64 ? 160 : 75, # /usr/include/asm/unistd.h
SYS_getrlimit=>$sys64 ? 97 : 76, # /usr/include/asm/unistd.h
struct_rlimit=>$sys64 ? 'QQ':'II', # pack()ed struct rlimit
};
##############################################################
# Helper class
##############################################################
{
package My::Handle;
use strict;
use Coro::Handle;
use IO::Socket::UNIX;
use IO::Handle::Record;
our @ISA=('Coro::Handle');
sub check_socket {
my ($I)=@_;
my $fh=$I->fh;
unless( $fh->isa('IO::Socket::UNIX') ) {
bless $fh, 'IO::Socket::UNIX';
}
return $fh;
}
sub read_record {
my ($I)=@_;
my $fh=$I->check_socket;
while($I->readable) { # coro-wait until readable
my @rec=$fh->read_record;
return if $fh->end_of_input;
return @rec unless defined $fh->read_buffer;
}
die "Unexpected read error";
}
sub write_record {
my ($I, @rec)=@_;
my $fh=$I->check_socket;
my $rc;
$rc=$fh->write_record(@rec) if $I->writable; # push record
return if $rc;
while($I->writable) { # coro-wait until writable
return if $fh->write_record;
}
die "Unexpected read error";
}
sub is_eof {
my ($I)=@_;
my $fh=$I->check_socket;
$fh->end_of_input;
}
sub peercred {
my ($I)=@_;
my $fh=$I->check_socket;
$fh->peercred;
}
sub received_fds {
my ($I)=@_;
my $fh=$I->check_socket;
return [map {$I->new_from_fh($_)} @{$fh->received_fds}];
}
sub fds_to_send : lvalue {
my ($I)=@_;
my $fh=$I->check_socket;
$fh->fds_to_send;
}
}
##############################################################
sub chunked ($) {
my ($data)=@_;
sprintf "%x\r\n%s\r\n", length($data), $data;
}
sub multipart ($$) {
my ($data, $boundary)=@_;
sprintf "--%s\r\nContent-Type: text/html\r\n\r\n%s", $boundary, $data;
}
sub html ($) {
'
'.$_[0].'
';
}
sub clock {
my @fds=@{$_[0]};
my $boundary=$_[1];
while(1) {
return unless $fds[0]->print(chunked multipart html +localtime, $boundary);
Coro::Timer::sleep 1;
}
}
sub handle_client {
my ($fh)=@_;
my ($pid, $uid, $gid)=$fh->peercred;
{local $W=1;
warn "Accepted new connection from PID $pid (UID=$uid, GID=$gid)\n"};
my @rec;
while(1) {
@rec=$fh->read_record;
last if $fh->is_eof;
if($rec[0] eq 'q') {
$fh->write_record('OK');
last;
} elsif($rec[0] eq 'clock') {
# $rec[1] is the multipart boundary
async {
clock @_;
{local $W=1; warn "clock thread finished\n";}
} $fh->received_fds, $rec[1];
$fh->write_record('OK');
}
}
$fh->close;
{local $W=1; warn "Connection from PID $pid (UID=$uid, GID=$gid) closed\n";}
}
sub adjust_nofile {
my $old=pack struct_rlimit, 0, 0;
die "Cannot getrlimit(NOFILE): $!"
if( syscall SYS_getrlimit, RLIMIT_NOFILE, $old );
my ($old_s, $old_h)=unpack struct_rlimit, $old;
my $new=$MAXCLIENTS+10;
return if $new<=$old_s; # OK, we have enough files
$new=pack struct_rlimit, $new, $new;
if( syscall SYS_setrlimit, RLIMIT_NOFILE, $new ) {
$MAXCLIENTS=$old_s-20;
warn "Cannot setrlimit(NOFILE): $! -- adjusting MAXCLIENTS to $MAXCLIENTS\n";
} else {
$new=pack struct_rlimit, 0, 0;
syscall SYS_getrlimit, RLIMIT_NOFILE, $new;
my ($new_s, $new_h)=unpack struct_rlimit, $new;
{local $W=1; warn "RLIMIT_NOFILE raised to $new_s\n";}
}
}
sub somaxconn () {
local $/="\n";
my $fh;
open $fh, '<', '/proc/sys/net/ipv4/tcp_max_syn_backlog' and do {
my $x=<$fh>;
return ($x>0 ? $x+0 : 128);
};
return 128;
}
sub open_listener {
my ($path)=@_;
socket my $fh, AF_UNIX, SOCK_STREAM, 0
or die "Cannot create listener socket on $path: $!\n";
unlink $path;
bind $fh, Socket::pack_sockaddr_un $path
or die "Cannot bind listener to $path: $!\n";
listen $fh, somaxconn or die "Cannot listen on $path: $!\n";
return My::Handle->new_from_fh($fh);
}
sub main {
config;
#my $debugger=Coro::Debug->new_unix_server( "/tmp/coro:$LOCAL_HOSTPORT" );
adjust_nofile;
# $listener is a Coro::Handle
my $listener=open_listener $LOCAL_PATH;
my $run=Coro::Signal->new;
my $sighandler=sub {$running=R_STOPPING; $run->send};
my @signal_watcher=map {AE::signal($_=>$sighandler)} qw/TERM INT/;
my $accept_watcher;
my $activate_listener=sub {
$accept_watcher=AE::io($listener->fh, 0, sub{$run->send});
};
my $stop_listener=sub { undef $accept_watcher };
my $maxclients=Coro::Semaphore->new($MAXCLIENTS);
$running=R_RUNNING;
$activate_listener->();
while( $running==R_RUNNING ) {
$run->wait;
last unless( $running==R_RUNNING );
unless( $maxclients->try ) {
warn "Max # of clients reached\n";
{local $W=2; warn "Stopping listener\n";}
$stop_listener->();
$maxclients->down;
{local $W=2; warn "Activate listener\n";}
$activate_listener->();
}
my $client;
if( $client=$listener->accept ) {
async_pool {
eval {
handle_client @_;
};
warn "$@" if $@;
$maxclients->up;
} $client;
} else {
# this should never happen
# it can happen in principle on ENFILE conditions. EMFILE
# should be handled by the semaphore.
$maxclients->up;
warn "Accept failed: $!\n";
{local $W=2; warn "Stopping listener for 0.5 sec\n";}
$stop_listener->();
Coro::Timer::sleep 0.5;
{local $W=2; warn "Activate listener\n";}
$activate_listener->();
}
}
undef $accept_watcher;
close $listener;
undef $listener;
# reuse $maxclients to see when all children have finished
# this adjustment leaves one "down" operation if there are no children
$maxclients->adjust(-$MAXCLIENTS+1);
# and that is consumed now
# Thus we wait for all current clients to finish.
$maxclients->down;
warn "exiting\n";
return 0;
}
exit main;
__END__
=encoding utf8
=head1 NAME
timeserver - a server-push clock
=head1 SYNOPSIS
timeserver [-n maxclients] [-d level] [socketpath] [-h]
=head1 DESCRIPTION
see L
=head2 Command Line
C accepts these options:
=over 4
=item -n count
sets the number of connections allowed to be handled in parallel,
default 10000. Depends on the current C setting.
=item -d level
verbosity level
=back
The remaining C parameter specifies the path to a
UNIX-domain socket. The server will be listening for connections
on this socket.
Default is, C.
=head2 Protocol
After a connection is established the pair C<< $sock->write_record >>,
C<< $conn->read_record >> is used to communicate. Here is a client side
example:
use IO::Socket::UNIX;
use IO::Handle::Record;
my $s=IO::Socket::UNIX->new('/tmp/timeserver');
$s->fds_to_send=[1]; # pass STDOUT (fd=1) to the server
$s->write_record(clock=>'boundary'); # send the req and pass the fd.
my @reply=$s->read_record; # read the reply
print "reply: @reply\n" # done
call this script as
perl script.pl | cat
You'll see infinite output similar to this:
62
--boundary
Content-Type: text/html
Mon Apr 19 16:19:07 2010
reply: OK
62
--boundary
Content-Type: text/html
Mon Apr 19 16:19:08 2010
...
The C line comes from the client program. All the other output
is written by the server. But it is written directly to the client's
C. Check in another window to see that the perl client process is
gone but the output continues. Then kill the C process to stop it.
=head1 SIGNALS
Upon receipt of a C or C the server stops accepting
client connections. When all other client connections are done the
server exits. As client connection only the connections that implement
the C<< $sock->write_record / $conn->read_record >> protocol are taken
into account.
=head1 AUTHOR
Torsten Förtsch
=head1 SEE ALSO
L,
L,
L
=cut