Drizzled Public API Documentation

discover_ms.cc
00001 /* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
00002  * Derived from code Copyright (C) 2000-2004 MySQL AB
00003  *
00004  * PrimeBase MS
00005  *
00006  * This program is free software; you can redistribute it and/or modify
00007  * it under the terms of the GNU General Public License as published by
00008  * the Free Software Foundation; either version 2 of the License, or
00009  * (at your option) any later version.
00010  *
00011  * This program is distributed in the hope that it will be useful,
00012  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  * GNU General Public License for more details.
00015  *
00016  * You should have received a copy of the GNU General Public License
00017  * along with this program; if not, write to the Free Software
00018  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
00019  *
00020  *  Created by Barry Leslie on 8/27/08.
00021  *
00022  */
00023 
00024 #ifndef DRIZZLED
00025 #include "discover_ms.h"
00026 
00027 #include "mysql_priv.h"
00028 #include "item_create.h"
00029 #include <m_ctype.h>
00030 #include "cslib/CSConfig.h"
00031 #include "cslib/CSGlobal.h"
00032 #include "cslib/CSThread.h"
00033 #include "ha_pbms.h"
00034 #undef new
00035 
00036 #if MYSQL_VERSION_ID > 60005
00037 #define DOT_STR(x)      x.str
00038 #else
00039 #define DOT_STR(x)      x
00040 #endif
00041 
00042 #define LOCK_OPEN_HACK_REQUIRED
00043 
00044 #ifdef LOCK_OPEN_HACK_REQUIRED
00045 
00046 /*
00047  * Unfortunately I cannot use the standard mysql_create_table_no_lock() because it will lock "LOCK_open"
00048  * which has already been locked while the server is performing table discovery. So I have added this hack 
00049  * in here to create my own version. The following macros will make the changes I need to get it to work.
00050  * The actual function code has been copied here without changes.
00051  *
00052  * Its almost enough to make you want to cry. :(
00053 */
00054 //-----------------------------
00055 
00056 #ifdef pthread_mutex_lock
00057 #undef pthread_mutex_lock
00058 #endif
00059 
00060 #ifdef pthread_mutex_unlock
00061 #undef pthread_mutex_unlock
00062 #endif
00063 
00064 #define mysql_create_table_no_lock hacked_mysql_create_table_no_lock
00065 #define pthread_mutex_lock(l)
00066 #define pthread_mutex_unlock(l)
00067 
00068 #define check_engine(t, n, c) (0)
00069 #define set_table_default_charset(t, c, d)
00070 
00071 void calculate_interval_lengths(CHARSET_INFO *cs, TYPELIB *interval,
00072                                 uint32 *max_length, uint32 *tot_length);
00073 
00074 uint build_tmptable_filename(THD* thd, char *buff, size_t bufflen);
00075 uint build_table_filename(char *buff, size_t bufflen, const char *db,
00076                           const char *table_name, const char *ext, uint flags);
00077 
00081 
00082 // sort_keys() cut and pasted directly from sql_table.cc. 
00083 static int sort_keys(KEY *a, KEY *b)
00084 {
00085   ulong a_flags= a->flags, b_flags= b->flags;
00086   
00087   if (a_flags & HA_NOSAME)
00088   {
00089     if (!(b_flags & HA_NOSAME))
00090       return -1;
00091     if ((a_flags ^ b_flags) & (HA_NULL_PART_KEY | HA_END_SPACE_KEY))
00092     {
00093       /* Sort NOT NULL keys before other keys */
00094       return (a_flags & (HA_NULL_PART_KEY | HA_END_SPACE_KEY)) ? 1 : -1;
00095     }
00096     if (a->name == primary_key_name)
00097       return -1;
00098     if (b->name == primary_key_name)
00099       return 1;
00100     /* Sort keys don't containing partial segments before others */
00101     if ((a_flags ^ b_flags) & HA_KEY_HAS_PART_KEY_SEG)
00102       return (a_flags & HA_KEY_HAS_PART_KEY_SEG) ? 1 : -1;
00103   }
00104   else if (b_flags & HA_NOSAME)
00105     return 1;         // Prefer b
00106 
00107   if ((a_flags ^ b_flags) & HA_FULLTEXT)
00108   {
00109     return (a_flags & HA_FULLTEXT) ? 1 : -1;
00110   }
00111   /*
00112     Prefer original key order.  usable_key_parts contains here
00113     the original key position.
00114   */
00115   return ((a->usable_key_parts < b->usable_key_parts) ? -1 :
00116     (a->usable_key_parts > b->usable_key_parts) ? 1 :
00117     0);
00118 }
00119 
00120 // check_if_keyname_exists() cut and pasted directly from sql_table.cc. 
00121 static bool
00122 check_if_keyname_exists(const char *name, KEY *start, KEY *end)
00123 {
00124   for (KEY *key=start ; key != end ; key++)
00125     if (!my_strcasecmp(system_charset_info,name,key->name))
00126       return 1;
00127   return 0;
00128 }
00129 
00130 // make_unique_key_name() cut and pasted directly from sql_table.cc. 
00131 static char *
00132 make_unique_key_name(const char *field_name,KEY *start,KEY *end)
00133 {
00134   char buff[MAX_FIELD_NAME],*buff_end;
00135 
00136   if (!check_if_keyname_exists(field_name,start,end) &&
00137       my_strcasecmp(system_charset_info,field_name,primary_key_name))
00138     return (char*) field_name;      // Use fieldname
00139   buff_end=strmake(buff,field_name, sizeof(buff)-4);
00140 
00141   /*
00142     Only 3 chars + '\0' left, so need to limit to 2 digit
00143     This is ok as we can't have more than 100 keys anyway
00144   */
00145   for (uint i=2 ; i< 100; i++)
00146   {
00147     *buff_end= '_';
00148     int10_to_str(i, buff_end+1, 10);
00149     if (!check_if_keyname_exists(buff,start,end))
00150       return sql_strdup(buff);
00151   }
00152   return (char*) "not_specified";   // Should never happen
00153 }
00154 
00155 
00156 // prepare_blob_field() cut and pasted directly from sql_table.cc. 
00157 static bool prepare_blob_field(THD *thd, Create_field *sql_field)
00158 {
00159   DBUG_ENTER("prepare_blob_field");
00160 
00161   if (sql_field->length > MAX_FIELD_VARCHARLENGTH &&
00162       !(sql_field->flags & BLOB_FLAG))
00163   {
00164     /* Convert long VARCHAR columns to TEXT or BLOB */
00165     char warn_buff[MYSQL_ERRMSG_SIZE];
00166 
00167     if (sql_field->def || (thd->variables.sql_mode & (MODE_STRICT_TRANS_TABLES |
00168                                                       MODE_STRICT_ALL_TABLES)))
00169     {
00170       my_error(ER_TOO_BIG_FIELDLENGTH, MYF(0), sql_field->field_name,
00171                MAX_FIELD_VARCHARLENGTH / sql_field->charset->mbmaxlen);
00172       DBUG_RETURN(1);
00173     }
00174     sql_field->sql_type= MYSQL_TYPE_BLOB;
00175     sql_field->flags|= BLOB_FLAG;
00176     snprintf(warn_buff, MYSQL_ERRMSG_SIZE, ER(ER_AUTO_CONVERT), sql_field->field_name,
00177             (sql_field->charset == &my_charset_bin) ? "VARBINARY" : "VARCHAR",
00178             (sql_field->charset == &my_charset_bin) ? "BLOB" : "TEXT");
00179     push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_AUTO_CONVERT,
00180                  warn_buff);
00181   }
00182     
00183   if ((sql_field->flags & BLOB_FLAG) && sql_field->length)
00184   {
00185     if (sql_field->sql_type == MYSQL_TYPE_BLOB)
00186     {
00187       /* The user has given a length to the blob column */
00188       sql_field->sql_type= get_blob_type_from_length(sql_field->length);
00189       sql_field->pack_length= calc_pack_length(sql_field->sql_type, 0);
00190     }
00191     sql_field->length= 0;
00192   }
00193   DBUG_RETURN(0);
00194 }
00195 
00197 // mysql_prepare_create_table() cut and pasted directly from sql_table.cc.
00198 static int
00199 mysql_prepare_create_table(THD *thd, HA_CREATE_INFO *create_info,
00200                            Alter_info *alter_info,
00201                            bool tmp_table,
00202                            uint *db_options,
00203                            handler *file, KEY **key_info_buffer,
00204                            uint *key_count, int select_field_count)
00205 {
00206   const char  *key_name;
00207   Create_field  *sql_field,*dup_field;
00208   uint    field,null_fields,blob_columns,max_key_length;
00209   ulong   record_offset= 0;
00210   KEY   *key_info;
00211   KEY_PART_INFO *key_part_info;
00212   int   timestamps= 0, timestamps_with_niladic= 0;
00213   int   field_no,dup_no;
00214   int   select_field_pos,auto_increment=0;
00215   List<Create_field>::iterator it(alter_info->create_list);
00216   List<Create_field>::iterator it2(alter_info->create_list);
00217   uint total_uneven_bit_length= 0;
00218   DBUG_ENTER("mysql_prepare_create_table");
00219 
00220   select_field_pos= alter_info->create_list.elements - select_field_count;
00221   null_fields=blob_columns=0;
00222   create_info->varchar= 0;
00223   max_key_length= file->max_key_length();
00224 
00225   for (field_no=0; (sql_field=it++) ; field_no++)
00226   {
00227     CHARSET_INFO *save_cs;
00228 
00229     /*
00230       Initialize length from its original value (number of characters),
00231       which was set in the parser. This is necessary if we're
00232       executing a prepared statement for the second time.
00233     */
00234     sql_field->length= sql_field->char_length;
00235     if (!sql_field->charset)
00236       sql_field->charset= create_info->default_table_charset;
00237     /*
00238       table_charset is set in ALTER TABLE if we want change character set
00239       for all varchar/char columns.
00240       But the table charset must not affect the BLOB fields, so don't
00241       allow to change my_charset_bin to somethig else.
00242     */
00243     if (create_info->table_charset && sql_field->charset != &my_charset_bin)
00244       sql_field->charset= create_info->table_charset;
00245 
00246     save_cs= sql_field->charset;
00247     if ((sql_field->flags & BINCMP_FLAG) &&
00248   !(sql_field->charset= get_charset_by_csname(sql_field->charset->csname,
00249                 MY_CS_BINSORT,MYF(0))))
00250     {
00251       char tmp[64];
00252       strmake(strmake(tmp, save_cs->csname, sizeof(tmp)-4),
00253               STRING_WITH_LEN("_bin"));
00254       my_error(ER_UNKNOWN_COLLATION, MYF(0), tmp);
00255       DBUG_RETURN(TRUE);
00256     }
00257 
00258     /*
00259       Convert the default value from client character
00260       set into the column character set if necessary.
00261     */
00262     if (sql_field->def && 
00263         save_cs != sql_field->def->collation.collation &&
00264         (sql_field->sql_type == MYSQL_TYPE_VAR_STRING ||
00265          sql_field->sql_type == MYSQL_TYPE_STRING ||
00266          sql_field->sql_type == MYSQL_TYPE_SET ||
00267          sql_field->sql_type == MYSQL_TYPE_ENUM))
00268     {
00269       /*
00270         Starting from 5.1 we work here with a copy of Create_field
00271         created by the caller, not with the instance that was
00272         originally created during parsing. It's OK to create
00273         a temporary item and initialize with it a member of the
00274         copy -- this item will be thrown away along with the copy
00275         at the end of execution, and thus not introduce a dangling
00276         pointer in the parsed tree of a prepared statement or a
00277         stored procedure statement.
00278       */
00279       sql_field->def= sql_field->def->safe_charset_converter(save_cs);
00280 
00281       if (sql_field->def == NULL)
00282       {
00283         /* Could not convert */
00284         my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name);
00285         DBUG_RETURN(TRUE);
00286       }
00287     }
00288 
00289     if (sql_field->sql_type == MYSQL_TYPE_SET ||
00290         sql_field->sql_type == MYSQL_TYPE_ENUM)
00291     {
00292       uint32 dummy;
00293       CHARSET_INFO *cs= sql_field->charset;
00294       TYPELIB *interval= sql_field->interval;
00295 
00296       /*
00297         Create typelib from interval_list, and if necessary
00298         convert strings from client character set to the
00299         column character set.
00300       */
00301       if (!interval)
00302       {
00303         /*
00304           Create the typelib in runtime memory - we will free the
00305           occupied memory at the same time when we free this
00306           sql_field -- at the end of execution.
00307         */
00308         interval= sql_field->interval= typelib(thd->mem_root,
00309                                                sql_field->interval_list);
00310         List<String>::iterator int_it(sql_field->interval_list);
00311         String conv, *tmp;
00312         char comma_buf[2];
00313         int comma_length= cs->cset->wc_mb(cs, ',', (uchar*) comma_buf,
00314                                           (uchar*) comma_buf + 
00315                                           sizeof(comma_buf));
00316         DBUG_ASSERT(comma_length > 0);
00317         for (uint i= 0; (tmp= int_it++); i++)
00318         {
00319           uint lengthsp;
00320           if (String::needs_conversion(tmp->length(), tmp->charset(),
00321                                        cs, &dummy))
00322           {
00323             uint cnv_errs;
00324             conv.copy(tmp->ptr(), tmp->length(), tmp->charset(), cs, &cnv_errs);
00325             interval->type_names[i]= strmake_root(thd->mem_root, conv.ptr(),
00326                                                   conv.length());
00327             interval->type_lengths[i]= conv.length();
00328           }
00329 
00330           // Strip trailing spaces.
00331           lengthsp= cs->cset->lengthsp(cs, interval->type_names[i],
00332                                        interval->type_lengths[i]);
00333           interval->type_lengths[i]= lengthsp;
00334           ((uchar *)interval->type_names[i])[lengthsp]= '\0';
00335           if (sql_field->sql_type == MYSQL_TYPE_SET)
00336           {
00337             if (cs->coll->instr(cs, interval->type_names[i], 
00338                                 interval->type_lengths[i], 
00339                                 comma_buf, comma_length, NULL, 0))
00340             {
00341               my_error(ER_ILLEGAL_VALUE_FOR_TYPE, MYF(0), "set", tmp->ptr());
00342               DBUG_RETURN(TRUE);
00343             }
00344           }
00345         }
00346         sql_field->interval_list.empty(); // Don't need interval_list anymore
00347       }
00348 
00349       if (sql_field->sql_type == MYSQL_TYPE_SET)
00350       {
00351         uint32 field_length;
00352         if (sql_field->def != NULL)
00353         {
00354           char *not_used;
00355           uint not_used2;
00356           bool not_found= 0;
00357           String str, *def= sql_field->def->val_str(&str);
00358           if (def == NULL) /* SQL "NULL" maps to NULL */
00359           {
00360             if ((sql_field->flags & NOT_NULL_FLAG) != 0)
00361             {
00362               my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name);
00363               DBUG_RETURN(TRUE);
00364             }
00365 
00366             /* else, NULL is an allowed value */
00367             (void) find_set(interval, NULL, 0,
00368                             cs, &not_used, &not_used2, &not_found);
00369           }
00370           else /* not NULL */
00371           {
00372             (void) find_set(interval, def->ptr(), def->length(),
00373                             cs, &not_used, &not_used2, &not_found);
00374           }
00375 
00376           if (not_found)
00377           {
00378             my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name);
00379             DBUG_RETURN(TRUE);
00380           }
00381         }
00382         calculate_interval_lengths(cs, interval, &dummy, &field_length);
00383         sql_field->length= field_length + (interval->count - 1);
00384       }
00385       else  /* MYSQL_TYPE_ENUM */
00386       {
00387         uint32 field_length;
00388         DBUG_ASSERT(sql_field->sql_type == MYSQL_TYPE_ENUM);
00389         if (sql_field->def != NULL)
00390         {
00391           String str, *def= sql_field->def->val_str(&str);
00392           if (def == NULL) /* SQL "NULL" maps to NULL */
00393           {
00394             if ((sql_field->flags & NOT_NULL_FLAG) != 0)
00395             {
00396               my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name);
00397               DBUG_RETURN(TRUE);
00398             }
00399 
00400             /* else, the defaults yield the correct length for NULLs. */
00401           } 
00402           else /* not NULL */
00403           {
00404             def->length(cs->cset->lengthsp(cs, def->ptr(), def->length()));
00405             if (find_type2(interval, def->ptr(), def->length(), cs) == 0) /* not found */
00406             {
00407               my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name);
00408               DBUG_RETURN(TRUE);
00409             }
00410           }
00411         }
00412         calculate_interval_lengths(cs, interval, &field_length, &dummy);
00413         sql_field->length= field_length;
00414       }
00415       set_if_smaller(sql_field->length, MAX_FIELD_WIDTH-1);
00416     }
00417 
00418     if (sql_field->sql_type == MYSQL_TYPE_BIT)
00419     { 
00420       sql_field->pack_flag= FIELDFLAG_NUMBER;
00421       if (file->ha_table_flags() & HA_CAN_BIT_FIELD)
00422         total_uneven_bit_length+= sql_field->length & 7;
00423       else
00424         sql_field->pack_flag|= FIELDFLAG_TREAT_BIT_AS_CHAR;
00425     }
00426 
00427     sql_field->create_length_to_internal_length();
00428     if (prepare_blob_field(thd, sql_field))
00429       DBUG_RETURN(TRUE);
00430 
00431     if (!(sql_field->flags & NOT_NULL_FLAG))
00432       null_fields++;
00433 
00434     if (check_column_name(sql_field->field_name))
00435     {
00436       my_error(ER_WRONG_COLUMN_NAME, MYF(0), sql_field->field_name);
00437       DBUG_RETURN(TRUE);
00438     }
00439 
00440     /* Check if we have used the same field name before */
00441     for (dup_no=0; (dup_field=it2++) != sql_field; dup_no++)
00442     {
00443       if (my_strcasecmp(system_charset_info,
00444       sql_field->field_name,
00445       dup_field->field_name) == 0)
00446       {
00447   /*
00448     If this was a CREATE ... SELECT statement, accept a field
00449     redefinition if we are changing a field in the SELECT part
00450   */
00451   if (field_no < select_field_pos || dup_no >= select_field_pos)
00452   {
00453     my_error(ER_DUP_FIELDNAME, MYF(0), sql_field->field_name);
00454     DBUG_RETURN(TRUE);
00455   }
00456   else
00457   {
00458     /* Field redefined */
00459     sql_field->def=   dup_field->def;
00460     sql_field->sql_type=    dup_field->sql_type;
00461     sql_field->charset=   (dup_field->charset ?
00462            dup_field->charset :
00463            create_info->default_table_charset);
00464     sql_field->length=    dup_field->char_length;
00465           sql_field->pack_length= dup_field->pack_length;
00466           sql_field->key_length=  dup_field->key_length;
00467     sql_field->decimals=    dup_field->decimals;
00468     sql_field->create_length_to_internal_length();
00469     sql_field->unireg_check=  dup_field->unireg_check;
00470           /* 
00471             We're making one field from two, the result field will have
00472             dup_field->flags as flags. If we've incremented null_fields
00473             because of sql_field->flags, decrement it back.
00474           */
00475           if (!(sql_field->flags & NOT_NULL_FLAG))
00476             null_fields--;
00477     sql_field->flags=   dup_field->flags;
00478           sql_field->interval=          dup_field->interval;
00479     it2.remove();     // Remove first (create) definition
00480     select_field_pos--;
00481     break;
00482   }
00483       }
00484     }
00485     /* Don't pack rows in old tables if the user has requested this */
00486     if ((sql_field->flags & BLOB_FLAG) ||
00487   sql_field->sql_type == MYSQL_TYPE_VARCHAR &&
00488   create_info->row_type != ROW_TYPE_FIXED)
00489       (*db_options)|= HA_OPTION_PACK_RECORD;
00490     it2= alter_info->create_list;
00491   }
00492 
00493   /* record_offset will be increased with 'length-of-null-bits' later */
00494   record_offset= 0;
00495   null_fields+= total_uneven_bit_length;
00496 
00497   it= alter_info->create_list;
00498   while ((sql_field=it++))
00499   {
00500     DBUG_ASSERT(sql_field->charset != 0);
00501 
00502     if (prepare_create_field(sql_field, &blob_columns, 
00503            &timestamps, &timestamps_with_niladic,
00504            file->ha_table_flags()))
00505       DBUG_RETURN(TRUE);
00506     if (sql_field->sql_type == MYSQL_TYPE_VARCHAR)
00507       create_info->varchar= TRUE;
00508     sql_field->offset= record_offset;
00509     if (MTYP_TYPENR(sql_field->unireg_check) == Field::NEXT_NUMBER)
00510       auto_increment++;
00511     record_offset+= sql_field->pack_length;
00512   }
00513   if (timestamps_with_niladic > 1)
00514   {
00515     my_message(ER_TOO_MUCH_AUTO_TIMESTAMP_COLS,
00516                ER(ER_TOO_MUCH_AUTO_TIMESTAMP_COLS), MYF(0));
00517     DBUG_RETURN(TRUE);
00518   }
00519   if (auto_increment > 1)
00520   {
00521     my_message(ER_WRONG_AUTO_KEY, ER(ER_WRONG_AUTO_KEY), MYF(0));
00522     DBUG_RETURN(TRUE);
00523   }
00524   if (auto_increment &&
00525       (file->ha_table_flags() & HA_NO_AUTO_INCREMENT))
00526   {
00527     my_message(ER_TABLE_CANT_HANDLE_AUTO_INCREMENT,
00528                ER(ER_TABLE_CANT_HANDLE_AUTO_INCREMENT), MYF(0));
00529     DBUG_RETURN(TRUE);
00530   }
00531 
00532   if (blob_columns && (file->ha_table_flags() & HA_NO_BLOBS))
00533   {
00534     my_message(ER_TABLE_CANT_HANDLE_BLOB, ER(ER_TABLE_CANT_HANDLE_BLOB),
00535                MYF(0));
00536     DBUG_RETURN(TRUE);
00537   }
00538 
00539   /* Create keys */
00540 
00541   List<Key>::iterator key_iterator(alter_info->key_list);
00542   List<Key>::iterator key_iterator2(alter_info->key_list);
00543   uint key_parts=0, fk_key_count=0;
00544   bool primary_key=0,unique_key=0;
00545   Key *key, *key2;
00546   uint tmp, key_number;
00547   /* special marker for keys to be ignored */
00548   static char ignore_key[1];
00549 
00550   /* Calculate number of key segements */
00551   *key_count= 0;
00552 
00553   while ((key=key_iterator++))
00554   {
00555     DBUG_PRINT("info", ("key name: '%s'  type: %d", key->DOT_STR(name) ? key->DOT_STR(name) :
00556                         "(none)" , key->type));
00557     LEX_STRING key_name_str;
00558     if (key->type == Key::FOREIGN_KEY)
00559     {
00560       fk_key_count++;
00561       Foreign_key *fk_key= (Foreign_key*) key;
00562       if (fk_key->ref_columns.elements &&
00563     fk_key->ref_columns.elements != fk_key->columns.elements)
00564       {
00565         my_error(ER_WRONG_FK_DEF, MYF(0),
00566                  (fk_key->DOT_STR(name) ?  fk_key->DOT_STR(name) : "foreign key without name"),
00567                  ER(ER_KEY_REF_DO_NOT_MATCH_TABLE_REF));
00568   DBUG_RETURN(TRUE);
00569       }
00570       continue;
00571     }
00572     (*key_count)++;
00573     tmp=file->max_key_parts();
00574     if (key->columns.elements > tmp)
00575     {
00576       my_error(ER_TOO_MANY_KEY_PARTS,MYF(0),tmp);
00577       DBUG_RETURN(TRUE);
00578     }
00579     key_name_str.str= (char*) key->DOT_STR(name);
00580     key_name_str.length= key->DOT_STR(name) ? strlen(key->DOT_STR(name)) : 0;
00581     if (check_string_char_length(&key_name_str, "", NAME_CHAR_LEN,
00582                                  system_charset_info, 1))
00583     {
00584       my_error(ER_TOO_LONG_IDENT, MYF(0), key->DOT_STR(name));
00585       DBUG_RETURN(TRUE);
00586     }
00587     key_iterator2= alter_info->key_list;
00588     if (key->type != Key::FOREIGN_KEY)
00589     {
00590       while ((key2 = key_iterator2++) != key)
00591       {
00592   /*
00593           foreign_key_prefix(key, key2) returns 0 if key or key2, or both, is
00594           'generated', and a generated key is a prefix of the other key.
00595           Then we do not need the generated shorter key.
00596         */
00597         if ((key2->type != Key::FOREIGN_KEY &&
00598              key2->DOT_STR(name) != ignore_key &&
00599              !foreign_key_prefix(key, key2)))
00600         {
00601           /* TODO: issue warning message */
00602           /* mark that the generated key should be ignored */
00603           if (!key2->generated ||
00604               (key->generated && key->columns.elements <
00605                key2->columns.elements))
00606             key->DOT_STR(name)= ignore_key;
00607           else
00608           {
00609             key2->DOT_STR(name)= ignore_key;
00610             key_parts-= key2->columns.elements;
00611             (*key_count)--;
00612           }
00613           break;
00614         }
00615       }
00616     }
00617     if (key->DOT_STR(name) != ignore_key)
00618       key_parts+=key->columns.elements;
00619     else
00620       (*key_count)--;
00621     if (key->DOT_STR(name) && !tmp_table && (key->type != Key::PRIMARY) &&
00622   !my_strcasecmp(system_charset_info,key->DOT_STR(name),primary_key_name))
00623     {
00624       my_error(ER_WRONG_NAME_FOR_INDEX, MYF(0), key->DOT_STR(name));
00625       DBUG_RETURN(TRUE);
00626     }
00627   }
00628   tmp=file->max_keys();
00629   if (*key_count > tmp)
00630   {
00631     my_error(ER_TOO_MANY_KEYS,MYF(0),tmp);
00632     DBUG_RETURN(TRUE);
00633   }
00634 
00635   (*key_info_buffer)= key_info= (KEY*) sql_calloc(sizeof(KEY) * (*key_count));
00636   key_part_info=(KEY_PART_INFO*) sql_calloc(sizeof(KEY_PART_INFO)*key_parts);
00637   if (!*key_info_buffer || ! key_part_info)
00638     DBUG_RETURN(TRUE);        // Out of memory
00639 
00640   key_iterator= alter_info->key_list;
00641   key_number=0;
00642   for (; (key=key_iterator++) ; key_number++)
00643   {
00644     uint key_length=0;
00645     Key_part_spec *column;
00646 
00647     if (key->DOT_STR(name) == ignore_key)
00648     {
00649       /* ignore redundant keys */
00650       do
00651   key=key_iterator++;
00652       while (key && key->DOT_STR(name) == ignore_key);
00653       if (!key)
00654   break;
00655     }
00656 
00657     switch (key->type) {
00658     case Key::MULTIPLE:
00659   key_info->flags= 0;
00660   break;
00661     case Key::FULLTEXT:
00662   key_info->flags= HA_FULLTEXT;
00663   if ((key_info->parser_name= &key->key_create_info.parser_name)->str)
00664           key_info->flags|= HA_USES_PARSER;
00665         else
00666           key_info->parser_name= 0;
00667   break;
00668     case Key::SPATIAL:
00669 #ifdef HAVE_SPATIAL
00670   key_info->flags= HA_SPATIAL;
00671   break;
00672 #else
00673   my_error(ER_FEATURE_DISABLED, MYF(0),
00674                  sym_group_geom.name, sym_group_geom.needed_define);
00675   DBUG_RETURN(TRUE);
00676 #endif
00677     case Key::FOREIGN_KEY:
00678       key_number--;       // Skip this key
00679       continue;
00680     default:
00681       key_info->flags = HA_NOSAME;
00682       break;
00683     }
00684     if (key->generated)
00685       key_info->flags|= HA_GENERATED_KEY;
00686 
00687     key_info->key_parts=(uint8) key->columns.elements;
00688     key_info->key_part=key_part_info;
00689     key_info->usable_key_parts= key_number;
00690     key_info->algorithm= key->key_create_info.algorithm;
00691 
00692     if (key->type == Key::FULLTEXT)
00693     {
00694       if (!(file->ha_table_flags() & HA_CAN_FULLTEXT))
00695       {
00696   my_message(ER_TABLE_CANT_HANDLE_FT, ER(ER_TABLE_CANT_HANDLE_FT),
00697                    MYF(0));
00698   DBUG_RETURN(TRUE);
00699       }
00700     }
00701     /*
00702        Make SPATIAL to be RTREE by default
00703        SPATIAL only on BLOB or at least BINARY, this
00704        actually should be replaced by special GEOM type
00705        in near future when new frm file is ready
00706        checking for proper key parts number:
00707     */
00708 
00709     /* TODO: Add proper checks if handler supports key_type and algorithm */
00710     if (key_info->flags & HA_SPATIAL)
00711     {
00712       if (!(file->ha_table_flags() & HA_CAN_RTREEKEYS))
00713       {
00714         my_message(ER_TABLE_CANT_HANDLE_SPKEYS, ER(ER_TABLE_CANT_HANDLE_SPKEYS),
00715                    MYF(0));
00716         DBUG_RETURN(TRUE);
00717       }
00718       if (key_info->key_parts != 1)
00719       {
00720   my_error(ER_WRONG_ARGUMENTS, MYF(0), "SPATIAL INDEX");
00721   DBUG_RETURN(TRUE);
00722       }
00723     }
00724     else if (key_info->algorithm == HA_KEY_ALG_RTREE)
00725     {
00726 #ifdef HAVE_RTREE_KEYS
00727       if ((key_info->key_parts & 1) == 1)
00728       {
00729   my_error(ER_WRONG_ARGUMENTS, MYF(0), "RTREE INDEX");
00730   DBUG_RETURN(TRUE);
00731       }
00732       /* TODO: To be deleted */
00733       my_error(ER_NOT_SUPPORTED_YET, MYF(0), "RTREE INDEX");
00734       DBUG_RETURN(TRUE);
00735 #else
00736       my_error(ER_FEATURE_DISABLED, MYF(0),
00737                sym_group_rtree.name, sym_group_rtree.needed_define);
00738       DBUG_RETURN(TRUE);
00739 #endif
00740     }
00741 
00742     /* Take block size from key part or table part */
00743     /*
00744       TODO: Add warning if block size changes. We can't do it here, as
00745       this may depend on the size of the key
00746     */
00747     key_info->block_size= (key->key_create_info.block_size ?
00748                            key->key_create_info.block_size :
00749                            create_info->key_block_size);
00750 
00751     if (key_info->block_size)
00752       key_info->flags|= HA_USES_BLOCK_SIZE;
00753 
00754     List<Key_part_spec>::iterator cols(key->columns);
00755     List<Key_part_spec>::iterator cols2(key->columns);
00756     CHARSET_INFO *ft_key_charset=0;  // for FULLTEXT
00757     for (uint column_nr=0 ; (column=cols++) ; column_nr++)
00758     {
00759       uint length;
00760       Key_part_spec *dup_column;
00761 
00762       it= alter_info->create_list;
00763       field=0;
00764       while ((sql_field=it++) &&
00765        my_strcasecmp(system_charset_info,
00766          column->DOT_STR(field_name),
00767          sql_field->field_name))
00768   field++;
00769       if (!sql_field)
00770       {
00771   my_error(ER_KEY_COLUMN_DOES_NOT_EXITS, MYF(0), column->field_name);
00772   DBUG_RETURN(TRUE);
00773       }
00774       while ((dup_column= cols2++) != column)
00775       {
00776         if (!my_strcasecmp(system_charset_info,
00777                    column->DOT_STR(field_name), dup_column->DOT_STR(field_name)))
00778   {
00779     my_printf_error(ER_DUP_FIELDNAME,
00780         ER(ER_DUP_FIELDNAME),MYF(0),
00781         column->field_name);
00782     DBUG_RETURN(TRUE);
00783   }
00784       }
00785       cols2= key->columns;
00786       if (key->type == Key::FULLTEXT)
00787       {
00788   if ((sql_field->sql_type != MYSQL_TYPE_STRING &&
00789        sql_field->sql_type != MYSQL_TYPE_VARCHAR &&
00790        !f_is_blob(sql_field->pack_flag)) ||
00791       sql_field->charset == &my_charset_bin ||
00792       sql_field->charset->mbminlen > 1 || // ucs2 doesn't work yet
00793       (ft_key_charset && sql_field->charset != ft_key_charset))
00794   {
00795       my_error(ER_BAD_FT_COLUMN, MYF(0), column->field_name);
00796       DBUG_RETURN(-1);
00797   }
00798   ft_key_charset=sql_field->charset;
00799   /*
00800     for fulltext keys keyseg length is 1 for blobs (it's ignored in ft
00801     code anyway, and 0 (set to column width later) for char's. it has
00802     to be correct col width for char's, as char data are not prefixed
00803     with length (unlike blobs, where ft code takes data length from a
00804     data prefix, ignoring column->length).
00805   */
00806   column->length=test(f_is_blob(sql_field->pack_flag));
00807       }
00808       else
00809       {
00810   column->length*= sql_field->charset->mbmaxlen;
00811 
00812         if (key->type == Key::SPATIAL && column->length)
00813         {
00814           my_error(ER_WRONG_SUB_KEY, MYF(0));
00815     DBUG_RETURN(TRUE);
00816   }
00817 
00818   if (f_is_blob(sql_field->pack_flag) ||
00819             (f_is_geom(sql_field->pack_flag) && key->type != Key::SPATIAL))
00820   {
00821     if (!(file->ha_table_flags() & HA_CAN_INDEX_BLOBS))
00822     {
00823       my_error(ER_BLOB_USED_AS_KEY, MYF(0), column->field_name);
00824       DBUG_RETURN(TRUE);
00825     }
00826           if (f_is_geom(sql_field->pack_flag) && sql_field->geom_type ==
00827               Field::GEOM_POINT)
00828             column->length= 25;
00829     if (!column->length)
00830     {
00831       my_error(ER_BLOB_KEY_WITHOUT_LENGTH, MYF(0), column->field_name);
00832       DBUG_RETURN(TRUE);
00833     }
00834   }
00835 #ifdef HAVE_SPATIAL
00836   if (key->type == Key::SPATIAL)
00837   {
00838     if (!column->length)
00839     {
00840       /*
00841               4 is: (Xmin,Xmax,Ymin,Ymax), this is for 2D case
00842               Lately we'll extend this code to support more dimensions
00843       */
00844       column->length= 4*sizeof(double);
00845     }
00846   }
00847 #endif
00848   if (!(sql_field->flags & NOT_NULL_FLAG))
00849   {
00850     if (key->type == Key::PRIMARY)
00851     {
00852       /* Implicitly set primary key fields to NOT NULL for ISO conf. */
00853       sql_field->flags|= NOT_NULL_FLAG;
00854       sql_field->pack_flag&= ~FIELDFLAG_MAYBE_NULL;
00855             null_fields--;
00856     }
00857     else
00858           {
00859             key_info->flags|= HA_NULL_PART_KEY;
00860             if (!(file->ha_table_flags() & HA_NULL_IN_KEY))
00861             {
00862               my_error(ER_NULL_COLUMN_IN_INDEX, MYF(0), column->field_name);
00863               DBUG_RETURN(TRUE);
00864             }
00865             if (key->type == Key::SPATIAL)
00866             {
00867               my_message(ER_SPATIAL_CANT_HAVE_NULL,
00868                          ER(ER_SPATIAL_CANT_HAVE_NULL), MYF(0));
00869               DBUG_RETURN(TRUE);
00870             }
00871           }
00872   }
00873   if (MTYP_TYPENR(sql_field->unireg_check) == Field::NEXT_NUMBER)
00874   {
00875     if (column_nr == 0 || (file->ha_table_flags() & HA_AUTO_PART_KEY))
00876       auto_increment--;     // Field is used
00877   }
00878       }
00879 
00880       key_part_info->fieldnr= field;
00881       key_part_info->offset=  (uint16) sql_field->offset;
00882       key_part_info->key_type=sql_field->pack_flag;
00883       length= sql_field->key_length;
00884 
00885       if (column->length)
00886       {
00887   if (f_is_blob(sql_field->pack_flag))
00888   {
00889     if ((length=column->length) > max_key_length ||
00890         length > file->max_key_part_length())
00891     {
00892       length=min(max_key_length, file->max_key_part_length());
00893       if (key->type == Key::MULTIPLE)
00894       {
00895         /* not a critical problem */
00896         char warn_buff[MYSQL_ERRMSG_SIZE];
00897         my_snprintf(warn_buff, sizeof(warn_buff), ER(ER_TOO_LONG_KEY),
00898         length);
00899         push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
00900          ER_TOO_LONG_KEY, warn_buff);
00901               /* Align key length to multibyte char boundary */
00902               length-= length % sql_field->charset->mbmaxlen;
00903       }
00904       else
00905       {
00906         my_error(ER_TOO_LONG_KEY,MYF(0),length);
00907         DBUG_RETURN(TRUE);
00908       }
00909     }
00910   }
00911   else if (!f_is_geom(sql_field->pack_flag) &&
00912       (column->length > length ||
00913                    !Field::type_can_have_key_part (sql_field->sql_type) ||
00914        ((f_is_packed(sql_field->pack_flag) ||
00915          ((file->ha_table_flags() & HA_NO_PREFIX_CHAR_KEYS) &&
00916           (key_info->flags & HA_NOSAME))) &&
00917         column->length != length)))
00918   {
00919     my_message(ER_WRONG_SUB_KEY, ER(ER_WRONG_SUB_KEY), MYF(0));
00920     DBUG_RETURN(TRUE);
00921   }
00922   else if (!(file->ha_table_flags() & HA_NO_PREFIX_CHAR_KEYS))
00923     length=column->length;
00924       }
00925       else if (length == 0)
00926       {
00927   my_error(ER_WRONG_KEY_COLUMN, MYF(0), column->field_name);
00928     DBUG_RETURN(TRUE);
00929       }
00930       if (length > file->max_key_part_length() && key->type != Key::FULLTEXT)
00931       {
00932         length= file->max_key_part_length();
00933   if (key->type == Key::MULTIPLE)
00934   {
00935     /* not a critical problem */
00936     char warn_buff[MYSQL_ERRMSG_SIZE];
00937     my_snprintf(warn_buff, sizeof(warn_buff), ER(ER_TOO_LONG_KEY),
00938           length);
00939     push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
00940            ER_TOO_LONG_KEY, warn_buff);
00941           /* Align key length to multibyte char boundary */
00942           length-= length % sql_field->charset->mbmaxlen;
00943   }
00944   else
00945   {
00946     my_error(ER_TOO_LONG_KEY,MYF(0),length);
00947     DBUG_RETURN(TRUE);
00948   }
00949       }
00950       key_part_info->length=(uint16) length;
00951       /* Use packed keys for long strings on the first column */
00952       if (!((*db_options) & HA_OPTION_NO_PACK_KEYS) &&
00953     (length >= KEY_DEFAULT_PACK_LENGTH &&
00954      (sql_field->sql_type == MYSQL_TYPE_STRING ||
00955       sql_field->sql_type == MYSQL_TYPE_VARCHAR ||
00956       sql_field->pack_flag & FIELDFLAG_BLOB)))
00957       {
00958   if (column_nr == 0 && (sql_field->pack_flag & FIELDFLAG_BLOB) ||
00959             sql_field->sql_type == MYSQL_TYPE_VARCHAR)
00960     key_info->flags|= HA_BINARY_PACK_KEY | HA_VAR_LENGTH_KEY;
00961   else
00962     key_info->flags|= HA_PACK_KEY;
00963       }
00964       /* Check if the key segment is partial, set the key flag accordingly */
00965       if (length != sql_field->key_length)
00966         key_info->flags|= HA_KEY_HAS_PART_KEY_SEG;
00967 
00968       key_length+=length;
00969       key_part_info++;
00970 
00971       /* Create the key name based on the first column (if not given) */
00972       if (column_nr == 0)
00973       {
00974   if (key->type == Key::PRIMARY)
00975   {
00976     if (primary_key)
00977     {
00978       my_message(ER_MULTIPLE_PRI_KEY, ER(ER_MULTIPLE_PRI_KEY),
00979                        MYF(0));
00980       DBUG_RETURN(TRUE);
00981     }
00982     key_name=primary_key_name;
00983     primary_key=1;
00984   }
00985   else if (!(key_name = key->DOT_STR(name)))
00986     key_name=make_unique_key_name(sql_field->field_name,
00987           *key_info_buffer, key_info);
00988   if (check_if_keyname_exists(key_name, *key_info_buffer, key_info))
00989   {
00990     my_error(ER_DUP_KEYNAME, MYF(0), key_name);
00991     DBUG_RETURN(TRUE);
00992   }
00993   key_info->name=(char*) key_name;
00994       }
00995     }
00996     if (!key_info->name || check_column_name(key_info->name))
00997     {
00998       my_error(ER_WRONG_NAME_FOR_INDEX, MYF(0), key_info->name);
00999       DBUG_RETURN(TRUE);
01000     }
01001     if (!(key_info->flags & HA_NULL_PART_KEY))
01002       unique_key=1;
01003     key_info->key_length=(uint16) key_length;
01004     if (key_length > max_key_length && key->type != Key::FULLTEXT)
01005     {
01006       my_error(ER_TOO_LONG_KEY,MYF(0),max_key_length);
01007       DBUG_RETURN(TRUE);
01008     }
01009     key_info++;
01010   }
01011   if (!unique_key && !primary_key &&
01012       (file->ha_table_flags() & HA_REQUIRE_PRIMARY_KEY))
01013   {
01014     my_message(ER_REQUIRES_PRIMARY_KEY, ER(ER_REQUIRES_PRIMARY_KEY), MYF(0));
01015     DBUG_RETURN(TRUE);
01016   }
01017   if (auto_increment > 0)
01018   {
01019     my_message(ER_WRONG_AUTO_KEY, ER(ER_WRONG_AUTO_KEY), MYF(0));
01020     DBUG_RETURN(TRUE);
01021   }
01022   /* Sort keys in optimized order */
01023   my_qsort((uchar*) *key_info_buffer, *key_count, sizeof(KEY),
01024      (qsort_cmp) sort_keys);
01025   create_info->null_bits= null_fields;
01026 
01027   /* Check fields. */
01028   it= alter_info->create_list;
01029   while ((sql_field=it++))
01030   {
01031     Field::utype type= (Field::utype) MTYP_TYPENR(sql_field->unireg_check);
01032 
01033     if (thd->variables.sql_mode & MODE_NO_ZERO_DATE &&
01034         !sql_field->def &&
01035         sql_field->sql_type == MYSQL_TYPE_TIMESTAMP &&
01036         (sql_field->flags & NOT_NULL_FLAG) &&
01037         (type == Field::NONE || type == Field::TIMESTAMP_UN_FIELD))
01038     {
01039       /*
01040         An error should be reported if:
01041           - NO_ZERO_DATE SQL mode is active;
01042           - there is no explicit DEFAULT clause (default column value);
01043           - this is a TIMESTAMP column;
01044           - the column is not NULL;
01045           - this is not the DEFAULT CURRENT_TIMESTAMP column.
01046 
01047         In other words, an error should be reported if
01048           - NO_ZERO_DATE SQL mode is active;
01049           - the column definition is equivalent to
01050             'column_name TIMESTAMP DEFAULT 0'.
01051       */
01052 
01053       my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name);
01054       DBUG_RETURN(TRUE);
01055     }
01056   }
01057 
01058   DBUG_RETURN(FALSE);
01059 }
01060 
01062 // mysql_create_table_no_lock() cut and pasted directly from sql_table.cc. (I did make is static after copying it.)
01063 
01064 static bool mysql_create_table_no_lock(THD *thd,
01065                                 const char *db, const char *table_name,
01066                                 HA_CREATE_INFO *create_info,
01067                                 Alter_info *alter_info,
01068                                 bool internal_tmp_table,
01069                                 uint select_field_count)
01070 {
01071   char      path[FN_REFLEN];
01072   uint          path_length;
01073   const char  *alias;
01074   uint      db_options, key_count;
01075   KEY     *key_info_buffer;
01076   handler   *file;
01077   bool      error= TRUE;
01078   DBUG_ENTER("mysql_create_table_no_lock");
01079   DBUG_PRINT("enter", ("db: '%s'  table: '%s'  tmp: %d",
01080                        db, table_name, internal_tmp_table));
01081 
01082 
01083   /* Check for duplicate fields and check type of table to create */
01084   if (!alter_info->create_list.elements)
01085   {
01086     my_message(ER_TABLE_MUST_HAVE_COLUMNS, ER(ER_TABLE_MUST_HAVE_COLUMNS),
01087                MYF(0));
01088     DBUG_RETURN(TRUE);
01089   }
01090   if (check_engine(thd, table_name, create_info))
01091     DBUG_RETURN(TRUE);
01092   db_options= create_info->table_options;
01093   if (create_info->row_type == ROW_TYPE_DYNAMIC)
01094     db_options|=HA_OPTION_PACK_RECORD;
01095   alias= table_case_name(create_info, table_name);
01096 
01097   /* PMC - Done to avoid getting the partition handler by mistake! */
01098   if (!(file= new (thd->mem_root) ha_pbms(pbms_hton, NULL)))
01099   {
01100   mem_alloc_error(sizeof(handler));
01101   DBUG_RETURN(TRUE);
01102   }
01103 
01104   file->init();
01105   
01106   set_table_default_charset(thd, create_info, (char*) db);
01107 
01108   if (mysql_prepare_create_table(thd, create_info, alter_info,
01109                                  internal_tmp_table,
01110                                  &db_options, file,
01111                                  &key_info_buffer, &key_count,
01112                                  select_field_count))
01113     goto err;
01114 
01115       /* Check if table exists */
01116   if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
01117   {
01118     path_length= build_tmptable_filename(thd, path, sizeof(path));
01119     create_info->table_options|=HA_CREATE_DELAY_KEY_WRITE;
01120   }
01121   else  
01122   {
01123  #ifdef FN_DEVCHAR
01124     /* check if the table name contains FN_DEVCHAR when defined */
01125     if (strchr(alias, FN_DEVCHAR))
01126     {
01127       my_error(ER_WRONG_TABLE_NAME, MYF(0), alias);
01128       DBUG_RETURN(TRUE);
01129     }
01130 #endif
01131     path_length= build_table_filename(path, sizeof(path), db, alias, reg_ext,
01132                                       internal_tmp_table ? FN_IS_TMP : 0);
01133   }
01134 
01135   /* Check if table already exists */
01136   if ((create_info->options & HA_LEX_CREATE_TMP_TABLE) &&
01137       find_temporary_table(thd, db, table_name))
01138   {
01139     if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
01140     {
01141       create_info->table_existed= 1;    // Mark that table existed
01142       push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
01143                           ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
01144                           alias);
01145       error= 0;
01146       goto err;
01147     }
01148     my_error(ER_TABLE_EXISTS_ERROR, MYF(0), alias);
01149     goto err;
01150   }
01151 
01152   pthread_mutex_lock(&LOCK_open);
01153   if (!internal_tmp_table && !(create_info->options & HA_LEX_CREATE_TMP_TABLE))
01154   {
01155     if (!access(path,F_OK))
01156     {
01157       if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
01158         goto warn;
01159       my_error(ER_TABLE_EXISTS_ERROR,MYF(0),table_name);
01160       goto unlock_and_end;
01161     }
01162     /*
01163       We don't assert here, but check the result, because the table could be
01164       in the table definition cache and in the same time the .frm could be
01165       missing from the disk, in case of manual intervention which deletes
01166       the .frm file. The user has to use FLUSH TABLES; to clear the cache.
01167       Then she could create the table. This case is pretty obscure and
01168       therefore we don't introduce a new error message only for it.
01169     */
01170     if (get_cached_table_share(db, alias))
01171     {
01172       my_error(ER_TABLE_EXISTS_ERROR, MYF(0), table_name);
01173       goto unlock_and_end;
01174     }
01175   }
01176 
01177   /*
01178     Check that table with given name does not already
01179     exist in any storage engine. In such a case it should
01180     be discovered and the error ER_TABLE_EXISTS_ERROR be returned
01181     unless user specified CREATE TABLE IF EXISTS
01182     The LOCK_open mutex has been locked to make sure no
01183     one else is attempting to discover the table. Since
01184     it's not on disk as a frm file, no one could be using it!
01185   */
01186   if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
01187   {
01188     bool create_if_not_exists =
01189       create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS;
01190     int retcode = ha_table_exists_in_engine(thd, db, table_name);
01191     DBUG_PRINT("info", ("exists_in_engine: %"PRIu32"",retcode));
01192     switch (retcode)
01193     {
01194       case HA_ERR_NO_SUCH_TABLE:
01195         /* Normal case, no table exists. we can go and create it */
01196         break;
01197       case HA_ERR_TABLE_EXIST:
01198         DBUG_PRINT("info", ("Table existed in handler"));
01199 
01200         if (create_if_not_exists)
01201           goto warn;
01202         my_error(ER_TABLE_EXISTS_ERROR,MYF(0),table_name);
01203         goto unlock_and_end;
01204         break;
01205       default:
01206         DBUG_PRINT("info", ("error: %"PRIu32" from storage engine", retcode));
01207         my_error(retcode, MYF(0),table_name);
01208         goto unlock_and_end;
01209     }
01210   }
01211 
01212   thd_proc_info(thd, "creating table");
01213   create_info->table_existed= 0;    // Mark that table is created
01214 
01215   create_info->table_options=db_options;
01216 
01217   path[path_length - reg_ext_length]= '\0'; // Remove .frm extension
01218   if (rea_create_table(thd, path, db, table_name,
01219                        create_info, alter_info->create_list,
01220                        key_count, key_info_buffer, file))
01221     goto unlock_and_end;
01222 
01223   if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
01224   {
01225     /* Open table and put in temporary table list */
01226 #if MYSQL_VERSION_ID > 60005
01227     if (!(open_temporary_table(thd, path, db, table_name, 1, OTM_OPEN)))
01228 #else
01229     if (!(open_temporary_table(thd, path, db, table_name, 1)))
01230 #endif
01231     {
01232 #if MYSQL_VERSION_ID > 60005
01233       (void) rm_temporary_table(create_info->db_type, path, false);
01234 #else
01235       (void) rm_temporary_table(create_info->db_type, path);
01236 #endif
01237       goto unlock_and_end;
01238     }
01239     thd->thread_specific_used= TRUE;
01240   }
01241 
01242   /*
01243     Don't write statement if:
01244     - It is an internal temporary table,
01245     - Row-based logging is used and it we are creating a temporary table, or
01246     - The binary log is not open.
01247     Otherwise, the statement shall be binlogged.
01248    */
01249   if (!internal_tmp_table &&
01250       (!thd->current_stmt_binlog_row_based ||
01251        (thd->current_stmt_binlog_row_based &&
01252         !(create_info->options & HA_LEX_CREATE_TMP_TABLE))))
01253 #if MYSQL_VERSION_ID > 50140
01254    write_bin_log(thd, TRUE, thd->query(), thd->query_length());
01255 #else
01256     write_bin_log(thd, TRUE, thd->query, thd->query_length);
01257 #endif
01258   error= FALSE;
01259 unlock_and_end:
01260   pthread_mutex_unlock(&LOCK_open);
01261 
01262 err:
01263   thd_proc_info(thd, "After create");
01264   delete file;
01265   DBUG_RETURN(error);
01266 
01267 warn:
01268   error= FALSE;
01269   push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
01270                       ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
01271                       alias);
01272   create_info->table_existed= 1;    // Mark that table existed
01273   goto unlock_and_end;
01274 }
01275 
01279 
01280 #endif // LOCK_OPEN_HACK_REQUIRED
01281 
01282 //#define HAVE_KEYS
01283 //------------------------------
01284 int ms_create_table_frm(handlerton *hton, THD* thd, const char *db, const char *name, DT_FIELD_INFO *info, DT_KEY_INFO *keys, uchar **frmblob, size_t *frmlen )
01285 {
01286   char file_name[FN_REFLEN];
01287   int err = 1, delete_frm = 0;
01288   char field_length_buffer[12], *field_length_ptr;
01289   LEX  *save_lex= thd->lex, mylex;
01290   
01291   memset(&mylex.create_info, 0, sizeof(HA_CREATE_INFO));
01292 
01293   thd->lex = &mylex;
01294     lex_start(thd);
01295   
01296   /* setup the create info */
01297   mylex.create_info.db_type = hton;
01298   mylex.create_info.frm_only = 1;
01299   mylex.create_info.default_table_charset = system_charset_info;
01300   
01301   /* setup the column info. */
01302   while (info->field_name) {    
01303      LEX_STRING field_name, comment;     
01304      field_name.str = (char*)(info->field_name);
01305      field_name.length = strlen(info->field_name);
01306      
01307      comment.str = (char*)(info->comment);
01308      comment.length = strlen(info->comment);
01309           
01310      if (info->field_length) {
01311       snprintf(field_length_buffer, 12, "%d", info->field_length);
01312       field_length_ptr = field_length_buffer;
01313      } else 
01314       field_length_ptr = NULL;
01315 
01316     if (add_field_to_list(thd, &field_name, info->field_type, field_length_ptr, info->field_decimal_length,
01317       info->field_flags,
01318 #if MYSQL_VERSION_ID > 60005
01319         HA_SM_DISK,
01320         COLUMN_FORMAT_TYPE_FIXED,
01321 #endif
01322            NULL /*default_value*/, NULL /*on_update_value*/, &comment, NULL /*change*/, 
01323            NULL /*interval_list*/, info->field_charset, 0 /*uint_geom_type*/)) 
01324       goto error;
01325 
01326 
01327     info++;
01328   }
01329 
01330   if (keys) {
01331 #ifdef HAVE_KEYS
01332     while (keys->key_name) {
01333       LEX_STRING lex;
01334       Key *key;
01335       enum Key::Keytype type;
01336       List<Key_part_spec> col_list;
01337       int i =0;
01338       while (keys->key_columns[i]) {
01339         lex.str = (char *)(keys->key_columns[i++]);
01340         lex.length = strlen(lex.str);
01341         col_list.push_back(new Key_part_spec(lex, 0));
01342         //col_list.push_back(new Key_part_spec(keys->key_columns[i++], 0));
01343       }
01344       
01345       switch (keys->key_type) {
01346         case PRI_KEY_FLAG:
01347           type = Key::PRIMARY;
01348           break;
01349         case UNIQUE_KEY_FLAG:
01350           type = Key::UNIQUE;
01351           break;
01352         case MULTIPLE_KEY_FLAG:
01353           type = Key::MULTIPLE;
01354           break;
01355       }
01356       
01357       key= new Key(type, keys->key_name, strlen(keys->key_name),
01358                 &default_key_create_info,
01359                 0, col_list);
01360       mylex.alter_info.key_list.push_back(key);
01361       col_list.empty();
01362       keys++;
01363     }
01364 #endif
01365   }
01366   
01367   /* Create an internal temp table */
01368   if (mysql_create_table_no_lock(thd, db, name, &mylex.create_info, &mylex.alter_info, 1, 0)) 
01369     goto error;
01370 
01371   delete_frm = 1;
01372   /* Read the FRM file. */
01373   build_table_filename(file_name, sizeof(file_name), db, name, "", FN_IS_TMP);
01374   if (readfrm(file_name, frmblob, frmlen)) 
01375     goto error;
01376 
01377   err = 0;
01378 
01379 error:
01380   lex_end(&mylex);
01381   thd->lex = save_lex;
01382   
01383   if (delete_frm) {
01384     build_table_filename(file_name, sizeof(file_name), db, name, reg_ext, FN_IS_TMP);
01385     my_delete(file_name, MYF(0));
01386   }
01387   return err;
01388 }
01389 
01390 #endif // DRIZZLED