/* * Copyright (c) 2000 Konstantinos K. Konstantinidis. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define DEBUG YES #define MAXCLIENTS 64 /* maximum number of clients that can connect */ #define WIN_SIZE 48 /* number of packets to send before waiting for acks */ #define SEG_SIZE 1408 /* max bytes of data per udp packet */ #define BFTP_LSN 4949 /* port deamon listens for client connections */ #define BFTP_RCV 4950 /* destination port for udp broadcasts */ #define BFTP_SND 4951 /* source port for udp broadcasts */ #define CMD_OPEN 1 #define CMD_OACK 2 #define CMD_ACK 3 #define CMD_NACK 4 #define CMD_DATA 5 #define PANIC { perror("PANIC"); exit(1); } /* shit happens */ struct { short command; char magic[32]; } cpkt; struct { short command; union { long no_procno; long no_winno; } ap_no; } apkt; #define procno ap_no.no_procno #define windno ap_no.no_winno struct data_packet { long window; long sequence; size_t size; char data[SEG_SIZE]; }; struct data_window { int size; struct data_packet dpkt[WIN_SIZE]; } window; FILE *data; int tcpsocket = 0; int udpsocket = 0; long wincounter = 0; long pktcounter = 0; size_t size = 2111864832; size_t transfered = 0; float throughput = 0; /* * * Server part * */ struct sockaddr_in broadcast; char *clientname[MAXCLIENTS]; int sockets[MAXCLIENTS]; char nacks[MAXCLIENTS]; char acks[MAXCLIENTS]; int clients = 0; jmp_buf ackbuf; void timer(int x) { longjmp(ackbuf, 1); } void waitclients(int nclients) { struct linger linger = {0}; struct sockaddr_in servername; struct sockaddr_in client; int clientlength = sizeof(client); int optval = 1; int i; linger.l_onoff = 1; linger.l_linger = 30; memset(&servername, 0, sizeof(servername)); servername.sin_family = AF_INET; servername.sin_addr.s_addr = htonl(INADDR_ANY); servername.sin_port = htons(BFTP_LSN); if ((tcpsocket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) PANIC; optval = 1; setsockopt(tcpsocket, SOL_SOCKET, SO_REUSEADDR, (const void *) &optval, sizeof(optval)); setsockopt(tcpsocket, SOL_SOCKET, SO_LINGER, (const void *) &linger, sizeof(linger)); if (bind(tcpsocket, (struct sockaddr *) &servername, sizeof(servername)) == -1) PANIC; if (listen(tcpsocket, MAXCLIENTS) < 0) PANIC; fprintf(stderr, "Waiting for %d clients...\n", nclients); do { sockets[clients] = accept(tcpsocket, (struct sockaddr *) &client, &clientlength); clientname[clients] = strdup(inet_ntoa(client.sin_addr)); printf("[%d] %s : ", clients, clientname[clients]); client.sin_addr.s_addr = htonl(INADDR_BROADCAST); read(sockets[clients], &cpkt, sizeof(cpkt)); if (cpkt.command != htons(CMD_OPEN)) { printf("REJECTED\n"); close(sockets[clients]); } else { apkt.command = htons(CMD_OACK); apkt.procno = htonl(clients); write(sockets[clients], &apkt, sizeof(apkt)); printf("ACCEPTED\n"); clients++; } } while (clients < nclients); fprintf(stderr, "%d clients connected.\n", clients); sleep(2); for (i = 0; i < clients; i++) { apkt.command = htons(CMD_OACK); apkt.procno = htonl(i); write(sockets[i], &apkt, sizeof(apkt)); } /* no need to listen for client connections anymore */ close(tcpsocket); } void opensndsocket(void) { int optval = 1; struct sockaddr_in servername; broadcast.sin_family = AF_INET; broadcast.sin_addr.s_addr = htonl(INADDR_BROADCAST); broadcast.sin_port = htons(BFTP_RCV); if ((udpsocket = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) PANIC; memset(&servername, 0, sizeof(servername)); servername.sin_family = AF_INET; servername.sin_addr.s_addr = htonl(INADDR_ANY); servername.sin_port = htons(BFTP_SND); setsockopt(udpsocket, SOL_SOCKET, SO_BROADCAST, (const void *) &optval, sizeof(optval)); if (bind(udpsocket, (struct sockaddr *) &servername, sizeof(servername)) == -1) PANIC; } void closesndsocket(void) { close(udpsocket); } void readwindow(void) { int i; #ifdef DEBUG fprintf(stderr, "READ %ld\n", wincounter); #endif memset(&window, 0, sizeof(window)); for (i = 0; i < WIN_SIZE; i++) { window.dpkt[i].size = htonl(fread(window.dpkt[i].data, 1, sizeof(window.dpkt[i].data), data)); window.dpkt[i].sequence = htonl((pktcounter++ % WIN_SIZE) + 1); window.dpkt[i].window = htonl(wincounter); } window.size = i; } void sendwindow(void) { int i; #ifdef DEBUG fprintf(stderr, "SEND %ld\n", wincounter); #endif for (i = 0; i < WIN_SIZE; i++) { sendto(udpsocket, &window.dpkt[i], sizeof(window.dpkt[i]), 0, (struct sockaddr *) &broadcast, sizeof(broadcast)); } } void waitforacks(void) { int i, max = 0; short n_nacks, n_acks, ncl; struct timeval tm; fd_set readfds, exceptfds; char rcvdpak[MAXCLIENTS]; memset(rcvdpak, 0, sizeof(rcvdpak)); memset(nacks, 0, sizeof(nacks)); memset(acks, 0, sizeof(acks)); memset(&apkt, -1, sizeof(apkt)); alarm(0); signal(SIGALRM, timer); if (setjmp(ackbuf) == 1) { fprintf(stderr, "Timeout\n"); goto stop; } alarm(5); do { max = 0; FD_ZERO(&readfds); FD_ZERO(&exceptfds); for (i = 0; i < clients; i++) { if (sockets[i] > max) max = sockets[i]; if (sockets[i]) { FD_SET(sockets[i], &readfds); FD_SET(sockets[i], &exceptfds); } } tm.tv_sec = 30; tm.tv_usec = 0; if (select(max + 1, &readfds, NULL, &exceptfds, &tm)) { for (i = 0; i < clients; i++) { if (FD_ISSET(sockets[i], &exceptfds)) { fprintf(stderr, "Lost client %d, continuing...\n", i); close(sockets[i]); sockets[i] = 0; } else if (FD_ISSET(sockets[i], &readfds)) { short command; memset(&apkt, -1, sizeof(apkt)); if (read(sockets[i], &apkt, sizeof(apkt)) != 0) { command = ntohs(apkt.command); if ((command == CMD_ACK) && (wincounter == ntohl(apkt.windno))) acks[i]++; if ((command == CMD_NACK) && (wincounter == ntohl(apkt.windno))) nacks[i]++; if ((apkt.windno == -1) || (wincounter != ntohl(apkt.windno))) { fprintf(stderr, "Lost client %d, continuing*...\n", i); close(sockets[i]); sockets[i] = 0; } #ifdef DEBUG if (command == CMD_ACK) fprintf(stderr, "PACK %d (%d)\n", ntohl(apkt.windno), i); if (command == CMD_NACK) fprintf(stderr, "NACK %d (%d)\n", ntohl(apkt.windno), i); #endif } } } } n_acks = 0; n_nacks = 0; ncl = 0; for (i = 0; i < clients; i++) { if (sockets[i]) { ncl++; if (acks[i]) n_acks++; if (nacks[i]) n_nacks++; } } } while ((n_acks + n_nacks) != ncl); if (!ncl) { fprintf(stderr, "Lost all clients, aborting...\n"); exit(1); } stop:alarm(0); signal(SIGALRM, SIG_IGN); } void sendacks(void) { int i; usleep(10000); memset(&apkt, 0, sizeof(apkt)); apkt.command = htons(CMD_ACK); apkt.windno = htonl(wincounter); #ifdef DEBUG fprintf(stderr, "RACK %ld\n", wincounter); #endif for (i = 0; i < clients; i++) if (sockets[i]) write(sockets[i], &apkt, sizeof(apkt)); } void closeclients(void) { int i; for (i = 0; i < clients; i++); if (sockets[i]) close(sockets[i]); } int lostpackets(void) { short i, nacks, ncl; ncl = 0; nacks = 0; for (i = 0; i < clients; i++) { if (sockets[i]) { ncl++; if (acks[i]) nacks++; } } return (nacks < ncl); } void server(int num_clients, char *file) { time_t start, timer; memset(&window, 0, sizeof(window)); data = fopen(file, "r"); waitclients(num_clients); opensndsocket(); fprintf(stderr, "\nBroadcasting file...\n"); start = time(NULL); while (!feof(data)) { readwindow(); do { sendwindow(); sendacks(); waitforacks(); if (lostpackets()) fprintf(stderr, "Retransmitting window %ld...\n", wincounter); } while (lostpackets()); wincounter++; } timer = time(NULL) - start; /* fprintf(stderr, "Broadcasted %.3fMB at %.2fMbps in %02d:%02d\n", (float) size / (1024 * 1024), (float) 9.2, (int) timer / 60, (int) timer % 60); */ closesndsocket(); closeclients(); fclose(data); exit(0); } /* * * Client part * */ struct sockaddr_in broadcaster; void opensession(char *host) { struct hostent *he; struct sockaddr_in their_addr; if ((he = gethostbyname(host)) == NULL) { herror("gethostbyname"); exit(1); } do { their_addr.sin_family = AF_INET; their_addr.sin_port = htons(BFTP_LSN); their_addr.sin_addr = *((struct in_addr *) he->h_addr); bzero(&(their_addr.sin_zero), 8); if (tcpsocket) { close(tcpsocket); sleep(1); } else { fprintf(stderr, "Trying %s...", inet_ntoa(their_addr.sin_addr)); fflush(stderr); } if ((tcpsocket = socket(AF_INET, SOCK_STREAM, 0)) == -1) { perror("socket"); exit(1); } } while (connect(tcpsocket, (struct sockaddr *) &their_addr, sizeof(struct sockaddr)) == -1); fprintf(stderr, "\nConnected to %s.\n", host); memset(&cpkt, 0, sizeof(cpkt)); cpkt.command = htons(CMD_OPEN); send(tcpsocket, &cpkt, sizeof(cpkt), 0); memset(&apkt, 0, sizeof(apkt)); if ((recv(tcpsocket, &apkt, sizeof(apkt), 0) == -1) || (apkt.command != htons(CMD_OACK))) { fprintf(stderr, "Connection closed by foreign host.\n"); exit(1); } while (recv(tcpsocket, &apkt, sizeof(apkt), 0) == -1) memset(&apkt, 0, sizeof(apkt)); if (apkt.command != htons(CMD_OACK)) { fprintf(stderr, "Connection closed by foreign host.\n"); exit(1); } } void openrcvsocket(void) { struct sockaddr_in myaddr; memset(&myaddr, 0, sizeof(myaddr)); myaddr.sin_family = AF_INET; myaddr.sin_port = htons(BFTP_RCV); myaddr.sin_addr.s_addr = INADDR_ANY; if ((udpsocket = socket(AF_INET, SOCK_DGRAM, 0)) == -1) { perror("socket"); exit(1); } if (bind(udpsocket, (struct sockaddr *) &myaddr, sizeof(struct sockaddr)) == -1) { perror("bind"); exit(1); } } void closercvsocket(void) { close(udpsocket); } void client(char *host, char *file) { int addrlen = sizeof(struct sockaddr); fd_set readfds, exceptfds; time_t start, timer, prev, eta = 0; struct timeval tm; char ack = 1, finished = 0; int i, max; struct data_packet dpkt; memset(&window, 0, sizeof(window)); data = fopen(file, "w"); openrcvsocket(); opensession(host); fprintf(stderr, "Receiving file...\n"); start = time(NULL); do { max = 0; if (udpsocket > max) max = udpsocket; if (tcpsocket > max) max = tcpsocket; FD_ZERO(&readfds); FD_ZERO(&exceptfds); if (udpsocket) { FD_SET(udpsocket, &readfds); FD_SET(udpsocket, &exceptfds); } if (tcpsocket) { FD_SET(tcpsocket, &readfds); FD_SET(tcpsocket, &exceptfds); } tm.tv_sec = 10; tm.tv_usec = 0; if (!select(max + 1, &readfds, NULL, &exceptfds, &tm)) { fprintf(stderr, "Connection timed out.\n"); exit(1); } if (FD_ISSET(udpsocket, &readfds)) { memset(&dpkt, 0, sizeof(dpkt)); if (recvfrom(udpsocket, &dpkt, sizeof(dpkt), 0, (struct sockaddr *) &broadcaster, &addrlen) == -1) { perror("recvfrom"); exit(1); } if (wincounter == ntohl(dpkt.window)) { if (ntohl(dpkt.size) > SEG_SIZE) PANIC; window.dpkt[ntohl(dpkt.sequence) - 1].sequence = ntohl(dpkt.sequence); window.dpkt[ntohl(dpkt.sequence) - 1].size = ntohl(dpkt.size); memcpy(&window.dpkt[ntohl(dpkt.sequence) - 1].data, &dpkt.data, sizeof(dpkt.data)); #ifdef DEBUG printf("GOTS %02d %02d (%d bytes)\n", ntohl(dpkt.window), ntohl(dpkt.sequence), ntohl(dpkt.size)); #endif } else { #ifdef DEBUG printf("GOTI %02d %02d (%d bytes) - expected %ld\n", ntohl(dpkt.window), ntohl(dpkt.sequence), ntohl(dpkt.size), wincounter); #endif } } if (FD_ISSET(tcpsocket, &readfds)) { memset(&apkt, 0, sizeof(apkt)); recv(tcpsocket, &apkt, sizeof(apkt), 0); if (apkt.command == htons(CMD_ACK)) { #ifdef DEBUG printf("RACK %d\n", ntohl(apkt.windno)); #endif if (ntohl(apkt.windno) == wincounter - 1) { apkt.command = htons(CMD_ACK); #ifdef DEBUG fprintf(stderr, "PACK %d\n", ntohl(apkt.windno)); #endif send(tcpsocket, &apkt, sizeof(apkt), 0); } else if (ntohl(apkt.windno) == wincounter) { ack = 1; for (i = 0; i < WIN_SIZE; i++) { if (window.dpkt[i].sequence == 0) ack = 0; else if (window.dpkt[i].size == 0) finished = 1; } if (ack) { apkt.command = htons(CMD_ACK); #ifdef DEBUG printf("PACK %d\n", ntohl(apkt.windno)); #endif send(tcpsocket, &apkt, sizeof(apkt), 0); for (i = 0; i < WIN_SIZE; i++) { if (window.dpkt[i].size == 0) break; fwrite(&window.dpkt[i].data, 1, window.dpkt[i].size, data); pktcounter++; transfered += window.dpkt[ntohl(dpkt.sequence) - 1].size; } memset(&window, 0, sizeof(window)); wincounter++; prev = timer; timer = time(NULL) - start; throughput = ((float) transfered * 8) / ((float) timer * 1024 * 1024 + 1); eta = (int) ((float) (size - transfered) / ((float) transfered / (float) (timer + 1))); fprintf(stderr, "%02d:%02d/%02d:%02d (%7.3fMB at %3.1fMbps) \r", (int) (timer / 60), (int) (timer % 60), (int) (eta / 60), (int) (eta % 60), ((float) transfered) / (1024 * 1024), throughput); fflush(stdout); } else { apkt.command = htons(CMD_NACK); #ifdef DEBUG printf("NACK %d\n", ntohl(apkt.windno)); #endif send(tcpsocket, &apkt, sizeof(apkt), 0); finished = 0; } } } } } while (!finished); timer = time(NULL) - start; fprintf(stderr, "Received %.3fMB at %.2fMbps in %02d:%02d\n", (float) transfered / (1024 * 1024), (float) (throughput), (int) timer / 60, (int) timer % 60); closercvsocket(); fclose(data); exit(0); } /* * * Main * */ void usage(void) { printf("usage:\n\n"); printf("server: bftp put clients file\n"); printf("client: bftp get server file\n\n"); printf("clients - specifies the number of clients (0-%d)\n", MAXCLIENTS); printf("server - specifies the ip or hostname of the server\n"); printf("file - specifies the file that the server will read from and the client write to.\n"); exit(1); } int main(int argc, char *argv[]) { int i = atoi(argv[2]); if (strcmp(argv[1], "put") == 0) server(i, argv[3]); if (strcmp(argv[1], "get") == 0) client(argv[2], argv[3]); usage(); return -1; }