//-< WWWAPI.CPP >----------------------------------------------------*--------* // GOODS Version 1.0 (c) 1997 GARRET * ? * // (Generic Object Oriented Database System) * /\| * // * / \ * // Created: 27-Mar-99 K.A. Knizhnik * / [] \ * // Last update: 1-Jul-99 K.A. Knizhnik * GARRET * //-------------------------------------------------------------------*--------* // Implementation of WWWapi class //-------------------------------------------------------------------*--------* #include "goods.h" #include "support.h" #include "wwwapi.h" #include BEGIN_GOODS_NAMESPACE const size_t init_reply_buffer_size = 4*1024; const size_t kMaxHostNameLen = 255; #define ERROR_TEXT(x) \ "HTTP/1.1 " x "\r\n\ Connection: close\r\n\r\n\ Invalid request to the database\r\n\ \n\r\

" x "

\n\r\ \r\n\r\n" WWWconnection::WWWconnection() { memset(hash_table, 0, sizeof hash_table); sock = NULL; reply_buf = new char[init_reply_buffer_size]; reply_buf_size = init_reply_buffer_size; free_pairs = NULL; peer = NULL; userData = NULL; } WWWconnection::~WWWconnection() { reset(); name_value_pair *nvp, *next; for (nvp = free_pairs; nvp != NULL; nvp = next) { next = nvp->next; delete nvp; } delete[] reply_buf; delete[] peer; } inline char* WWWconnection::extendBuffer(size_t inc) { if (reply_buf_used + inc + 1 >= reply_buf_size) { reply_buf_size = reply_buf_size*2 > reply_buf_used + inc + 1 ? reply_buf_size*2 : reply_buf_used + inc + 1; char* new_buf = new char[reply_buf_size]; memcpy(new_buf, reply_buf, reply_buf_used); delete[] reply_buf; reply_buf = new_buf; } reply_buf_used += inc; return reply_buf; } boolean WWWconnection::terminatedBy(char const* str) const { size_t len = strlen(str); if (len > reply_buf_used - 4) { return False; } return memcmp(reply_buf + reply_buf_used - len, str, len) == 0; } WWWconnection& WWWconnection::append(char const* str) { int pos = reply_buf_used; char* dst = extendBuffer(strlen(str)); unsigned char ch; switch (encoding) { case TAG: strcpy(dst + pos, str); encoding = HTML; break; case HTML: encoding = TAG; while (True) { switch(ch = *str++) { case '<': dst = extendBuffer(3); dst[pos++] = '&'; dst[pos++] = 'l'; dst[pos++] = 't'; dst[pos++] = ';'; break; case '>': dst = extendBuffer(3); dst[pos++] = '&'; dst[pos++] = 'g'; dst[pos++] = 't'; dst[pos++] = ';'; break; case '&': dst = extendBuffer(4); dst[pos++] = '&'; dst[pos++] = 'a'; dst[pos++] = 'm'; dst[pos++] = 'p'; dst[pos++] = ';'; break; case '"': dst = extendBuffer(5); dst[pos++] = '&'; dst[pos++] = 'q'; dst[pos++] = 'u'; dst[pos++] = 'o'; dst[pos++] = 't'; dst[pos++] = ';'; break; case '\0': dst[pos] = '\0'; return *this; default: dst[pos++] = ch; } } break; case URL: encoding = TAG; while (True) { ch = *str++; if (ch == '\0') { dst[pos] = '\0'; return *this; } else if (ch == ' ') { dst[pos++] = '+'; } else if (!isalnum(ch)) { dst = extendBuffer(2); dst[pos++] = '%'; dst[pos++] = (ch >> 4) >= 10 ? (ch >> 4) + 'A' - 10 : (ch >> 4) + '0'; dst[pos++] = (ch & 0xF) >= 10 ? (ch & 0xF) + 'A' - 10 : (ch & 0xF) + '0'; } else { dst[pos++] = ch; } } } return *this; } WWWconnection& WWWconnection::append(char const* buf, nat4 iLength) { int pos = reply_buf_used; char* dst = extendBuffer(iLength); memcpy(dst + pos, buf, iLength); return *this; } void WWWconnection::reset() { reply_buf_used = 0; encoding = TAG; for (int i = itemsof(hash_table); --i >= 0;) { name_value_pair *nvp, *next; for (nvp = hash_table[i]; nvp != NULL; nvp = next) { next = nvp->next; nvp->next = free_pairs; free_pairs = nvp; } hash_table[i] = NULL; } } void WWWconnection::addPair(char const* name, char const* value) { name_value_pair* nvp; if (free_pairs != NULL) { nvp = free_pairs; free_pairs = nvp->next; } else { nvp = new name_value_pair; } unsigned hash_code = string_hash_function(name); nvp->hash_code = hash_code; hash_code %= hash_table_size; nvp->next = hash_table[hash_code]; hash_table[hash_code] = nvp; nvp->value = value; nvp->name = name; } char* WWWconnection::unpack(char* body, size_t length) { char *src = body, *end = body + length; while (src < end) { char* name = src; char ch; char* dst = src; while (src < end && (ch = *src++) != '=') { if (ch == '+') { ch = ' '; } else if (ch == '%') { ch = ((src[0] >= 'A' ? src[0] - 'A'+ 10 : src[0] - '0') << 4) | (src[1] >= 'A' ? src[1] - 'A'+ 10 : src[1] - '0'); src += 2; } *dst++ = ch; } *dst = '\0'; char* value = dst = src; while (src < end && (ch = *src++) != '&') { if (ch == '+') { ch = ' '; } else if (ch == '%') { ch = ((src[0] >= 'A' ? src[0] - 'A'+ 10 : src[0] - '0') << 4) | (src[1] >= 'A' ? src[1] - 'A'+ 10 : src[1] - '0'); src += 2; } *dst++ = ch; } *dst = '\0'; addPair(name, value); } stub = get("stub"); return get("page"); } char* WWWconnection::get(char const* name, int n) { unsigned hash_code = string_hash_function(name); name_value_pair* nvp; for (nvp = hash_table[hash_code % hash_table_size]; nvp != NULL; nvp = nvp->next) { if (nvp->hash_code == hash_code && strcmp(nvp->name, name) == 0) { if (n == 0) { return (char*)nvp->value; } n -= 1; } } return NULL; } /*****************************************************************************\ readHTTPbody - see socket_t::readHTTPbody() \*****************************************************************************/ char* WWWconnection::readHTTPbody(char* ioBuf, int iBufSize, int iSize, char* iBody, int& oLength, int iRedirects, char** oHeader, boolean iAllowFreeForm) { if (!sock) { return NULL; } return sock->readHTTPbody(ioBuf, iBufSize, iSize, iBody, oLength, iRedirects, oHeader, iAllowFreeForm); } /*****************************************************************************\ readHTTPcontent - see socket_t::readHTTPcontent() \*****************************************************************************/ char* WWWconnection::readHTTPcontent(char* ioBuf, int iBufSize, int& oLength, int iRedirects, char** oHeader, boolean iAllowFreeForm) { if (oHeader) { *oHeader = NULL; } if (!sock) { return NULL; } reset(); return sock->readHTTPcontent(ioBuf, iBufSize, oLength, iRedirects, oHeader, iAllowFreeForm); } /*****************************************************************************\ readHTTPheader - read a pending HTTP header into the specified buffer ------------------------------------------------------------------------------- This function reads the HTTP header currently queued for reading on this connection. If an error occurs and this function closes this connection's socket, NULL is returned. If the HTTP header contains a bad request or the HTTP header read operation times out, the appropriate response is streamed across the HTTP connection and a pointer to the buffer is returned (no header is read). Otherwise if the read succeeds, a pointer to the first byte of HTTP content past the HTTP header is returned, ioSize is updated to the total number of bytes read, and ioBuffer will contain the NULL-terminated HTTP header. Note that more characters may be read into the specified buffer than there are in the HTTP header; the returned pointer points to the first character of HTTP content past the header, and ioSize contains the total number of bytes read. So the calculation (ioSize - - ioBuffer) will return the number of HTTP content bytes this function read. ioBuffer specifies a buffer into which this function reads the pending HTTP header iBufSize specifies the size of the specified buffer, in bytes ioSize when this function completes, this value will contain the total number of bytes copied into the specified buffer (including the HTTP header and any HTTP content) iTimeout specifies the socket's timeout value; if unspecified, this function will wait forever for the HTTP header to be read \*****************************************************************************/ char* WWWconnection::readHTTPheader(char* ioBuffer, size_t iBufSize, size_t &ioSize, time_t iTimeout) { char* p; reset(); p = sock->readHTTPheader(ioBuffer, iBufSize, ioSize, iTimeout); if (p == NULL) { delete sock; sock = NULL; } return p; } /*****************************************************************************\ reply - send the cached reply buffer ------------------------------------------------------------------------------- This function sends the cached reply buffer to this connection's peer. This function optionally inserts the length of the content in the reply buffer into the HTTP header in this object's cache. This function also optionally shuts down this connection's socket. This function returns `true' if this connection's socket has been shut down; otherwise `false' is returned. keepAlive specifies whether this connection's socket should be shut down after the buffer is sent insertLength specifies whether the content length should be inserted into the HTTP header that is at the beginning of this connection's reply buffer; this should only be `true' if this object's writeHTTPheader() was used \*****************************************************************************/ boolean WWWconnection::reply(boolean keepAlive, bool insertLength) { if (insertLength) { char* body; char buf[64]; char prev_ch = 0; reply_buf[reply_buf_used] = '\0'; body = strstr(reply_buf, "Content-Length: "); if (body != NULL) { body += 16; int length_pos = body - reply_buf; while ((*body != '\n' || prev_ch != '\n') && (*body != '\r' || prev_ch != '\n') && *body != '\0') { prev_ch = *body++; } if (*body == '\0') { reset(); append(ERROR_TEXT("HTTP/1.1 500 Internal server error")); if (sock) { sock->write(reply_buf, reply_buf_used); } return false; } body += *body == '\n' ? 1 : 2; sprintf(buf, "%u", reply_buf_used - (body - reply_buf)); memcpy(reply_buf + length_pos, buf, strlen(buf)); } } if (sock) { sock->write(reply_buf, reply_buf_used); } else { return true; } if (!keepAlive) { delete sock; sock = NULL; return true; } return false; } /*****************************************************************************\ writeHTTPheader - write the top part of an HTTP header, with space for length ------------------------------------------------------------------------------- This function writes the top portion of an HTTP header to the connection buffer, with a blank space inserted so that the content length can be filled in later by reply() (when we actually know how much content we'll be sending!) iKeepAlive specifies whether the HTTP header should indicate to the client to keep the connection alive or shut it down \*****************************************************************************/ void WWWconnection::writeHTTPheader(bool iKeepAlive) { append("HTTP/1.1 200 OK\r\nContent-Length: \r\n"); append(iKeepAlive ? "Connection: Keep-Alive\r\n" : "Connection: close\r\n"); } //-------------------------------------------------- WWWapi::WWWapi(int n_handlers, dispatcher* dispatch_table) { memset(hash_table, 0, sizeof hash_table); sock = NULL; address = NULL; dispatcher* disp = dispatch_table; while (--n_handlers >= 0) { unsigned hash_code = string_hash_function(disp->page); disp->hash_code = hash_code; hash_code %= hash_table_size; disp->collision_chain = hash_table[hash_code]; hash_table[hash_code] = disp; disp += 1; } } boolean WWWapi::open(char const* socket_address, socket_t::socket_domain domain, int listen_queue) { if (sock != NULL) { close(); } address = new char[strlen(socket_address) + 1]; strcpy(address, socket_address); sock = domain != socket_t::sock_global_domain ? socket_t::create_local(socket_address, listen_queue) : socket_t::create_global(socket_address, listen_queue); return sock != NULL; } boolean WWWapi::connect(WWWconnection& con) { assert(sock != NULL); con.reset(); delete con.sock; con.sock = sock->accept(); con.address = address; return con.sock != NULL; } void WWWapi::close() { delete sock; delete[] address; sock = NULL; } boolean WWWapi::dispatch(WWWconnection& con, char* page) { unsigned hash_code = string_hash_function(page); for (dispatcher* disp = hash_table[hash_code % hash_table_size]; disp != NULL; disp = disp->collision_chain) { if (disp->hash_code == hash_code && strcmp(disp->page, page) == 0) { return disp->func(con); } } return True; } boolean CGIapi::serve(WWWconnection& con) { nat4 length; con.reset(); if ((size_t)con.sock->read(&length, sizeof length, sizeof length) != sizeof(length)) { return True; } int size = length - sizeof length; char* buf = new char[size]; if (con.sock->read(buf, size, size) != size) { return True; } char* page = con.unpack(buf + buf[0], length - sizeof length - buf[0]); char* peer = con.get("peer"); con.peer = new char[strlen(peer)+1]; strcpy(con.peer, peer); boolean result = True; if (page != NULL) { con.extendBuffer(4); result = dispatch(con, page); *(int4*)con.reply_buf = con.reply_buf_used; con.sock->write(con.reply_buf, con.reply_buf_used); } delete con.sock; con.sock = NULL; // close connection return result; } void URL2ASCII(char* src) { char* dst = src; char ch; while ((ch = *src++) != '\0') { if (ch == '%') { *dst++ = ((src[0] - '0') << 8) | (src[1] - '0'); } else if (ch == '+') { *dst++ = ' '; } else { *dst++ = ch; } } *dst = '\0'; } boolean HTTPapi::serve(WWWconnection& con) { const size_t inputBufferSize = 16*1024; char buf[inputBufferSize]; boolean result = False; size_t size = 0; con.peer = con.sock->get_peer_name(); while (true) { char* p = con.readHTTPheader(buf, inputBufferSize, size, connectionHoldTimeout); if (!p) { return true; } else if (p == buf) { break; } int length = INT_MAX; char* lenptr = stristr(buf, "Content-Length: "); boolean persistentConnection = stristr(buf, "Connection: Keep-Alive") != NULL; if (lenptr != NULL) { sscanf(lenptr+15, "%d", &length); } if (strncmp(buf, "GET ", 4) == 0) { char hostBuf[kMaxHostNameLen]; char *host = hostBuf; GetHost(buf, host, kMaxHostNameLen); char* file, *uri = buf; file = strchr(uri, '/'); if (file == NULL) { con.append(ERROR_TEXT("400 Bad Request")); break; } if (*++file == '/') { if (*host != '\0') { host = file+1; } file = strchr(uri, '/'); if (file == NULL) { con.append(ERROR_TEXT("400 Bad Request")); break; } *file++ = '\0'; } char* file_end = strchr(file, ' '); char index_html[] = "index.html"; if (file_end == NULL) { con.append(ERROR_TEXT("400 Bad Request")); break; } if (file_end == file) { file = index_html; } else { *file_end = '\0'; } char* params = strchr(file, '?'); if (params != NULL) { if (*host == '\0') { host = "localhost"; } if (!handleRequest(con, file_end+1, file, params+1, file_end, host, result)) { delete con.sock; con.sock = NULL; return result; } } else { char* err = NULL; if (servePage(con, file_end+1, file, &err)) { return True; } if (err) { con.append(err); break; } } } else if (strncmp(buf, "POST ", 5) == 0) { char hostBuf[kMaxHostNameLen]; char *host = hostBuf; GetHost(buf, host, kMaxHostNameLen); char* body = p; ScanNextPart: int n = length < buf + size - p ? length : buf + size - p; while (--n >= 0 && *p != '\r' && *p != '\n') { p += 1; } if (n < 0 && p - body != length) { if (size >= sizeof(buf) - 1) { con.append(ERROR_TEXT("413 Request Entity Too Large")); break; } int rc = con.sock->read(p, 1, sizeof(buf) - size - 1, connectionHoldTimeout); if (rc < 0) { delete con.sock; con.sock = NULL; return True; } size += rc; goto ScanNextPart; } else { if (*host == '\0') { host = "localhost"; } if (!handleRequest(con, buf, NULL, body, p, host, result)) { delete con.sock; con.sock = NULL; return result; } while (p < buf + size && (*p == '\n' || *p == '\r')) { p += 1; n -= 1; } } } else if (strncmp(buf, "PUT ", 4) == 0) { char* begin; char* data = NULL; int dataLen = 0; char* end; char* header; char hostBuf[kMaxHostNameLen]; char *host = hostBuf; header = new char[strlen(buf) + 1]; strcpy(header, buf); GetHost(buf, host, kMaxHostNameLen); begin = strstr(header, "?"); if (begin != NULL) { end = ++begin; while (*end && (*end != ' ')) end++; con.unpack(begin, end - begin); } data = con.readHTTPbody(buf, inputBufferSize, size, p, dataLen, 0, NULL, true); if (!handlePut(con, buf, host, data, dataLen)) { delete header; delete con.sock; con.sock = NULL; return result; } delete header; } else { con.append(ERROR_TEXT("405 Method not allowed")); break; } if (!persistentConnection) { delete con.sock; con.sock = NULL; return True; } if (p - buf < (long)size) { size -= p - buf; memcpy(buf, p, size); } else { size = 0; } } con.append("Connection: close\r\n\r\n"); if (con.sock != NULL) { con.sock->write(con.reply_buf, con.reply_buf_used); delete con.sock; con.sock = NULL; } return True; } /*****************************************************************************\ GetHost - get the name of the host identified by the specified HTTP header ------------------------------------------------------------------------------- This function returns a pointer to a NULL-terminated string containing the name of the host against which the specified HTTP header will conduct its operation. Note that this function modifies the specified string (inserting a NULL character after the host name) and returns a pointer within the specified string. If no host attribute is found in the specified HTTP header, NULL is returned. ioHeader points to a NULL-terminated string containing the HTTP header whose host name is to be retrieved; this function modifies this string \*****************************************************************************/ char* HTTPapi::GetHost(char* ioHeader) { char* host = stristr(ioHeader, "Host: "); if (host != NULL) { char* q = host += 6; while (*q != '\n' && *q != '\r' && *q != '\0') q += 1; *q = '\0'; } return host; } /*****************************************************************************\ GetHost - get the name of the host identified by the specified HTTP header ------------------------------------------------------------------------------- This function copies a NULL-terminated string into the specified buffer, a string containing the name of the host against which the specified HTTP header will conduct its operation. Unlike the GetHost() implementation above, this function does NOT modify the header from which the host name is copied. The length of the host name is returned. Note that this number may be larger than the size of the buffer. iHeader points to a NULL-terminated string containing the HTTP header whose host name is to be retrieved oBuf points to a buffer into which the host name (or as much of it that will fit) is copied; may be NULL if iBufLen is zero; unless this number is zero the copied string will always be NULL-terminated (even if the whole host name will not fit) iBufLen specifies the size of the buffer (oBuf); if this number is smaller than the length of the host name, as much of the host name as will fit is copied into the buffer \*****************************************************************************/ nat4 HTTPapi::GetHost(char* iHeader, char* oBuf, nat4 iBufLen) { char* host = stristr(iHeader, "Host: "); nat4 hostLen = 0; if (host != NULL) { const char* q = host += 6; while (*q != '\n' && *q != '\r' && *q != '\0') { if (hostLen < iBufLen) oBuf[hostLen] = *q; hostLen += 1; q += 1; } hostLen = (nat4)q - (nat4)host; if (hostLen < iBufLen) oBuf[hostLen] = 0; else oBuf[iBufLen - 1] = 0; } else if (oBuf) { *oBuf = 0; } return hostLen; } /*****************************************************************************\ GetType - get the MIME type of the specified file \*****************************************************************************/ const char* HTTPapi::GetType(const char *iFile) { int fileLen = strlen(iFile); if((fileLen > 4) && (stricmp(&(iFile[fileLen - 4]), ".gif") == 0)) return "image/gif"; else if((fileLen > 4) && (stricmp(&(iFile[fileLen - 4]), ".jpg") == 0)) return "image/jpeg"; else if((fileLen > 4) && (stricmp(&(iFile[fileLen - 4]), ".txt") == 0)) return "text/plain"; else if((fileLen > 4) && (stricmp(&(iFile[fileLen - 4]), ".png") == 0)) return "image/png"; else if((fileLen > 4) && (stricmp(&(iFile[fileLen - 4]), ".exe") == 0)) return "application/octet-stream"; return "text/html"; } /*****************************************************************************\ servePage - serve the specified file across the specified connection ------------------------------------------------------------------------------- This function streams the specified file across the specified connection. Return Value: - If an error occurs, false is returned and `*err' will point to a string description of the error that occurred. - If the file is streamed successfully and the connection should not be kept alive, true is returned. - If the file is streamed successfully and the connection should be kept alive, false is returned and *err is NULL. Arguments: con specifies the connection across which the specified file is to be streamed header specifies the request's HTTP header file specifies the path to the file to be served across the connection err points to a character pointer which will be set to NULL if the file is streamed successfully or a string description of the error that occurred if the streaming fails \*****************************************************************************/ boolean HTTPapi::servePage(WWWconnection& con, char* header, char* file, char** err) { URL2ASCII(file); FILE* f = fopen(file, "rb"); if (f == NULL) { *err = ERROR_TEXT("404 Not found"); return False; } fseek(f, 0, SEEK_END); size_t file_size = ftell(f); fseek(f, 0, SEEK_SET); char reply[1024]; sprintf(reply, "HTTP/1.1 200 OK\r\nContent-Length: %u\r\n" "Content-Type: %s\r\nConnection: %s\r\n\r\n", file_size, GetType(file), keepConnectionAlive ? "Keep-Alive" : "close"); con.append(reply); size_t pos = con.reply_buf_used; char* dst = con.extendBuffer(file_size); if (dst == NULL) { con.reset(); *err = ERROR_TEXT("413 Request Entity Too Large"); return False; } if (fread(dst + pos, 1, file_size, f) != file_size) { con.reset(); *err = ERROR_TEXT("500 Internal server error"); return False; } fclose(f); if (!con.sock->write(dst, con.reply_buf_used) || !keepConnectionAlive) { delete con.sock; con.sock = NULL; return True; } return False; } /*****************************************************************************\ handlePut - handle a PUT method ------------------------------------------------------------------------------- This overridable function is called when an HTTP request arrives as a PUT method. Overrides of this function should stream whatever content they wish into `con', do whatever they wish with the content buffer passed into this function, then call `con.reply(true)', then `return keepConnectionAlive'. This function is responsible for seeing that the buffer `content' is freed. The default implementation indicates that this method is not allowed. con specifies the connection from which the HTTP request originated and to which any response should be streamed header points to a string containing the NULL-terminated header from the HTTP request host specifies the name of the server host to which the HTTP request was directed content points to an allocated buffer containing the body content of the HTTP PUT request; this function becomes the owner of this buffer (it is responsible for freeing the buffer or making sure someone frees it) contentLength specifies the size (in bytes) of the body content buffer \*****************************************************************************/ boolean HTTPapi::handlePut(WWWconnection& con, char* header, char* host, char* content, nat4 contentLength) { delete content; con.append("HTTP/1.1 405 Method not allowed\r\n"); return con.reply(keepConnectionAlive); } /*****************************************************************************\ handleRequest - handle a GET with parameters or a POST ------------------------------------------------------------------------------- This overridable function is called when an HTTP request arrives as a POST method or as a GET method with parameters. Overrides of this function should stream whatever content they wish into `con', then call `con.reply(true)', then `return keepConnectionAlive'. The default implementation dispatches requests to a function in this object's dispatcher table, based on the value of the `page' parameter. An error is reported across the connection if a corresponding page function is not found or if an internal server error occurs. con specifies the connection from which the HTTP request originated and to which any response should be streamed header points to a string containing the header from the HTTP request url points to the requested URL, or NULL if the request was made using the POST method begin points to the first character of the HTTP request parameter list end points to the last character of the HTTP request parameter list host specifies the name of the server host to which the HTTP request was directed result currently unused; overrides should set to true before exiting \*****************************************************************************/ boolean HTTPapi::handleRequest(WWWconnection& con, char*, char*, char* begin, char* end, char* host, boolean& result) { char buf[64]; char ch = *end; char* page = con.unpack(begin, end - begin); if (page != NULL) { con.writeHTTPheader(keepConnectionAlive); sprintf(buf, "http://%s/cgistub", host); con.stub = buf; result = dispatch(con, page); con.reply(true, true); *end = ch; return result && keepConnectionAlive; } else { con.append(ERROR_TEXT("406 Not acceptable")); con.sock->write(con.reply_buf, con.reply_buf_used); result = True; *end = ch; return False; } } //---------------------------------------------------- void task_proc QueueManager::handleThread(void* arg) { ((QueueManager*)arg)->handle(); } QueueManager::QueueManager(void) : connectionPool(NULL), start(cs), done(cs), threads(NULL) { } QueueManager::QueueManager(WWWapi& api, int nThreads, int connectionQueueLen, size_t threadStackSize) : start(cs), done(cs) { run(api, nThreads, connectionQueueLen, threadStackSize); } void QueueManager::run(WWWapi& api, int nThreads, int connectionQueueLen, size_t threadStackSize) { boolean connected = false; // Allocate the thread pool. Each thread will enter the handle() function // and wait their turn to handle a connection. assert(nThreads >= 1 && connectionQueueLen >= 1); this->nThreads = nThreads; threads = new task*[nThreads]; while (--nThreads >= 0) { threads[nThreads] = task::create(handleThread, this, task::pri_normal, threadStackSize); } // Allocate the connection pool, and place each connection into the free // queue. connectionPool = new WWWconnection[connectionQueueLen]; connectionPool[--connectionQueueLen].next = NULL; while (--connectionQueueLen >= 0) { connectionPool[connectionQueueLen].next = &connectionPool[connectionQueueLen+1]; } freeList = connectionPool; waitList = NULL; // This loop grabs the next free connection, and waits for an incoming // request. Once a request is received, the connection is placed in the // wait queue so a thread (waiting in handle()) can serve it. The critical // section is used to maintain the integrity of the wait and free queues. server = &api; cs.enter(); while (server != NULL) { // Grab a connection from the free queue. If there are no free // connections, wait until one of the handle() threads finishes serving // a connection's request. if (freeList == NULL) { done.reset(); done.wait(); if (server == NULL) { break; } assert(freeList != NULL); } WWWconnection* con = freeList; freeList = con->next; WWWapi* srv = server; cs.leave(); // Wait for an incoming request. If the server pointer is set to NULL // (presumably by another thread) then it's time to shut down the // QueueManager. connected = srv->connect(*con); if (server == NULL) { return; } // If the socket connected successfully, place the connection we // grabbed from the free queue into the wait queue and signal the next // thread waiting in handle() to serve the connection. cs.enter(); if (connected) { con->next = waitList; waitList = con; start.signal(); } } cs.leave(); } void QueueManager::handle() { // This critical section queues up the next thread to handle the next // connection inserted into the wait queue by the server thread running in // run(). One thread per connection. cs.enter(); while (True) { // Wait for the server thread (in run()) to queue a waiting connection. start.wait(); // If server is set to NULL, then it's time to shut down the // QueueManager. WWWapi* api = server; if (api == NULL) { break; } // Remove the next waiting connection from the wait queue. WWWconnection* con = waitList; assert(con != NULL); waitList = con->next; // Leaving this critical section allows the next thread to handle the // next connection in the wait queue. cs.leave(); // Handle the connection's request. api->serve(*con); // Return the connection to the free queue. The critical section is // used to maintain the integrity of the free queue. cs.enter(); if (freeList == NULL) { done.signal(); } con->next = freeList; freeList = con; } cs.leave(); } void QueueManager::stop() { cs.enter(); server = NULL; while (--nThreads >= 0) { start.signal(); } done.signal(); cs.leave(); } QueueManager::~QueueManager() { if(threads) { delete[] threads; } if(connectionPool) { delete[] connectionPool; } } END_GOODS_NAMESPACE