Daily Archives: 14 April, 2014

Network Programming – 3

Okay, last time I added infrastructure for handling config files and command lines to the project.  Also, options accessible either way to allow you to specify what port to listen to and how deep to make the listening queue.

The more stuff there is in the whole project, the more low-level actions are out of place in the main() procedure.  I’m going to lift a couple more things out of it. Aaand, I guess I should explain why.

This is something so obvious to programmers that it’s hard for us to explain it to anyone else, but it’s essentially the distinction between map and territory.  The bigger your project gets, the more you want to use main() as a map and the less you want to use it as the territory.   Above some fairly small number of lines of code, when you look at main() you’re usually trying to find out where in the code to find something else.  Once main() is useful to you as a programmer mainly as a map, you want to see absolutely nothing in it except well-commented calls to a few routines that do conceptually separate things. Everything else is clutter that just makes your map harder to read.  When main() is a good code-map you can easily figure out which of those few calls is the control branch that the thing you want to work on will be found in, and follow it.

Other than that, let’s get back to the objective of giving the program a way to figure out where to make outgoing connections to.  This information needs to persist between runs of the program, so that a new run can use connection information learned during a previous runs to help find other instances to connect to.  Because it should outlive a particular run of the program, it needs to live in a file.   Because it needs to be a file that the program can read and write, it can’t be the same configuration file we’re already reading.

Given that the program is going to have a data file, the configuration/command line infrastructure we’ve just built needs a way to tell  it where to find the data file.  And there needs to be a default location for that data file that gets used when it’s not specified on the command line.

So: to summarize goals for this session:

  • Pull more stuff out of main() and into its own routines.
  • Add a config option to specify a connection file.
  • Add a routine to read a connection file.
  • Add a data structure that gets filled in by reading the connection file.

Ready?

/*
** server.c -- a stream socket server demo (restructured, command line & config file processing added)
*/

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <signal.h>
#include <time.h>
#include <fcntl.h>

#define PORT "3490"  /* the default port users will be connecting to */

#define BACKLOG 10   /* how many pending connections queue will hold by default */

/* how many connections can I make at once? */
#define MAXCONNECTIONS 128

/* how many addresses will I attempt to keep for node-discovery etc? */
#define MAXCONNINFO 4096


struct connaddress{
  int whichconnection; /* -1 or if current connection exists, index into the vector of connections. */
  char port[8];        /* Is a 16-bit number, but we're keeping it in string form to pass to getaddrinfo.  */
  int addrfamily;      /* AF_INET or AF_INET6 */
  char ipaddress[38];  /* a 128 bit IPv6 address (as text) in the worst case is 35 characters long. */
  time_t lastspoke;    /* last time we were in contact with this node */
  time_t lastseen;     /* last time we heard that this node is currently online. */

  double longuptime;   /* running average of how reliably this node has responded, updated at most once per hour. */
  double useful;       /* While observed to follow protocol, rises asymptotically to 0.5. Rises higher only when being 
              extra-useful, such as on confirmation of something it was first to notify us of. 
              Set to zero for an observed protocol breach. */

};

struct connectionrecord{
  /* index into the vector of addresses for the address info */
  int whichaddr;

  /* process ID of the thread currently tasked to deal with this connection */
  pid_t socketthread;
  
