Multicast

Teil 5 von 7 der Serie Sockets

Börsensysteme wie Xetra, Eurex, SWX oder virtX verwenden im typischen Setup Multicasting für verschiedene Zwecke. Zum Beispiel wird in den Schweizer Börsensystemen ein Quorum-System realisiert in dem in einem 4er-Verbund bei Ausfall eines Servers Aufgaben auf andere Quorum-Member delegiert werden.

Da solche Verbünde oft über mehrere Lokationen hinweg verteilt sind, ist eine besondere Anforderung an die Netzwerk-IT (Infrastruktur) einerseits die Rechner in den verschiedenen privaten Netzen über Switches korrekt miteinander zu Verbinden als auch die Firewalls korrekt zu konfigurieren, damit die verschiedenen Server untereinander erfolgreich Multicast-Pakete austauschen können.

Im Allgemeinen ist es für Entwickler ziemlich schwierig solche Konfigurationen zu prüfen und bei der Fehlersuche klar zu entscheiden ober die Ursache im eigenen Code oder in der Infrastruktur zu suchen ist. Ich habe mir deshalb ein kleines Tool geschrieben, das mir bei früheren Kunden viel nützliche Dienste erwiesen hat. Per Kommandozeilen-Optionen können die Multicast-Gruppe und das Interface ausgewählt werden. Der Port ist frei wählbar und die Anzahl der Hops kann begrenzt werden um die Netzwerktopologie zu prüfen.

Der Zweck des Codes ist die Zweckmäßigkeit und nicht Schönheitswettbewerbe zu bestreiten. In wenigen Handgriffen kann Performance-Testing eingebaut werden – aktuell sind 500ms Sleep zwischen den einzelnen Sendungen hart kodiert.

#include <unistd.h>
#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include <arpa/inet.h>

#define MCASTADDR "224.0.0.1"
#define MCASTPORT 5000
int count = 500;    // number of messages to send/receive

void usage(char* progname)
{
    printf("usage: %s -s -m:MC -p:int -i:str -l -n:int\n", progname);
    printf(" -s sct as server\n");
    printf(" -m:MC mc adress [%s]\n", MCASTADDR);
    printf(" -p:int port number [%d]\n", MCASTPORT);
    printf(" -i:str local interface [INADDR_ANY]\n");
    printf(" -l disable loopback\n");
    printf(" -n:int number of messages [%d]\n", count);
    exit(0);
}

int main(int argc, char **argv)
{
    int client = 1; // client (receiver) or server (sender)
    int loopback = 0; // disable loopback
    int dwInterface = INADDR_ANY; // interface to bind to
    int mcgroup = inet_addr(MCASTADDR); // mc group
    int port = MCASTPORT;

    for(int i = 1; i < argc; i++)
    {
        if(argv[i][0] == '-')
        {
            switch (tolower(argv[i][1]))
            {
            case 's': // Sender
                client = 0; break;
            case 'm': // Multicast group to join
                if (strlen(argv[i]) > 3)
                    mcgroup = inet_addr(&argv[i][3]);
                break;
            case 'i': // Local interface to use
                if (strlen(argv[i]) > 3)
                    dwInterface = inet_addr(&argv[i][3]);
                break;
            case 'p': // Port number to use
                if (strlen(argv[i]) > 3)
                    port = atoi(&argv[i][3]);
                break;
            case 'l': // Disable loopback?
                loopback = 1;
                break;
            case 'n': // Number of messages to send/recv
                count = atoi(&argv[i][3]);
                break;
            default:
                usage(argv[0]);
                break;
            }
        }
    }

    //--------------------------------------------
    int s = socket(AF_INET, SOCK_DGRAM, 0);

    int reuse = 1;
    if(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char *) &reuse, sizeof(reuse)) < 0)
        perror("reuse:"), exit(0);

    struct sockaddr_in local;
    local.sin_family = AF_INET;
    local.sin_port = htons(port);
    local.sin_addr.s_addr = dwInterface;

    if(bind(s, (struct sockaddr *)&local, sizeof(local)) < 0)
        perror("bind:"), exit(0);

    struct ip_mreq mcast;
    mcast.imr_multiaddr.s_addr = mcgroup;
    mcast.imr_interface.s_addr = dwInterface;

    if(setsockopt(s, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *)&mcast, sizeof(mcast)) < 0)
        perror("membership:"), exit(0);

    // Set the TTL to something else. Default TTL is 1.
    int optval = 8;
    if(setsockopt(s, IPPROTO_IP, IP_MULTICAST_TTL, (char *)&optval, sizeof(int)) < 0)
        perror("ttl:"), exit(0);

    // Disable the loopback if selected.
    if (loopback)
    {
        optval = 0;
        if(setsockopt(s, IPPROTO_IP, IP_MULTICAST_LOOP, (char *)&optval, sizeof(optval))<0)
            perror("loopback:"), exit(0);
    }

    int received = 0;
    char buffer[1024];
    if (client) // Client
    {
        fprintf(stdout, "Acting as client. Reading %d messages from mc group\n", count);
        struct sockaddr_in from;
        socklen_t addrlen = sizeof(struct sockaddr_in);

        for(int i = 0; i < count; i++)
        {
            if ((received = recvfrom(s, buffer, sizeof(buffer), 0, (struct sockaddr *)&from, &addrlen)) < 0)
                perror("recv"), exit(0);
            buffer[received] = 0;
            printf("RECV: '%s' from <%s>\n", buffer, inet_ntoa(from.sin_addr));
        }
    }
    else // Server
    {
        fprintf(stdout, "Acting as server. Sending %d messages to mc group\n", count);

        struct timespec wait = {0};
        wait.tv_sec = 0;
        wait.tv_nsec = 500 * 1000000L;

        struct sockaddr_in remote;
        remote.sin_family = AF_INET;
        remote.sin_port = htons(port);
        remote.sin_addr.s_addr = mcgroup;

        // Send some data
        for(int i = 0; i < count; i++)
        {
            sprintf(buffer, "server 1: This is the test message number %d", i);
            if (sendto(s, (char *)buffer, strlen(buffer), 0, (struct sockaddr *)&remote, sizeof(remote)) < 0)
                close(s), exit(0);
            nanosleep(&wait, nullptr);
        }
    }

    // Drop group membership
    if (setsockopt(s, IPPROTO_IP, IP_DROP_MEMBERSHIP, (char *)&mcast, sizeof(mcast)) < 0)
         perror("drop:");
    close(s);
    return 0;
}

Series Navigation<< Selenium WebDriver ließt WebSocketLIPC, Unix Domain Sockets >>