Börsensysteme wie Xetra, Eurex, SWX oder virtX verwenden im typischen Setup Multicasting für verschiedene Zwecke. Zum Beispiel wird in den Schweizer Börsensystemen ein Quorum-System realisiert in dem in einem 4er-Verbund bei Ausfall eines Servers Aufgaben auf andere Quorum-Member delegiert werden.
Da solche Verbünde oft über mehrere Lokationen hinweg verteilt sind, ist eine besondere Anforderung an die Netzwerk-IT (Infrastruktur) einerseits die Rechner in den verschiedenen privaten Netzen über Switches korrekt miteinander zu Verbinden als auch die Firewalls korrekt zu konfigurieren, damit die verschiedenen Server untereinander erfolgreich Multicast-Pakete austauschen können.
Im Allgemeinen ist es für Entwickler ziemlich schwierig solche Konfigurationen zu prüfen und bei der Fehlersuche klar zu entscheiden ober die Ursache im eigenen Code oder in der Infrastruktur zu suchen ist. Ich habe mir deshalb ein kleines Tool geschrieben, das mir bei früheren Kunden viel nützliche Dienste erwiesen hat. Per Kommandozeilen-Optionen können die Multicast-Gruppe und das Interface ausgewählt werden. Der Port ist frei wählbar und die Anzahl der Hops kann begrenzt werden um die Netzwerktopologie zu prüfen.
Der Zweck des Codes ist die Zweckmäßigkeit und nicht Schönheitswettbewerbe zu bestreiten. In wenigen Handgriffen kann Performance-Testing eingebaut werden – aktuell sind 500ms Sleep zwischen den einzelnen Sendungen hart kodiert.
#include <unistd.h>
#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include <arpa/inet.h>
#define MCASTADDR "224.0.0.1"
#define MCASTPORT 5000
int count = 500; // number of messages to send/receive
void usage(char* progname)
{
printf("usage: %s -s -m:MC -p:int -i:str -l -n:int\n", progname);
printf(" -s sct as server\n");
printf(" -m:MC mc adress [%s]\n", MCASTADDR);
printf(" -p:int port number [%d]\n", MCASTPORT);
printf(" -i:str local interface [INADDR_ANY]\n");
printf(" -l disable loopback\n");
printf(" -n:int number of messages [%d]\n", count);
exit(0);
}
int main(int argc, char **argv)
{
int client = 1; // client (receiver) or server (sender)
int loopback = 0; // disable loopback
int dwInterface = INADDR_ANY; // interface to bind to
int mcgroup = inet_addr(MCASTADDR); // mc group
int port = MCASTPORT;
for(int i = 1; i < argc; i++)
{
if(argv[i][0] == '-')
{
switch (tolower(argv[i][1]))
{
case 's': // Sender
client = 0; break;
case 'm': // Multicast group to join
if (strlen(argv[i]) > 3)
mcgroup = inet_addr(&argv[i][3]);
break;
case 'i': // Local interface to use
if (strlen(argv[i]) > 3)
dwInterface = inet_addr(&argv[i][3]);
break;
case 'p': // Port number to use
if (strlen(argv[i]) > 3)
port = atoi(&argv[i][3]);
break;
case 'l': // Disable loopback?
loopback = 1;
break;
case 'n': // Number of messages to send/recv
count = atoi(&argv[i][3]);
break;
default:
usage(argv[0]);
break;
}
}
}
//--------------------------------------------
int s = socket(AF_INET, SOCK_DGRAM, 0);
int reuse = 1;
if(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char *) &reuse, sizeof(reuse)) < 0)
perror("reuse:"), exit(0);
struct sockaddr_in local;
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = dwInterface;
if(bind(s, (struct sockaddr *)&local, sizeof(local)) < 0)
perror("bind:"), exit(0);
struct ip_mreq mcast;
mcast.imr_multiaddr.s_addr = mcgroup;
mcast.imr_interface.s_addr = dwInterface;
if(setsockopt(s, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *)&mcast, sizeof(mcast)) < 0)
perror("membership:"), exit(0);
// Set the TTL to something else. Default TTL is 1.
int optval = 8;
if(setsockopt(s, IPPROTO_IP, IP_MULTICAST_TTL, (char *)&optval, sizeof(int)) < 0)
perror("ttl:"), exit(0);
// Disable the loopback if selected.
if (loopback)
{
optval = 0;
if(setsockopt(s, IPPROTO_IP, IP_MULTICAST_LOOP, (char *)&optval, sizeof(optval))<0)
perror("loopback:"), exit(0);
}
int received = 0;
char buffer[1024];
if (client) // Client
{
fprintf(stdout, "Acting as client. Reading %d messages from mc group\n", count);
struct sockaddr_in from;
socklen_t addrlen = sizeof(struct sockaddr_in);
for(int i = 0; i < count; i++)
{
if ((received = recvfrom(s, buffer, sizeof(buffer), 0, (struct sockaddr *)&from, &addrlen)) < 0)
perror("recv"), exit(0);
buffer[received] = 0;
printf("RECV: '%s' from <%s>\n", buffer, inet_ntoa(from.sin_addr));
}
}
else // Server
{
fprintf(stdout, "Acting as server. Sending %d messages to mc group\n", count);
struct timespec wait = {0};
wait.tv_sec = 0;
wait.tv_nsec = 500 * 1000000L;
struct sockaddr_in remote;
remote.sin_family = AF_INET;
remote.sin_port = htons(port);
remote.sin_addr.s_addr = mcgroup;
// Send some data
for(int i = 0; i < count; i++)
{
sprintf(buffer, "server 1: This is the test message number %d", i);
if (sendto(s, (char *)buffer, strlen(buffer), 0, (struct sockaddr *)&remote, sizeof(remote)) < 0)
close(s), exit(0);
nanosleep(&wait, nullptr);
}
}
// Drop group membership
if (setsockopt(s, IPPROTO_IP, IP_DROP_MEMBERSHIP, (char *)&mcast, sizeof(mcast)) < 0)
perror("drop:");
close(s);
return 0;
}