  /* Okay, I hate having threaded programs lose time with threads waiting for each other, so I'm implementing
     contention-free message queues between parent and socketthread.

     Here is how it works.  rdq is two message queues for passing messages from the thread to the parent.  To pass
     messages to the parent process, socketthread first checks rdOwner for the rdq's and, for the rdq's that it owns,
     whether or not they are empty.  If it owns both queues and finds both empty, it writes its message to one of them
     and then sets the corresponding rdowner to the parent thread's PID.  If it owns only one queue, then whether or not
     that queue is empty it adds its message to that queue and doesn't change anything about the rdowner.  If it owns
     both queues and finds at least one nonempty, it writes the parent's PID to the rdOwner corresponding to the
     nonempty queue containing the oldest message and adds its current message to the other queue.  The key here is that
     it never gives up ownership of a message queue unless it can see that it already owns the other.  Hence it never
     has to wait for the opportunity to write a message for the parent.
     
     The parent thread, meanwhile, when it wants to read messages, checks to see if it owns an rdq.  If it does, then it
     reads everything from that rdq (emptying it) and writes the socketthread's PID back to the corresponding rdowner,
     ceding ownership back to the socketthread.  If it doesn't own an rdq, then there are no messages for it to read.

     wrq and wrOwner work the same way, except that in that case it's the parent thread that's guaranteed to always own
     at least one of the queues and never need to wait to write, and the socketthread is the one doing the reading and
     emptying.

     The nice part of this is that nobody ever has to wait to write a message.  The less nice part is that there's no
     real guarantee about when the other thread gets the written message.  We don't lose time waiting, but we're at
     least potentially passing messages in batches with no guarantees of when (relative to our own run) the other thread
     will have read and responded to them.  

     This will affect protocol design.  Tightly context-dependent synchronous protocols interactively requiring
     information from the parent thread at each step cannot proceed without both threads working and turning ownership
     around.  If no other communication is allowed until a particular such protocol completes, then we might as well be
     using a locking communication model and forcing our threads to wait on each other.  Therefore protocol design for
     this system should emphasize asynchronous protocols that can run insofar as possible using information available to
     socketthreads and which can be arbitrarily interleaved with one another.
     
  */
  pid_t rdOwner[2];
  pid_t wrOwner[2];
  
  struct message *rdq[2];
  struct message *wrq[2];
};





/* configoptions is file scope. */
struct {
  char listenport[9];
  int backlog;

  /* how many addresses do we now have? */
  int addresscount;
  /* how many actual connections are there?  */
  int connectioncount;

  /* This is for node discovery.  The program both reads and writes it. */
  struct connaddress addresses[MAXCONNINFO];  
  /* ephemeral information about each active connection */
  struct connectionrecord connections[MAXCONNECTIONS];
  

} configoptions;



void initconfig(){
  /* atoi does not detect errors.  if PORT is not the 
     string representation of a number this fails hard. */
  strcpy( configoptions.listenport, PORT);
  configoptions.backlog = BACKLOG;
  configoptions.connectioncount = 0;
  configoptions.addresscount = 0;
}


void sigchld_handler(int s){
  /*this checks and reaps all dead child processes, returning 0 immediately if there are none. */
  while(waitpid(-1, NULL, WNOHANG) > 0);
}

/* get sockaddr, IPv4 or IPv6: */
void *get_in_addr(struct sockaddr *sa){
  if (sa->sa_family == AF_INET) {
    return &(((struct sockaddr_in*)sa)->sin_addr);
  }
  
    return &(((struct sockaddr_in6*)sa)->sin6_addr);
}

