#!/usr/bin/env perl use strict; use Coro; use Coro::State; use Coro::AnyEvent; use Coro::Semaphore; use Coro::Handle; use Coro::Signal; use Coro::Timer; use Coro::RWLock; use AnyEvent::Socket; use Socket; use Getopt::Long; #use Coro::Debug; my ($LOCAL_PATH)=('/tmp/timeserver'); my $MAXCLIENTS=1000; use constant { R_STARTINGUP=>1, R_RUNNING=>2, R_STOPPING=>1, }; my $running=R_STARTINGUP; our $DEBUG=0; our $W=0; our %W=(0=>'W', 1=>'I', 2=>'D'); $Coro::State::WARNHOOK=sub { if($W<=$DEBUG) { my ($s)=@_; my $t=localtime(); my $c=exists $W{$W} ? $W{$W} : 'D'.($W-1); $s=~s/^/"[$t] $c timeserver: "/gem; warn $s; } }; $Coro::State::DIEHOOK=sub { die @_ if $^S; # do nothing if exception is being caught die @_ if ref($_[0]); # or is an object my ($s)=@_; my $t=localtime(); $s=~s/^/"[$t] F timeserver: "/gem; die $s; }; sub config { my $help; Getopt::Long::Configure(qw/no_ignore_case/); GetOptions('n=i'=>\$MAXCLIENTS, 'd=i'=>\$DEBUG, 'h'=>\$help) && !$help or do { local $SIG{__WARN__}; warn <<"USAGE"; timeserver [-n maxclients] [-d debuglevel] [socketpath] [-h] -n set max. number of parallel client connections, default $MAXCLIENTS -d set a debug level (0..2) -h print this help socketpath set the path to the listening AF_UNIX socket, default $LOCAL_PATH USAGE exit 1; }; if( @ARGV ) { $LOCAL_PATH=shift @ARGV; } local $W=1; warn "Listening on $LOCAL_PATH\n"; } $SIG{PIPE} = 'IGNORE'; my $sys64; BEGIN { use POSIX qw/uname/; my @uname=uname; if( $uname[$#uname]=~/^i\d86$/ ) { $sys64=0; } elsif( $uname[$#uname] eq 'x86_64' ) { $sys64=1; } else { die "Unsupported operating system: @uname\n"; } } use constant { RLIMIT_NOFILE=>7, # from /usr/include/bits/resource.h SYS_setrlimit=>$sys64 ? 160 : 75, # /usr/include/asm/unistd.h SYS_getrlimit=>$sys64 ? 97 : 76, # /usr/include/asm/unistd.h struct_rlimit=>$sys64 ? 'QQ':'II', # pack()ed struct rlimit }; ############################################################## # Helper class ############################################################## { package My::Handle; use strict; use Coro::Handle; use IO::Socket::UNIX; use IO::Handle::Record; our @ISA=('Coro::Handle'); sub check_socket { my ($I)=@_; my $fh=$I->fh; unless( $fh->isa('IO::Socket::UNIX') ) { bless $fh, 'IO::Socket::UNIX'; } return $fh; } sub read_record { my ($I)=@_; my $fh=$I->check_socket; while($I->readable) { # coro-wait until readable my @rec=$fh->read_record; return if $fh->end_of_input; return @rec unless defined $fh->read_buffer; } die "Unexpected read error"; } sub write_record { my ($I, @rec)=@_; my $fh=$I->check_socket; my $rc; $rc=$fh->write_record(@rec) if $I->writable; # push record return if $rc; while($I->writable) { # coro-wait until writable return if $fh->write_record; } die "Unexpected read error"; } sub is_eof { my ($I)=@_; my $fh=$I->check_socket; $fh->end_of_input; } sub peercred { my ($I)=@_; my $fh=$I->check_socket; $fh->peercred; } sub received_fds { my ($I)=@_; my $fh=$I->check_socket; return [map {$I->new_from_fh($_)} @{$fh->received_fds}]; } sub fds_to_send : lvalue { my ($I)=@_; my $fh=$I->check_socket; $fh->fds_to_send; } } ############################################################## sub chunked ($) { my ($data)=@_; sprintf "%x\r\n%s\r\n", length($data), $data; } sub multipart ($$) { my ($data, $boundary)=@_; sprintf "--%s\r\nContent-Type: text/html\r\n\r\n%s", $boundary, $data; } sub html ($) { '

'.$_[0].'

