blob: 12bc871b4184dade0b63f03139cfba4587df8a98 [file] [log] [blame]
#!/usr/bin/perl
#
# conmux -- the main console multiplexor daemon
#
# Main console multiplexor daemon. There is one of these daemons for
# each open console supported in the system. Clients are directed to
# this daemon via the conmux-registry deamon.
#
# (C) Copyright IBM Corp. 2004, 2005, 2006
# Author: Andy Whitcroft <andyw@uk.ibm.com>
#
# The Console Multiplexor is released under the GNU Public License V2
#
use strict;
use FindBin;
use Symbol qw(gensym);
use IO::Socket;
use IO::Multiplex;
use IPC::Open3;
use URI::Escape;
use Net::Domain;
# Find our internal libraries.
use lib $FindBin::Bin;
use lib "$FindBin::Bin/../lib/";
use lib "$FindBin::Bin/lib/";
use Conmux;
our $P = 'conmux';
our $debug = 0;
$SIG{'CHLD'} = "IGNORE";
$| = 1;
#
# CALLBACK: this class is used to provide a timed callback. The multiplexor
# libarary allows us to set a timeout on any open file we have registered.
# So, we open a new file descriptor to /dev/null and set a timeout on that.
#
package Callback;
sub new {
my ($class, $mux, $who, $time) = @_;
my $self = bless { 'who' => $who }, $class;
my ($fh);
print "Callback::new [$self] mux<$mux> who<$who> time<$time>\n"
if ($main::debug);
# Open a file handle to nothing, we need this to hang the timeout
# on in the multiplexor. It will fail with a mux_eof, which we ignore.
open($fh, "</dev/null") || die "$P: /dev/null: open failed - $!\n";
$mux->add($fh);
$mux->set_callback_object($self, $fh);
$mux->set_timeout($fh, $time);
$self;
}
sub mux_timeout {
my ($self, $mux, $fh) = @_;
print "Callback::mux_timeout [$self] mux<$mux> fh<$fh>\n"
if ($main::debug);
$self->{'who'}->callback_timeout();
$mux->close($fh);
}
sub DESTROY {
my ($self) = @_;
print "Callback::DESTROY [$self]\n" if ($main::debug);
}
#
# LISTENER SOCKET: creates an intenet listener for new clients and
# connects them to the junction provided.
#
package ListenerSocket;
sub new {
my ($class, $mux, $port) = @_;
my $self = bless { 'mux' => $mux }, $class;
print "ListenerSocket::new [$self] mux<$mux> port<$port>\n"
if ($main::debug);
$self->initialise($port);
$self;
}
sub initialise {
my ($self, $port) = @_;
my ($sock);
print "ListenerSocket::initialise [$self] port<$port> "
if ($main::debug);
# Create a listening socket and add it to the multiplexor.
my $sock = new IO::Socket::INET(Proto => 'tcp',
LocalPort => $port,
Listen => 4,
ReuseAddr => 1)
or die "socket: $@";
print " adding $self $sock\n" if ($main::debug);
$self->mux->listen($sock);
$self->mux->set_callback_object($self, $sock);
$self->listener($sock);
}
# DATA accessors.
sub mux {
my $self = shift;
if (@_) { $self->{'mux'} = shift }
return $self->{'mux'};
}
sub listener {
my $self = shift;
if (@_) { $self->{'listener'} = shift }
return $self->{'listener'};
}
sub address {
my ($self) = @_;
Net::Domain::hostfqdn() . ':' . $self->{'listener'}->sockport();
}
# JUNCTION: callbacks.
##sub junctionInput {
##}
##sub junctionEOF {
## my ($self) = @_;
##
## $self->{'junction'}->junctionRemove($self, 'console-client');
## $self->{'mux'}->close($self->{'listener'});
##}
# Handle new connections by instantiating a new client class.
sub mux_connection {
my ($self, $mux, $fh) = @_;
my ($client);
print "ListenerSocket::mux_connection [$self] mux<$mux> fh<$fh>\n"
if ($main::debug);
# Make a new client connection.
$client = ClientCmd->new($mux, $fh);
print " new connection $self $client\n" if ($main::debug);
}
sub DESTROY {
my ($self) = @_;
print "ListenerSocket::DESTROY [$self]\n" if ($main::debug);
close($self->listener);
}
#
# JUNCTION: generic junction box object, connects names groups of objects
# to other named groups.
#
# Expects the following callbacks to be defined on each object registered:
# junctionInput($from, $data)
# junctionEOF($from, $to)
#
package Junction;
sub new {
my ($class) = @_;
my $self = bless { }, $class;
print "Junction::new [$self]\n" if ($main::debug);
$self;
}
sub junctionAdd {
my ($self, $client) = @_;
print "Junction::junctionAdd [$self] client<$client>\n"
if ($main::debug);
# Add ourselves to the list of recipients.
$self->{$client} = $client;
}
sub junctionInput {
my ($self, $client, $data) = @_;
my ($c);
print "Junction::junctionInput [$self] client<$client> " .
"data<$data>\n" if ($main::debug);
# Send this data on to the clients listed in the output list.
for $c (values %{$self}) {
print " sending to $c\n" if ($main::debug);
$c->junctionInput($client, $data);
}
}
sub junctionEOF {
my ($self, $client) = @_;
my ($c);
print "Junction::junctionEOF [$self] client<$client>\n"
if ($main::debug);
# Send this eof on to the clients listed in the output list.
for $c (values %{$self}) {
print " sending to $c\n" if ($main::debug);
$c->junctionEOF($client);
}
}
sub junctionRemove {
my ($self, $client) = @_;
print "Junction::junctionRemove [$self] client<$client>\n"
if ($main::debug);
# Drop this client from our lists.
delete $self->{$client};
}
#
# PAYLOAD: generic payload object, connects itself to the requisite junction.
#
package Payload;
my %payloads = ();
my $payloads = 0;
sub lookup {
my ($class, $name) = @_;
$payloads{$name};
}
sub found {
my ($class, $name, $self) = @_;
print "Payloads::found name<$name> self<$self>\n" if ($main::debug);
$payloads{$name} = $self;
$payloads++;
}
sub lost {
my ($class, $name, $self) = @_;
print "Payloads::lost name<$name> self<$self>\n" if ($main::debug);
undef $payloads{$name};
if (--$payloads == 0) {
exit(0);
}
}
sub new {
my ($class, $name, $title, $mux, @a) = @_;
my $self = bless { }, $class;
print "Payload::new [$self] name<$name> title<$title> mux<$mux>\n"
if ($main::debug);
Payload->found($name, $self);
$self->name($name);
$self->title($title);
$self->mux($mux);
$self->enabled(1);
$self->cin(Junction->new);
$self->cout(Junction->new);
$self->initialise(@a);
$self;
}
# Data accessors.
sub name {
my $self = shift;
if (@_) { $self->{'name'} = shift }
return $self->{'name'};
}
sub title {
my $self = shift;
if (@_) { $self->{'title'} = shift }
return $self->{'title'};
}
sub mux {
my $self = shift;
if (@_) { $self->{'mux'} = shift }
return $self->{'mux'};
}
sub cin {
my $self = shift;
if (@_) { $self->{'cin'} = shift }
return $self->{'cin'};
}
sub cout {
my $self = shift;
if (@_) { $self->{'cout'} = shift }
return $self->{'cout'};
}
sub enabled {
my $self = shift;
if (@_) { $self->{'enabled'} = shift }
return $self->{'enabled'};
}
sub connected {
my $self = shift;
if (@_) { $self->{'connected'} = shift }
$self->transition();
return $self->{'connected'};
}
sub transition {
my $self = shift;
my $time = time;
if (($time - $self->{'trans_minor'}) > 30) {
$self->{'trans_major'} = $time;
}
$self->{'trans_minor'} = $time;
}
sub retry_timeout {
my $self = shift;
my $time = time - $self->{'trans_major'};
if ($time < 60) {
return 1;
} elsif ($time < 120) {
return 10;
} else {
return 30;
}
}
sub state {
my $self = shift;
my $ctime = $self->{'connected'};
my $ttime = $self->{'trans_major'};
my $time = time;
if ($ctime && ($time - $ctime) > 30) {
"connected";
} elsif ($ttime && ($time - $ttime) < 60) {
"transition";
} else {
"disconnected";
}
}
sub initialise {
my ($self) = @_;
my ($sock);
print "Payload::initialise [$self]\n" if ($main::debug);
# Ensure we recieve client input.
$self->cin->junctionAdd($self);
$self->connected(time);
}
# Telnet constants.
my $TN_IAC = sprintf("%c", 255);
my $TN_DONT = sprintf("%c", 254);
my $TN_DO = sprintf("%c", 253);
my $TN_WONT = sprintf("%c", 252);
my $TN_WILL = sprintf("%c", 251);
my $TN_SB = sprintf("%c", 250);
my $TN_SE = sprintf("%c", 240);
my $TN_BREAK = sprintf("%c", 243);
my $TNOPT_ECHO = sprintf("%c", 1);
my $TNOPT_SGA = sprintf("%c", 3);
#
# If we get here then we have accumulated a complete telnet
# negotiation string.
#
# Telnet negotiation protocol - RFC#854:
#
# DO We are being asked to DO an option
# DONT We are being asked to NOT DO an option
# WILL We are being told they will DO an option
# WONT We are being told they will NOT DO an option
#
# DO/DONT requests indicate we should {en,dis}able a mode.
# We are expected to respond with WILL or WONT. To prevent
# loops, we should not respond if the request matches our
# current mode.
#
# WILL/WONT requests indicate the other end would like to
# {en,dis}able a mode. We are expected to respond with
# DO/DONT.
#
# If we want a particular mode {en,dis}abled then we may start
# negotiation of that mode with a WILL/WONT.
#
# We want the other end to perform echo by default so we will
# DO any request for ECHO and DONT all other requests.
#
sub mux_input {
my ($self, $mux, $fh, $input) = @_;
my ($client);
print "Payload::mux_input [$self] mux<$mux> fh<$fh> input<$$input>\n"
if ($main::debug);
while ($$input ne "") {
# Ordinary text.
if ($$input =~ s/^([^$TN_IAC]+)//) {
# Data coming in from the payload, this needs to go to
# all of the clients.
$self->cout->junctionInput($self, $1);
next;
}
# IAC,SB,...,SE
if ($$input =~ s/^$TN_IAC$TN_SB([^$TN_SE]+)$TN_SE//) {
print "SB\n" if ($main::debug);
next;
}
# IAC,[DO|DONT|WILL|WONT],<what>
if ($$input =~ s/^$TN_IAC$TN_DO(.)//) {
my $c = unpack("C", $1);
print "DO<$c:$1>\n" if ($main::debug);
# We are DONT on all options so WONT all requests.
$self->junctionInput($self, "$TN_IAC$TN_WONT$1");
next;
}
if ($$input =~ s/^$TN_IAC$TN_DONT(.)//) {
my $c = unpack("C", $1);
print "DONT<$c:$1>\n" if ($main::debug);
# We are already DONT on all options, no reply.
next;
}
if ($$input =~ s/^$TN_IAC$TN_WILL(.)//) {
my $c = unpack("C", $1);
print "WILL<$c:$1>\n" if ($main::debug);
my $reply = $TN_DONT;
if ($1 == $TNOPT_ECHO || $1 == $TNOPT_SGA) {
$reply = $TN_DO;
}
$self->junctionInput($self, "$TN_IAC$reply$1");
next;
}
if ($$input =~ s/^$TN_IAC$TN_WONT(.)//) {
my $c = unpack("C", $1);
print "WONT<$c:$1>\n" if ($main::debug);
$self->junctionInput($self, "$TN_IAC$TN_DONT$1");
next;
}
# IAC,<option>
if ($$input =~ s/^$TN_IAC([^$TN_SB$TN_DO$TN_DONT$TN_WILL$TN_WONT])//) {
print "OPTION<$1>\n" if ($main::debug);
next;
}
# Incomplete ...
if ($$input =~ /^$TN_IAC/) {
return;
}
}
}
sub junctionInput {
my ($self, $from, $data) = @_;
my ($fh);
print "Payload::junctionInput [$self] from<$from> data<$data>\n"
if ($main::debug);
##$self->{'mux'}->write($self->{'wfh'}, $data);
# If we are connected ...
if ($self->{'wfh'}) {
$fh = $self->{'wfh'};
print $fh $data;
} else {
$from->junctionInput($self, "<<<NOT CONNECTED>>>\n");
}
}
sub mux_eof {
my ($self, $mux, $fh) = @_;
my ($client);
print "Payload::mux_eof [$self] mux<$mux> fh<$fh>\n" if ($main::debug);
# Check for a restartable connection.
if ($self->can("restart")) {
my ($timeout) = $self->retry_timeout();
$self->cout->junctionInput($self,
"<<<PAYLOAD LOST ... retrying in $timeout secs>>>\n");
# Schedule a timeout to trigger a reconnect.
Callback->new($mux, $self, $timeout);
} else {
$self->cout->junctionEOF($self);
$self->cin->junctionRemove($self);
Payload->lost($self->name, $self);
}
# Close down the payload ...
$mux->close($self->{'rfh'});
##$mux->remove($self->{'wfh'});
}
sub mux_close {
my ($self, $mux, $fh) = @_;
$self->connected(0);
#close($self->{'rfh'});
close($self->{'wfh'});
undef $self->{'rfh'};
undef $self->{'wfh'};
if ($self->{'pid'}) {
# Kill the process group for this pid.
kill 1, 0 - $self->{'pid'};
undef $self->{'pid'};
}
}
sub callback_timeout {
my ($self) = @_;
print "Payload::callback_timeout [$self]\n" if ($main::debug);
if ($self->enabled) {
$self->cout->junctionInput($self, "<<<PAYLOAD RESTART>>>\n");
$self->openPayload();
} else {
$self->cout->junctionInput($self, "<<<PAYLOAD DISABLED>>>\n");
}
}
sub closePayload {
my ($self) = @_;
if ($self->connected) {
$self->cout->junctionInput($self, "<<<PAYLOAD CLOSED>>>\n");
# Close down the payload ...
$self->mux->close($self->{'rfh'});
}
if ($self->enabled) {
$self->enabled(0);
return 1;
} else {
return 0;
}
}
sub openPayload {
my ($self) = @_;
$self->enabled(1);
if (!$self->connected) {
if ($self->can("restart")) {
$self->restart();
return 1;
}
}
return 0;
}
sub helpAdd {
my ($self, $cmd, $msg) = @_;
push(@{$self->{'help'}}, [ $cmd, $msg ]);
}
sub commandHelp {
my ($self) = @_;
my @entries = (
[ 'break', 'send a break sequence' ]
);
if (defined $self->{'help'}) {
( @entries, @{$self->{'help'}} );
} else {
@entries;
}
}
sub commandAdd {
my ($self, $cmd, @a) = @_;
$self->{'cmd'}->{$cmd} = [ @a ];
}
sub commandExec {
my ($self, $client, $cmd, $a) = @_;
my ($exe);
print "Payload::commandExec [$self] client<$client> cmd<$cmd> a<$a>\n"
if ($main::debug);
$exe = $self->{'cmd'}->{$cmd};
if ($cmd eq "break") {
# Send a telnet break ...
$self->junctionInput($self, "$TN_IAC$TN_BREAK");
return;
} elsif ($cmd eq "close") {
if (!$self->enabled && !$self->connected) {
$client->junctionInput($self,
"console already closed\n");
} elsif ($self->closePayload()) {
$self->cout->junctionInput($self, "(" . $client->id .
") triggered a console close\n");
} else {
$client->junctionInput($self, "ERROR: close failed\n");
}
return;
} elsif ($cmd eq "open") {
if ($self->connected) {
$client->junctionInput($self, "console already open\n");
} elsif ($self->openPayload()) {
$self->cout->junctionInput($self, "(" . $client->id .
") triggered a console open\n");
} else {
$client->junctionInput($self, "open failed\n");
}
return;
}
# Ensure we error if we have no command.
if (!$exe) {
$client->junctionInput($self, "Command not recognised\n");
return;
}
my ($msg, $run) = @{$exe};
if ($msg ne '') {
$self->cout->junctionInput($self, "(" . $client->id .
") $msg\n");
}
local(*IN, *OUT, *ERR);
my ($cmd, @args) = split(m/'(.*?)'|"(.*?)"|\s(.*?)\s/g, $run . " $a");
my @opts;
for (my $i=0; $i < @args; $i++) {
next if not $args[$i];
push(@opts, $args[$i]);
}
my $pid = IPC::Open3::open3(*IN, *OUT, *ERR, $cmd, @opts);
close(*IN{IO});
# XXX: this should not be happening here.
$self->mux->add(*OUT{IO});
my $data = ClientData->new($self->mux, *OUT{IO});
$data->{'id'} = "cmd:$cmd stdout";
$data->payload($self);
$data->cout($self->cout);
# XXX: this should not be happening here.
$self->mux->add(*ERR{IO});
my $data = ClientData->new($self->mux, *ERR{IO});
$data->{'id'} = "cmd:$cmd stderr";
$data->payload($self);
$data->cout($client);
}
sub DESTROY {
my ($self) = @_;
print "Payload::DESTROY [$self]\n" if ($main::debug);
}
#
# PAYLOAD APPLICATION: handles forking off a command as a payload.
#
package PayloadApplication;
use base 'Payload';
sub initialise {
my ($self, $cmd) = @_;
my ($pid, $wfh, $rfh);
print "PayloadApplication::initialise [$self] cmd<$cmd>"
if ($main::debug);
$self->SUPER::initialise();
# XXX: we cannot use the write buffering offered by the mux package
# without suffering a read error from the PWR file handle, there
# is no a way to add a write-only channel.
$self->{'args'} = $cmd;
# Start the payload ...
$pid = IPC::Open3::open3($wfh, $rfh, 0, "setsid " . $cmd);
$self->{'rfh'} = $rfh;
$self->{'wfh'} = $wfh;
$self->{'pid'} = $pid;
$self->mux->add($rfh);
##$mux->add($wfh);
$self->mux->set_callback_object($self, $rfh);
##$mux->set_callback_object($self, $wfh);
print "SHARE PAYLOAD: $self $wfh/$rfh (to $cmd) [fd=" .
fileno($wfh) . "/" . fileno($rfh) . "]\n" if ($main::debug);
print "payload '$cmd' on fd=" . fileno($wfh) . "/" .
fileno($rfh) . "\n";
$self;
}
sub restart {
my ($self) = @_;
$self->initialise($self->{'args'});
}
#
# PAYLOAD SOCKET: handles a network socket as payload.
#
package PayloadSocket;
use base 'Payload';
sub initialise {
my ($self, $addr) = @_;
my ($payload);
print "PayloadSocket::initialise [$self] addr<$addr>\n"
if ($main::debug);
$self->SUPER::initialise();
$self->{'args'} = $addr;
# Create a listening socket and add it to the multiplexor.
my $payload = new IO::Socket::INET(PeerAddr => $addr);
if (!$payload) {
$self->connected(0);
if ($self->can("restart")) {
my ($timeout) = $self->retry_timeout();
$self->cout->junctionInput($self,
"<<<PAYLOAD ERROR ($!) ... retrying in $timeout secs>>>\n");
# Schedule a timeout to trigger a reconnect.
Callback->new($self->mux, $self, $timeout);
} else {
$self->cout->junctionEOF($self);
$self->cin->junctionRemove($self);
Payload->lost($self->name, $self);
}
} else {
$self->{'rfh'} = $payload;
$self->{'wfh'} = $payload;
print "SHARE PAYLOAD: $self $payload (to $addr) [fd=" .
fileno($payload) . "]\n" if ($main::debug);
print "payload '$addr' on fd=" . fileno($payload) . "\n";
$self->mux->add($payload);
$self->mux->set_callback_object($self, $payload);
}
print "SHARE PAYLOAD: $self $payload ... done\n" if ($main::debug);
$self;
}
sub restart {
my ($self) = @_;
$self->initialise($self->{'args'});
}
#
# CLIENT: general client object, represents a remote client channel
#
package Client;
sub new {
my ($class, $mux, $fh) = @_;
my $self = bless { 'mux' => $mux,
'fh' => $fh }, $class;
print "Client::new [$self] mux<$mux> fh<$fh>\n"
if ($main::debug);
$self->initialise();
$self;
}
sub clone {
my ($class, $from) = @_;
my $self = bless { %{$from} }, $class;
print "Client::clone [$self] from<$from>\n" if ($main::debug);
$self->initialise();
$self;
}
# Data accessors.
sub mux {
my $self = shift;
if (@_) { $self->{'mux'} = shift }
return $self->{'mux'};
}
sub payload {
my $self = shift;
if (@_) { $self->{'payload'} = shift }
return $self->{'payload'};
}
sub fh {
my $self = shift;
if (@_) { $self->{'fh'} = shift }
return $self->{'fh'};
}
sub id {
my $self = shift;
if (@_) { $self->{'id'} = shift }
return $self->{'id'};
}
sub announce {
my $self = shift;
if (@_) { $self->{'announce'} = shift }
return $self->{'announce'};
}
sub cout {
my $self = shift;
if (@_) { $self->{'cout'} = shift }
return $self->{'cout'};
}
sub cin {
my $self = shift;
if (@_) {
$self->{'cin'}->junctionRemove($self) if ($self->{'cin'});
$self->{'cin'} = shift;
$self->{'cin'}->junctionAdd($self) if ($self->{'cin'} != undef);
}
return $self->{'cin'};
}
sub initialise {
my ($self) = @_;
print "Client::initialise [$self]\n" if ($main::debug);
$self->mux->set_callback_object($self, $self->fh);
}
sub junctionInput {
my ($self, $from, $data) = @_;
print "Client::junctionInput [$self] data<$data>\n" if ($main::debug);
$self->mux->write($self->fh, $data);
}
sub junctionEOF {
my ($self, $from, $data) = @_;
print "Client::junctionEOF [$self] data<$data>\n" if ($main::debug);
$self->shutdown();
}
sub mux_eof {
my ($self, $mux, $fh, $input) = @_;
print "Client::mux_eof [$self] mux<$mux> fh<$fh> input<$input>\n"
if ($main::debug);
# Handle any pending input, then remove myself from the clients list.
$self->mux_input($mux, $fh, $input);
$self->cin(undef);
$self->cout(undef);
# Tell the multiplexor we no longer are using this channel.
$mux->shutdown($fh, 1);
}
sub mux_close {
my ($self, $mux, $fn) = @_;
print "Client::close [$self]\n" if ($main::debug);
if ($self->announce) {
$self->announce->junctionInput($self, "(" . $self->id .
") disconnected\n");
}
print "$self->{'id'} disconnected\n";
}
sub shutdown {
my ($self) = @_;
print "Client::shutdown [$self]\n" if ($main::debug);
# Close myself down and tell the payload.
$self->mux->shutdown($self->fh, 2);
}
sub DESTROY {
my ($self) = @_;
print "Client::DESTROY [$self]\n" if ($main::debug);
}
#
# CLIENT CMD: represents a client whilst in command mode, when we have commited
# to connecting this will pass the client connection off to a ClientData
# object.
#
package ClientCmd;
use base 'Client';
sub mux_input {
my ($self, $mux, $fh, $input) = @_;
print "Client::shutdown [$self] mux<$mux> fh<$fh> input<$$input>\n"
if ($main::debug);
while ($$input =~ s/^(.*?)\n//) {
my ($cmd, $args) = split(' ', $1, 2);
my (%args) = Conmux::decodeArgs($args);
my $reply = {
'status' => 'ENOSYS unknown command',
};
# XXX: check authentication if required and reject the
# command out of hand - leak _nothing_.
if (!defined $args{'id'}) {
$reply->{'status'} = 'EACCES identifier required';
goto reply;
}
# They are who they say they are, record who that is.
$self->{'id'} = $args{'id'};
if ($cmd eq "CONNECT") {
# Switch over to data mode, hand this connection off
# to a data client instance, I am done.
my ($data, $to, $in, $out);
$data = ClientData->clone($self);
$to = $args{'to'};
if (!$to) {
$reply->{'status'} = "EINVAL CONNECT " .
" requires 'to' specifier";
goto reply;
}
my $payload = Payload->lookup($to);
if (!defined $payload) {
$reply->{'status'} = "EINVAL '$to' not a " .
" valid destination specifier";
goto reply;
}
$reply->{'status'} = 'OK';
# Get the payload title and pass that back.
$reply->{'title'} = $payload->title . ' [channel ' .
$payload->state() . ']';
$reply->{'state'} = $payload->state();
# Get connected clients and pass back as the motd
for my $cl (keys(%{$payload->cout})) {
$reply->{'motd'} .= '(' . $payload->cout->{$cl}->id;
$reply->{'motd'} .= ") is already connected\n";
}
$data->payload($payload);
$args{'type'} = 'client' if (!$args{'type'});
if ($args{'type'} eq 'status') {
$data->cout($payload->cout);
} elsif ($args{'type'} eq 'client') {
if (!$args{'hide'}) {
$data->announce($payload->cout);
$payload->cout->junctionInput(
$self, "(" . $self->id .
") connected\n");
}
$data->cin($payload->cout);
$data->cout($payload->cin);
} else {
$reply->{'status'} = "EINVAL '$args{'type'}' " .
"not a valid destination type";
goto reply;
}
print "$self->{'id'} connected to $to/$args{'type'}\n";
$self->junctionInput($self,
Conmux::encodeArgs($reply) . "\n");
# Don't handle any more input - its not going to be
# for us.
last;
}
reply:
# We're done, send back our response to this.
$self->junctionInput($self, Conmux::encodeArgs($reply) . "\n");
}
}
#
# CLIENT DATA: handles a client connection when in data mode, attaches
# the client connection to the relevant junction.
#
package ClientData;
use base 'Client';
my @help = (
[ 'msg', 'send a message to all connected clients' ],
[ 'quit', 'disconnect from the console' ],
);
sub mux_input {
my ($self, $mux, $fh, $input) = @_;
print "ClientData::mux_input [$self] mux<$mux> fh<$fh> input<$$input>\n"
if ($main::debug);
while ($$input ne "") {
if ($self->{'cmd'} eq '') {
# Check for an incomplete escape ... wait for more.
if ($$input =~ /^~$/s) {
return;
}
if ($$input =~ s/^~\$//s) {
$self->{'cmd'} = '>';
my $title = $self->payload->title;
$self->junctionInput($self, "\r\nCommand($title)> ");
next;
}
# Its not an escape ... pass it on.
# Ship anything before that cannot be the escape.
if ($$input =~ s/^(.[^~]*)(~|$)/\2/s) {
# Data coming in from the client, send it to
# the payload.
$self->cout->junctionInput($self, $1);
}
} else {
# Consume characters upto a newline, echo them back
# to the client as we go.
while ($$input =~ s/^([^\r\n])//) {
my $c = $1;
if ($c eq "\b" || $c eq "\x7f") {
if (length($self->{'cmd'}) > 1) {
$c = "\b \b";
substr($self->{'cmd'},
-1, 1, '');
} else {
$c = '';
}
} else {
$self->{'cmd'} .= $c;
}
$self->junctionInput($self, $c);
}
# If we arn't at a newline, then wait for more input.
if ($$input !~ s/^[\r\n]+//) {
return;
}
$self->junctionInput($self, "\n");
my ($cmd, $a) = split(' ', substr($self->{'cmd'},
1), 2);
$self->{'cmd'} = '';
if ($cmd eq '') {
} elsif ($cmd eq 'help') {
my @cmds = $self->payload->commandHelp();
my $ent;
my $help = "Conmux commands:\n";
for $ent (@cmds, @help) {
$help .= sprintf(" %-20s %s\n",
$ent->[0], $ent->[1]);
}
$self->junctionInput($self, $help);
} elsif ($cmd eq 'quit') {
$self->shutdown();
} elsif ($cmd eq 'msg') {
$self->cin->junctionInput($self,
"($self->{'id'}) $a\n");
# Not a client command ... pass it to the payload.
} else {
$self->payload->commandExec($self, $cmd, $a);
}
}
}
}
#
# LIBRARY: split a string honouring quoting.
#
package main;
sub parse($) {
my ($str) = @_;
my ($pos, @args, $argc, $quote, $real, $c, $inc);
$inc = 0;
@args = ();
$argc = 0;
$quote = 0;
$real = 0;
$pos = 0;
while (substr($str, $pos, 1) eq " ") {
$pos++;
}
for (; $pos < length($str); $pos++) {
$c = substr($str, $pos, 1);
if ($quote != 2 && $c eq '\\') {
$real = 1;
$pos++;
$c = substr($str, $pos, 1);
} else {
$real = 0;
}
if ($quote != 2 && $c eq '"' && !$real) {
$quote ^= 1;
} elsif ($quote != 1 && $c eq "'" && !$real) {
$quote ^= 2;
} elsif ($c eq " " && $quote == 0 && !$real) {
while (substr($str, $pos, 1) eq " ") {
$pos++;
}
$pos--;
$argc++;
} else {
if ($inc) {
$inc = 0;
$argc++;
}
$args[$argc] .= $c;
}
}
@args;
}
#
# MAIN: makes the IO multiplexor, junction, listener and payload and stitches
# them all together.
#
package main;
# Usage checks.
if ($#ARGV != 0 && $#ARGV != 3) {
print STDERR "Usage: $P <config file>\n";
print STDERR " $P <local port> <title> socket <host>:<port>\n";
print STDERR " $P <local port> <title> cmd <cmd>\n";
exit 1
}
my @conf;
if ($#ARGV == 3) {
my ($lport, $title, $what, $arg) = @ARGV;
@conf = (
"listener '$lport'",
"'$what' console '$title' '$arg'"
);
} else {
my ($cf) = @ARGV;
open(CONF, '<', $cf) || die "$P: $cf: open failed - $!\n";
@conf = <CONF>;
close(CONF);
}
# Make a new multiplexer.
my $mux = new IO::Multiplex;
my ($line, $seg, $listener, $payload);
$line = '';
for $seg (@conf) {
# Handle comments, blank lines and line continuation.
chomp($seg); $seg =~ s/^\s+//;
next if ($seg =~ /^#/);
$line .= $seg;
if ($line =~ m/\\$/) {
chop($line);
next;
}
next if (/^\s+$/);
my ($cmd, @a) = parse($line);
$line = '';
if ($cmd eq "listener") {
if ($#a != 0) {
warn "$P: Usage: listener <port>\n" .
"$P: $line\n";
next;
}
my ($lport) = @a;
my ($rhost, $rname);
# port
if ($lport =~ m@^\d+$@) {
# Already in the right format.
# registry/service
} elsif ($lport =~ m@(.*)/(.*)@) {
($rhost, $rname, $lport) = ($1, $2, 0);
# service
} else {
($rhost, $rname, $lport) = ('-', $lport, 0);
}
# Create the client listener socket.
$listener = ListenerSocket->new($mux, $lport);
# Register us with the registry.
if ($rhost) {
Conmux::Registry::add($rhost, $rname, $listener->address);
}
} elsif ($cmd eq 'socket') {
if ($#a != 2) {
warn "$P: Usage: socket <name> <title> <host:port>\n" .
"$P: $line\n";
next;
}
my ($name, $title, $sock) = @a;
# Create the payload.
$payload = PayloadSocket->new($name, $title, $mux, $sock);
} elsif ($cmd eq 'application') {
if ($#a != 2) {
warn "$P: Usage: application <name> <title> <host:port>\n" .
"$P: $line\n";
next;
}
my ($name, $title, $app) = @a;
$payload = PayloadApplication->new($name, $title, $mux, $app);
} elsif ($cmd eq 'command') {
if ($#a != 2) {
warn "$P: Usage: command <name> <msg> <cmd>\n" .
"$P: $line\n";
next;
}
my ($name, $msg, $cmd) = @a;
$payload->commandAdd($name, $msg, $cmd);
} elsif ($cmd eq 'help') {
if ($#a != 1) {
warn "$P: Usage: $cmd <name> <msg>\n" .
"$P: $line\n";
next;
}
my ($name, $msg) = @a;
$payload->helpAdd($name, $msg);
} else {
warn "$P: $cmd: unknown configuration command\n";
}
}
# Hand over to the multiplexor.
do {
eval { $mux->loop; };
warn "$@";
} while ($@ =~ /^Use of freed value in iteration/);
die "ERROR: $@\n";