int setup (int * sockfd){
  struct addrinfo hints, *servinfo, *paddr; 
  int yes=1;
  int addrerror;  
  int returncode = 0;
  
  memset(&hints, 0, sizeof hints);
  hints.ai_family = AF_UNSPEC;         /* AF_UNSPEC means we don't care whether IP4 or IP6 */
  hints.ai_socktype = SOCK_STREAM;     /* we are using sockets, not datagrams. */
  hints.ai_flags = AI_PASSIVE;         /* use my own IP */

  /* getaddrinfo sets servinfo to point to linked list of bindable sockets and 
     returns a nonzero error code if not successful. */  
  if ((addrerror = getaddrinfo(NULL, configoptions.listenport, &hints, &servinfo)) != 0) { 
    fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(addrerror));
    /* return 1 meaning unable to get any addresses to bind */
    returncode = 1;
  }
  
  /* loop through all the results and bind to the first we can */
  if (returncode != 1){
    for(paddr = servinfo; paddr != NULL; paddr = paddr->ai_next) {
      if ((*sockfd = socket(paddr->ai_family,     /* for our purposes anything other than AF_INET or AF_INET6 is useless */
                            paddr->ai_socktype,   /* for our purposes anything other than SOCK_STREAM is useless  */
                            paddr->ai_protocol)   /* must be zero because we're not using a standard protocol. */
           ) == -1) {
        /* we found a socket we can't use. */
        perror("server: socket");
        /* continue meaning try the next thing in the list */
      }

      /* SOL_SOCKET means manipulating options at the level of the sockets API (rather than the protocol level)
         SO_REUSEADDR specifies the option to allow this socket to be used by more than one port.  
         &yes (actually a pointer at any nonzero) means enable that option. (void* because various structs for protocols)
         the last arg gives the size of the thing that the &yes argument points at, in our case an int.   */
      
      else if (setsockopt(*sockfd, SOL_SOCKET, SO_REUSEADDR, (void*)&yes, sizeof(int)) == -1) {
        /* we found a socket but we can't set the options the way we want. */
        perror("server:getsockopt");
      }
 
      /* okay, if we got to here we have a socket descriptor and we've set the options necessary to use it. 
         Now we attempt to bind it.  The arguments here are the socket descriptor, the address, and its length.
         The return value is 0 for success and -1 (with an errno set) for failure. One errno that we're going to 
         be interested in for a P2P app is EADDRINUSE - meaning another instance is already running. */
      else if (bind(*sockfd, paddr->ai_addr, paddr->ai_addrlen) == -1) {
        close(*sockfd); 
        /* we found a socket and set the options but we couldn't bind it (maybe somebody else bound it before 
           we got to it, maybe something else changed)*/
        perror("server: bind");
      }
      /* okay, if we got here then we either bound a port (so we break the loop) or we're out of items 
         in the linked list (in which case break changes nothing because the loop is about to exit anyway). */
      break;
    }  /* ends for loop*/
  
    /*this is a check to see why the for loop stopped.  If paddr is null it means we ran out of servinfo entries.*/
    if (paddr == NULL)  {
      fprintf(stderr, "server: failed to bind\n");
      /* return 2 for failure to bind a port. */
      returncode = 2;
    }
  }  
  /* now that we've bound something or used up all our entries we don't need the list any more. 
     having called getaddrinfo(), we must not return or exit without calling freeaddrinfo().
   */
  freeaddrinfo(servinfo); 
  
  /* we've bound a port, now we try to listen to it.  listen() returns 0 on success, -1 with an errno on failure.
     again, we will be interested in EADDRINUSE for a P2P app. */
  
  if (returncode == 0){
    if (listen(*sockfd, configoptions.backlog) == -1) {
      /* error, listen failed.  kill the thread. */
      perror("server:listen");
      returncode = 3;
    }
  }
  /* successfully bound and listening!  Yay!*/
  if (returncode == 0) printf("server: waiting for connections...\n");
  return returncode;
}




/* read up to MAXCONNINFO struct connaddresses from the connectfile. */
int connectfilehandler(char *filename){
  FILE *connections = fopen(filename, "r");
  int failed = 0;
  int addresscounter;
  char inetspec[10];

  if ( connections == NULL ) {
    /* if we can't open the file, fail. */
    fprintf( stderr, "Could not open connection file %s\n", filename );
    return 1;
  }

  else {

    for (addresscounter = 0; !feof(connections) && addresscounter < MAXCONNINFO; addresscounter++){
      configoptions.addresses[addresscounter].whichconnection = 0;
      /* FIXME, this is terrifyingly fragile.  It's supposed to be a file read and written by machine, but 
     we still can't leave anything around that will crash if it gets unexpected format; this needs a 
     full getc-based parser with error messages etc. */
      fscanf(connections, "%s, %i, %s\n", 
         &(configoptions.addresses[addresscounter].port), 
         inetspec,
         &(configoptions.addresses[addresscounter].ipaddress));
      if (strcmp("AF_INET",inetspec)==0) configoptions.addresses[addresscounter].addrfamily = AF_INET;
      else if (strcmp("AF_INET6",inetspec)==0) configoptions.addresses[addresscounter].addrfamily = AF_INET6;
      else {
    failed++;
    fprintf(stderr,"in file %s read '%s' for inet specification: this program understands only 'AF_INET' or 'AF_INET6'.", 
        filename, inetspec);
      }
    }
    fclose(connections);    
    /* if failed is still zero, we succeeded.  If it is nonzero, we failed. */
    return failed;
  }
  
}


int configfilehandler(char *filename){
  FILE *config = fopen( filename, "r" );
  char cmdbuffer[180];
  int bufferloc = 0;
  int rchar;
  int failed = 0;
  static int filecount = 0;

  /* yes, 180 == exactly fifteen dozen.  That's my canonical definition of 'too many config files'.  */
  if (filecount++ > 180){
    fprintf(stderr, "%s %s %s", 
        "Fifteen dozen configuration files is too many. Giving up while trying to process ", filename, 
        "./nThere is probably an 'include loop' of --configfile directives in your config files./n" );
    /* We can return (fail) directly because we haven't opened a file we need to close yet. */
    return 1;
  }
  
  if ( config == NULL ) {
    /* if we can't open the file, set failure. */
    fprintf( stderr, "Could not open configfile %s\n", config );
    return 1;
  }
  
  else {
    /* read one character at a time from file. */
    while  ((rchar = fgetc( config )) != EOF && !failed ){
      /* read to the end of a line. */
      if (rchar != '\n'){
    /* if there's a line longer than 180 characters 
       in the config file we ignore the rest of it. */
    if (bufferloc+1 < 180) cmdbuffer[bufferloc++] = rchar;
      }
      else{ /* we got a newline, a command is complete */
        /* add a null terminator. */
    cmdbuffer[bufferloc] = 0;
    /* reset bufferloc to read next command line */
    bufferloc = 0;
    /* process this line or set failed. */
    if (handlecommand(cmdbuffer) != EXIT_SUCCESS ){
      fprintf(stderr, "error in config file: this line is unrecognized. \n%s\n", cmdbuffer);
      failed++;
    }
      }
    }
    /* okay, this is weird but occasionally we get a config file that
       ends with something besides a newline. So there might be a
       valid unhandled command in the cmdbuffer after the loop exits.
       If that happens bufferloc will be nonzero. So handle that. */
    if (bufferloc != 0 && !failed){
      cmdbuffer[bufferloc] = 0;
      if (handlecommand(cmdbuffer) != EXIT_SUCCESS ){
    /* bad command, report failure and return failure */
    fprintf(stderr, "error in config file: this line is unrecognized. \n%s\n", cmdbuffer);
    failed++;
      }
    }
    fclose( config );
  }
  /* returns 0 for success or >0 for failure. */
  return failed;
}

int listenporthandler(char *portstring){
  /* portstring is a number, possibly preceded by whitespace */
  int argpoint = 0;
  long int newval = 0;
  errno = 0;
  /* first make sure it's valid. */
  newval = strtol(portstring, NULL, 0);
  /* You can in practice send and receive on port zero, but it is defined by IANA to be an invalid port number. */
  if (errno == ERANGE || errno == EINVAL || newval <= 0 || newval >= 65536){
    fprintf(stderr, "--listenport %s found but %s is not a valid port number.\n", portstring, portstring);
    return(1); /* failure */
  }
  /* IANA doesn't state this explicitly but it's a convention some systems (solaris, etc) follow. */
  if (newval > 49151){
    fprintf(stderr, "%s %s %s %s", 
        "warning: preparing to listen on port ", portstring, ". On some systems ports higher than 49151 ",
        "are reserved for ephemeral connections.\n");
  }
  if (newval < 1024){
    fprintf(stderr, "warning: preparing to listen on port %s. Ports below 1024 are reserved for standard protocols.\n", portstring);
  }

  /* Anyway, if we got here we have a valid port.  Save it as a string in a known format, no matter how we got it. */
  snprintf(configoptions.listenport, 8, "%d", newval);
  return(0); /* success */
}

