Commit 8a308127 authored by Renaud Pacalet's avatar Renaud Pacalet
Browse files

Add pipe mode

parent fd99554b
......@@ -47,6 +47,17 @@ sub msg2nstr {
return pack('nC/l>*', $self->{"id"}, @{$self->{"parameters"}});
}
# Message to string
sub msg2str {
my $usage = "Usage: MSG->msg2str()\n msg->msg2str()\n";
my $self = shift or croak("$usage");
my $str = sprintf("%u:%u", $self->{"id"}, scalar(@{$self->{"parameters"}}));
for my $v ( @{$self->{"parameters"}} ) {
$str .= sprintf(":%d", $v);
}
return $str;
}
# Network string to message
sub nstr2msg {
my $usage = "Usage: MSG->nstr2msg(str)\n";
......@@ -58,4 +69,16 @@ sub nstr2msg {
return $msg;
}
# String to message
sub str2msg {
my $usage = "Usage: MSG->str2msg(str)\n";
my $class = shift or croak("$usage");
croak("$usage") if(ref($class));
my $str = shift or croak("$usage");
my ( $id, $n, @params ) = split(/:/, $str);
croak("Invalid string message\n") if($n != @params);
my $msg = MSG->new($id, @params);
return $msg;
}
1;
......@@ -16,15 +16,21 @@ some important differences.
----
MSG.pm is a perl module that handles messages. The messages comprise a 16-bits
identifier ID, an 8-bits number of values N and N 32-bits integer values. The
module uses pack to convert messages between perl hashes to network strings.
identifier ID, and an array of 1 to 255 32-bits integer values. The
module uses pack to convert messages between perl hashes to network strings. The
network strings are the concatenation of:
- ID (2-bytes unsigned integer), the identifier
- N (1-byte unsigned integer), the number of 32-bits integer values
- N 4-bytes integer values
The ID and the integer values are in network (big) endianness.
----
server.pl plays the role of the acquisition device. It can be used to generate
several streams of data samples (sines of various frequencies) and send them
through a UDP socket. Each UDP packet contains a 16-bits stream index and a
single 32-bits sample value.
through a UDP socket. Each sent UDP packet is in the MSG.pm network string
format and carries a 16-bits unsigned ID and a single 32-bits integer value. The
ID is used to differentiate several streams.
Usage: ./server.pl [OPTION]...
Send sine samples to a UDP socket. Samples are in the MSG form where the id
......@@ -38,6 +44,7 @@ specifies the X increment between two consecutive sine samples.
--duration=DURATION total duration in seconds, 0 for never ending (default 1)
--index=INDEX index of stream to send (default 0)
--offset=OFFSET X increment between two consecutive sine(X) (default 0.1)
--pipe send the samples to standard input instead of network
--end send a sample with id 0xffff at the end (if duration != 0)
--help print this help
......@@ -76,6 +83,7 @@ integer value to plot.
--ymax=YMAX gnuplot maximum of vertical axis (default 2147483647)
--threshold=THRESHOLD vertical position of threshold line (default 0)
--geometry=GEOMETRY position on gnuplot window (default 640x480+0+0)
--pipe receive the samples from standard input instead of network
--persist lets plot window survive after main gnuplot program exits
--end interprets any sample with id=0xffff as a request to stop and exit
--help print this help
......@@ -100,10 +108,10 @@ Exit status:
----
test.sh simply launches the client and the server for testing. Of course, for
testing, the client and the server both run on the same computer but it is very
easy to deploy this example on two different nodes by changing the --address
option of the server.
test.sh simply launches the client and the server for testing, first using UDP
packets and then using a pipe. Of course, for testing, the client and the server
both run on the same computer but it is very easy to deploy this example on two
different nodes by changing the --address option of the server.
To run the test:
......
......@@ -20,7 +20,7 @@ my %DEFAULTS = (
);
my %options = ();
GetOptions(\%options, "port=i", "index=s@", "samples=s@", "title=s@", "ymin=s@", "ymax=s@", "threshold=s@", "geometry=s@", "persist", "end", "help");
GetOptions(\%options, "port=i", "index=s@", "samples=s@", "title=s@", "ymin=s@", "ymax=s@", "threshold=s@", "geometry=s@", "pipe", "persist", "end", "help");
my $usage = <<EOS;
Usage: $0 [OPTION]...
Listen to a UDP socket for data samples and plot them. Samples are in the MSG
......@@ -35,6 +35,7 @@ integer value to plot.
--ymax=YMAX gnuplot maximum of vertical axis (default $DEFAULTS{"ymax"})
--threshold=THRESHOLD vertical position of threshold line (default $DEFAULTS{"threshold"})
--geometry=GEOMETRY position on gnuplot window (default $DEFAULTS{"geometry"})
--pipe receive the samples from standard input instead of network
--persist lets plot window survive after main gnuplot program exits
--end interprets any sample with id=0xffff as a request to stop and exit
--help print this help
......@@ -68,6 +69,7 @@ if(defined($options{"help"})) {
my $port = defined($options{"port"}) ? $options{"port"} : $DEFAULTS{"port"};
my $index = defined($options{"index"}) ? $options{"index"} : [ $DEFAULTS{"index"} ];
my $persist = defined($options{"persist"}) ? " --persist " : "";
my $pipe = defined($options{"pipe"}) ? 1 : 0;
my $end = defined($options{"end"}) ? 1 : 0;
if($port < 0 || $port > 65535) {
print "Invalid port specification: $port\n";
......@@ -166,19 +168,28 @@ for my $i (@{$index}) {
$plots[$i]->{"buffer"} = \@buffer;
}
my $sock = IO::Socket::INET->new(
LocalPort => $port,
Proto => 'udp',
Type => SOCK_DGRAM,
);
if(!$sock) {
print "Could not create socket: $!\n";
exit 2;
my $sock;
if(!$pipe) {
$sock = IO::Socket::INET->new(
LocalPort => $port,
Proto => 'udp',
Type => SOCK_DGRAM,
);
if(!$sock) {
print "Could not create socket: $!\n";
exit 2;
}
}
while(1) {
$sock->recv(my $data, 1024);
my $msg = MSG->nstr2msg($data);
my ( $data, $msg );
if($pipe) {
$data = <>;
$msg = MSG->str2msg($data);
} else {
$sock->recv($data, 1024);
$msg = MSG->nstr2msg($data);
}
last if($end && $msg->{"id"} == 0xffff);
my $idx = $msg->{"id"};
next if(!$plots[$idx]);
......@@ -212,4 +223,6 @@ for my $i (@{$index}) {
print $PIPE "exit;\n";
close $PIPE;
}
$sock->close();
if(!$pipe) {
$sock->close();
}
......@@ -16,11 +16,10 @@ my %DEFAULTS = (
"duration" => 1,
"index" => [ 0 ],
"offset" => [ 0.1 ],
"end" => 0,
);
my %options = ();
GetOptions(\%options, "address=s", "port=i", "rate=i", "duration=f", "index=s@", "offset=s@", "end", "help");
GetOptions(\%options, "address=s", "port=i", "rate=i", "duration=f", "index=s@", "offset=s@", "pipe", "end", "help");
my $usage = <<EOS;
Usage: $0 [OPTION]...
Send sine samples to a UDP socket. Samples are in the MSG form where the id
......@@ -34,6 +33,7 @@ specifies the X increment between two consecutive sine samples.
--duration=DURATION total duration in seconds, 0 for never ending (default $DEFAULTS{"duration"})
--index=INDEX index of stream to send (default @{$DEFAULTS{"index"}})
--offset=OFFSET X increment between two consecutive sine(X) (default @{$DEFAULTS{"offset"}})
--pipe send the samples to standard input instead of network
--end send a sample with id 0xffff at the end (if duration != 0)
--help print this help
......@@ -65,7 +65,8 @@ my $rate = defined($options{"rate"}) ? $options{"rate"} : $DEFAULTS{"rate"};
my $duration = defined($options{"duration"}) ? $options{"duration"} : $DEFAULTS{"duration"};
my $index = defined($options{"index"}) ? $options{"index"} : $DEFAULTS{"index"};
my $offset = defined($options{"offset"}) ? $options{"offset"} : $DEFAULTS{"offset"};
my $end = defined($options{"end"}) ? $options{"end"} : $DEFAULTS{"end"};
my $pipe = defined($options{"pipe"}) ? 1 : 0;
my $end = defined($options{"end"}) ? 1 : 0;
if($port < 0 || $port > 65535) {
print "Invalid port specification: $port\n";
print $usage;
......@@ -112,26 +113,37 @@ for my $i (@{$index}) {
$x[$i] = 0;
}
my $sock = IO::Socket::INET->new(
PeerAddr => "$address",
PeerPort => $port,
Proto => 'udp',
Type => SOCK_DGRAM,
);
if(!$sock) {
print "Could not create socket: $!\n";
exit 2;
my $sock;
if(!$pipe) {
$sock = IO::Socket::INET->new(
PeerAddr => "$address",
PeerPort => $port,
Proto => 'udp',
Type => SOCK_DGRAM,
);
if(!$sock) {
print "Could not create socket: $!\n";
exit 2;
}
}
for(my $i = 0; $i < $duration * $rate; $i++) {
for my $i (@{$index}) {
my $val = int(sin($x[$i]) * 2**30);
my $msg = MSG->new($i, $val);
print $sock $msg->msg2nstr;
if($pipe) {
print $msg->msg2str . "\n";
} else {
print $sock $msg->msg2nstr;
}
$x[$i] += $offset[$i];
}
sleep($period);
}
my $msg = MSG->new(0xffff);
print $sock $msg->msg2nstr if($end);
$sock->close();
if($pipe) {
print $msg->msg2str . "\n" if($end);
} else {
print $sock $msg->msg2nstr if($end);
$sock->close();
}
#!/usr/bin/env bash
echo "Testing UDP packets..."
./client.pl --port=56780 \
--index=0,1,2 \
--samples=250 \
......@@ -17,3 +18,22 @@ sleep 1
--index=0,1,2 \
--offset=0.05,0.1,0.2 \
--end
echo "...done"
sleep 1
echo "Testing pipe..."
./server.pl --pipe \
--rate=50 \
--duration=10 \
--index=0,1,2 \
--offset=0.05,0.1,0.2 \
--end | \
./client.pl --pipe \
--index=0,1,2 \
--samples=250 \
--ymin=-2000000000 \
--ymax=2000000000 \
--threshold=0 \
--title="Stream #1","Stream #2","Stream #3" \
--geometry=640x480+0+0,640x480+640+0,640x480+1280+0 \
--end
echo "...done"
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment