00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include "cslib/CSConfig.h"
00031 #include <inttypes.h>
00032
00033 #include "defs_ms.h"
00034
00035 #include "cslib/CSGlobal.h"
00036 #include "cslib/CSSocket.h"
00037 #include "cslib/CSStrUtil.h"
00038 #include "cslib/CSHTTPStream.h"
00039
00040 #include "connection_handler_ms.h"
00041 #include "network_ms.h"
00042 #include "open_table_ms.h"
00043 #include "engine_ms.h"
00044 #include "version_ms.h"
00045
00046
00047
00048 u_long MSConnectionHandler::gMaxKeepAlive;
00049
00050 MSConnectionHandler::MSConnectionHandler(CSThreadList *list):
00051 CSDaemon(list),
00052 amWaitingToListen(false),
00053 shuttingDown(false),
00054 lastUse(0),
00055 replyPending(false),
00056 iInputStream(NULL),
00057 iOutputStream(NULL),
00058 iTableURI(NULL)
00059 {
00060 }
00061
00062 void MSConnectionHandler::close()
00063 {
00064 closeStream();
00065 freeRequestURI();
00066 }
00067
00068 MSConnectionHandler *MSConnectionHandler::newHandler(CSThreadList *list)
00069 {
00070 return new MSConnectionHandler(list);
00071 }
00072
00073
00074 bool MSConnectionHandler::openStream()
00075 {
00076 CSSocket *sock;
00077 CSInputStream *in;
00078 CSOutputStream *out;
00079
00080 enter_();
00081 if (!(sock = MSNetwork::openConnection(this)))
00082 return_(false);
00083 push_(sock);
00084 in = sock->getInputStream();
00085 in = CSBufferedInputStream::newStream(in);
00086 iInputStream = CSHTTPInputStream::newStream(in);
00087
00088 out = sock->getOutputStream();
00089 out = CSBufferedOutputStream::newStream(out);
00090 iOutputStream = CSHTTPOutputStream::newStream(out);
00091 release_(sock);
00092 return_(true);
00093 }
00094
00095 int MSConnectionHandler::getHTTPStatus(int err)
00096 {
00097 int code;
00098
00099 switch (err) {
00100 case MS_OK: code = 200; break;
00101 case MS_ERR_ENGINE: code = 500; break;
00102 case MS_ERR_UNKNOWN_TABLE: code = 404; break;
00103 case MS_ERR_UNKNOWN_DB: code = 404; break;
00104 case MS_ERR_DATABASE_DELETED: code = 404; break;
00105 case MS_ERR_NOT_FOUND: code = 404; break;
00106 case MS_ERR_REMOVING_REPO: code = 404; break;
00107 case MS_ERR_TABLE_LOCKED: code = 412; break;
00108 case MS_ERR_INCORRECT_URL: code = 404; break;
00109 case MS_ERR_AUTH_FAILED: code = 403; break;
00110 default: code = 500; break;
00111 }
00112 return code;
00113 }
00114
00115 void MSConnectionHandler::writeException(const char *qualifier)
00116 {
00117 int code;
00118
00119 enter_();
00120 iOutputStream->clearHeaders();
00121 iOutputStream->clearBody();
00122 code = getHTTPStatus(myException.getErrorCode());
00123 iOutputStream->setStatus(code);
00124 iOutputStream->appendBody("<HTML><HEAD><TITLE>HTTP Error ");
00125 iOutputStream->appendBody(code);
00126 iOutputStream->appendBody(": ");
00127 iOutputStream->appendBody(CSHTTPOutputStream::getReasonPhrase(code));
00128 iOutputStream->appendBody("</TITLE></HEAD>");
00129 iOutputStream->appendBody("<BODY><H2>HTTP Error ");
00130 iOutputStream->appendBody(code);
00131 iOutputStream->appendBody(": ");
00132 iOutputStream->appendBody(CSHTTPOutputStream::getReasonPhrase(code));
00133 iOutputStream->appendBody("</H2>");
00134 if (qualifier)
00135 iOutputStream->appendBody(qualifier);
00136 iOutputStream->appendBody(EXCEPTION_REPLY_MESSAGE_PREFIX_TAG);
00137 iOutputStream->appendBody(myException.getMessage());
00138 iOutputStream->appendBody(EXCEPTION_REPLY_MESSAGE_SUFFIX_TAG);
00139 iOutputStream->appendBody(myException.getStackTrace());
00140 iOutputStream->appendBody(EXCEPTION_REPLY_STACK_TRACE_SUFFIX_TAG);
00141 iOutputStream->appendBody("MySQL ");
00142 iOutputStream->appendBody(PBMSVersion::getCString());
00143 iOutputStream->appendBody(", PBMS ");
00144 iOutputStream->appendBody(PBMSVersion::getCString());
00145 iOutputStream->appendBody("<br>Copyright © 2009, PrimeBase Technologies GmbH</font></P></BODY></HTML>");
00146
00147 replyPending = false;
00148 iOutputStream->writeHead();
00149 iOutputStream->writeBody();
00150 iOutputStream->flush();
00151 exit_();
00152 }
00153
00154 void MSConnectionHandler::writeException()
00155 {
00156 writeException(NULL);
00157 }
00158
00159 void MSConnectionHandler::closeStream()
00160 {
00161 enter_();
00162 if (iOutputStream) {
00163 if (replyPending) {
00164 try_(a) {
00165 writeException();
00166 }
00167 catch_(a) {
00168 }
00169 cont_(a);
00170 }
00171 iOutputStream->release();
00172 iOutputStream = NULL;
00173 }
00174 if (iInputStream) {
00175 iInputStream->release();
00176 iInputStream = NULL;
00177 }
00178 exit_();
00179 }
00180
00181 void MSConnectionHandler::parseRequestURI()
00182 {
00183 CSString *uri = iInputStream->getRequestURI();
00184 uint32_t pos = 0, end;
00185 enter_();
00186
00187 freeRequestURI();
00188 pos = uri->locate(0, "://");
00189 if (pos < uri->length())
00190 pos += 3;
00191 else
00192 pos = uri->skip(0, '/');
00193
00194
00195 end = uri->locate(pos, '/');
00196
00197
00198 iTableURI = uri->substr(pos);
00199
00200 exit_();
00201 }
00202
00203 void MSConnectionHandler::freeRequestURI()
00204 {
00205 if (iTableURI)
00206 iTableURI->release();
00207 iTableURI = NULL;
00208 }
00209
00210 void MSConnectionHandler::writeFile(CSString *file_path)
00211 {
00212 CSPath *path;
00213 CSFile *file;
00214
00215 enter_();
00216 push_(file_path);
00217
00218 path = CSPath::newPath(RETAIN(file_path));
00219 pop_(file_path);
00220 push_(path);
00221 if (path->exists()) {
00222 file = path->openFile(CSFile::READONLY);
00223 push_(file);
00224
00225 iOutputStream->setContentLength((uint64_t) path->getSize());
00226 replyPending = false;
00227 iOutputStream->writeHead();
00228
00229 CSStream::pipe(RETAIN(iOutputStream), file->getInputStream());
00230
00231 release_(file);
00232 }
00233 else {
00234 myException.initFileError(CS_CONTEXT, path->getCString(), ENOENT);
00235 writeException();
00236 }
00237 release_(path);
00238
00239 exit_();
00240 }
00241
00242
00243
00244
00245
00246
00247 void MSConnectionHandler::handleGet(bool info_only)
00248 {
00249 const char *bad_url_comment = "Incorrect URL: ";
00250 MSOpenTable *otab;
00251 CSString *info_request;
00252 CSString *ping_request;
00253
00254 enter_();
00255 self->myException.setErrorCode(0);
00256
00257 iOutputStream->clearHeaders();
00258 iOutputStream->clearBody();
00259
00260
00261 parseRequestURI();
00262
00263 ping_request = iInputStream->getHeaderValue(MS_PING_REQUEST);
00264 if (ping_request) {
00265 MSDatabase *db;
00266
00267 db = MSDatabase::getDatabase(ping_request, false);
00268 if (db) {
00269 push_(db);
00270 if (db->myBlobCloud->cl_getDefaultCloudRef()) {
00271 MSCloudInfo *info = db->myBlobCloud->cl_getCloudInfo();
00272 push_(info);
00273 iOutputStream->addHeader(MS_CLOUD_SERVER, info->getServer());
00274 release_(info);
00275 }
00276 release_(db);
00277 }
00278
00279 iOutputStream->setStatus(200);
00280 iOutputStream->writeHead();
00281 iOutputStream->flush();
00282 exit_();
00283
00284 }
00285
00286 info_request = iInputStream->getHeaderValue(MS_BLOB_INFO_REQUEST);
00287 if (info_request) {
00288 info_only = (info_request->compare("yes") == 0);
00289 info_request->release();
00290 }
00291
00292
00293
00294 if (iTableURI->length() == 0)
00295 goto bad_url;
00296
00297
00298 MSBlobURLRec blob;
00299
00300 if (iTableURI->equals("favicon.ico")) {
00301 iOutputStream->setStatus(200);
00302 writeFile(iTableURI);
00303 } else if (PBMSBlobURLTools::couldBeURL(iTableURI->getCString(), &blob)) {
00304 uint64_t size, offset;
00305
00306 if ((! info_only) && iInputStream->getRange(&size, &offset)) {
00307 if (offset >= blob.bu_blob_size) {
00308 iOutputStream->setStatus(416);
00309 iOutputStream->writeHead();
00310 iOutputStream->flush();
00311 exit_();
00312 }
00313
00314 if (size > (blob.bu_blob_size - offset))
00315 size = blob.bu_blob_size - offset;
00316
00317 iOutputStream->setRange(size, offset, blob.bu_blob_size);
00318 } else {
00319 size = blob.bu_blob_size;
00320 offset = 0;
00321 }
00322
00323 if (blob.bu_type == MS_URL_TYPE_BLOB) {
00324 otab = MSTableList::getOpenTableByID(blob.bu_db_id, blob.bu_tab_id);
00325 frompool_(otab);
00326 otab->sendRepoBlob(blob.bu_blob_id, offset, size, blob.bu_auth_code, info_only, iOutputStream);
00327 backtopool_(otab);
00328 } else {
00329 MSRepoFile *repo_file;
00330
00331 if (!(otab = MSTableList::getOpenTableForDB(blob.bu_db_id))) {
00332 char buffer[CS_EXC_MESSAGE_SIZE];
00333 char id_str[12];
00334
00335 snprintf(id_str, 12, "%"PRIu32"", blob.bu_db_id);
00336
00337 cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown database ID # ");
00338 cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, id_str);
00339 CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, buffer);
00340 }
00341 frompool_(otab);
00342 repo_file = otab->getDB()->getRepoFileFromPool(blob.bu_tab_id, false);
00343 frompool_(repo_file);
00344 repo_file->sendBlob(otab, blob.bu_blob_id, offset, size, blob.bu_auth_code, true, info_only, iOutputStream);
00345 backtopool_(repo_file);
00346 backtopool_(otab);
00347 }
00348 }
00349 else {
00350 #ifdef HAVE_ALIAS_SUPPORT
00351
00352 CSString *db_name;
00353 CSString *alias;
00354 MSDatabase * db;
00355 uint32_t repo_id;
00356 uint64_t blob_id;
00357
00358 db_name = iTableURI->left("/");
00359 push_(db_name);
00360 alias = iTableURI->right("/");
00361 push_(alias);
00362
00363 if (db_name->length() == 0 || alias->length() == 0 || alias->length() > BLOB_ALIAS_LENGTH)
00364 goto bad_url;
00365
00366 if (!(otab = MSTableList::getOpenTableForDB(MSDatabase::getDatabaseID(db_name->getCString(), true)))) {
00367 char buffer[CS_EXC_MESSAGE_SIZE];
00368
00369 cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown database: ");
00370 cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, db_name->getCString());
00371 CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, buffer);
00372 }
00373 frompool_(otab);
00374
00375 db = otab->getDB();
00376
00377
00378 if (!db->findBlobWithAlias(alias->getCString(), &repo_id, &blob_id)) {
00379 char buffer[CS_EXC_MESSAGE_SIZE];
00380
00381 cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown alias: ");
00382 cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, alias->getCString());
00383 CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, buffer);
00384 }
00385
00386 MSRepoFile *repo_file = db->getRepoFileFromPool(repo_id, false);
00387
00388 frompool_(repo_file);
00389 repo_file->sendBlob(otab, blob_id, 0, false, info_only, iOutputStream);
00390 backtopool_(repo_file);
00391
00392 backtopool_(otab);
00393
00394 release_(alias);
00395 release_(db_name);
00396
00397 #else
00398 char buffer[CS_EXC_MESSAGE_SIZE];
00399
00400 cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Bad PBMS BLOB URL: ");
00401 cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, iTableURI->getCString());
00402 CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, buffer);
00403 #endif
00404 }
00405
00406
00407 exit_();
00408
00409 bad_url:
00410 char buffer[CS_EXC_MESSAGE_SIZE];
00411
00412 cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, bad_url_comment);
00413 cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, iInputStream->getRequestURI()->getCString());
00414 CSException::throwException(CS_CONTEXT, MS_ERR_INCORRECT_URL, buffer);
00415 exit_();
00416 }
00417
00418 void MSConnectionHandler::handlePut()
00419 {
00420 MSOpenTable *otab = NULL;
00421 uint32_t db_id = 0, tab_id;
00422
00423 enter_();
00424 self->myException.setErrorCode(0);
00425
00426 iOutputStream->clearHeaders();
00427 iOutputStream->clearBody();
00428 iOutputStream->setStatus(200);
00429
00430 parseRequestURI();
00431 if (iTableURI->length() != 0)
00432 MSDatabase::convertTablePathToIDs(iTableURI->getCString(), &db_id, &tab_id, true);
00433
00434
00435 if ((!db_id) || !(otab = MSTableList::getOpenTableByID(db_id, tab_id))) {
00436 char buffer[CS_EXC_MESSAGE_SIZE];
00437
00438 cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Incorrect URL: ");
00439 cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, iInputStream->getRequestURI()->getCString());
00440 CSException::throwException(CS_CONTEXT, MS_ERR_INCORRECT_URL, buffer);
00441 }
00442 frompool_(otab);
00443
00444 uint64_t blob_len, cloud_blob_len = 0;
00445 PBMSBlobURLRec bh;
00446 size_t handle_len;
00447 uint16_t metadata_size = 0;
00448 CSStringBuffer *metadata;
00449
00450 new_(metadata, CSStringBuffer(80));
00451 push_(metadata);
00452
00453 if (! iInputStream->getContentLength(&blob_len)) {
00454 CSException::throwException(CS_CONTEXT, CS_ERR_MISSING_HTTP_HEADER, "Missing content length header");
00455 }
00456
00457
00458
00459 for (uint32_t i = 0; i < iInputStream->numHeaders(); i++) {
00460 CSHeader *header = iInputStream->getHeader(i);
00461 const char *name = header->getNameCString();
00462
00463 push_(header);
00464
00465 if (!strcmp(name, MS_BLOB_SIZE)) {
00466 sscanf(header->getValueCString(), "%"PRIu64"", &cloud_blob_len);
00467 }
00468
00469 if (name && otab->getDB()->isValidHeaderField(name)) {
00470 uint16_t rec_size, name_size, value_size;
00471 const char *value = header->getValueCString();
00472 char *buf;
00473 if (!value)
00474 value = "";
00475
00476 name_size = strlen(name);
00477 value_size = strlen(value);
00478
00479 rec_size = name_size + value_size + 2;
00480 metadata->setLength(metadata_size + rec_size);
00481
00482 buf = metadata->getBuffer(metadata_size);
00483 metadata_size += rec_size;
00484
00485 memcpy(buf, name, name_size);
00486 buf += name_size;
00487 *buf = 0; buf++;
00488
00489 memcpy(buf, value, value_size);
00490 buf += value_size;
00491 *buf = 0;
00492 }
00493
00494 release_(header);
00495 }
00496
00497 if (blob_len) {
00498 char hex_checksum[33];
00499 Md5Digest checksum;
00500
00501 otab->createBlob(&bh, blob_len, metadata->getBuffer(0), metadata_size, RETAIN(iInputStream), NULL, &checksum);
00502
00503 cs_bin_to_hex(33, hex_checksum, 16, checksum.val);
00504 iOutputStream->addHeader(MS_CHECKSUM_TAG, hex_checksum);
00505 } else {
00506 if (!cloud_blob_len)
00507 CSException::throwException(CS_CONTEXT, CS_ERR_MISSING_HTTP_HEADER, "Missing BLOB length header for cloud BLOB.");
00508 if (otab->getDB()->myBlobType == MS_CLOUD_STORAGE) {
00509 CloudKeyRec cloud_key;
00510 uint32_t signature_time;
00511 char time_str[20];
00512 CloudDB *cloud = otab->getDB()->myBlobCloud;
00513 MSCloudInfo *info;
00514
00515
00516 cloud->cl_getNewKey(&cloud_key);
00517 otab->createBlob(&bh, cloud_blob_len, metadata->getBuffer(0), metadata_size, NULL, &cloud_key);
00518
00519 CSString *signature;
00520 signature = cloud->cl_getSignature(&cloud_key, iInputStream->getHeaderValue("Content-Type"), &signature_time);
00521 push_(signature);
00522
00523 info = cloud->cl_getCloudInfo(cloud_key.cloud_ref);
00524 push_(info);
00525 iOutputStream->addHeader(MS_CLOUD_SERVER, info->getServer());
00526 iOutputStream->addHeader(MS_CLOUD_BUCKET, info->getBucket());
00527 iOutputStream->addHeader(MS_CLOUD_KEY, info->getPublicKey());
00528 iOutputStream->addHeader(MS_CLOUD_OBJECT_KEY, cloud->cl_getObjectKey(&cloud_key));
00529 iOutputStream->addHeader(MS_BLOB_SIGNATURE, signature->getCString());
00530 release_(info);
00531
00532 release_(signature);
00533 snprintf(time_str, 20, "%"PRIu32"", signature_time);
00534 iOutputStream->addHeader(MS_BLOB_DATE, time_str);
00535
00536 } else {
00537
00538
00539
00540 bh.bu_data[0] = 0;
00541 }
00542 }
00543 handle_len = strlen(bh.bu_data);
00544 iOutputStream->setContentLength(handle_len);
00545
00546 replyPending = false;
00547 iOutputStream->writeHead();
00548 iOutputStream->write(bh.bu_data, handle_len);
00549 iOutputStream->flush();
00550
00551 release_(metadata);
00552
00553 backtopool_(otab);
00554
00555 exit_();
00556 }
00557
00558 void MSConnectionHandler::serviceConnection()
00559 {
00560 const char *method;
00561 bool threadStarted = false;
00562
00563 for (;;) {
00564 iInputStream->readHead();
00565 if (iInputStream->expect100Continue()) {
00566 iOutputStream->clearHeaders();
00567 iOutputStream->clearBody();
00568 iOutputStream->setStatus(100);
00569 iOutputStream->setContentLength(0);
00570 iOutputStream->writeHead();
00571 iOutputStream->flush();
00572 }
00573
00574 if (!(method = iInputStream->getMethod()))
00575 break;
00576 if (!threadStarted ) {
00577
00578
00579
00580 threadStarted = true;
00581 if (!MSNetwork::gWaitingToListen)
00582 MSNetwork::startConnectionHandler();
00583 }
00584 replyPending = true;
00585 if (strcmp(method, "GET") == 0)
00586 handleGet(false);
00587 else if (strcmp(method, "PUT") == 0 ||
00588 strcmp(method, "POST") == 0)
00589 handlePut();
00590 else if (strcmp(method, "HEAD"))
00591 handleGet(true);
00592 else
00593 CSException::throwCoreError(CS_CONTEXT, CS_ERR_UNKNOWN_METHOD, method);
00594 }
00595 }
00596
00597 bool MSConnectionHandler::initializeWork()
00598 {
00599 return true;
00600 }
00601
00602
00603
00604
00605 bool MSConnectionHandler::doWork()
00606 {
00607 enter_();
00608
00609
00610 if (!openStream()) {
00611 myMustQuit = true;
00612 return_(false);
00613 }
00614
00615
00616 serviceConnection();
00617
00618
00619 close();
00620
00621 return_(false);
00622 }
00623
00624 void *MSConnectionHandler::completeWork()
00625 {
00626 shuttingDown = true;
00627
00628 close();
00629
00630 return NULL;
00631 }
00632
00633 bool MSConnectionHandler::handleException()
00634 {
00635 if (!myMustQuit) {
00636
00637 if (!MSNetwork::gWaitingToListen)
00638 MSNetwork::startConnectionHandler();
00639 }
00640 close();
00641 if (!shuttingDown)
00642 CSDaemon::handleException();
00643 return false;
00644 }
00645
00646