int backloghandler(char *backstring){
  /* backstring is a number, possibly preceded by whitespace */
  int argpoint = 0;
  long int newval = 0;
  errno = 0;
  newval = strtol(backstring, NULL, 0);
  if (errno == ERANGE || errno == EINVAL || newval < 0){
    fprintf(stderr, "--backlog %s found but %s is not valid (must be a number >= 0).\n", backstring);
    return(1); /* failure */
  }
  if (newval >= 100){
    /* 100 is way too many for a protocol with even moderately persistent connections.  */
    fprintf(stderr, "--backlog %s found but %s is too many waiting connections.  For best results try 1-15.\n", backstring, backstring);
    return(1);
  }
  else if (newval > 20){
    fprintf(stderr, "warning, --backlog %s found; a range of 1-15 maximum waiting connections is recommended for best results.\n ", 
        backstring);
  }
  configoptions.backlog = newval;
  return(0); /* success */
}


/* check a string against a particular command; pass remainder of string to appropriate handler if it matches.
   The third argument is a pointer to a procedure which takes a string and returns an int.  */
int checkcommand(char *command, char *commandstring,  int(*handler)(char *harg), int *rcode) {
  int argloc = strlen(command);
  if (strncmp(command, commandstring, argloc) == 0){
    *rcode = handler(&(commandstring[argloc]));
    /* 1 means this command matched. success/failure is in rcode. */
    return(1);
  }
  /* 0 means no match.  rcode is irrelevant. */
  return (0);
}


/* handle some command.  We don't know what it is, so we check it against all possibilities */
int handlecommand(char *commandstring){
  /* add more lines and handler routines as needed. */
  int rcode = 0;
  if (checkcommand("--backlog",        commandstring,  backloghandler,     &rcode) ||
      checkcommand("--configfile",     commandstring,  configfilehandler,  &rcode) ||
      checkcommand("--connectionfile", commandstring,  connectfilehandler, &rcode) ||      
      checkcommand("--listenport",     commandstring,  listenporthandler,  &rcode))

    /* if checkcommand was true for anything, then rcode will be set according to the 
       success of the handler for that option.  If the handler succeeded, we return 
       rcode to signal success, otherwise we return rcode to signal failure. We don't 
       report to stderr because it's not us that hit the error, it's the handler. */
    return rcode; 

  /* otherwise none of the checkcommand lines matched so we report to stderr and return 1 for failure. */
  fprintf(stderr, "unrecognized command: %s\n", commandstring);
  return(1);
}


int handlearguments(int argc, char *argv[]){
  char cmdbuffer[180];
  int bufferloc = 0;
  int rchar;
  int argcount;
  FILE *config;
  /* first make sure argc is odd, 'cause all our arguments are spec,value pairs */
  if ((argc % 2) != 1){
    /* from the user's point of view the number of args must be even, because he's not counting argv[0].*/
    fprintf(stderr, "wrong number of arguments (must be even).\n");
    return 1;
  }
  else for (argcount = 1; argcount+1 < argc; argcount += 2){    
      strcpy(cmdbuffer, argv[argcount]);
      strcat(cmdbuffer, " ");
      strcat(cmdbuffer, argv[argcount+1]);
      if (handlecommand(cmdbuffer)){
    // handling this pair of arguments failed.
    fprintf(stderr, "invalid command: %s", cmdbuffer );
    return 1;
      } /* end of checking one argpair */
    } /* end for checking all argpairs. */
  return 0;
}