'; } sub clock { my @fds=@{$_[0]}; my $boundary=$_[1]; while(1) { return unless $fds[0]->print(chunked multipart html +localtime, $boundary); Coro::Timer::sleep 1; } } sub handle_client { my ($fh)=@_; my ($pid, $uid, $gid)=$fh->peercred; {local $W=1; warn "Accepted new connection from PID $pid (UID=$uid, GID=$gid)\n"}; my @rec; while(1) { @rec=$fh->read_record; last if $fh->is_eof; if($rec[0] eq 'q') { $fh->write_record('OK'); last; } elsif($rec[0] eq 'clock') { # $rec[1] is the multipart boundary async { clock @_; {local $W=1; warn "clock thread finished\n";} } $fh->received_fds, $rec[1]; $fh->write_record('OK'); } } $fh->close; {local $W=1; warn "Connection from PID $pid (UID=$uid, GID=$gid) closed\n";} } sub adjust_nofile { my $old=pack struct_rlimit, 0, 0; die "Cannot getrlimit(NOFILE): $!" if( syscall SYS_getrlimit, RLIMIT_NOFILE, $old ); my ($old_s, $old_h)=unpack struct_rlimit, $old; my $new=$MAXCLIENTS+10; return if $new<=$old_s; # OK, we have enough files $new=pack struct_rlimit, $new, $new; if( syscall SYS_setrlimit, RLIMIT_NOFILE, $new ) { $MAXCLIENTS=$old_s-20; warn "Cannot setrlimit(NOFILE): $! -- adjusting MAXCLIENTS to $MAXCLIENTS\n"; } else { $new=pack struct_rlimit, 0, 0; syscall SYS_getrlimit, RLIMIT_NOFILE, $new; my ($new_s, $new_h)=unpack struct_rlimit, $new; {local $W=1; warn "RLIMIT_NOFILE raised to $new_s\n";} } } sub somaxconn () { local $/="\n"; my $fh; open $fh, '<', '/proc/sys/net/ipv4/tcp_max_syn_backlog' and do { my $x=<$fh>; return ($x>0 ? $x+0 : 128); }; return 128; } sub open_listener { my ($path)=@_; socket my $fh, AF_UNIX, SOCK_STREAM, 0 or die "Cannot create listener socket on $path: $!\n"; unlink $path; bind $fh, Socket::pack_sockaddr_un $path or die "Cannot bind listener to $path: $!\n"; listen $fh, somaxconn or die "Cannot listen on $path: $!\n"; return My::Handle->new_from_fh($fh); } sub main { config; #my $debugger=Coro::Debug->new_unix_server( "/tmp/coro:$LOCAL_HOSTPORT" ); adjust_nofile; # $listener is a Coro::Handle my $listener=open_listener $LOCAL_PATH; my $run=Coro::Signal->new; my $sighandler=sub {$running=R_STOPPING; $run->send}; my @signal_watcher=map {AE::signal($_=>$sighandler)} qw/TERM INT/; my $accept_watcher; my $activate_listener=sub { $accept_watcher=AE::io($listener->fh, 0, sub{$run->send}); }; my $stop_listener=sub { undef $accept_watcher }; my $maxclients=Coro::Semaphore->new($MAXCLIENTS); $running=R_RUNNING; $activate_listener->(); while( $running==R_RUNNING ) { $run->wait; last unless( $running==R_RUNNING ); unless( $maxclients->try ) { warn "Max # of clients reached\n"; {local $W=2; warn "Stopping listener\n";} $stop_listener->(); $maxclients->down; {local $W=2; warn "Activate listener\n";} $activate_listener->(); } my $client; if( $client=$listener->accept ) { async_pool { eval { handle_client @_; }; warn "$@" if $@; $maxclients->up; } $client; } else { # this should never happen # it can happen in principle on ENFILE conditions. EMFILE # should be handled by the semaphore. $maxclients->up; warn "Accept failed: $!\n"; {local $W=2; warn "Stopping listener for 0.5 sec\n";} $stop_listener->(); Coro::Timer::sleep 0.5; {local $W=2; warn "Activate listener\n";} $activate_listener->(); } } undef $accept_watcher; close $listener; undef $listener; # reuse $maxclients to see when all children have finished # this adjustment leaves one "down" operation if there are no children $maxclients->adjust(-$MAXCLIENTS+1); # and that is consumed now # Thus we wait for all current clients to finish. $maxclients->down; warn "exiting\n"; return 0; } exit main; __END__ =encoding utf8 =head1 NAME timeserver - a server-push clock =head1 SYNOPSIS timeserver [-n maxclients] [-d level] [socketpath] [-h] =head1 DESCRIPTION see L =head2 Command Line C accepts these options: =over 4 =item -n count sets the number of connections allowed to be handled in parallel, default 10000. Depends on the current C setting. =item -d level verbosity level =back The remaining C parameter specifies the path to a UNIX-domain socket. The server will be listening for connections on this socket. Default is, C. =head2 Protocol After a connection is established the pair C<< $sock->write_record >>, C<< $conn->read_record >> is used to communicate. Here is a client side example: use IO::Socket::UNIX; use IO::Handle::Record; my $s=IO::Socket::UNIX->new('/tmp/timeserver'); $s->fds_to_send=[1]; # pass STDOUT (fd=1) to the server $s->write_record(clock=>'boundary'); # send the req and pass the fd. my @reply=$s->read_record; # read the reply print "reply: @reply\n" # done call this script as perl script.pl | cat You'll see infinite output similar to this: 62 --boundary Content-Type: text/html

Mon Apr 19 16:19:07 2010

reply: OK 62 --boundary Content-Type: text/html

Mon Apr 19 16:19:08 2010

... The C line comes from the client program. All the other output is written by the server. But it is written directly to the client's C. Check in another window to see that the perl client process is gone but the output continues. Then kill the C process to stop it. =head1 SIGNALS Upon receipt of a C or C the server stops accepting client connections. When all other client connections are done the server exits. As client connection only the connections that implement the C<< $sock->write_record / $conn->read_record >> protocol are taken into account. =head1 AUTHOR Torsten Förtsch =head1 SEE ALSO L, L, L =cut