Drizzled Public API Documentation

CSSocket.cc
00001 /* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
00002  *
00003  * PrimeBase Media Stream for MySQL
00004  *
00005  * This program is free software; you can redistribute it and/or modify
00006  * it under the terms of the GNU General Public License as published by
00007  * the Free Software Foundation; either version 2 of the License, or
00008  * (at your option) any later version.
00009  *
00010  * This program is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  * GNU General Public License for more details.
00014  *
00015  * You should have received a copy of the GNU General Public License
00016  * along with this program; if not, write to the Free Software
00017  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
00018  *
00019  * Original author: Paul McCullagh (H&G2JCtL)
00020  * Continued development: Barry Leslie
00021  *
00022  * 2007-05-24
00023  *
00024  * CORE SYSTEM:
00025  * Basic socket I/O.
00026  *
00027  */
00028 
00029 #include "CSConfig.h"
00030 
00031 #include <stdio.h>
00032 #include <sys/types.h>
00033 
00034 #ifdef OS_WINDOWS
00035 #include <winsock.h>
00036 typedef int socklen_t;
00037 #define SHUT_RDWR 2
00038 #define CLOSE_SOCKET(s) closesocket(s)
00039 #define IOCTL_SOCKET  ioctlsocket
00040 #define SOCKET_ERRORNO  WSAGetLastError()
00041 #define EWOULDBLOCK   WSAEWOULDBLOCK
00042 
00043 #else
00044 #include <sys/ioctl.h>
00045 #include <sys/socket.h>
00046 #include <netdb.h>
00047 #include <netinet/in.h>
00048 #include <arpa/inet.h>
00049 #include <netinet/in.h>
00050 #include <netinet/tcp.h>
00051 #include <sys/select.h>
00052 #include <fcntl.h>
00053 
00054 extern void unix_close(int h);
00055 #define CLOSE_SOCKET(s) unix_close(s)
00056 #define IOCTL_SOCKET  ioctl
00057 #define SOCKET_ERRORNO  errno
00058 
00059 #endif
00060 
00061 #include <ctype.h>
00062 #include <string.h>
00063 #include <stdlib.h>
00064 
00065 #include "CSSocket.h"
00066 #include "CSStream.h"
00067 #include "CSGlobal.h"
00068 #include "CSStrUtil.h"
00069 #include "CSFile.h"
00070 
00071 void CSSocket::initSockets()
00072 {
00073 #ifdef OS_WINDOWS
00074   int   err;
00075   WSADATA data;
00076   WORD  version = MAKEWORD (1,1);
00077   static int inited = 0;
00078 
00079   if (!inited) {
00080     err = WSAStartup(version, &data);
00081 
00082     if (err != 0) {
00083       CSException::throwException(CS_CONTEXT, err, "WSAStartup error");
00084     }
00085     
00086     inited = 1;
00087   }
00088   
00089 #endif
00090 }
00091 
00092 /*
00093  * ---------------------------------------------------------------
00094  * CORE SYSTEM SOCKET FACTORY
00095  */
00096 
00097 CSSocket *CSSocket::newSocket()
00098 {
00099   CSSocket *s;
00100   
00101   new_(s, CSSocket());
00102   return s;
00103 }
00104 
00105 /*
00106  * ---------------------------------------------------------------
00107  * INTERNAL UTILITIES
00108  */
00109 
00110 void CSSocket::formatAddress(size_t size, char *buffer)
00111 {
00112   if (iHost) {
00113     cs_strcpy(size, buffer, iHost);
00114     if (iService)
00115       cs_strcat(size, buffer, ":");
00116   }
00117   else
00118     *buffer = 0;
00119   if (iService)
00120     cs_strcat(size, buffer, iService);
00121 }
00122 
00123 void CSSocket::throwError(const char *func, const char *file, int line, char *address, int err)
00124 {
00125   if (err)
00126     CSException::throwFileError(func, file, line, address, err);
00127   else
00128     CSException::throwEOFError(func, file, line, address);
00129 }
00130 
00131 void CSSocket::throwError(const char *func, const char *file, int line, int err)
00132 {
00133   char address[CS_SOCKET_ADDRESS_SIZE];
00134 
00135   formatAddress(CS_SOCKET_ADDRESS_SIZE, address);
00136   throwError(func, file, line, address, err);
00137 }
00138 
00139 void CSSocket::setNoDelay()
00140 {
00141   int flag = 1;
00142 
00143   if (setsockopt(iHandle, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)) == -1)
00144     CSException::throwOSError(CS_CONTEXT, SOCKET_ERRORNO);
00145 }
00146 
00147 void CSSocket::setNonBlocking()
00148 {
00149   if (iTimeout) {
00150     unsigned long block = 1;
00151 
00152     if (IOCTL_SOCKET(iHandle, FIONBIO, &block) != 0)
00153       throwError(CS_CONTEXT, SOCKET_ERRORNO);
00154   }
00155 }
00156 
00157 void CSSocket::setBlocking()
00158 {
00159   /* No timeout, set blocking: */
00160   if (!iTimeout) {
00161     unsigned long block = 0;
00162 
00163     if (IOCTL_SOCKET(iHandle, FIONBIO, &block) != 0)
00164       throwError(CS_CONTEXT, SOCKET_ERRORNO);
00165   }
00166 }
00167 
00168 void CSSocket::openInternal()
00169 {
00170   iHandle = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
00171   if (iHandle == -1)
00172     CSException::throwOSError(CS_CONTEXT, SOCKET_ERRORNO);
00173   setNoDelay();
00174   setNonBlocking();
00175 }
00176 
00177 void CSSocket::writeBlock(const void *data, size_t len)
00178 {
00179   ssize_t out;
00180 
00181   enter_();
00182   while (len > 0) {
00183     out = send(iHandle, (const char *) data, len, 0);
00184     self->interrupted();
00185     if (out == -1) {
00186       int err = SOCKET_ERRORNO;
00187 
00188       if (err == EWOULDBLOCK || err == EINTR)
00189         continue;
00190       throwError(CS_CONTEXT, err);
00191     }
00192     if ((size_t) out > len)
00193       break;
00194     len -= (size_t) out;
00195     data = ((char *) data) + (size_t) out;
00196   }
00197   exit_();
00198 }
00199 
00200 int CSSocket::timeoutRead(CSThread *self, void *buffer, size_t length)
00201 {      
00202   int     in;
00203   uint64_t  start_time;
00204   uint64_t  timeout = iTimeout * 1000;
00205   
00206   start_time = CSTime::getTimeCurrentTicks();
00207 
00208   retry:
00209   in = recv(iHandle, (char *) buffer, length, 0);
00210   if (in == -1) {
00211     if (SOCKET_ERRORNO == EWOULDBLOCK) {
00212       fd_set      readfds;
00213       uint64_t    time_diff;
00214       struct timeval  tv_timeout;
00215 
00216       FD_ZERO(&readfds);
00217       self->interrupted();
00218 
00219       time_diff = CSTime::getTimeCurrentTicks() - start_time;
00220       if (time_diff >= timeout) {
00221         char address[CS_SOCKET_ADDRESS_SIZE];
00222 
00223         formatAddress(CS_SOCKET_ADDRESS_SIZE, address);
00224         CSException::throwExceptionf(CS_CONTEXT, CS_ERR_RECEIVE_TIMEOUT, "Receive timeout: %lu ms, on: %s", iTimeout, address);
00225       }
00226 
00227       /* Calculate how much time we can wait: */
00228       time_diff = timeout - time_diff;
00229       tv_timeout.tv_sec = (long)time_diff / 1000000;
00230       tv_timeout.tv_usec = (long)time_diff % 1000000;
00231 
00232       FD_SET(iHandle, &readfds);
00233       in = select(iHandle+1, &readfds, NULL, NULL, &tv_timeout);
00234       if (in != -1)
00235         goto retry;
00236     }
00237   }
00238   return in;
00239 }
00240 
00241 /*
00242  * ---------------------------------------------------------------
00243  * SOCKET BASED ON THE STANDARD C SOCKET
00244  */
00245 
00246 void CSSocket::setTimeout(uint32_t milli_sec)
00247 {
00248   if (iTimeout != milli_sec) {
00249     if ((iTimeout = milli_sec))
00250       setNonBlocking();
00251     else
00252       setBlocking();
00253   }
00254 }
00255 
00256 CSOutputStream *CSSocket::getOutputStream()
00257 {
00258   return CSSocketOutputStream::newStream(RETAIN(this));
00259 }
00260 
00261 CSInputStream *CSSocket::getInputStream()
00262 {
00263   return CSSocketInputStream::newStream(RETAIN(this));
00264 }
00265 
00266 void CSSocket::publish(char *service, int default_port)
00267 {
00268   enter_();
00269   close();
00270   try_(a) {
00271     struct servent    *servp;
00272     struct sockaddr_in  server;
00273     struct servent    s;
00274     int         flag = 1;
00275 
00276     openInternal();
00277     if (service) {
00278       if (isdigit(service[0])) {
00279         int i =  atoi(service);
00280 
00281         if (!i)
00282           CSException::throwCoreError(CS_CONTEXT, CS_ERR_BAD_ADDRESS, service);
00283         servp = &s;
00284         s.s_port = htons((uint16_t) i);
00285         iService = cs_strdup(service);
00286       }
00287       else if ((servp = getservbyname(service, "tcp")) == NULL) {
00288         if (!default_port)
00289           CSException::throwCoreError(CS_CONTEXT, CS_ERR_UNKNOWN_SERVICE, service);
00290         servp = &s;
00291         s.s_port = htons((uint16_t) default_port);
00292         iService = cs_strdup(default_port);
00293       }
00294       else
00295         iService = cs_strdup(service);
00296     }
00297     else {
00298       if (!default_port)
00299         CSException::throwCoreError(CS_CONTEXT, CS_ERR_UNKNOWN_SERVICE, "");
00300       servp = &s;
00301       s.s_port = htons((uint16_t) default_port);
00302       iService = cs_strdup(default_port);
00303     }
00304       
00305     iPort = ntohs(servp->s_port);
00306 
00307     memset(&server, 0, sizeof(server));
00308     server.sin_family = AF_INET;
00309     server.sin_addr.s_addr = INADDR_ANY;
00310     server.sin_port = (uint16_t) servp->s_port;
00311 
00312     if (setsockopt(iHandle, SOL_SOCKET, SO_REUSEADDR, (char *) &flag, sizeof(int)) == -1)
00313       CSException::throwOSError(CS_CONTEXT, SOCKET_ERRORNO);
00314 
00315     if (bind(iHandle, (struct sockaddr *) &server, sizeof(server)) == -1)
00316       CSException::throwOSError(CS_CONTEXT, SOCKET_ERRORNO);
00317 
00318     if (listen(iHandle, SOMAXCONN) == -1)
00319       CSException::throwOSError(CS_CONTEXT, SOCKET_ERRORNO);
00320   }
00321   catch_(a) {
00322     close();
00323     throw_();
00324   }
00325   cont_(a);
00326   exit_();
00327 }
00328 
00329 void CSSocket::open(CSSocket *listener)
00330 {
00331   enter_();
00332 
00333   close();
00334   try_(a) {
00335     int listener_handle;
00336     char address[CS_SOCKET_ADDRESS_SIZE];
00337     struct sockaddr_in  remote;
00338     socklen_t     addrlen = sizeof(remote);
00339 
00340     /* First get all the information we need from the listener: */
00341     listener_handle = ((CSSocket *) listener)->iHandle;
00342     listener->formatAddress(CS_SOCKET_ADDRESS_SIZE, address);
00343 
00344     /* I want to make sure no error occurs after the connect!
00345      * So I allocate a buffer for the host name up front.
00346      * This means it may be to small, but this is not a problem
00347      * because the host name stored here is is only used for display
00348      * of error message etc.
00349      */
00350     iHost = (char *) cs_malloc(100);
00351     iHandle = accept(listener_handle, (struct sockaddr *) &remote, &addrlen);
00352     if (iHandle == -1)
00353       throwError(CS_CONTEXT, address, SOCKET_ERRORNO);
00354 
00355     cs_strcpy(100, iHost, inet_ntoa(remote.sin_addr));
00356     iPort = ntohs(remote.sin_port);
00357 
00358     setNoDelay();
00359     setNonBlocking();
00360   }
00361   catch_(a) {
00362     close();
00363     throw_();
00364   }
00365   cont_(a);
00366   exit_();
00367 }
00368 
00369 void CSSocket::open(char *address, int default_port)
00370 {
00371   enter_();
00372   close();
00373   try_(a) {
00374     char        *portp = strchr(address, ':');
00375     struct servent    s;
00376     struct servent    *servp;
00377     struct hostent    *hostp;
00378     struct sockaddr_in  server;
00379 
00380     openInternal();
00381     if (!portp) {
00382       iHost = cs_strdup(address);
00383       if (!default_port)
00384         CSException::throwCoreError(CS_CONTEXT, CS_ERR_BAD_ADDRESS, address);
00385       iService = cs_strdup(default_port);
00386     }
00387     else {
00388       iHost = cs_strdup(address, (size_t) (portp - address));
00389       iService = cs_strdup(portp+1);
00390     }
00391   
00392     if (isdigit(iService[0])) {
00393       int i =  atoi(iService);
00394 
00395       if (!i)
00396         CSException::throwCoreError(CS_CONTEXT, CS_ERR_BAD_ADDRESS, address);
00397       servp = &s;
00398       s.s_port = htons((uint16_t) i);
00399     }
00400     else if ((servp = getservbyname(iService, "tcp")) == NULL)
00401       CSException::throwCoreError(CS_CONTEXT, CS_ERR_UNKNOWN_SERVICE, iService);
00402     iPort = (int) ntohs(servp->s_port);
00403 
00404     if ((hostp = gethostbyname(iHost)) == 0)
00405       CSException::throwCoreError(CS_CONTEXT, CS_ERR_UNKNOWN_HOST, iHost);
00406 
00407     memset(&server, 0, sizeof(server));
00408     server.sin_family = AF_INET;
00409     memcpy(&server.sin_addr, hostp->h_addr, (size_t) hostp->h_length);
00410     server.sin_port = (uint16_t) servp->s_port;
00411     if (connect(iHandle, (struct sockaddr *) &server, sizeof(server)) == -1)
00412       throwError(CS_CONTEXT, SOCKET_ERRORNO);
00413   }
00414   catch_(a) {
00415     close();
00416     throw_();
00417   }
00418   cont_(a);
00419   exit_();
00420 }
00421 
00422 void CSSocket::close()
00423 {
00424   flush();
00425   if (iHandle != -1) {
00426     shutdown(iHandle, SHUT_RDWR);
00427     /* shutdown does not close the socket!!? */
00428     CLOSE_SOCKET(iHandle);
00429     iHandle = -1;
00430   }
00431   if (iHost) {
00432     cs_free(iHost);
00433     iHost = NULL;
00434   }
00435   if (iService) {
00436     cs_free(iService);
00437     iService = NULL;
00438   }
00439   if (iIdentity) {
00440     cs_free(iIdentity);
00441     iIdentity = NULL;
00442   }
00443   iPort = 0;
00444 }
00445 
00446 size_t CSSocket::read(void *data, size_t len)
00447 {
00448   ssize_t in;
00449 
00450   enter_();
00451   /* recv, by default will block until at lease one byte is
00452    * returned.
00453    * So a return of zero means EOF!
00454    */
00455   retry:
00456   if (iTimeout)
00457     in = timeoutRead(self, data, len);
00458   else
00459     in = recv(iHandle, (char *) data, len, 0);
00460   self->interrupted();
00461   if (in == -1) {
00462     /* Note, we actually ignore all errors on the socket.
00463      * If no data was returned by the read so far, then
00464      * the error will be considered EOF.
00465      */
00466     int err = SOCKET_ERRORNO;
00467 
00468     if (err == EWOULDBLOCK || err == EINTR)
00469       goto retry;
00470     throwError(CS_CONTEXT, err);
00471     in = 0;
00472   }
00473   return_((size_t) in);
00474 }
00475 
00476 int CSSocket::read()
00477 {
00478   int   ch;
00479   u_char  buffer[1];
00480 
00481   enter_();
00482   if (read(buffer, 1) == 1)
00483     ch = buffer[0];
00484   else
00485     ch = -1;
00486   return_(ch);
00487 }
00488 
00489 int CSSocket::peek()
00490 {
00491   return -1;
00492 }
00493 
00494 void CSSocket::write(const void *data, size_t len)
00495 {
00496 #ifdef CS_USE_OUTPUT_BUFFER
00497   if (len <= CS_MIN_WRITE_SIZE) {
00498     if (iDataLen + len > CS_OUTPUT_BUFFER_SIZE) {
00499       /* This is the amount of data that will still fit
00500        * intp the buffer:
00501        */
00502       size_t tfer = CS_OUTPUT_BUFFER_SIZE - iDataLen;
00503 
00504       memcpy(iOutputBuffer + iDataLen, data, tfer);
00505       flush();
00506       len -= tfer;
00507       memcpy(iOutputBuffer, ((char *) data) + tfer, len);
00508       iDataLen = len;
00509     }
00510     else {
00511       memcpy(iOutputBuffer + iDataLen, data, len);
00512       iDataLen += len;
00513     }
00514   }
00515   else {
00516     /* If the block give is large enough, the
00517      * writing directly from the block saves copying the
00518      * data to the local output buffer buffer:
00519      */
00520     flush();
00521     writeBlock(data, len);
00522   }
00523 #else
00524   writeBlock(data, len);
00525 #endif
00526 }
00527 
00528 void CSSocket::write(char ch)
00529 {
00530   enter_();
00531   writeBlock(&ch, 1);
00532   exit_();
00533 }
00534 
00535 void CSSocket::flush()
00536 {
00537 #ifdef CS_USE_OUTPUT_BUFFER
00538   uint32_t len;
00539 
00540   if ((len = iDataLen)) {
00541     iDataLen = 0;
00542     /* Note: we discard the data to be written if an
00543      * exception occurs.
00544      */
00545     writeBlock(iOutputBuffer, len);
00546   }
00547 #endif
00548 }
00549 
00550 const char *CSSocket::identify()
00551 {
00552   enter_();
00553   if (!iIdentity) {
00554     char buffer[200];
00555 
00556     formatAddress(200, buffer);
00557     iIdentity = cs_strdup(buffer);
00558   }
00559   return_(iIdentity);
00560 }
00561 
00562 
00563