int startreaper(){
  struct sigaction sa;
  
  /* reap all dead processes */
  sa.sa_handler = sigchld_handler; 
  /* sigemptyset initializes sa. */
  sigemptyset(&sa.sa_mask);
  /* some of the socket calls block.  SA_RESTART says that blocked calls, 
     if interrupted by a sigaction, should be restarted. */
  sa.sa_flags = SA_RESTART;
  /* try to set sig handling for children.  If that causes an error, fail. */
  if (sigaction(SIGCHLD, &sa, NULL) == -1) {
    perror("server:sigaction");
    return 1;
  }
  return 0;
}


/* makes new outgoing connections and listens for new incoming connections. 
   Splits off child processes to deal with each connection (both ways). */

int handle_connections(int sockfd){
  int listenerpid;
  int new_fd;  /* listener on sock_fd, new connection on new_fd */
  socklen_t sin_size;
  char csixaddr[INET6_ADDRSTRLEN];  
  int err1, err2;

  struct sockaddr_storage their_addr; /* connector's address information */
  
  err1 = fcntl(sockfd, F_SETFL, (err2 = fcntl(sockfd, F_GETFL)| O_NONBLOCK));
  

  while(1) {  /* main accept() loop */
    sin_size = sizeof their_addr;
    new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size);
    if (new_fd == -1) {
      perror("server: accept");
      continue;
    }
      
    inet_ntop(their_addr.ss_family,
          get_in_addr((struct sockaddr *)&their_addr),
          csixaddr, sizeof csixaddr);
    printf("server: got connection from %s\n", csixaddr);
    
    if (!fork()) { /* Fork!  If this is the child process then */
      close(sockfd); /* we don't need the listener */
      if (send(new_fd, "Hello, world!", 13, 0) == -1)
    perror("child: send");
      close(new_fd);
      exit(0);
    }
    else /* otherwise this is the listener process and the */
      close(new_fd);  /* listener doesn't need the talking port. */
  }
}    

void handle_outgoing(){

}

int main(int argc, char * argv[]){
  int listenerpid;
  int sockfd;

  /* process arguments.  If that causes an error, fail. */
  if (handlearguments(argc, argv)) return EXIT_FAILURE;

  /* Try to bind a listening port.  If we fail, we return. */
  if (setup(&sockfd)!= 0) return EXIT_FAILURE;

  /* set up to reap all dead children.  If we fail, we return. */
  if (startreaper() != 0) return EXIT_FAILURE;
  
  /* start the connection handling thread. Returns the pid of the listener, or 
     -1 if no listener was created.  Now others may contact us. */
  if ((listenerpid = handle_connections(sockfd)) == -1) return EXIT_FAILURE;
  
  /* And now that we're listening, we start trying to contact others */
  //  handle_outgoing(listenerpid);
  
  return EXIT_SUCCESS;
}


Okay, there are things in here that I don’t like, things in here that I do like, and things whose design I’m not sure of yet.

I have fields in the connectionfile for finding nodes. Right now these are the network protocol, the network address, the port number. There are additional fields for keeping track of information about why that node is (or isn’t) useful. The code above doesn’t deal with reading values for those yet. In fact the code above for reading the connection file is pretty pathetic, because it will crash horribly if it attempts to read the file and the file doesn’t contain the format it expects. The file needs a proper parser, possibly for an XML or SEXP markup. And it needs to be future-proofed, meaning I need to be able to change the file format in future versions of the program and fix it so that won’t mess things up.

I’m going to use a lock-free algorithm for passing messages to and from the connected threads, because I hate locks. But right now aside from from several paragraphs of comments about how they will work, that’s not implemented – mostly because I haven’t defined protocol behavior yet in this framework.

Anyway, more bugs introduced this time along with a new feature, more fixes next time as we make the new feature a bit more bulletproof.