Drizzled Public API Documentation

alter_table.cc
1 /* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3  *
4  * Copyright (C) 2009 Sun Microsystems, Inc.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 #include <config.h>
22 
23 #include <fcntl.h>
24 
25 #include <sstream>
26 
27 #include <drizzled/show.h>
28 #include <drizzled/lock.h>
29 #include <drizzled/session.h>
30 #include <drizzled/statement/alter_table.h>
31 #include <drizzled/charset.h>
32 #include <drizzled/gettext.h>
33 #include <drizzled/data_home.h>
34 #include <drizzled/sql_table.h>
35 #include <drizzled/table_proto.h>
36 #include <drizzled/optimizer/range.h>
37 #include <drizzled/time_functions.h>
38 #include <drizzled/records.h>
39 #include <drizzled/pthread_globals.h>
40 #include <drizzled/internal/my_sys.h>
41 #include <drizzled/internal/iocache.h>
42 #include <drizzled/plugin/storage_engine.h>
43 #include <drizzled/copy_field.h>
44 #include <drizzled/transaction_services.h>
45 #include <drizzled/filesort.h>
46 #include <drizzled/message.h>
47 #include <drizzled/message/alter_table.pb.h>
48 #include <drizzled/alter_column.h>
49 #include <drizzled/alter_info.h>
50 #include <drizzled/util/test.h>
51 #include <drizzled/open_tables_state.h>
52 #include <drizzled/table/cache.h>
53 #include <drizzled/create_field.h>
54 
55 using namespace std;
56 
57 namespace drizzled {
58 
59 extern pid_t current_pid;
60 
61 static int copy_data_between_tables(Session *session,
62  Table *from,Table *to,
63  List<CreateField> &create,
64  bool ignore,
65  uint32_t order_num,
66  Order *order,
67  ha_rows *copied,
68  ha_rows *deleted,
69  message::AlterTable &alter_table_message,
70  bool error_if_not_empty);
71 
72 static bool prepare_alter_table(Session *session,
73  Table *table,
74  HA_CREATE_INFO *create_info,
75  const message::Table &original_proto,
76  message::Table &table_message,
77  message::AlterTable &alter_table_message,
78  AlterInfo *alter_info);
79 
80 static Table *open_alter_table(Session *session, Table *table, identifier::Table &identifier);
81 
82 static int apply_online_alter_keys_onoff(Session *session,
83  Table* table,
84  const message::AlterTable::AlterKeysOnOff &op);
85 
86 static int apply_online_rename_table(Session *session,
87  Table *table,
88  plugin::StorageEngine *original_engine,
89  identifier::Table &original_table_identifier,
90  identifier::Table &new_table_identifier,
91  const message::AlterTable::RenameTable &alter_operation);
92 
93 namespace statement {
94 
95 AlterTable::AlterTable(Session *in_session, Table_ident *) :
96  CreateTable(in_session)
97 {
98  set_command(SQLCOM_ALTER_TABLE);
99 }
100 
101 } // namespace statement
102 
104 {
105  TableList *first_table= (TableList *) lex().select_lex.table_list.first;
106  TableList *all_tables= lex().query_tables;
107  assert(first_table == all_tables && first_table != 0);
108  Select_Lex *select_lex= &lex().select_lex;
109 
110  is_engine_set= not createTableMessage().engine().name().empty();
111 
112  if (is_engine_set)
113  {
114  create_info().db_type=
115  plugin::StorageEngine::findByName(session(), createTableMessage().engine().name());
116 
117  if (create_info().db_type == NULL)
118  {
119  my_error(createTableMessage().engine().name(), ER_UNKNOWN_STORAGE_ENGINE, MYF(0));
120 
121  return true;
122  }
123  }
124 
125  /* Must be set in the parser */
126  assert(select_lex->db);
127 
128  /* Chicken/Egg... we need to search for the table, to know if the table exists, so we can build a full identifier from it */
129  message::table::shared_ptr original_table_message;
130  {
131  identifier::Table identifier(first_table->getSchemaName(), first_table->getTableName());
132  if (not (original_table_message= plugin::StorageEngine::getTableMessage(session(), identifier)))
133  {
134  my_error(ER_BAD_TABLE_ERROR, identifier);
135  return true;
136  }
137 
138  if (not create_info().db_type)
139  {
140  create_info().db_type=
141  plugin::StorageEngine::findByName(session(), original_table_message->engine().name());
142 
143  if (not create_info().db_type)
144  {
145  my_error(ER_BAD_TABLE_ERROR, identifier);
146  return true;
147  }
148  }
149  }
150 
151  if (not validateCreateTableOption())
152  return true;
153 
154  if (session().inTransaction())
155  {
156  my_error(ER_TRANSACTIONAL_DDL_NOT_SUPPORTED, MYF(0));
157  return true;
158  }
159 
160  if (session().wait_if_global_read_lock(0, 1))
161  return true;
162 
163  bool res;
164  if (original_table_message->type() == message::Table::STANDARD )
165  {
166  identifier::Table identifier(first_table->getSchemaName(), first_table->getTableName());
167  identifier::Table new_identifier(select_lex->db ? select_lex->db : first_table->getSchemaName(),
168  lex().name.data() ? lex().name.data() : first_table->getTableName());
169 
170  res= alter_table(&session(),
171  identifier,
172  new_identifier,
173  &create_info(),
174  *original_table_message,
175  createTableMessage(),
176  first_table,
177  &alter_info,
178  select_lex->order_list.size(),
179  (Order *) select_lex->order_list.first,
180  lex().ignore);
181  }
182  else
183  {
184  identifier::Table catch22(first_table->getSchemaName(), first_table->getTableName());
185  Table *table= session().open_tables.find_temporary_table(catch22);
186  assert(table);
187  {
188  identifier::Table identifier(first_table->getSchemaName(), first_table->getTableName(), table->getMutableShare()->getPath());
189  identifier::Table new_identifier(select_lex->db ? select_lex->db : first_table->getSchemaName(),
190  lex().name.data() ? lex().name.data() : first_table->getTableName(),
191  table->getMutableShare()->getPath());
192 
193  res= alter_table(&session(),
194  identifier,
195  new_identifier,
196  &create_info(),
197  *original_table_message,
198  createTableMessage(),
199  first_table,
200  &alter_info,
201  select_lex->order_list.size(),
202  (Order *) select_lex->order_list.first,
203  lex().ignore);
204  }
205  }
206 
207  /*
208  Release the protection against the global read lock and wake
209  everyone, who might want to set a global read lock.
210  */
211  session().startWaitingGlobalReadLock();
212 
213  return res;
214 }
215 
216 
257 static bool prepare_alter_table(Session *session,
258  Table *table,
259  HA_CREATE_INFO *create_info,
260  const message::Table &original_proto,
261  message::Table &table_message,
262  message::AlterTable &alter_table_message,
263  AlterInfo *alter_info)
264 {
265  uint32_t used_fields= create_info->used_fields;
266  vector<string> drop_keys;
267  vector<string> drop_columns;
268  vector<string> drop_fkeys;
269 
270  /* we use drop_(keys|columns|fkey) below to check that we can do all
271  operations we've been asked to do */
272  for (int operationnr= 0; operationnr < alter_table_message.operations_size();
273  operationnr++)
274  {
276  alter_table_message.operations(operationnr);
277 
278  switch (operation.operation())
279  {
280  case message::AlterTable::AlterTableOperation::DROP_KEY:
281  drop_keys.push_back(operation.drop_name());
282  break;
283  case message::AlterTable::AlterTableOperation::DROP_COLUMN:
284  drop_columns.push_back(operation.drop_name());
285  break;
286  case message::AlterTable::AlterTableOperation::DROP_FOREIGN_KEY:
287  drop_fkeys.push_back(operation.drop_name());
288  break;
289  default:
290  break;
291  }
292  }
293 
294  /* Let new create options override the old ones */
295  message::Table::TableOptions *table_options= table_message.mutable_options();
296 
297  if (not (used_fields & HA_CREATE_USED_DEFAULT_CHARSET))
298  create_info->default_table_charset= table->getShare()->table_charset;
299 
300  if (not (used_fields & HA_CREATE_USED_AUTO) && table->found_next_number_field)
301  {
302  /* Table has an autoincrement, copy value to new table */
303  table->cursor->info(HA_STATUS_AUTO);
304  create_info->auto_increment_value= table->cursor->stats.auto_increment_value;
305  if (create_info->auto_increment_value != original_proto.options().auto_increment_value())
306  table_options->set_has_user_set_auto_increment_value(false);
307  }
308 
309  table->restoreRecordAsDefault(); /* Empty record for DEFAULT */
310 
311  List<CreateField> new_create_list;
312  List<Key> new_key_list;
313  /* First collect all fields from table which isn't in drop_list */
314  Field *field;
315  for (Field **f_ptr= table->getFields(); (field= *f_ptr); f_ptr++)
316  {
317  /* Check if field should be dropped */
318  vector<string>::iterator it= drop_columns.begin();
319  while (it != drop_columns.end())
320  {
321  if (not system_charset_info->strcasecmp(field->field_name, it->c_str()))
322  {
323  /* Reset auto_increment value if it was dropped */
324  if (MTYP_TYPENR(field->unireg_check) == Field::NEXT_NUMBER &&
325  not (used_fields & HA_CREATE_USED_AUTO))
326  {
327  create_info->auto_increment_value= 0;
328  create_info->used_fields|= HA_CREATE_USED_AUTO;
329  }
330  break;
331  }
332  it++;
333  }
334 
335  if (it != drop_columns.end())
336  {
337  drop_columns.erase(it);
338  continue;
339  }
340 
341  /* Mark that we will read the field */
342  field->setReadSet();
343 
344  CreateField *def;
345  /* Check if field is changed */
346  List<CreateField>::iterator def_it= alter_info->create_list.begin();
347  while ((def= def_it++))
348  {
349  if (def->change &&
350  ! system_charset_info->strcasecmp(field->field_name, def->change))
351  break;
352  }
353 
354  if (def)
355  {
356  /* Field is changed */
357  def->field= field;
358  if (! def->after)
359  {
360  new_create_list.push_back(def);
361  def_it.remove();
362  }
363  }
364  else
365  {
366  /*
367  This field was not dropped and not changed, add it to the list
368  for the new table.
369  */
370  def= new CreateField(field, field);
371  new_create_list.push_back(def);
372  AlterInfo::alter_list_t::iterator alter(alter_info->alter_list.begin());
373 
374  for (; alter != alter_info->alter_list.end(); alter++)
375  {
376  if (not system_charset_info->strcasecmp(field->field_name, alter->name))
377  break;
378  }
379 
380  if (alter != alter_info->alter_list.end())
381  {
382  def->setDefaultValue(alter->def, NULL);
383 
384  alter_info->alter_list.erase(alter);
385  }
386  }
387  }
388 
389  CreateField *def;
390  List<CreateField>::iterator def_it= alter_info->create_list.begin();
391  while ((def= def_it++)) /* Add new columns */
392  {
393  if (def->change && ! def->field)
394  {
395  my_error(ER_BAD_FIELD_ERROR, MYF(0), def->change, table->getMutableShare()->getTableName());
396  return true;
397  }
398  /*
399  If we have been given a field which has no default value, and is not null then we need to bail.
400  */
401  if (not (~def->flags & (NO_DEFAULT_VALUE_FLAG | NOT_NULL_FLAG)) and not def->change)
402  {
403  alter_info->error_if_not_empty= true;
404  }
405  if (! def->after)
406  {
407  new_create_list.push_back(def);
408  }
409  else if (def->after == first_keyword)
410  {
411  new_create_list.push_front(def);
412  }
413  else
414  {
415  CreateField *find;
416  List<CreateField>::iterator find_it= new_create_list.begin();
417 
418  while ((find= find_it++)) /* Add new columns */
419  {
420  if (not system_charset_info->strcasecmp(def->after, find->field_name))
421  break;
422  }
423 
424  if (not find)
425  {
426  my_error(ER_BAD_FIELD_ERROR, MYF(0), def->after, table->getMutableShare()->getTableName());
427  return true;
428  }
429 
430  find_it.after(def); /* Put element after this */
431 
432  /*
433  XXX: hack for Bug#28427.
434  If column order has changed, force OFFLINE ALTER Table
435  without querying engine capabilities. If we ever have an
436  engine that supports online ALTER Table CHANGE COLUMN
437  <name> AFTER <name1> (Falcon?), this fix will effectively
438  disable the capability.
439  TODO: detect the situation in compare_tables, behave based
440  on engine capabilities.
441  */
442  if (alter_table_message.build_method() == message::AlterTable::BUILD_ONLINE)
443  {
444  my_error(*session->getQueryString(), ER_NOT_SUPPORTED_YET);
445  return true;
446  }
447  }
448  }
449 
450  if (not alter_info->alter_list.empty())
451  {
452  my_error(ER_BAD_FIELD_ERROR, MYF(0), alter_info->alter_list.front().name, table->getMutableShare()->getTableName());
453  return true;
454  }
455 
456  if (new_create_list.is_empty())
457  {
458  my_message(ER_CANT_REMOVE_ALL_FIELDS, ER(ER_CANT_REMOVE_ALL_FIELDS), MYF(0));
459  return true;
460  }
461 
462  /*
463  Collect all keys which isn't in drop list. Add only those
464  for which some fields exists.
465  */
466  KeyInfo *key_info= table->key_info;
467  for (uint32_t i= 0; i < table->getShare()->sizeKeys(); i++, key_info++)
468  {
469  const char *key_name= key_info->name;
470 
471  vector<string>::iterator it= drop_keys.begin();
472  while (it != drop_keys.end())
473  {
474  if (not system_charset_info->strcasecmp(key_name, it->c_str()))
475  break;
476  it++;
477  }
478 
479  if (it != drop_keys.end())
480  {
481  drop_keys.erase(it);
482  continue;
483  }
484 
485  KeyPartInfo *key_part= key_info->key_part;
486  List<Key_part_spec> key_parts;
487  for (uint32_t j= 0; j < key_info->key_parts; j++, key_part++)
488  {
489  if (not key_part->field)
490  continue; /* Wrong field (from UNIREG) */
491 
492  const char *key_part_name= key_part->field->field_name;
493  CreateField *cfield;
494  List<CreateField>::iterator field_it= new_create_list.begin();
495  while ((cfield= field_it++))
496  {
497  if (cfield->change)
498  {
499  if (not system_charset_info->strcasecmp(key_part_name, cfield->change))
500  break;
501  }
502  else if (not system_charset_info->strcasecmp(key_part_name, cfield->field_name))
503  break;
504  }
505 
506  if (not cfield)
507  continue; /* Field is removed */
508 
509  uint32_t key_part_length= key_part->length;
510  if (cfield->field) /* Not new field */
511  {
512  /*
513  If the field can't have only a part used in a key according to its
514  new type, or should not be used partially according to its
515  previous type, or the field length is less than the key part
516  length, unset the key part length.
517 
518  We also unset the key part length if it is the same as the
519  old field's length, so the whole new field will be used.
520 
521  BLOBs may have cfield->length == 0, which is why we test it before
522  checking whether cfield->length < key_part_length (in chars).
523  */
524  if (! Field::type_can_have_key_part(cfield->field->type()) ||
525  ! Field::type_can_have_key_part(cfield->sql_type) ||
526  (cfield->field->field_length == key_part_length) ||
527  (cfield->length &&
528  (cfield->length < key_part_length / key_part->field->charset()->mbmaxlen)))
529  key_part_length= 0; /* Use whole field */
530  }
531  key_part_length/= key_part->field->charset()->mbmaxlen;
532  key_parts.push_back(new Key_part_spec(str_ref(cfield->field_name), key_part_length));
533  }
534  if (key_parts.size())
535  {
536  KEY_CREATE_INFO key_create_info= default_key_create_info;
537  key_create_info.algorithm= key_info->algorithm;
538 
539  if (key_info->flags & HA_USES_BLOCK_SIZE)
540  key_create_info.block_size= key_info->block_size;
541 
542  if (key_info->flags & HA_USES_COMMENT)
543  key_create_info.comment= key_info->comment;
544 
545  Key::Keytype key_type= key_info->flags & HA_NOSAME
546  ? (is_primary_key(key_name) ? Key::PRIMARY : Key::UNIQUE)
547  : Key::MULTIPLE;
548  new_key_list.push_back(new Key(key_type, str_ref(key_name), &key_create_info, test(key_info->flags & HA_GENERATED_KEY), key_parts));
549  }
550  }
551 
552  /* Copy over existing foreign keys */
553  for (int32_t j= 0; j < original_proto.fk_constraint_size(); j++)
554  {
555  vector<string>::iterator it= drop_fkeys.begin();
556  while (it != drop_fkeys.end())
557  {
558  if (! system_charset_info->strcasecmp(original_proto.fk_constraint(j).name().c_str(), it->c_str()))
559  {
560  break;
561  }
562  it++;
563  }
564 
565  if (it != drop_fkeys.end())
566  {
567  drop_fkeys.erase(it);
568  continue;
569  }
570 
571  message::Table::ForeignKeyConstraint *pfkey= table_message.add_fk_constraint();
572  *pfkey= original_proto.fk_constraint(j);
573  }
574 
575  {
576  Key *key;
577  List<Key>::iterator key_it(alter_info->key_list.begin());
578  while ((key= key_it++)) /* Add new keys */
579  {
580  if (key->type == Key::FOREIGN_KEY)
581  {
582  if (((Foreign_key *)key)->validate(new_create_list))
583  {
584  return true;
585  }
586 
587  Foreign_key *fkey= (Foreign_key*)key;
588  add_foreign_key_to_table_message(&table_message,
589  fkey->name.data(),
590  fkey->columns,
591  fkey->ref_table,
592  fkey->ref_columns,
593  fkey->delete_opt,
594  fkey->update_opt,
595  fkey->match_opt);
596  }
597 
598  if (key->type != Key::FOREIGN_KEY)
599  new_key_list.push_back(key);
600 
601  if (key->name.data() && is_primary_key(key->name.data()))
602  {
603  my_error(ER_WRONG_NAME_FOR_INDEX, MYF(0), key->name.data());
604  return true;
605  }
606  }
607  }
608 
609  /* Fix names of foreign keys being added */
610  for (int j= 0; j < table_message.fk_constraint_size(); j++)
611  {
612  if (! table_message.fk_constraint(j).has_name())
613  {
614  std::string name(table->getMutableShare()->getTableName());
615  char number[20];
616 
617  name.append("_ibfk_");
618  snprintf(number, sizeof(number), "%d", j+1);
619  name.append(number);
620 
621  message::Table::ForeignKeyConstraint *pfkey= table_message.mutable_fk_constraint(j);
622  pfkey->set_name(name);
623  }
624  }
625 
626  if (drop_keys.size())
627  {
628  my_error(ER_CANT_DROP_FIELD_OR_KEY,
629  MYF(0),
630  drop_keys.front().c_str());
631  return true;
632  }
633 
634  if (drop_columns.size())
635  {
636  my_error(ER_CANT_DROP_FIELD_OR_KEY,
637  MYF(0),
638  drop_columns.front().c_str());
639  return true;
640  }
641 
642  if (drop_fkeys.size())
643  {
644  my_error(ER_CANT_DROP_FIELD_OR_KEY,
645  MYF(0),
646  drop_fkeys.front().c_str());
647  return true;
648  }
649 
650  if (not alter_info->alter_list.empty())
651  {
652  my_error(ER_CANT_DROP_FIELD_OR_KEY,
653  MYF(0),
654  alter_info->alter_list.front().name);
655  return true;
656  }
657 
658  if (not table_message.options().has_comment()
659  && table->getMutableShare()->hasComment())
660  {
661  table_options->set_comment(table->getMutableShare()->getComment());
662  }
663 
664  if (table->getShare()->getType())
665  {
666  table_message.set_type(message::Table::TEMPORARY);
667  }
668 
669  table_message.set_creation_timestamp(table->getShare()->getTableMessage()->creation_timestamp());
670  table_message.set_version(table->getShare()->getTableMessage()->version());
671  table_message.set_uuid(table->getShare()->getTableMessage()->uuid());
672 
673  alter_info->create_list.swap(new_create_list);
674  alter_info->key_list.swap(new_key_list);
675 
676  size_t num_engine_options= table_message.engine().options_size();
677  size_t original_num_engine_options= original_proto.engine().options_size();
678  for (size_t x= 0; x < original_num_engine_options; ++x)
679  {
680  bool found= false;
681 
682  for (size_t y= 0; y < num_engine_options; ++y)
683  {
684  found= not table_message.engine().options(y).name().compare(original_proto.engine().options(x).name());
685 
686  if (found)
687  break;
688  }
689 
690  if (not found)
691  {
692  message::Engine::Option *opt= table_message.mutable_engine()->add_options();
693 
694  opt->set_name(original_proto.engine().options(x).name());
695  opt->set_state(original_proto.engine().options(x).state());
696  }
697  }
698 
699  drizzled::message::update(table_message);
700 
701  return false;
702 }
703 
704 /* table_list should contain just one table */
705 static int discard_or_import_tablespace(Session *session,
706  TableList *table_list,
707  bool discard)
708 {
709  /*
710  Note that DISCARD/IMPORT TABLESPACE always is the only operation in an
711  ALTER Table
712  */
713  session->set_proc_info("discard_or_import_tablespace");
714 
715  /*
716  We set this flag so that ha_innobase::open and ::external_lock() do
717  not complain when we lock the table
718  */
719  session->setDoingTablespaceOperation(true);
720  Table* table= session->openTableLock(table_list, TL_WRITE);
721  if (not table)
722  {
723  session->setDoingTablespaceOperation(false);
724  return -1;
725  }
726 
727  int error;
728  do {
729  error= table->cursor->ha_discard_or_import_tablespace(discard);
730 
731  session->set_proc_info("end");
732 
733  if (error)
734  break;
735 
736  /* The ALTER Table is always in its own transaction */
737  error= TransactionServices::autocommitOrRollback(*session, false);
738  if (not session->endActiveTransaction())
739  error= 1;
740 
741  if (error)
742  break;
743 
745  *session->getQueryString(),
746  *session->schema());
747 
748  } while(0);
749 
750  (void) TransactionServices::autocommitOrRollback(*session, error);
751  session->setDoingTablespaceOperation(false);
752 
753  if (error == 0)
754  {
755  session->my_ok();
756  return 0;
757  }
758 
759  table->print_error(error, MYF(0));
760 
761  return -1;
762 }
763 
778 static bool alter_table_manage_keys(Session *session,
779  Table *table, int indexes_were_disabled,
780  const message::AlterTable &alter_table_message)
781 {
782  int error= 0;
783  if (alter_table_message.has_alter_keys_onoff()
784  && alter_table_message.alter_keys_onoff().enable())
785  {
786  error= table->cursor->ha_enable_indexes(HA_KEY_SWITCH_NONUNIQ_SAVE);
787  }
788 
789  if ((! alter_table_message.has_alter_keys_onoff() && indexes_were_disabled)
790  || (alter_table_message.has_alter_keys_onoff()
791  && ! alter_table_message.alter_keys_onoff().enable()))
792  {
793  error= table->cursor->ha_disable_indexes(HA_KEY_SWITCH_NONUNIQ_SAVE);
794  }
795 
796  if (error == HA_ERR_WRONG_COMMAND)
797  {
798  push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
799  ER_ILLEGAL_HA, ER(ER_ILLEGAL_HA),
800  table->getMutableShare()->getTableName());
801  error= 0;
802  }
803  else if (error)
804  {
805  table->print_error(error, MYF(0));
806  }
807 
808  return(error);
809 }
810 
811 static bool lockTableIfDifferent(Session &session,
812  identifier::Table &original_table_identifier,
813  identifier::Table &new_table_identifier,
814  Table *name_lock)
815 {
816  /* Check that we are not trying to rename to an existing table */
817  if (not (original_table_identifier == new_table_identifier))
818  {
819  if (original_table_identifier.isTmp())
820  {
821  if (session.open_tables.find_temporary_table(new_table_identifier))
822  {
823  my_error(ER_TABLE_EXISTS_ERROR, new_table_identifier);
824  return false;
825  }
826  }
827  else
828  {
829  name_lock= session.lock_table_name_if_not_cached(new_table_identifier);
830  if (not name_lock)
831  {
832  my_error(ER_TABLE_EXISTS_ERROR, new_table_identifier);
833  return false;
834  }
835 
836  if (plugin::StorageEngine::doesTableExist(session, new_table_identifier))
837  {
838  /* Table will be closed by Session::executeCommand() */
839  my_error(ER_TABLE_EXISTS_ERROR, new_table_identifier);
840 
841  {
842  boost::mutex::scoped_lock scopedLock(table::Cache::mutex());
843  session.unlink_open_table(name_lock);
844  }
845 
846  return false;
847  }
848  }
849  }
850 
851  return true;
852 }
853 
896 static bool internal_alter_table(Session *session,
897  Table *table,
898  identifier::Table &original_table_identifier,
899  identifier::Table &new_table_identifier,
900  HA_CREATE_INFO *create_info,
901  const message::Table &original_proto,
902  message::Table &create_proto,
903  message::AlterTable &alter_table_message,
904  TableList *table_list,
905  AlterInfo *alter_info,
906  uint32_t order_num,
907  Order *order,
908  bool ignore)
909 {
910  int error= 0;
911  char tmp_name[80];
912  char old_name[32];
913  ha_rows copied= 0;
914  ha_rows deleted= 0;
915 
916  if (not original_table_identifier.isValid())
917  return true;
918 
919  if (not new_table_identifier.isValid())
920  return true;
921 
922  session->set_proc_info("init");
923 
924  table->use_all_columns();
925 
926  plugin::StorageEngine *new_engine;
927  plugin::StorageEngine *original_engine;
928 
929  original_engine= table->getMutableShare()->getEngine();
930 
931  if (not create_info->db_type)
932  {
933  create_info->db_type= original_engine;
934  }
935  new_engine= create_info->db_type;
936 
937 
938  create_proto.set_schema(new_table_identifier.getSchemaName());
939  create_proto.set_type(new_table_identifier.getType());
940 
945  if (new_engine != original_engine &&
946  not table->cursor->can_switch_engines())
947  {
948  assert(0);
949  my_error(ER_ROW_IS_REFERENCED, MYF(0));
950 
951  return true;
952  }
953 
954  if (original_engine->check_flag(HTON_BIT_ALTER_NOT_SUPPORTED) ||
955  new_engine->check_flag(HTON_BIT_ALTER_NOT_SUPPORTED))
956  {
957  my_error(ER_ILLEGAL_HA, new_table_identifier);
958 
959  return true;
960  }
961 
962  session->set_proc_info("setup");
963 
964  /* First we try for operations that do not require a copying
965  ALTER TABLE.
966 
967  In future there should be more operations, currently it's just a couple.
968  */
969 
970  if ((alter_table_message.has_rename()
971  || alter_table_message.has_alter_keys_onoff())
972  && alter_table_message.operations_size() == 0)
973  {
974  /*
975  * test if no other bits except ALTER_RENAME and ALTER_KEYS_ONOFF are set
976  */
977  bitset<32> tmp;
978 
979  tmp.set();
980  tmp.reset(ALTER_RENAME);
981  tmp.reset(ALTER_KEYS_ONOFF);
982  tmp&= alter_info->flags;
983 
984  if((not (tmp.any()) && not table->getShare()->getType())) // no need to touch frm
985  {
986  if (alter_table_message.has_alter_keys_onoff())
987  {
988  error= apply_online_alter_keys_onoff(session, table,
989  alter_table_message.alter_keys_onoff());
990 
991  if (error == HA_ERR_WRONG_COMMAND)
992  {
993  error= EE_OK;
994  push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
995  ER_ILLEGAL_HA, ER(ER_ILLEGAL_HA),
996  table->getAlias());
997  }
998  }
999 
1000  if (alter_table_message.has_rename())
1001  {
1002  error= apply_online_rename_table(session,
1003  table,
1004  original_engine,
1005  original_table_identifier,
1006  new_table_identifier,
1007  alter_table_message.rename());
1008 
1009  if (error == HA_ERR_WRONG_COMMAND)
1010  {
1011  error= EE_OK;
1012  push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1013  ER_ILLEGAL_HA, ER(ER_ILLEGAL_HA),
1014  table->getAlias());
1015  }
1016  }
1017 
1018  if (not error)
1019  {
1020  TransactionServices::allocateNewTransactionId();
1022  *session->getQueryString(),
1023  *session->schema());
1024  session->my_ok();
1025  }
1026  else if (error > EE_OK) // If we have already set the error, we pass along -1
1027  {
1028  table->print_error(error, MYF(0));
1029  }
1030 
1031  table_list->table= NULL;
1032 
1033  return error;
1034  }
1035  }
1036 
1037  if (alter_table_message.build_method() == message::AlterTable::BUILD_ONLINE)
1038  {
1039  my_error(*session->getQueryString(), ER_NOT_SUPPORTED_YET);
1040  return true;
1041  }
1042 
1043  /* We have to do full alter table. */
1044  new_engine= create_info->db_type;
1045 
1046  if (prepare_alter_table(session, table, create_info, original_proto, create_proto, alter_table_message, alter_info))
1047  {
1048  return true;
1049  }
1050 
1051  set_table_default_charset(create_info, new_table_identifier.getSchemaName().c_str());
1052 
1053  snprintf(tmp_name, sizeof(tmp_name), "%s-%lx_%"PRIx64, TMP_FILE_PREFIX, (unsigned long) current_pid, session->thread_id);
1054 
1055  /* Create a temporary table with the new format */
1061  identifier::Table new_table_as_temporary(original_table_identifier.getSchemaName(),
1062  tmp_name,
1063  create_proto.type() != message::Table::TEMPORARY ? message::Table::INTERNAL :
1064  message::Table::TEMPORARY);
1065 
1066  /*
1067  Create a table with a temporary name.
1068  We don't log the statement, it will be logged later.
1069  */
1070  create_proto.set_name(new_table_as_temporary.getTableName());
1071  create_proto.mutable_engine()->set_name(create_info->db_type->getName());
1072 
1073  error= create_table(session,
1074  new_table_as_temporary,
1075  create_info, create_proto, alter_info, true, 0, false);
1076 
1077  if (error != 0)
1078  {
1079  return true;
1080  }
1081 
1082  /* Open the table so we need to copy the data to it. */
1083  Table *new_table= open_alter_table(session, table, new_table_as_temporary);
1084 
1085 
1086  if (not new_table)
1087  {
1088  plugin::StorageEngine::dropTable(*session, new_table_as_temporary);
1089  return true;
1090  }
1091 
1092  /* Copy the data if necessary. */
1093  {
1094  /* We must not ignore bad input! */
1095  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL; // calc cuted fields
1096  session->cuted_fields= 0L;
1097  session->set_proc_info("copy to tmp table");
1098  copied= deleted= 0;
1099 
1100  /* We don't want update TIMESTAMP fields during ALTER Table. */
1101  new_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
1102  new_table->next_number_field= new_table->found_next_number_field;
1103  error= copy_data_between_tables(session,
1104  table,
1105  new_table,
1106  alter_info->create_list,
1107  ignore,
1108  order_num,
1109  order,
1110  &copied,
1111  &deleted,
1112  alter_table_message,
1113  alter_info->error_if_not_empty);
1114 
1115  /* We must not ignore bad input! */
1116  assert(session->count_cuted_fields == CHECK_FIELD_ERROR_FOR_NULL);
1117  }
1118 
1119  /* Now we need to resolve what just happened with the data copy. */
1120 
1121  if (error)
1122  {
1123 
1124  /*
1125  No default value was provided for new fields.
1126  */
1127  if (alter_info->error_if_not_empty && session->row_count)
1128  {
1129  my_error(ER_INVALID_ALTER_TABLE_FOR_NOT_NULL, MYF(0));
1130  }
1131 
1132  if (original_table_identifier.isTmp())
1133  {
1134  if (new_table)
1135  {
1136  /* close_temporary_table() frees the new_table pointer. */
1137  session->open_tables.close_temporary_table(new_table);
1138  }
1139  else
1140  {
1141  plugin::StorageEngine::dropTable(*session, new_table_as_temporary);
1142  }
1143 
1144  return true;
1145  }
1146  else
1147  {
1148  if (new_table)
1149  {
1150  /*
1151  Close the intermediate table that will be the new table.
1152  Note that MERGE tables do not have their children attached here.
1153  */
1154  new_table->intern_close_table();
1155  if (new_table->hasShare())
1156  {
1157  delete new_table->getMutableShare();
1158  }
1159 
1160  delete new_table;
1161  }
1162 
1163  boost::mutex::scoped_lock scopedLock(table::Cache::mutex());
1164 
1165  plugin::StorageEngine::dropTable(*session, new_table_as_temporary);
1166 
1167  return true;
1168  }
1169  }
1170  // Temporary table and success
1171  else if (original_table_identifier.isTmp())
1172  {
1173  /* Close lock if this is a transactional table */
1174  if (session->open_tables.lock)
1175  {
1176  session->unlockTables(session->open_tables.lock);
1177  session->open_tables.lock= 0;
1178  }
1179 
1180  /* Remove link to old table and rename the new one */
1181  session->open_tables.close_temporary_table(table);
1182 
1183  /* Should pass the 'new_name' as we store table name in the cache */
1184  new_table->getMutableShare()->setIdentifier(new_table_identifier);
1185 
1186  new_table_identifier.setPath(new_table_as_temporary.getPath());
1187 
1188  if (rename_table(*session, new_engine, new_table_as_temporary, new_table_identifier) != 0)
1189  {
1190  return true;
1191  }
1192  }
1193  // Normal table success
1194  else
1195  {
1196  if (new_table)
1197  {
1198  /*
1199  Close the intermediate table that will be the new table.
1200  Note that MERGE tables do not have their children attached here.
1201  */
1202  new_table->intern_close_table();
1203 
1204  if (new_table->hasShare())
1205  {
1206  delete new_table->getMutableShare();
1207  }
1208 
1209  delete new_table;
1210  }
1211 
1212  {
1213  boost::mutex::scoped_lock scopedLock(table::Cache::mutex()); /* ALTER TABLE */
1214  /*
1215  Data is copied. Now we:
1216  1) Wait until all other threads close old version of table.
1217  2) Close instances of table open by this thread and replace them
1218  with exclusive name-locks.
1219  3) Rename the old table to a temp name, rename the new one to the
1220  old name.
1221  4) If we are under LOCK TABLES and don't do ALTER Table ... RENAME
1222  we reopen new version of table.
1223  5) Write statement to the binary log.
1224  6) If we are under LOCK TABLES and do ALTER Table ... RENAME we
1225  remove name-locks from list of open tables and table cache.
1226  7) If we are not not under LOCK TABLES we rely on close_thread_tables()
1227  call to remove name-locks from table cache and list of open table.
1228  */
1229 
1230  session->set_proc_info("rename result table");
1231 
1232  snprintf(old_name, sizeof(old_name), "%s2-%lx-%"PRIx64, TMP_FILE_PREFIX, (unsigned long) current_pid, session->thread_id);
1233 
1234  files_charset_info->casedn_str(old_name);
1235 
1236  wait_while_table_is_used(session, table, HA_EXTRA_PREPARE_FOR_RENAME);
1237  session->close_data_files_and_morph_locks(original_table_identifier);
1238 
1239  assert(not error);
1240 
1241  /*
1242  This leads to the storage engine (SE) not being notified for renames in
1243  rename_table(), because we just juggle with the FRM and nothing
1244  more. If we have an intermediate table, then we notify the SE that
1245  it should become the actual table. Later, we will recycle the old table.
1246  However, in case of ALTER Table RENAME there might be no intermediate
1247  table. This is when the old and new tables are compatible, according to
1248  compare_table(). Then, we need one additional call to
1249  */
1250  identifier::Table original_table_to_drop(original_table_identifier.getSchemaName(),
1251  old_name, create_proto.type() != message::Table::TEMPORARY ? message::Table::INTERNAL :
1252  message::Table::TEMPORARY);
1253 
1254  drizzled::error_t rename_error= EE_OK;
1255  if (rename_table(*session, original_engine, original_table_identifier, original_table_to_drop))
1256  {
1257  error= ER_ERROR_ON_RENAME;
1258  plugin::StorageEngine::dropTable(*session, new_table_as_temporary);
1259  }
1260  else
1261  {
1262  if (rename_table(*session, new_engine, new_table_as_temporary, new_table_identifier) != 0)
1263  {
1264  /* Try to get everything back. */
1265  rename_error= ER_ERROR_ON_RENAME;
1266 
1267  plugin::StorageEngine::dropTable(*session, new_table_identifier);
1268 
1269  plugin::StorageEngine::dropTable(*session, new_table_as_temporary);
1270 
1271  rename_table(*session, original_engine, original_table_to_drop, original_table_identifier);
1272  }
1273  else
1274  {
1275  plugin::StorageEngine::dropTable(*session, original_table_to_drop);
1276  }
1277  }
1278 
1279  if (rename_error)
1280  {
1281  /*
1282  An error happened while we were holding exclusive name-lock on table
1283  being altered. To be safe under LOCK TABLES we should remove placeholders
1284  from list of open tables list and table cache.
1285  */
1286  session->unlink_open_table(table);
1287 
1288  return true;
1289  }
1290  }
1291 
1292  session->set_proc_info("end");
1293 
1295  *session->getQueryString(),
1296  *session->schema());
1297  table_list->table= NULL;
1298  }
1299 
1300  /*
1301  * Field::store() may have called my_error(). If this is
1302  * the case, we must not send an ok packet, since
1303  * Diagnostics_area::is_set() will fail an assert.
1304  */
1305  if (session->is_error())
1306  {
1307  /* my_error() was called. Return true (which means error...) */
1308  return true;
1309  }
1310 
1311  snprintf(tmp_name, sizeof(tmp_name), ER(ER_INSERT_INFO),
1312  (ulong) (copied + deleted), (ulong) deleted,
1313  (ulong) session->cuted_fields);
1314  session->my_ok(copied + deleted, 0, 0L, tmp_name);
1315  return false;
1316 }
1317 
1318 static int apply_online_alter_keys_onoff(Session *session,
1319  Table* table,
1320  const message::AlterTable::AlterKeysOnOff &op)
1321 {
1322  int error= 0;
1323 
1324  if (op.enable())
1325  {
1326  /*
1327  wait_while_table_is_used() ensures that table being altered is
1328  opened only by this thread and that Table::TableShare::version
1329  of Table object corresponding to this table is 0.
1330  The latter guarantees that no DML statement will open this table
1331  until ALTER Table finishes (i.e. until close_thread_tables())
1332  while the fact that the table is still open gives us protection
1333  from concurrent DDL statements.
1334  */
1335  {
1336  boost::mutex::scoped_lock scopedLock(table::Cache::mutex()); /* DDL wait for/blocker */
1337  wait_while_table_is_used(session, table, HA_EXTRA_FORCE_REOPEN);
1338  }
1339  error= table->cursor->ha_enable_indexes(HA_KEY_SWITCH_NONUNIQ_SAVE);
1340 
1341  /* COND_refresh will be signaled in close_thread_tables() */
1342  }
1343  else
1344  {
1345  {
1346  boost::mutex::scoped_lock scopedLock(table::Cache::mutex()); /* DDL wait for/blocker */
1347  wait_while_table_is_used(session, table, HA_EXTRA_FORCE_REOPEN);
1348  }
1349  error= table->cursor->ha_disable_indexes(HA_KEY_SWITCH_NONUNIQ_SAVE);
1350 
1351  /* COND_refresh will be signaled in close_thread_tables() */
1352  }
1353 
1354  return error;
1355 }
1356 
1357 static int apply_online_rename_table(Session *session,
1358  Table *table,
1359  plugin::StorageEngine *original_engine,
1360  identifier::Table &original_table_identifier,
1361  identifier::Table &new_table_identifier,
1362  const message::AlterTable::RenameTable &alter_operation)
1363 {
1364  int error= 0;
1365 
1366  boost::mutex::scoped_lock scopedLock(table::Cache::mutex()); /* Lock to remove all instances of table from table cache before ALTER */
1367  /*
1368  Unlike to the above case close_cached_table() below will remove ALL
1369  instances of Table from table cache (it will also remove table lock
1370  held by this thread).
1371  */
1372 
1373  if (not (original_table_identifier == new_table_identifier))
1374  {
1375  assert(alter_operation.to_name() == new_table_identifier.getTableName());
1376  assert(alter_operation.to_schema() == new_table_identifier.getSchemaName());
1377 
1378  session->set_proc_info("rename");
1379  /*
1380  Then do a 'simple' rename of the table. First we need to close all
1381  instances of 'source' table.
1382  */
1383  session->close_cached_table(table);
1384  /*
1385  Then, we want check once again that target table does not exist.
1386  Actually the order of these two steps does not matter since
1387  earlier we took name-lock on the target table, so we do them
1388  in this particular order only to be consistent with 5.0, in which
1389  we don't take this name-lock and where this order really matters.
1390  @todo Investigate if we need this access() check at all.
1391  */
1392  if (plugin::StorageEngine::doesTableExist(*session, new_table_identifier))
1393  {
1394  my_error(ER_TABLE_EXISTS_ERROR, new_table_identifier);
1395  error= -1;
1396  }
1397  else
1398  {
1399  if (rename_table(*session, original_engine, original_table_identifier, new_table_identifier))
1400  {
1401  error= -1;
1402  }
1403  }
1404  }
1405  return error;
1406 }
1407 
1408 bool alter_table(Session *session,
1409  identifier::Table &original_table_identifier,
1410  identifier::Table &new_table_identifier,
1411  HA_CREATE_INFO *create_info,
1412  const message::Table &original_proto,
1413  message::Table &create_proto,
1414  TableList *table_list,
1415  AlterInfo *alter_info,
1416  uint32_t order_num,
1417  Order *order,
1418  bool ignore)
1419 {
1420  bool error;
1421  Table *table;
1422  message::AlterTable *alter_table_message= session->lex().alter_table();
1423 
1424  alter_table_message->set_catalog(original_table_identifier.getCatalogName());
1425  alter_table_message->set_schema(original_table_identifier.getSchemaName());
1426  alter_table_message->set_name(original_table_identifier.getTableName());
1427 
1428  if (alter_table_message->operations_size()
1429  && (alter_table_message->operations(0).operation()
1430  == message::AlterTable::AlterTableOperation::DISCARD_TABLESPACE
1431  || alter_table_message->operations(0).operation()
1432  == message::AlterTable::AlterTableOperation::IMPORT_TABLESPACE))
1433  {
1434  bool discard= (alter_table_message->operations(0).operation() ==
1435  message::AlterTable::AlterTableOperation::DISCARD_TABLESPACE);
1436  /* DISCARD/IMPORT TABLESPACE is always alone in an ALTER Table */
1437  return discard_or_import_tablespace(session, table_list, discard);
1438  }
1439 
1440  session->set_proc_info("init");
1441 
1442  if (not (table= session->openTableLock(table_list, TL_WRITE_ALLOW_READ)))
1443  return true;
1444 
1445  session->set_proc_info("gained write lock on table");
1446 
1447  /*
1448  Check that we are not trying to rename to an existing table,
1449  if one existed we get a lock, if we can't we error.
1450  */
1451  {
1452  Table *name_lock= NULL;
1453 
1454  if (not lockTableIfDifferent(*session, original_table_identifier, new_table_identifier, name_lock))
1455  {
1456  return true;
1457  }
1458 
1459  error= internal_alter_table(session,
1460  table,
1461  original_table_identifier,
1462  new_table_identifier,
1463  create_info,
1464  original_proto,
1465  create_proto,
1466  *alter_table_message,
1467  table_list,
1468  alter_info,
1469  order_num,
1470  order,
1471  ignore);
1472 
1473  if (name_lock)
1474  {
1475  boost::mutex::scoped_lock scopedLock(table::Cache::mutex());
1476  session->unlink_open_table(name_lock);
1477  }
1478  }
1479 
1480  return error;
1481 }
1482 /* alter_table */
1483 
1484 static int
1485 copy_data_between_tables(Session *session,
1486  Table *from, Table *to,
1487  List<CreateField> &create,
1488  bool ignore,
1489  uint32_t order_num, Order *order,
1490  ha_rows *copied,
1491  ha_rows *deleted,
1492  message::AlterTable &alter_table_message,
1493  bool error_if_not_empty)
1494 {
1495  int error= 0;
1496  CopyField *copy,*copy_end;
1497  ulong found_count,delete_count;
1498  uint32_t length= 0;
1499  SortField *sortorder;
1500  ReadRecord info;
1501  TableList tables;
1502  List<Item> fields;
1503  List<Item> all_fields;
1504  ha_rows examined_rows;
1505  bool auto_increment_field_copied= 0;
1506  uint64_t prev_insert_id;
1507 
1508  /*
1509  Turn off recovery logging since rollback of an alter table is to
1510  delete the new table so there is no need to log the changes to it.
1511 
1512  This needs to be done before external_lock
1513  */
1514 
1515  /*
1516  * LP Bug #552420
1517  *
1518  * Since open_temporary_table() doesn't invoke lockTables(), we
1519  * don't get the usual automatic call to StorageEngine::startStatement(), so
1520  * we manually call it here...
1521  */
1522  to->getMutableShare()->getEngine()->startStatement(session);
1523 
1524  copy= new CopyField[to->getShare()->sizeFields()];
1525 
1526  if (to->cursor->ha_external_lock(session, F_WRLCK))
1527  return -1;
1528 
1529  /* We need external lock before we can disable/enable keys */
1530  alter_table_manage_keys(session, to, from->cursor->indexes_are_disabled(),
1531  alter_table_message);
1532 
1533  /* We can abort alter table for any table type */
1534  session->setAbortOnWarning(not ignore);
1535 
1536  from->cursor->info(HA_STATUS_VARIABLE | HA_STATUS_NO_LOCK);
1537  to->cursor->ha_start_bulk_insert(from->cursor->stats.records);
1538 
1539  List<CreateField>::iterator it(create.begin());
1540  copy_end= copy;
1541  for (Field **ptr= to->getFields(); *ptr ; ptr++)
1542  {
1543  CreateField* def=it++;
1544  if (def->field)
1545  {
1546  if (*ptr == to->next_number_field)
1547  auto_increment_field_copied= true;
1548 
1549  (copy_end++)->set(*ptr,def->field,0);
1550  }
1551 
1552  }
1553 
1554  found_count=delete_count=0;
1555 
1556  do
1557  {
1558  if (order)
1559  {
1560  if (to->getShare()->hasPrimaryKey() && to->cursor->primary_key_is_clustered())
1561  {
1562  char warn_buff[DRIZZLE_ERRMSG_SIZE];
1563  snprintf(warn_buff, sizeof(warn_buff),
1564  _("order_st BY ignored because there is a user-defined clustered "
1565  "index in the table '%-.192s'"),
1566  from->getMutableShare()->getTableName());
1567  push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR,
1568  warn_buff);
1569  }
1570  else
1571  {
1572  FileSort filesort(*session);
1573  from->sort.io_cache= new internal::io_cache_st;
1574 
1575  tables.table= from;
1576  tables.setTableName(from->getMutableShare()->getTableName());
1577  tables.alias= tables.getTableName();
1578  tables.setSchemaName(from->getMutableShare()->getSchemaName());
1579  error= 1;
1580 
1581  session->lex().select_lex.setup_ref_array(session, order_num);
1582  if (setup_order(session, session->lex().select_lex.ref_pointer_array, &tables, fields, all_fields, order))
1583  break;
1584  sortorder= make_unireg_sortorder(order, &length, NULL);
1585  if ((from->sort.found_records= filesort.run(from, sortorder, length, (optimizer::SqlSelect *) 0, HA_POS_ERROR, 1, examined_rows)) == HA_POS_ERROR)
1586  break;
1587  }
1588  }
1589 
1590  /* Tell handler that we have values for all columns in the to table */
1591  to->use_all_columns();
1592 
1593  error= info.init_read_record(session, from, NULL, 1, true);
1594  if (error)
1595  {
1596  to->print_error(errno, MYF(0));
1597 
1598  break;
1599  }
1600 
1601  if (ignore)
1602  {
1603  to->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
1604  }
1605 
1606  session->row_count= 0;
1607  to->restoreRecordAsDefault(); // Create empty record
1608  while (not (error=info.read_record(&info)))
1609  {
1610  if (session->getKilled())
1611  {
1612  session->send_kill_message();
1613  error= 1;
1614  break;
1615  }
1616  session->row_count++;
1617  /* Return error if source table isn't empty. */
1618  if (error_if_not_empty)
1619  {
1620  error= 1;
1621  break;
1622  }
1623  if (to->next_number_field)
1624  {
1625  if (auto_increment_field_copied)
1626  to->auto_increment_field_not_null= true;
1627  else
1628  to->next_number_field->reset();
1629  }
1630 
1631  for (CopyField *copy_ptr= copy; copy_ptr != copy_end ; copy_ptr++)
1632  {
1633  if (not copy->to_field->hasDefault() and copy->from_null_ptr and *copy->from_null_ptr & copy->from_bit)
1634  {
1635  copy->to_field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN,
1636  ER_WARN_DATA_TRUNCATED, 1);
1637  copy->to_field->reset();
1638  error= 1;
1639  break;
1640  }
1641 
1642  copy_ptr->do_copy(copy_ptr);
1643  }
1644 
1645  if (error)
1646  {
1647  break;
1648  }
1649 
1650  prev_insert_id= to->cursor->next_insert_id;
1651  error= to->cursor->insertRecord(to->record[0]);
1652  to->auto_increment_field_not_null= false;
1653 
1654  if (error)
1655  {
1656  if (!ignore || to->cursor->is_fatal_error(error, HA_CHECK_DUP))
1657  {
1658  to->print_error(error, MYF(0));
1659  break;
1660  }
1661  to->cursor->restore_auto_increment(prev_insert_id);
1662  delete_count++;
1663  }
1664  else
1665  {
1666  found_count++;
1667  }
1668  }
1669 
1670  info.end_read_record();
1671  from->free_io_cache();
1672  delete[] copy;
1673 
1674  if (to->cursor->ha_end_bulk_insert() && error <= 0)
1675  {
1676  to->print_error(errno, MYF(0));
1677  error= 1;
1678  }
1679  to->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1680 
1681  /*
1682  Ensure that the new table is saved properly to disk so that we
1683  can do a rename
1684  */
1685  if (TransactionServices::autocommitOrRollback(*session, false))
1686  error= 1;
1687 
1688  if (not session->endActiveTransaction())
1689  error= 1;
1690 
1691  } while (0);
1692 
1693  session->setAbortOnWarning(false);
1694  from->free_io_cache();
1695  *copied= found_count;
1696  *deleted=delete_count;
1697  to->cursor->ha_release_auto_increment();
1698 
1699  if (to->cursor->ha_external_lock(session, F_UNLCK))
1700  {
1701  error= 1;
1702  }
1703 
1704  return error > 0 ? -1 : 0;
1705 }
1706 
1707 static Table *open_alter_table(Session *session, Table *table, identifier::Table &identifier)
1708 {
1709  /* Open the table so we need to copy the data to it. */
1710  if (table->getShare()->getType())
1711  {
1712  TableList tbl;
1713  tbl.setSchemaName(identifier.getSchemaName().c_str());
1714  tbl.alias= identifier.getTableName().c_str();
1715  tbl.setTableName(identifier.getTableName().c_str());
1716 
1717  /* Table is in session->temporary_tables */
1718  return session->openTable(&tbl, NULL, DRIZZLE_LOCK_IGNORE_FLUSH);
1719  }
1720  else
1721  {
1722  /* Open our intermediate table */
1723  return session->open_temporary_table(identifier, false);
1724  }
1725 }
1726 
1727 } /* namespace drizzled */