Drizzled Public API Documentation

sql_table.cc
1 /* Copyright (C) 2000-2004 MySQL AB
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15 
16 /* drop and alter of tables */
17 
18 #include <config.h>
19 #include <plugin/myisam/myisam.h>
20 #include <drizzled/show.h>
21 #include <drizzled/error.h>
22 #include <drizzled/gettext.h>
23 #include <drizzled/data_home.h>
24 #include <drizzled/sql_parse.h>
25 #include <drizzled/sql_lex.h>
26 #include <drizzled/session.h>
27 #include <drizzled/sql_base.h>
28 #include <drizzled/lock.h>
29 #include <drizzled/item/int.h>
30 #include <drizzled/item/empty_string.h>
31 #include <drizzled/transaction_services.h>
32 #include <drizzled/transaction_services.h>
33 #include <drizzled/table_proto.h>
34 #include <drizzled/plugin/client.h>
35 #include <drizzled/identifier.h>
36 #include <drizzled/internal/m_string.h>
37 #include <drizzled/charset.h>
38 #include <drizzled/definition/cache.h>
39 #include <drizzled/system_variables.h>
40 #include <drizzled/statement/alter_table.h>
41 #include <drizzled/sql_table.h>
42 #include <drizzled/pthread_globals.h>
43 #include <drizzled/typelib.h>
44 #include <drizzled/plugin/storage_engine.h>
45 #include <drizzled/diagnostics_area.h>
46 #include <drizzled/open_tables_state.h>
47 #include <drizzled/table/cache.h>
48 #include <drizzled/create_field.h>
49 
50 #include <algorithm>
51 #include <sstream>
52 
53 #include <boost/unordered_set.hpp>
54 
55 using namespace std;
56 
57 namespace drizzled {
58 
59 bool is_primary_key(const char* name)
60 {
61  return strcmp(name, "PRIMARY") == 0;
62 }
63 
64 static bool check_if_keyname_exists(const char *name,KeyInfo *start, KeyInfo *end);
65 static const char *make_unique_key_name(const char *field_name,KeyInfo *start,KeyInfo *end);
66 static bool prepare_blob_field(Session *session, CreateField *sql_field);
67 
68 void set_table_default_charset(HA_CREATE_INFO *create_info, const char *db)
69 {
70  /*
71  If the table character set was not given explicitly,
72  let's fetch the database default character set and
73  apply it to the table.
74  */
75  if (not create_info->default_table_charset)
76  create_info->default_table_charset= plugin::StorageEngine::getSchemaCollation(identifier::Schema(str_ref(db)));
77 }
78 
79 /*
80  Execute the drop of a normal or temporary table
81 
82  SYNOPSIS
83  rm_table_part2()
84  session Thread Cursor
85  tables Tables to drop
86  if_exists If set, don't give an error if table doesn't exists.
87  In this case we give an warning of level 'NOTE'
88  drop_temporary Only drop temporary tables
89 
90  @todo
91  When logging to the binary log, we should log
92  tmp_tables and transactional tables as separate statements if we
93  are in a transaction; This is needed to get these tables into the
94  cached binary log that is only written on COMMIT.
95 
96  The current code only writes DROP statements that only uses temporary
97  tables to the cache binary log. This should be ok on most cases, but
98  not all.
99 
100  RETURN
101  0 ok
102  1 Error
103  -1 Thread was killed
104 */
105 
106 int rm_table_part2(Session *session, TableList *tables, bool if_exists,
107  bool drop_temporary)
108 {
109  TableList *table;
110  util::string::vector wrong_tables;
111  int error= 0;
112  bool foreign_key_error= false;
113 
114  do
115  {
116  boost::mutex::scoped_lock scopedLock(table::Cache::mutex());
117 
118  if (not drop_temporary && session->lock_table_names_exclusively(tables))
119  {
120  return 1;
121  }
122 
123  /* Don't give warnings for not found errors, as we already generate notes */
124  session->no_warnings_for_error= 1;
125 
126  for (table= tables; table; table= table->next_local)
127  {
128  identifier::Table tmp_identifier(table->getSchemaName(), table->getTableName());
129 
130  error= session->open_tables.drop_temporary_table(tmp_identifier);
131 
132  switch (error) {
133  case 0:
134  // removed temporary table
135  continue;
136  case -1:
137  error= 1;
138  break;
139  default:
140  // temporary table not found
141  error= 0;
142  }
143 
144  if (drop_temporary == false)
145  {
146  abort_locked_tables(session, tmp_identifier);
147  table::Cache::removeTable(*session, tmp_identifier, RTFC_WAIT_OTHER_THREAD_FLAG | RTFC_CHECK_KILLED_FLAG);
148  /*
149  If the table was used in lock tables, remember it so that
150  unlock_table_names can free it
151  */
152  Table *locked_table= drop_locked_tables(session, tmp_identifier);
153  if (locked_table)
154  table->table= locked_table;
155 
156  if (session->getKilled())
157  {
158  error= -1;
159  break;
160  }
161  }
162  identifier::Table identifier(table->getSchemaName(), table->getTableName(), table->getInternalTmpTable() ? message::Table::INTERNAL : message::Table::STANDARD);
163 
164  message::table::shared_ptr message= plugin::StorageEngine::getTableMessage(*session, identifier, true);
165 
166  if (drop_temporary || not plugin::StorageEngine::doesTableExist(*session, identifier))
167  {
168  // Table was not found on disk and table can't be created from engine
169  if (if_exists)
170  {
171  push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
172  ER_BAD_TABLE_ERROR, ER(ER_BAD_TABLE_ERROR),
173  table->getTableName());
174  }
175  else
176  {
177  error= 1;
178  }
179  }
180  else
181  {
182  drizzled::error_t local_error;
183 
184  /* Generate transaction event ONLY when we successfully drop */
185  if (plugin::StorageEngine::dropTable(*session, identifier, local_error))
186  {
187  if (message) // If we have no definition, we don't know if the table should have been replicated
188  {
189  TransactionServices::dropTable(*session, identifier, *message, if_exists);
190  }
191  }
192  else
193  {
194  if (local_error == HA_ERR_NO_SUCH_TABLE and if_exists)
195  {
196  error= 0;
197  session->clear_error();
198  }
199 
200  if (local_error == HA_ERR_ROW_IS_REFERENCED)
201  {
202  /* the table is referenced by a foreign key constraint */
203  foreign_key_error= true;
204  }
205  error= local_error;
206  }
207  }
208 
209  if (error)
210  {
211  wrong_tables.push_back(table->getTableName());
212  }
213  }
214 
215  tables->unlock_table_names();
216 
217  } while (0);
218 
219  if (wrong_tables.size())
220  {
221  if (not foreign_key_error)
222  {
223  std::string table_error;
224 
225  BOOST_FOREACH(util::string::vector::reference iter, wrong_tables)
226  {
227  table_error+= iter;
228  table_error+= ',';
229  }
230  table_error.resize(table_error.size() -1);
231 
232  my_printf_error(ER_BAD_TABLE_ERROR, ER(ER_BAD_TABLE_ERROR), MYF(0),
233  table_error.c_str());
234  }
235  else
236  {
237  my_message(ER_ROW_IS_REFERENCED, ER(ER_ROW_IS_REFERENCED), MYF(0));
238  }
239  error= 1;
240  }
241 
242  session->no_warnings_for_error= 0;
243 
244  return error;
245 }
246 
247 /*
248  Sort keys in the following order:
249  - PRIMARY KEY
250  - UNIQUE keys where all column are NOT NULL
251  - UNIQUE keys that don't contain partial segments
252  - Other UNIQUE keys
253  - Normal keys
254  - Fulltext keys
255 
256  This will make checking for duplicated keys faster and ensure that
257  PRIMARY keys are prioritized.
258 */
259 
260 static int sort_keys(KeyInfo *a, KeyInfo *b)
261 {
262  ulong a_flags= a->flags, b_flags= b->flags;
263 
264  if (a_flags & HA_NOSAME)
265  {
266  if (!(b_flags & HA_NOSAME))
267  return -1;
268  if ((a_flags ^ b_flags) & (HA_NULL_PART_KEY))
269  {
270  /* Sort NOT NULL keys before other keys */
271  return (a_flags & (HA_NULL_PART_KEY)) ? 1 : -1;
272  }
273  if (is_primary_key(a->name))
274  return -1;
275  if (is_primary_key(b->name))
276  return 1;
277  /* Sort keys don't containing partial segments before others */
278  if ((a_flags ^ b_flags) & HA_KEY_HAS_PART_KEY_SEG)
279  return (a_flags & HA_KEY_HAS_PART_KEY_SEG) ? 1 : -1;
280  }
281  else if (b_flags & HA_NOSAME)
282  return 1; // Prefer b
283 
284  /*
285  Prefer original key order. usable_key_parts contains here
286  the original key position.
287  */
288  return ((a->usable_key_parts < b->usable_key_parts) ? -1 :
289  (a->usable_key_parts > b->usable_key_parts) ? 1 :
290  0);
291 }
292 
293 /*
294  Check TYPELIB (set or enum) for duplicates
295 
296  SYNOPSIS
297  check_duplicates_in_interval()
298  set_or_name "SET" or "ENUM" string for warning message
299  name name of the checked column
300  typelib list of values for the column
301  dup_val_count returns count of duplicate elements
302 
303  DESCRIPTION
304  This function prints an warning for each value in list
305  which has some duplicates on its right
306 
307  RETURN VALUES
308  0 ok
309  1 Error
310 */
311 
313 {
314 public:
315  string s;
316  const charset_info_st * const cs;
317 
318  typelib_set_member(const char* value, unsigned int length,
319  const charset_info_st * const charset)
320  : s(value, length),
321  cs(charset)
322  {}
323 };
324 
325 static bool operator==(typelib_set_member const& a, typelib_set_member const& b)
326 {
327  return (my_strnncoll(a.cs,
328  (const unsigned char*)a.s.c_str(), a.s.length(),
329  (const unsigned char*)b.s.c_str(), b.s.length())==0);
330 }
331 
332 
333 namespace
334 {
335 class typelib_set_member_hasher
336 {
337  boost::hash<string> hasher;
338 public:
339  std::size_t operator()(const typelib_set_member& t) const
340  {
341  return hasher(t.s);
342  }
343 };
344 }
345 
346 static bool check_duplicates_in_interval(const char *set_or_name,
347  const char *name, TYPELIB *typelib,
348  const charset_info_st * const cs,
349  unsigned int *dup_val_count)
350 {
351  TYPELIB tmp= *typelib;
352  const char **cur_value= typelib->type_names;
353  unsigned int *cur_length= typelib->type_lengths;
354  *dup_val_count= 0;
355 
356  boost::unordered_set<typelib_set_member, typelib_set_member_hasher> interval_set;
357 
358  for ( ; tmp.count > 0; cur_value++, cur_length++)
359  {
360  tmp.type_names++;
361  tmp.type_lengths++;
362  tmp.count--;
363  if (interval_set.count(typelib_set_member(*cur_value, *cur_length, cs)))
364  {
365  my_error(ER_DUPLICATED_VALUE_IN_TYPE, MYF(0),
366  name,*cur_value,set_or_name);
367  return 1;
368  }
369  else
370  interval_set.insert(typelib_set_member(*cur_value, *cur_length, cs));
371  }
372  return 0;
373 }
374 
375 
376 /*
377  Check TYPELIB (set or enum) max and total lengths
378 
379  SYNOPSIS
380  calculate_interval_lengths()
381  cs charset+collation pair of the interval
382  typelib list of values for the column
383  max_length length of the longest item
384  tot_length sum of the item lengths
385 
386  DESCRIPTION
387  After this function call:
388  - ENUM uses max_length
389  - SET uses tot_length.
390 
391  RETURN VALUES
392  void
393 */
394 static void calculate_interval_lengths(const charset_info_st * const cs,
395  TYPELIB *interval,
396  uint32_t *max_length,
397  uint32_t *tot_length)
398 {
399  const char **pos;
400  uint32_t *len;
401  *max_length= *tot_length= 0;
402  for (pos= interval->type_names, len= interval->type_lengths;
403  *pos ; pos++, len++)
404  {
405  uint32_t length= cs->cset->numchars(cs, *pos, *pos + *len);
406  *tot_length+= length;
407  set_if_bigger(*max_length, (uint32_t)length);
408  }
409 }
410 
411 /*
412  Prepare a create_table instance for packing
413 
414  SYNOPSIS
415  prepare_create_field()
416  sql_field field to prepare for packing
417  blob_columns count for BLOBs
418  timestamps count for timestamps
419 
420  DESCRIPTION
421  This function prepares a CreateField instance.
422  Fields such as pack_flag are valid after this call.
423 
424  RETURN VALUES
425  0 ok
426  1 Error
427 */
428 int prepare_create_field(CreateField *sql_field,
429  uint32_t *blob_columns,
430  int *timestamps,
431  int *timestamps_with_niladic)
432 {
433  unsigned int dup_val_count;
434 
435  /*
436  This code came from mysql_prepare_create_table.
437  Indent preserved to make patching easier
438  */
439  assert(sql_field->charset);
440 
441  switch (sql_field->sql_type) {
442  case DRIZZLE_TYPE_BLOB:
443  sql_field->length= 8; // Unireg field length
444  (*blob_columns)++;
445  break;
446 
447  case DRIZZLE_TYPE_ENUM:
448  {
449  if (check_duplicates_in_interval("ENUM",
450  sql_field->field_name,
451  sql_field->interval,
452  sql_field->charset,
453  &dup_val_count))
454  {
455  return 1;
456  }
457  }
458  break;
459 
460  case DRIZZLE_TYPE_MICROTIME:
461  case DRIZZLE_TYPE_TIMESTAMP:
462  /* We should replace old TIMESTAMP fields with their newer analogs */
463  if (sql_field->unireg_check == Field::TIMESTAMP_OLD_FIELD)
464  {
465  if (!*timestamps)
466  {
467  sql_field->unireg_check= Field::TIMESTAMP_DNUN_FIELD;
468  (*timestamps_with_niladic)++;
469  }
470  else
471  {
472  sql_field->unireg_check= Field::NONE;
473  }
474  }
475  else if (sql_field->unireg_check != Field::NONE)
476  {
477  (*timestamps_with_niladic)++;
478  }
479 
480  (*timestamps)++;
481 
482  break;
483 
484  case DRIZZLE_TYPE_BOOLEAN:
485  case DRIZZLE_TYPE_DATE: // Rest of string types
486  case DRIZZLE_TYPE_DATETIME:
487  case DRIZZLE_TYPE_DECIMAL:
488  case DRIZZLE_TYPE_DOUBLE:
489  case DRIZZLE_TYPE_LONG:
490  case DRIZZLE_TYPE_LONGLONG:
491  case DRIZZLE_TYPE_NULL:
492  case DRIZZLE_TYPE_TIME:
493  case DRIZZLE_TYPE_UUID:
494  case DRIZZLE_TYPE_IPV6:
495  case DRIZZLE_TYPE_VARCHAR:
496  break;
497  }
498 
499  return 0;
500 }
501 
502 static int prepare_create_table(Session *session,
503  HA_CREATE_INFO *create_info,
504  message::Table &create_proto,
505  AlterInfo *alter_info,
506  bool tmp_table,
507  uint32_t *db_options,
508  KeyInfo **key_info_buffer,
509  uint32_t *key_count,
510  int select_field_count)
511 {
512  const char *key_name;
513  CreateField *sql_field,*dup_field;
514  uint field,null_fields,blob_columns,max_key_length;
515  ulong record_offset= 0;
516  KeyInfo *key_info;
517  KeyPartInfo *key_part_info;
518  int timestamps= 0, timestamps_with_niladic= 0;
519  int dup_no;
520  int select_field_pos,auto_increment=0;
521  List<CreateField>::iterator it(alter_info->create_list.begin());
522  List<CreateField>::iterator it2(alter_info->create_list.begin());
523  uint32_t total_uneven_bit_length= 0;
524 
525  plugin::StorageEngine *engine= plugin::StorageEngine::findByName(create_proto.engine().name());
526 
527  select_field_pos= alter_info->create_list.size() - select_field_count;
528  null_fields=blob_columns=0;
529  max_key_length= engine->max_key_length();
530 
531  for (int32_t field_no=0; (sql_field=it++) ; field_no++)
532  {
533  const charset_info_st *save_cs;
534 
535  /*
536  Initialize length from its original value (number of characters),
537  which was set in the parser. This is necessary if we're
538  executing a prepared statement for the second time.
539  */
540  sql_field->length= sql_field->char_length;
541 
542  if (!sql_field->charset)
543  sql_field->charset= create_info->default_table_charset;
544 
545  /*
546  table_charset is set in ALTER Table if we want change character set
547  for all varchar/char columns.
548  But the table charset must not affect the BLOB fields, so don't
549  allow to change my_charset_bin to somethig else.
550  */
551  if (create_info->table_charset && sql_field->charset != &my_charset_bin)
552  sql_field->charset= create_info->table_charset;
553 
554  save_cs= sql_field->charset;
555  if ((sql_field->flags & BINCMP_FLAG) &&
556  !(sql_field->charset= get_charset_by_csname(sql_field->charset->csname, MY_CS_BINSORT)))
557  {
558  char tmp[64];
559  char *tmp_pos= tmp;
560  strncpy(tmp_pos, save_cs->csname, sizeof(tmp)-4);
561  tmp_pos+= strlen(tmp);
562  strncpy(tmp_pos, STRING_WITH_LEN("_bin"));
563  my_error(ER_UNKNOWN_COLLATION, MYF(0), tmp);
564  return true;
565  }
566 
567  /*
568  Convert the default value from client character
569  set into the column character set if necessary.
570  */
571  if (sql_field->def &&
572  save_cs != sql_field->def->collation.collation &&
573  (sql_field->sql_type == DRIZZLE_TYPE_ENUM))
574  {
575  /*
576  Starting from 5.1 we work here with a copy of CreateField
577  created by the caller, not with the instance that was
578  originally created during parsing. It's OK to create
579  a temporary item and initialize with it a member of the
580  copy -- this item will be thrown away along with the copy
581  at the end of execution, and thus not introduce a dangling
582  pointer in the parsed tree of a prepared statement or a
583  stored procedure statement.
584  */
585  sql_field->def= sql_field->def->safe_charset_converter(save_cs);
586 
587  if (sql_field->def == NULL)
588  {
589  /* Could not convert */
590  my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name);
591  return true;
592  }
593  }
594 
595  if (sql_field->sql_type == DRIZZLE_TYPE_ENUM)
596  {
597  const charset_info_st * const cs= sql_field->charset;
598  TYPELIB *interval= sql_field->interval;
599 
600  /*
601  Create typelib from interval_list, and if necessary
602  convert strings from client character set to the
603  column character set.
604  */
605  if (!interval)
606  {
607  /*
608  Create the typelib in runtime memory - we will free the
609  occupied memory at the same time when we free this
610  sql_field -- at the end of execution.
611  */
612  interval= sql_field->interval= typelib(*session->mem_root, sql_field->interval_list);
613 
614  List<String>::iterator int_it(sql_field->interval_list.begin());
615  String conv, *tmp;
616  char comma_buf[4];
617  int comma_length= cs->cset->wc_mb(cs, ',', (unsigned char*) comma_buf, (unsigned char*) comma_buf + sizeof(comma_buf));
618  assert(comma_length > 0);
619 
620  for (uint32_t i= 0; (tmp= int_it++); i++)
621  {
622  uint32_t lengthsp;
623  if (String::needs_conversion(tmp->length(), tmp->charset(), cs))
624  {
625  conv.copy(tmp->ptr(), tmp->length(), cs);
626  interval->type_names[i]= session->mem.strdup(conv);
627  interval->type_lengths[i]= conv.length();
628  }
629 
630  // Strip trailing spaces.
631  lengthsp= cs->cset->lengthsp(cs, interval->type_names[i], interval->type_lengths[i]);
632  interval->type_lengths[i]= lengthsp;
633  ((unsigned char *)interval->type_names[i])[lengthsp]= '\0';
634  }
635  sql_field->interval_list.clear(); // Don't need interval_list anymore
636  }
637 
638  /* DRIZZLE_TYPE_ENUM */
639  {
640  uint32_t field_length;
641  assert(sql_field->sql_type == DRIZZLE_TYPE_ENUM);
642  if (sql_field->def != NULL)
643  {
644  String str, *def= sql_field->def->val_str(&str);
645  if (def == NULL) /* SQL "NULL" maps to NULL */
646  {
647  if (sql_field->flags & NOT_NULL_FLAG)
648  {
649  my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name);
650  return true;
651  }
652 
653  /* else, the defaults yield the correct length for NULLs. */
654  }
655  else /* not NULL */
656  {
657  def->length(cs->cset->lengthsp(cs, def->ptr(), def->length()));
658  if (interval->find_type2(def->ptr(), def->length(), cs) == 0) /* not found */
659  {
660  my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name);
661  return true;
662  }
663  }
664  }
665  uint32_t new_dummy;
666  calculate_interval_lengths(cs, interval, &field_length, &new_dummy);
667  sql_field->length= field_length;
668  }
669  set_if_smaller(sql_field->length, (uint32_t)MAX_FIELD_WIDTH-1);
670  }
671 
673  if (prepare_blob_field(session, sql_field))
674  return true;
675 
676  if (!(sql_field->flags & NOT_NULL_FLAG))
677  null_fields++;
678 
679  if (check_column_name(sql_field->field_name))
680  {
681  my_error(ER_WRONG_COLUMN_NAME, MYF(0), sql_field->field_name);
682  return true;
683  }
684 
685  /* Check if we have used the same field name before */
686  for (dup_no=0; (dup_field=it2++) != sql_field; dup_no++)
687  {
688  if (system_charset_info->strcasecmp(sql_field->field_name, dup_field->field_name) == 0)
689  {
690  /*
691  If this was a CREATE ... SELECT statement, accept a field
692  redefinition if we are changing a field in the SELECT part
693  */
694  if (field_no < select_field_pos || dup_no >= select_field_pos)
695  {
696  my_error(ER_DUP_FIELDNAME, MYF(0), sql_field->field_name);
697  return true;
698  }
699  else
700  {
701  /* Field redefined */
702  sql_field->def= dup_field->def;
703  sql_field->sql_type= dup_field->sql_type;
704  sql_field->charset= (dup_field->charset ?
705  dup_field->charset :
706  create_info->default_table_charset);
707  sql_field->length= dup_field->char_length;
708  sql_field->pack_length= dup_field->pack_length;
709  sql_field->key_length= dup_field->key_length;
710  sql_field->decimals= dup_field->decimals;
712  sql_field->unireg_check= dup_field->unireg_check;
713  /*
714  We're making one field from two, the result field will have
715  dup_field->flags as flags. If we've incremented null_fields
716  because of sql_field->flags, decrement it back.
717  */
718  if (!(sql_field->flags & NOT_NULL_FLAG))
719  null_fields--;
720  sql_field->flags= dup_field->flags;
721  sql_field->interval= dup_field->interval;
722  it2.remove(); // Remove first (create) definition
723  select_field_pos--;
724  break;
725  }
726  }
727  }
728 
730  if (not create_proto.engine().name().compare("MyISAM") &&
731  ((sql_field->flags & BLOB_FLAG) ||
732  (sql_field->sql_type == DRIZZLE_TYPE_VARCHAR)))
733  {
734  (*db_options)|= HA_OPTION_PACK_RECORD;
735  }
736 
737  it2= alter_info->create_list.begin();
738  }
739 
740  /* record_offset will be increased with 'length-of-null-bits' later */
741  record_offset= 0;
742  null_fields+= total_uneven_bit_length;
743 
744  it= alter_info->create_list.begin();
745  while ((sql_field=it++))
746  {
747  assert(sql_field->charset != 0);
748 
749  if (prepare_create_field(sql_field, &blob_columns,
750  &timestamps, &timestamps_with_niladic))
751  return true;
752  sql_field->offset= record_offset;
753  if (MTYP_TYPENR(sql_field->unireg_check) == Field::NEXT_NUMBER)
754  auto_increment++;
755  }
756  if (timestamps_with_niladic > 1)
757  {
758  my_message(ER_TOO_MUCH_AUTO_TIMESTAMP_COLS,
759  ER(ER_TOO_MUCH_AUTO_TIMESTAMP_COLS), MYF(0));
760  return true;
761  }
762  if (auto_increment > 1)
763  {
764  my_message(ER_WRONG_AUTO_KEY, ER(ER_WRONG_AUTO_KEY), MYF(0));
765  return true;
766  }
767  if (auto_increment &&
768  (engine->check_flag(HTON_BIT_NO_AUTO_INCREMENT)))
769  {
770  my_message(ER_TABLE_CANT_HANDLE_AUTO_INCREMENT,
771  ER(ER_TABLE_CANT_HANDLE_AUTO_INCREMENT), MYF(0));
772  return true;
773  }
774 
775  if (blob_columns && (engine->check_flag(HTON_BIT_NO_BLOBS)))
776  {
777  my_message(ER_TABLE_CANT_HANDLE_BLOB, ER(ER_TABLE_CANT_HANDLE_BLOB),
778  MYF(0));
779  return true;
780  }
781 
782  /* Create keys */
783 
784  List<Key>::iterator key_iterator(alter_info->key_list.begin());
785  List<Key>::iterator key_iterator2(alter_info->key_list.begin());
786  uint32_t key_parts=0, fk_key_count=0;
787  bool primary_key=0,unique_key=0;
788  Key *key, *key2;
789  uint32_t tmp, key_number;
790  /* special marker for keys to be ignored */
791  static char ignore_key[1];
792 
793  /* Calculate number of key segements */
794  *key_count= 0;
795 
796  while ((key=key_iterator++))
797  {
798  if (key->type == Key::FOREIGN_KEY)
799  {
800  fk_key_count++;
801  if (((Foreign_key *)key)->validate(alter_info->create_list))
802  return true;
803 
804  Foreign_key *fk_key= (Foreign_key*) key;
805 
806  add_foreign_key_to_table_message(&create_proto,
807  fk_key->name.data(),
808  fk_key->columns,
809  fk_key->ref_table,
810  fk_key->ref_columns,
811  fk_key->delete_opt,
812  fk_key->update_opt,
813  fk_key->match_opt);
814 
815  if (fk_key->ref_columns.size() &&
816  fk_key->ref_columns.size() != fk_key->columns.size())
817  {
818  my_error(ER_WRONG_FK_DEF, MYF(0),
819  (fk_key->name.data() ? fk_key->name.data() : "foreign key without name"),
820  ER(ER_KEY_REF_DO_NOT_MATCH_TABLE_REF));
821  return true;
822  }
823  continue;
824  }
825  (*key_count)++;
826  tmp= engine->max_key_parts();
827  if (key->columns.size() > tmp)
828  {
829  my_error(ER_TOO_MANY_KEY_PARTS,MYF(0),tmp);
830  return true;
831  }
832  if (check_identifier_name(key->name, ER_TOO_LONG_IDENT))
833  return true;
834  key_iterator2= alter_info->key_list.begin();
835  if (key->type != Key::FOREIGN_KEY)
836  {
837  while ((key2 = key_iterator2++) != key)
838  {
839  /*
840  foreign_key_prefix(key, key2) returns 0 if key or key2, or both, is
841  'generated', and a generated key is a prefix of the other key.
842  Then we do not need the generated shorter key.
843  */
844  if ((key2->type != Key::FOREIGN_KEY &&
845  key2->name.data() != ignore_key &&
846  !foreign_key_prefix(key, key2)))
847  {
848  /* @todo issue warning message */
849  /* mark that the generated key should be ignored */
850  if (!key2->generated ||
851  (key->generated && key->columns.size() <
852  key2->columns.size()))
853  key->name.assign(ignore_key, 1);
854  else
855  {
856  key2->name.assign(ignore_key, 1);
857  key_parts-= key2->columns.size();
858  (*key_count)--;
859  }
860  break;
861  }
862  }
863  }
864  if (key->name.data() != ignore_key)
865  key_parts+=key->columns.size();
866  else
867  (*key_count)--;
868  if (key->name.data() && !tmp_table && (key->type != Key::PRIMARY) && is_primary_key(key->name.data()))
869  {
870  my_error(ER_WRONG_NAME_FOR_INDEX, MYF(0), key->name.data());
871  return true;
872  }
873  }
874  tmp= engine->max_keys();
875  if (*key_count > tmp)
876  {
877  my_error(ER_TOO_MANY_KEYS,MYF(0),tmp);
878  return true;
879  }
880 
881  (*key_info_buffer)= key_info= (KeyInfo*) memory::sql_calloc(sizeof(KeyInfo) * (*key_count));
882  key_part_info=(KeyPartInfo*) memory::sql_calloc(sizeof(KeyPartInfo)*key_parts);
883 
884  key_iterator= alter_info->key_list.begin();
885  key_number=0;
886  for (; (key=key_iterator++) ; key_number++)
887  {
888  uint32_t key_length=0;
889  Key_part_spec *column;
890 
891  if (key->name.data() == ignore_key)
892  {
893  /* ignore redundant keys */
894  do
895  key=key_iterator++;
896  while (key && key->name.data() == ignore_key);
897  if (!key)
898  break;
899  }
900 
901  switch (key->type) {
902  case Key::MULTIPLE:
903  key_info->flags= 0;
904  break;
905  case Key::FOREIGN_KEY:
906  key_number--; // Skip this key
907  continue;
908  default:
909  key_info->flags = HA_NOSAME;
910  break;
911  }
912  if (key->generated)
913  key_info->flags|= HA_GENERATED_KEY;
914 
915  key_info->key_parts=(uint8_t) key->columns.size();
916  key_info->key_part=key_part_info;
917  key_info->usable_key_parts= key_number;
918  key_info->algorithm= key->key_create_info.algorithm;
919 
920  uint32_t tmp_len= system_charset_info->cset->charpos(system_charset_info,
921  key->key_create_info.comment.begin(),
922  key->key_create_info.comment.end(),
923  INDEX_COMMENT_MAXLEN);
924 
925  if (tmp_len < key->key_create_info.comment.size())
926  {
927  my_error(ER_WRONG_STRING_LENGTH, MYF(0), key->key_create_info.comment.data(), "INDEX COMMENT", (uint32_t) INDEX_COMMENT_MAXLEN);
928  return -1;
929  }
930 
931  key_info->comment.assign(key_info->comment.data(), key->key_create_info.comment.size());
932  if (key_info->comment.size() > 0)
933  {
934  key_info->flags|= HA_USES_COMMENT;
935  key_info->comment.assign(key->key_create_info.comment.data(), key_info->comment.size()); // weird
936  }
937 
938  message::Table::Field *protofield= NULL;
939 
940  List<Key_part_spec>::iterator cols(key->columns.begin());
941  List<Key_part_spec>::iterator cols2(key->columns.begin());
942  for (uint32_t column_nr=0 ; (column=cols++) ; column_nr++)
943  {
944  uint32_t length;
945  Key_part_spec *dup_column;
946  int proto_field_nr= 0;
947 
948  it= alter_info->create_list.begin();
949  field=0;
950  while ((sql_field=it++) && ++proto_field_nr &&
951  system_charset_info->strcasecmp(column->field_name.data(), sql_field->field_name))
952  {
953  field++;
954  }
955 
956  if (!sql_field)
957  {
958  my_error(ER_KEY_COLUMN_DOES_NOT_EXITS, MYF(0), column->field_name.data());
959  return true;
960  }
961 
962  while ((dup_column= cols2++) != column)
963  {
964  if (!system_charset_info->strcasecmp(column->field_name.data(), dup_column->field_name.data()))
965  {
966  my_printf_error(ER_DUP_FIELDNAME,
967  ER(ER_DUP_FIELDNAME),MYF(0),
968  column->field_name.data());
969  return true;
970  }
971  }
972  cols2= key->columns.begin();
973 
974  if (create_proto.field_size() > 0)
975  protofield= create_proto.mutable_field(proto_field_nr - 1);
976 
977  {
978  column->length*= sql_field->charset->mbmaxlen;
979 
980  if (sql_field->sql_type == DRIZZLE_TYPE_BLOB)
981  {
982  if (! (engine->check_flag(HTON_BIT_CAN_INDEX_BLOBS)))
983  {
984  my_error(ER_BLOB_USED_AS_KEY, MYF(0), column->field_name.data());
985  return true;
986  }
987  if (! column->length)
988  {
989  my_error(ER_BLOB_KEY_WITHOUT_LENGTH, MYF(0), column->field_name.data());
990  return true;
991  }
992  }
993 
994  if (! (sql_field->flags & NOT_NULL_FLAG))
995  {
996  if (key->type == Key::PRIMARY)
997  {
998  /* Implicitly set primary key fields to NOT NULL for ISO conf. */
999  sql_field->flags|= NOT_NULL_FLAG;
1000  null_fields--;
1001 
1002  if (protofield)
1003  {
1005  constraints= protofield->mutable_constraints();
1006  constraints->set_is_notnull(true);
1007  }
1008 
1009  }
1010  else
1011  {
1012  key_info->flags|= HA_NULL_PART_KEY;
1013  if (! (engine->check_flag(HTON_BIT_NULL_IN_KEY)))
1014  {
1015  my_error(ER_NULL_COLUMN_IN_INDEX, MYF(0), column->field_name.data());
1016  return true;
1017  }
1018  }
1019  }
1020 
1021  if (MTYP_TYPENR(sql_field->unireg_check) == Field::NEXT_NUMBER)
1022  {
1023  if (column_nr == 0 || (engine->check_flag(HTON_BIT_AUTO_PART_KEY)))
1024  auto_increment--; // Field is used
1025  }
1026  }
1027 
1028  key_part_info->fieldnr= field;
1029  key_part_info->offset= (uint16_t) sql_field->offset;
1030  key_part_info->key_type= 0;
1031  length= sql_field->key_length;
1032 
1033  if (column->length)
1034  {
1035  if (sql_field->sql_type == DRIZZLE_TYPE_BLOB)
1036  {
1037  if ((length=column->length) > max_key_length ||
1038  length > engine->max_key_part_length())
1039  {
1040  length= min(max_key_length, engine->max_key_part_length());
1041  if (key->type == Key::MULTIPLE)
1042  {
1043  /* not a critical problem */
1044  char warn_buff[DRIZZLE_ERRMSG_SIZE];
1045  snprintf(warn_buff, sizeof(warn_buff), ER(ER_TOO_LONG_KEY),
1046  length);
1047  push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
1048  ER_TOO_LONG_KEY, warn_buff);
1049  /* Align key length to multibyte char boundary */
1050  length-= length % sql_field->charset->mbmaxlen;
1051  }
1052  else
1053  {
1054  my_error(ER_TOO_LONG_KEY,MYF(0),length);
1055  return true;
1056  }
1057  }
1058  }
1059  else if ((column->length > length ||
1060  ! Field::type_can_have_key_part(sql_field->sql_type)))
1061  {
1062  my_message(ER_WRONG_SUB_KEY, ER(ER_WRONG_SUB_KEY), MYF(0));
1063  return true;
1064  }
1065  else if (! (engine->check_flag(HTON_BIT_NO_PREFIX_CHAR_KEYS)))
1066  {
1067  length=column->length;
1068  }
1069  }
1070  else if (length == 0)
1071  {
1072  my_error(ER_WRONG_KEY_COLUMN, MYF(0), column->field_name.data());
1073  return true;
1074  }
1075  if (length > engine->max_key_part_length())
1076  {
1077  length= engine->max_key_part_length();
1078  if (key->type == Key::MULTIPLE)
1079  {
1080  /* not a critical problem */
1081  char warn_buff[DRIZZLE_ERRMSG_SIZE];
1082  snprintf(warn_buff, sizeof(warn_buff), ER(ER_TOO_LONG_KEY),
1083  length);
1084  push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
1085  ER_TOO_LONG_KEY, warn_buff);
1086  /* Align key length to multibyte char boundary */
1087  length-= length % sql_field->charset->mbmaxlen;
1088  }
1089  else
1090  {
1091  my_error(ER_TOO_LONG_KEY,MYF(0),length);
1092  return true;
1093  }
1094  }
1095  key_part_info->length=(uint16_t) length;
1096  /* Use packed keys for long strings on the first column */
1097  if (!((*db_options) & HA_OPTION_NO_PACK_KEYS) &&
1098  (length >= KEY_DEFAULT_PACK_LENGTH &&
1099  (sql_field->sql_type == DRIZZLE_TYPE_VARCHAR ||
1100  sql_field->sql_type == DRIZZLE_TYPE_BLOB)))
1101  {
1102  if ((column_nr == 0 && sql_field->sql_type == DRIZZLE_TYPE_BLOB) ||
1103  sql_field->sql_type == DRIZZLE_TYPE_VARCHAR)
1104  {
1105  key_info->flags|= HA_BINARY_PACK_KEY | HA_VAR_LENGTH_KEY;
1106  }
1107  else
1108  {
1109  key_info->flags|= HA_PACK_KEY;
1110  }
1111  }
1112  /* Check if the key segment is partial, set the key flag accordingly */
1113  if (length != sql_field->key_length)
1114  key_info->flags|= HA_KEY_HAS_PART_KEY_SEG;
1115 
1116  key_length+=length;
1117  key_part_info++;
1118 
1119  /* Create the key name based on the first column (if not given) */
1120  if (column_nr == 0)
1121  {
1122  if (key->type == Key::PRIMARY)
1123  {
1124  if (primary_key)
1125  {
1126  my_message(ER_MULTIPLE_PRI_KEY, ER(ER_MULTIPLE_PRI_KEY),
1127  MYF(0));
1128  return true;
1129  }
1130  static const char pkey_name[]= "PRIMARY";
1131  key_name=pkey_name;
1132  primary_key=1;
1133  }
1134  else if (!(key_name= key->name.data()))
1135  key_name=make_unique_key_name(sql_field->field_name,
1136  *key_info_buffer, key_info);
1137  if (check_if_keyname_exists(key_name, *key_info_buffer, key_info))
1138  {
1139  my_error(ER_DUP_KEYNAME, MYF(0), key_name);
1140  return true;
1141  }
1142  key_info->name=(char*) key_name;
1143  }
1144  }
1145 
1146  if (!key_info->name || check_column_name(key_info->name))
1147  {
1148  my_error(ER_WRONG_NAME_FOR_INDEX, MYF(0), key_info->name);
1149  return true;
1150  }
1151 
1152  if (!(key_info->flags & HA_NULL_PART_KEY))
1153  {
1154  unique_key=1;
1155  }
1156 
1157  key_info->key_length=(uint16_t) key_length;
1158 
1159  if (key_length > max_key_length)
1160  {
1161  my_error(ER_TOO_LONG_KEY,MYF(0),max_key_length);
1162  return true;
1163  }
1164 
1165  key_info++;
1166  }
1167 
1168  if (!unique_key && !primary_key &&
1169  (engine->check_flag(HTON_BIT_REQUIRE_PRIMARY_KEY)))
1170  {
1171  my_message(ER_REQUIRES_PRIMARY_KEY, ER(ER_REQUIRES_PRIMARY_KEY), MYF(0));
1172  return true;
1173  }
1174 
1175  if (auto_increment > 0)
1176  {
1177  my_message(ER_WRONG_AUTO_KEY, ER(ER_WRONG_AUTO_KEY), MYF(0));
1178  return true;
1179  }
1180  /* Sort keys in optimized order */
1181  internal::my_qsort((unsigned char*) *key_info_buffer, *key_count, sizeof(KeyInfo),
1182  (qsort_cmp) sort_keys);
1183 
1184  /* Check fields. */
1185  it= alter_info->create_list.begin();
1186  while ((sql_field=it++))
1187  {
1188  Field::utype type= (Field::utype) MTYP_TYPENR(sql_field->unireg_check);
1189 
1190  if (session->variables.sql_mode & MODE_NO_ZERO_DATE &&
1191  !sql_field->def &&
1192  (sql_field->sql_type == DRIZZLE_TYPE_TIMESTAMP or sql_field->sql_type == DRIZZLE_TYPE_MICROTIME) &&
1193  (sql_field->flags & NOT_NULL_FLAG) &&
1194  (type == Field::NONE || type == Field::TIMESTAMP_UN_FIELD))
1195  {
1196  /*
1197  An error should be reported if:
1198  - NO_ZERO_DATE SQL mode is active;
1199  - there is no explicit DEFAULT clause (default column value);
1200  - this is a TIMESTAMP column;
1201  - the column is not NULL;
1202  - this is not the DEFAULT CURRENT_TIMESTAMP column.
1203 
1204  In other words, an error should be reported if
1205  - NO_ZERO_DATE SQL mode is active;
1206  - the column definition is equivalent to
1207  'column_name TIMESTAMP DEFAULT 0'.
1208  */
1209 
1210  my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name);
1211  return true;
1212  }
1213  }
1214 
1215  return false;
1216 }
1217 
1218 /*
1219  Extend long VARCHAR fields to blob & prepare field if it's a blob
1220 
1221  SYNOPSIS
1222  prepare_blob_field()
1223  sql_field Field to check
1224 
1225  RETURN
1226  0 ok
1227  1 Error (sql_field can't be converted to blob)
1228  In this case the error is given
1229 */
1230 
1231 static bool prepare_blob_field(Session *,
1232  CreateField *sql_field)
1233 {
1234 
1235  if (sql_field->length > MAX_FIELD_VARCHARLENGTH &&
1236  !(sql_field->flags & BLOB_FLAG))
1237  {
1238  my_error(ER_TOO_BIG_FIELDLENGTH, MYF(0), sql_field->field_name,
1239  MAX_FIELD_VARCHARLENGTH / sql_field->charset->mbmaxlen);
1240  return 1;
1241  }
1242 
1243  if ((sql_field->flags & BLOB_FLAG) && sql_field->length)
1244  {
1245  if (sql_field->sql_type == DRIZZLE_TYPE_BLOB)
1246  {
1247  /* The user has given a length to the blob column */
1248  sql_field->pack_length= calc_pack_length(sql_field->sql_type, 0);
1249  }
1250  sql_field->length= 0;
1251  }
1252  return 0;
1253 }
1254 
1255 static bool locked_create_event(Session *session,
1256  const identifier::Table &identifier,
1257  HA_CREATE_INFO *create_info,
1258  message::Table &table_proto,
1259  AlterInfo *alter_info,
1260  bool is_if_not_exists,
1261  bool internal_tmp_table,
1262  uint db_options,
1263  uint key_count,
1264  KeyInfo *key_info_buffer)
1265 {
1266  bool error= true;
1267 
1268  {
1269 
1270  /*
1271  @note if we are building a temp table we need to check to see if a temp table
1272  already exists, otherwise we just need to find out if a normal table exists (aka it is fine
1273  to create a table under a temporary table.
1274  */
1275  bool exists=
1276  plugin::StorageEngine::doesTableExist(*session, identifier,
1277  identifier.getType() != message::Table::STANDARD );
1278 
1279  if (exists)
1280  {
1281  if (is_if_not_exists)
1282  {
1283  error= false;
1284  push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1285  ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
1286  identifier.getTableName().c_str());
1287  create_info->table_existed= 1; // Mark that table existed
1288  return error;
1289  }
1290 
1291  my_error(ER_TABLE_EXISTS_ERROR, identifier);
1292 
1293  return error;
1294  }
1295 
1296  if (identifier.getType() == message::Table::STANDARD) // We have a real table
1297  {
1298  /*
1299  We don't assert here, but check the result, because the table could be
1300  in the table definition cache and in the same time the .frm could be
1301  missing from the disk, in case of manual intervention which deletes
1302  the .frm cursor. The user has to use FLUSH TABLES; to clear the cache.
1303  Then she could create the table. This case is pretty obscure and
1304  therefore we don't introduce a new error message only for it.
1305  */
1306  /*
1307  @todo improve this error condition.
1308  */
1309  if (definition::Cache::find(identifier.getKey()))
1310  {
1311  my_error(ER_TABLE_EXISTS_ERROR, identifier);
1312 
1313  return error;
1314  }
1315  }
1316  }
1317 
1318  session->set_proc_info("creating table");
1319  create_info->table_existed= 0; // Mark that table is created
1320 
1321  create_info->table_options= db_options;
1322 
1323  if (not rea_create_table(session, identifier,
1324  table_proto,
1325  create_info, alter_info->create_list,
1326  key_count, key_info_buffer))
1327  {
1328  return error;
1329  }
1330 
1331  if (identifier.getType() == message::Table::TEMPORARY)
1332  {
1333  /* Open table and put in temporary table list */
1334  if (not (session->open_temporary_table(identifier)))
1335  {
1336  (void) session->open_tables.rm_temporary_table(identifier);
1337  return error;
1338  }
1339  }
1340 
1341  /*
1342  We keep this behind the lock to make sure ordering is correct for a table.
1343  This is a very unlikely problem where before we would write out to the
1344  trans log, someone would do a delete/create operation.
1345  */
1346 
1347  if (table_proto.type() == message::Table::STANDARD && not internal_tmp_table)
1348  {
1349  TransactionServices::createTable(*session, table_proto);
1350  }
1351 
1352  return false;
1353 }
1354 
1355 
1356 /*
1357  Ignore the name of this function... it locks :(
1358 
1359  Create a table
1360 
1361  SYNOPSIS
1362  create_table_no_lock()
1363  session Thread object
1364  db Database
1365  table_name Table name
1366  create_info Create information (like MAX_ROWS)
1367  fields List of fields to create
1368  keys List of keys to create
1369  internal_tmp_table Set to 1 if this is an internal temporary table
1370  (From ALTER Table)
1371  select_field_count
1372 
1373  DESCRIPTION
1374  If one creates a temporary table, this is automatically opened
1375 
1376  Note that this function assumes that caller already have taken
1377  name-lock on table being created or used some other way to ensure
1378  that concurrent operations won't intervene. create_table()
1379  is a wrapper that can be used for this.
1380 
1381  RETURN VALUES
1382  false OK
1383  true error
1384 */
1385 
1386 bool create_table_no_lock(Session *session,
1387  const identifier::Table &identifier,
1388  HA_CREATE_INFO *create_info,
1389  message::Table &table_proto,
1390  AlterInfo *alter_info,
1391  bool internal_tmp_table,
1392  uint32_t select_field_count,
1393  bool is_if_not_exists)
1394 {
1395  uint db_options, key_count;
1396  KeyInfo *key_info_buffer;
1397  bool error= true;
1398 
1399  /* Check for duplicate fields and check type of table to create */
1400  if (not alter_info->create_list.size())
1401  {
1402  my_message(ER_TABLE_MUST_HAVE_COLUMNS, ER(ER_TABLE_MUST_HAVE_COLUMNS),
1403  MYF(0));
1404  return true;
1405  }
1406  assert(identifier.getTableName() == table_proto.name());
1407  db_options= create_info->table_options;
1408 
1409  set_table_default_charset(create_info, identifier.getSchemaName().c_str());
1410 
1411  /* Build a Table object to pass down to the engine, and the do the actual create. */
1412  if (not prepare_create_table(session, create_info, table_proto, alter_info,
1413  internal_tmp_table,
1414  &db_options,
1415  &key_info_buffer, &key_count,
1416  select_field_count))
1417  {
1418  boost::mutex::scoped_lock lock(table::Cache::mutex()); /* CREATE TABLE (some confussion on naming, double check) */
1419  error= locked_create_event(session,
1420  identifier,
1421  create_info,
1422  table_proto,
1423  alter_info,
1424  is_if_not_exists,
1425  internal_tmp_table,
1426  db_options, key_count,
1427  key_info_buffer);
1428  }
1429 
1430  session->set_proc_info("After create");
1431 
1432  return(error);
1433 }
1434 
1438 static bool drizzle_create_table(Session *session,
1439  const identifier::Table &identifier,
1440  HA_CREATE_INFO *create_info,
1441  message::Table &table_proto,
1442  AlterInfo *alter_info,
1443  bool internal_tmp_table,
1444  uint32_t select_field_count,
1445  bool is_if_not_exists)
1446 {
1447  Table *name_lock= session->lock_table_name_if_not_cached(identifier);
1448  bool result;
1449  if (name_lock == NULL)
1450  {
1451  if (is_if_not_exists)
1452  {
1453  push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1454  ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
1455  identifier.getTableName().c_str());
1456  create_info->table_existed= 1;
1457  result= false;
1458  }
1459  else
1460  {
1461  my_error(ER_TABLE_EXISTS_ERROR, identifier);
1462  result= true;
1463  }
1464  }
1465  else
1466  {
1467  result= create_table_no_lock(session,
1468  identifier,
1469  create_info,
1470  table_proto,
1471  alter_info,
1472  internal_tmp_table,
1473  select_field_count,
1474  is_if_not_exists);
1475  }
1476 
1477  if (name_lock)
1478  {
1479  boost::mutex::scoped_lock lock(table::Cache::mutex()); /* Lock for removing name_lock during table create */
1480  session->unlink_open_table(name_lock);
1481  }
1482 
1483  return(result);
1484 }
1485 
1486 
1487 /*
1488  Database locking aware wrapper for create_table_no_lock(),
1489 */
1490 bool create_table(Session *session,
1491  const identifier::Table &identifier,
1492  HA_CREATE_INFO *create_info,
1493  message::Table &table_proto,
1494  AlterInfo *alter_info,
1495  bool internal_tmp_table,
1496  uint32_t select_field_count,
1497  bool is_if_not_exists)
1498 {
1499  if (identifier.isTmp())
1500  {
1501  return create_table_no_lock(session,
1502  identifier,
1503  create_info,
1504  table_proto,
1505  alter_info,
1506  internal_tmp_table,
1507  select_field_count,
1508  is_if_not_exists);
1509  }
1510 
1511  return drizzle_create_table(session,
1512  identifier,
1513  create_info,
1514  table_proto,
1515  alter_info,
1516  internal_tmp_table,
1517  select_field_count,
1518  is_if_not_exists);
1519 }
1520 
1521 
1522 /*
1523 ** Give the key name after the first field with an optional '_#' after
1524 **/
1525 
1526 static bool
1527 check_if_keyname_exists(const char *name, KeyInfo *start, KeyInfo *end)
1528 {
1529  for (KeyInfo *key= start; key != end; key++)
1530  {
1531  if (!system_charset_info->strcasecmp(name, key->name))
1532  return 1;
1533  }
1534  return 0;
1535 }
1536 
1537 
1538 static const char*
1539 make_unique_key_name(const char *field_name,KeyInfo *start,KeyInfo *end)
1540 {
1541  char buff[MAX_FIELD_NAME],*buff_end;
1542 
1543  if (not check_if_keyname_exists(field_name,start,end) && not is_primary_key(field_name))
1544  {
1545  return field_name; // Use fieldname
1546  }
1547 
1548  buff_end= strncpy(buff, field_name, sizeof(buff)-4);
1549  buff_end+= strlen(buff);
1550 
1551  /*
1552  Only 3 chars + '\0' left, so need to limit to 2 digit
1553  This is ok as we can't have more than 100 keys anyway
1554  */
1555  for (uint32_t i=2 ; i< 100; i++)
1556  {
1557  *buff_end= '_';
1558  internal::int10_to_str(i, buff_end+1, 10);
1559  if (!check_if_keyname_exists(buff,start,end))
1560  return memory::sql_strdup(buff);
1561  }
1562  return (char*) "not_specified"; // Should never happen
1563 }
1564 
1565 
1566 /****************************************************************************
1567 ** Alter a table definition
1568 ****************************************************************************/
1569 
1570 /*
1571  Rename a table.
1572 
1573  SYNOPSIS
1574  rename_table()
1575  session
1576  base The plugin::StorageEngine handle.
1577  old_db The old database name.
1578  old_name The old table name.
1579  new_db The new database name.
1580  new_name The new table name.
1581 
1582  RETURN
1583  false OK
1584  true Error
1585 */
1586 
1587 bool
1588 rename_table(Session &session,
1589  plugin::StorageEngine *base,
1590  const identifier::Table &from,
1591  const identifier::Table &to)
1592 {
1593  if (not plugin::StorageEngine::doesSchemaExist(to))
1594  {
1595  my_error(ER_NO_DB_ERROR, MYF(0), to.getSchemaName().c_str());
1596  return true;
1597  }
1598 
1599  int error= base->renameTable(session, from, to);
1600  if (error == HA_ERR_WRONG_COMMAND)
1601  my_error(ER_NOT_SUPPORTED_YET, MYF(0), "ALTER Table");
1602  else if (error)
1603  {
1604  my_error(ER_ERROR_ON_RENAME, MYF(0),
1605  from.isTmp() ? "#sql-temporary" : from.getSQLPath().c_str(),
1606  to.isTmp() ? "#sql-temporary" : to.getSQLPath().c_str(), error);
1607  }
1608  return error;
1609 }
1610 
1611 
1612 /*
1613  Force all other threads to stop using the table
1614 
1615  SYNOPSIS
1616  wait_while_table_is_used()
1617  session Thread Cursor
1618  table Table to remove from cache
1619  function HA_EXTRA_PREPARE_FOR_DROP if table is to be deleted
1620  HA_EXTRA_FORCE_REOPEN if table is not be used
1621  HA_EXTRA_PREPARE_FOR_RENAME if table is to be renamed
1622  NOTES
1623  When returning, the table will be unusable for other threads until
1624  the table is closed.
1625 
1626  PREREQUISITES
1627  Lock on table::Cache::mutex()
1628  Win32 clients must also have a WRITE LOCK on the table !
1629 */
1630 
1631 void wait_while_table_is_used(Session *session, Table *table,
1632  enum ha_extra_function function)
1633 {
1634 
1635  safe_mutex_assert_owner(table::Cache::mutex().native_handle());
1636 
1637  table->cursor->extra(function);
1638  /* Mark all tables that are in use as 'old' */
1639  session->abortLock(table); /* end threads waiting on lock */
1640 
1641  /* Wait until all there are no other threads that has this table open */
1642  identifier::Table identifier(table->getShare()->getSchemaName(), table->getShare()->getTableName());
1643  table::Cache::removeTable(*session, identifier, RTFC_WAIT_OTHER_THREAD_FLAG);
1644 }
1645 
1646 /*
1647  Close a cached table
1648 
1649  SYNOPSIS
1650  close_cached_table()
1651  session Thread Cursor
1652  table Table to remove from cache
1653 
1654  NOTES
1655  Function ends by signaling threads waiting for the table to try to
1656  reopen the table.
1657 
1658  PREREQUISITES
1659  Lock on table::Cache::mutex()
1660  Win32 clients must also have a WRITE LOCK on the table !
1661 */
1662 
1663 void Session::close_cached_table(Table *table)
1664 {
1665 
1666  wait_while_table_is_used(this, table, HA_EXTRA_FORCE_REOPEN);
1667  /* Close lock if this is not got with LOCK TABLES */
1668  if (open_tables.lock)
1669  {
1670  unlockTables(open_tables.lock);
1671  open_tables.lock= NULL; // Start locked threads
1672  }
1673  /* Close all copies of 'table'. This also frees all LOCK TABLES lock */
1674  unlink_open_table(table);
1675 
1676  /* When lock on table::Cache::mutex() is freed other threads can continue */
1678 }
1679 
1680 /*
1681  RETURN VALUES
1682  false Message sent to net (admin operation went ok)
1683  true Message should be sent by caller
1684  (admin operation or network communication failed)
1685 */
1686 static bool admin_table(Session* session, TableList* tables,
1687  const char *operator_name,
1688  thr_lock_type lock_type,
1689  bool open_for_modify,
1690  int (Cursor::*operator_func)(Session*))
1691 {
1692  TableList *table;
1693  Select_Lex *select= &session->lex().select_lex;
1694  List<Item> field_list;
1695  Item *item;
1696  int result_code= 0;
1697  const charset_info_st * const cs= system_charset_info;
1698 
1699  if (! session->endActiveTransaction())
1700  return 1;
1701 
1702  field_list.push_back(item = new Item_empty_string("Table", NAME_CHAR_LEN * 2, cs));
1703  item->maybe_null = 1;
1704  field_list.push_back(item = new Item_empty_string("Op", 10, cs));
1705  item->maybe_null = 1;
1706  field_list.push_back(item = new Item_empty_string("Msg_type", 10, cs));
1707  item->maybe_null = 1;
1708  field_list.push_back(item = new Item_empty_string("Msg_text", 255, cs));
1709  item->maybe_null = 1;
1710  session->getClient()->sendFields(field_list);
1711 
1712  for (table= tables; table; table= table->next_local)
1713  {
1714  identifier::Table table_identifier(table->getSchemaName(), table->getTableName());
1715  bool fatal_error=0;
1716 
1717  std::string table_name = table_identifier.getSQLPath();
1718 
1719  table->lock_type= lock_type;
1720  /* open only one table from local list of command */
1721  {
1722  TableList *save_next_global, *save_next_local;
1723  save_next_global= table->next_global;
1724  table->next_global= 0;
1725  save_next_local= table->next_local;
1726  table->next_local= 0;
1727  select->table_list.first= (unsigned char*)table;
1728  /*
1729  Time zone tables and SP tables can be add to lex->query_tables list,
1730  so it have to be prepared.
1731  @todo Investigate if we can put extra tables into argument instead of using lex->query_tables
1732  */
1733  session->lex().query_tables= table;
1734  session->lex().query_tables_last= &table->next_global;
1735  session->lex().query_tables_own_last= 0;
1736  session->no_warnings_for_error= 0;
1737 
1738  session->openTablesLock(table);
1739  session->no_warnings_for_error= 0;
1740  table->next_global= save_next_global;
1741  table->next_local= save_next_local;
1742  }
1743 
1744  /*
1745  CHECK Table command is only command where VIEW allowed here and this
1746  command use only temporary teble method for VIEWs resolving => there
1747  can't be VIEW tree substitition of join view => if opening table
1748  succeed then table->table will have real Table pointer as value (in
1749  case of join view substitution table->table can be 0, but here it is
1750  impossible)
1751  */
1752  if (!table->table)
1753  {
1754  if (!session->main_da().m_warn_list.size())
1755  push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
1756  ER_CHECK_NO_SUCH_TABLE, ER(ER_CHECK_NO_SUCH_TABLE));
1757  result_code= HA_ADMIN_CORRUPT;
1758  goto send_result;
1759  }
1760 
1761  if ((table->table->db_stat & HA_READ_ONLY) && open_for_modify)
1762  {
1763  char buff[FN_REFLEN + DRIZZLE_ERRMSG_SIZE];
1764  uint32_t length;
1765  session->getClient()->store(table_name.c_str());
1766  session->getClient()->store(operator_name);
1767  session->getClient()->store(STRING_WITH_LEN("error"));
1768  length= snprintf(buff, sizeof(buff), ER(ER_OPEN_AS_READONLY),
1769  table_name.c_str());
1770  session->getClient()->store(buff, length);
1771  TransactionServices::autocommitOrRollback(*session, false);
1772  session->endTransaction(COMMIT);
1773  session->close_thread_tables();
1774  session->lex().reset_query_tables_list(false);
1775  table->table=0; // For query cache
1776  if (session->getClient()->flush())
1777  goto err;
1778  continue;
1779  }
1780 
1781  /* Close all instances of the table to allow repair to rename files */
1782  if (lock_type == TL_WRITE && table->table->getShare()->getVersion())
1783  {
1784  table::Cache::mutex().lock(); /* Lock type is TL_WRITE and we lock to repair the table */
1785  const char *old_message=session->enter_cond(COND_refresh, table::Cache::mutex(),
1786  "Waiting to get writelock");
1787  session->abortLock(table->table);
1788  identifier::Table identifier(table->table->getShare()->getSchemaName(), table->table->getShare()->getTableName());
1789  table::Cache::removeTable(*session, identifier, RTFC_WAIT_OTHER_THREAD_FLAG | RTFC_CHECK_KILLED_FLAG);
1790  session->exit_cond(old_message);
1791  if (session->getKilled())
1792  goto err;
1793  open_for_modify= 0;
1794  }
1795 
1796  result_code = (table->table->cursor->*operator_func)(session);
1797 
1798 send_result:
1799 
1800  session->lex().cleanup_after_one_table_open();
1801  session->clear_error(); // these errors shouldn't get client
1802  {
1803  BOOST_FOREACH(DRIZZLE_ERROR* err, session->main_da().m_warn_list)
1804  {
1805  session->getClient()->store(table_name.c_str());
1806  session->getClient()->store(operator_name);
1807  session->getClient()->store(warning_level_names[err->level].data(), warning_level_names[err->level].size());
1808  session->getClient()->store(err->msg);
1809  if (session->getClient()->flush())
1810  goto err;
1811  }
1812  drizzle_reset_errors(*session, true);
1813  }
1814  session->getClient()->store(table_name.c_str());
1815  session->getClient()->store(operator_name);
1816 
1817  switch (result_code) {
1818  case HA_ADMIN_NOT_IMPLEMENTED:
1819  {
1820  char buf[ERRMSGSIZE+20];
1821  uint32_t length=snprintf(buf, ERRMSGSIZE,
1822  ER(ER_CHECK_NOT_IMPLEMENTED), operator_name);
1823  session->getClient()->store(STRING_WITH_LEN("note"));
1824  session->getClient()->store(buf, length);
1825  }
1826  break;
1827 
1828  case HA_ADMIN_OK:
1829  session->getClient()->store(STRING_WITH_LEN("status"));
1830  session->getClient()->store(STRING_WITH_LEN("OK"));
1831  break;
1832 
1833  case HA_ADMIN_FAILED:
1834  session->getClient()->store(STRING_WITH_LEN("status"));
1835  session->getClient()->store(STRING_WITH_LEN("Operation failed"));
1836  break;
1837 
1838  case HA_ADMIN_REJECT:
1839  session->getClient()->store(STRING_WITH_LEN("status"));
1840  session->getClient()->store(STRING_WITH_LEN("Operation need committed state"));
1841  open_for_modify= false;
1842  break;
1843 
1844  case HA_ADMIN_ALREADY_DONE:
1845  session->getClient()->store(STRING_WITH_LEN("status"));
1846  session->getClient()->store(STRING_WITH_LEN("Table is already up to date"));
1847  break;
1848 
1849  case HA_ADMIN_CORRUPT:
1850  session->getClient()->store(STRING_WITH_LEN("error"));
1851  session->getClient()->store(STRING_WITH_LEN("Corrupt"));
1852  fatal_error=1;
1853  break;
1854 
1855  case HA_ADMIN_INVALID:
1856  session->getClient()->store(STRING_WITH_LEN("error"));
1857  session->getClient()->store(STRING_WITH_LEN("Invalid argument"));
1858  break;
1859 
1860  default: // Probably HA_ADMIN_INTERNAL_ERROR
1861  {
1862  char buf[ERRMSGSIZE+20];
1863  uint32_t length=snprintf(buf, ERRMSGSIZE,
1864  _("Unknown - internal error %d during operation"),
1865  result_code);
1866  session->getClient()->store(STRING_WITH_LEN("error"));
1867  session->getClient()->store(buf, length);
1868  fatal_error=1;
1869  break;
1870  }
1871  }
1872  if (table->table)
1873  {
1874  if (fatal_error)
1875  {
1876  table->table->getMutableShare()->resetVersion(); // Force close of table
1877  }
1878  else if (open_for_modify)
1879  {
1880  if (table->table->getShare()->getType())
1881  {
1882  table->table->cursor->info(HA_STATUS_CONST);
1883  }
1884  else
1885  {
1886  boost::unique_lock<boost::mutex> lock(table::Cache::mutex());
1887  identifier::Table identifier(table->table->getShare()->getSchemaName(), table->table->getShare()->getTableName());
1888  table::Cache::removeTable(*session, identifier, RTFC_NO_FLAG);
1889  }
1890  }
1891  }
1892  TransactionServices::autocommitOrRollback(*session, false);
1893  session->endTransaction(COMMIT);
1894  session->close_thread_tables();
1895  table->table=0; // For query cache
1896  if (session->getClient()->flush())
1897  goto err;
1898  }
1899 
1900  session->my_eof();
1901  return false;
1902 
1903 err:
1904  TransactionServices::autocommitOrRollback(*session, true);
1905  session->endTransaction(ROLLBACK);
1906  session->close_thread_tables(); // Shouldn't be needed
1907  if (table)
1908  table->table=0;
1909  return true;
1910 }
1911 
1912  /*
1913  Create a new table by copying from source table
1914 
1915  Altough exclusive name-lock on target table protects us from concurrent
1916  DML and DDL operations on it we still want to wrap .FRM creation and call
1917  to plugin::StorageEngine::createTable() in critical section protected by
1918  table::Cache::mutex() in order to provide minimal atomicity against operations which
1919  disregard name-locks, like I_S implementation, for example. This is a
1920  temporary and should not be copied. Instead we should fix our code to
1921  always honor name-locks.
1922 
1923  Also some engines (e.g. NDB cluster) require that table::Cache::mutex() should be held
1924  during the call to plugin::StorageEngine::createTable().
1925  See bug #28614 for more info.
1926  */
1927 static bool create_table_wrapper(Session &session,
1928  const message::Table& create_table_proto,
1929  const identifier::Table& destination_identifier,
1930  const identifier::Table& source_identifier,
1931  bool is_engine_set)
1932 {
1933  // We require an additional table message because during parsing we used
1934  // a "new" message and it will not have all of the information that the
1935  // source table message would have.
1936  message::Table new_table_message;
1937 
1938  message::table::shared_ptr source_table_message= plugin::StorageEngine::getTableMessage(session, source_identifier);
1939 
1940  if (not source_table_message)
1941  {
1942  my_error(ER_TABLE_UNKNOWN, source_identifier);
1943  return false;
1944  }
1945 
1946  new_table_message.CopyFrom(*source_table_message);
1947  new_table_message.set_type(destination_identifier.isTmp() ? message::Table::TEMPORARY : message::Table::STANDARD);
1948 
1949  if (is_engine_set)
1950  {
1951  new_table_message.mutable_engine()->set_name(create_table_proto.engine().name());
1952  }
1953 
1954  { // We now do a selective copy of elements on to the new table.
1955  new_table_message.set_name(create_table_proto.name());
1956  new_table_message.set_schema(create_table_proto.schema());
1957  new_table_message.set_catalog(create_table_proto.catalog());
1958  }
1959 
1960  /* Fix names of foreign keys being added */
1961  for (int32_t j= 0; j < new_table_message.fk_constraint_size(); j++)
1962  {
1963  if (new_table_message.fk_constraint(j).has_name())
1964  {
1965  std::string name(new_table_message.name());
1966  char number[20];
1967 
1968  name.append("_ibfk_");
1969  snprintf(number, sizeof(number), "%d", j+1);
1970  name.append(number);
1971 
1972  message::Table::ForeignKeyConstraint *pfkey= new_table_message.mutable_fk_constraint(j);
1973  pfkey->set_name(name);
1974  }
1975  }
1976 
1977  /*
1978  As mysql_truncate don't work on a new table at this stage of
1979  creation, instead create the table directly (for both normal and temporary tables).
1980  */
1981  bool success= plugin::StorageEngine::createTable(session, destination_identifier, new_table_message);
1982 
1983  if (success && not destination_identifier.isTmp())
1984  {
1985  TransactionServices::createTable(session, new_table_message);
1986  }
1987 
1988  return success;
1989 }
1990 
1991 /*
1992  Create a table identical to the specified table
1993 
1994  SYNOPSIS
1995  create_like_table()
1996  session Thread object
1997  table Table list element for target table
1998  src_table Table list element for source table
1999  create_info Create info
2000 
2001  RETURN VALUES
2002  false OK
2003  true error
2004 */
2005 
2006 bool create_like_table(Session* session,
2007  const identifier::Table& destination_identifier,
2008  const identifier::Table& source_identifier,
2009  message::Table &create_table_proto,
2010  bool is_if_not_exists,
2011  bool is_engine_set)
2012 {
2013  bool res= true;
2014  bool table_exists= false;
2015 
2016  /*
2017  Check that destination tables does not exist. Note that its name
2018  was already checked when it was added to the table list.
2019 
2020  For temporary tables we don't aim to grab locks.
2021  */
2022  if (destination_identifier.isTmp())
2023  {
2024  if (session->open_tables.find_temporary_table(destination_identifier))
2025  {
2026  table_exists= true;
2027  }
2028  else
2029  {
2030  bool was_created= create_table_wrapper(*session,
2031  create_table_proto,
2032  destination_identifier,
2033  source_identifier,
2034  is_engine_set);
2035  if (not was_created) // This is pretty paranoid, but we assume something might not clean up after itself
2036  {
2037  (void) session->open_tables.rm_temporary_table(destination_identifier, true);
2038  }
2039  else if (not session->open_temporary_table(destination_identifier))
2040  {
2041  // We created, but we can't open... also, a hack.
2042  (void) session->open_tables.rm_temporary_table(destination_identifier, true);
2043  }
2044  else
2045  {
2046  res= false;
2047  }
2048  }
2049  }
2050  else // Standard table which will require locks.
2051  {
2052  Table *name_lock= session->lock_table_name_if_not_cached(destination_identifier);
2053  if (not name_lock)
2054  {
2055  table_exists= true;
2056  }
2057  else if (plugin::StorageEngine::doesTableExist(*session, destination_identifier))
2058  {
2059  table_exists= true;
2060  }
2061  else // Otherwise we create the table
2062  {
2063  bool was_created;
2064  {
2065  boost::mutex::scoped_lock lock(table::Cache::mutex()); /* We lock for CREATE TABLE LIKE to copy table definition */
2066  was_created= create_table_wrapper(*session, create_table_proto, destination_identifier,
2067  source_identifier, is_engine_set);
2068  }
2069 
2070  // So we blew the creation of the table, and we scramble to clean up
2071  // anything that might have been created (read... it is a hack)
2072  if (not was_created)
2073  {
2074  plugin::StorageEngine::dropTable(*session, destination_identifier);
2075  }
2076  else
2077  {
2078  res= false;
2079  }
2080  }
2081 
2082  if (name_lock)
2083  {
2084  boost::mutex::scoped_lock lock(table::Cache::mutex()); /* unlink open tables for create table like*/
2085  session->unlink_open_table(name_lock);
2086  }
2087  }
2088 
2089  if (table_exists)
2090  {
2091  if (is_if_not_exists)
2092  {
2093  char warn_buff[DRIZZLE_ERRMSG_SIZE];
2094  snprintf(warn_buff, sizeof(warn_buff),
2095  ER(ER_TABLE_EXISTS_ERROR), destination_identifier.getTableName().c_str());
2096  push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
2097  ER_TABLE_EXISTS_ERROR, warn_buff);
2098  return false;
2099  }
2100 
2101  my_error(ER_TABLE_EXISTS_ERROR, destination_identifier);
2102 
2103  return true;
2104  }
2105 
2106  return res;
2107 }
2108 
2109 
2110 bool analyze_table(Session* session, TableList* tables)
2111 {
2112  return admin_table(session, tables, "analyze", TL_READ_NO_INSERT, true, &Cursor::ha_analyze);
2113 }
2114 
2115 
2116 bool check_table(Session* session, TableList* tables)
2117 {
2118  return admin_table(session, tables, "check", TL_READ_NO_INSERT, false, &Cursor::ha_check);
2119 }
2120 
2121 } /* namespace drizzled */