Drizzled Public API Documentation

sql_insert.cc
1 /* Copyright (C) 2000-2006 MySQL AB
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15 
16 
17 /* Insert of records */
18 
19 #include <config.h>
20 #include <cstdio>
21 #include <drizzled/sql_select.h>
22 #include <drizzled/show.h>
23 #include <drizzled/error.h>
24 #include <drizzled/name_resolution_context_state.h>
25 #include <drizzled/probes.h>
26 #include <drizzled/sql_base.h>
27 #include <drizzled/sql_load.h>
28 #include <drizzled/field/epoch.h>
29 #include <drizzled/lock.h>
30 #include <drizzled/sql_table.h>
31 #include <drizzled/pthread_globals.h>
32 #include <drizzled/transaction_services.h>
33 #include <drizzled/plugin/transactional_storage_engine.h>
34 #include <drizzled/select_insert.h>
35 #include <drizzled/select_create.h>
36 #include <drizzled/table/shell.h>
37 #include <drizzled/alter_info.h>
38 #include <drizzled/sql_parse.h>
39 #include <drizzled/sql_lex.h>
40 #include <drizzled/statistics_variables.h>
41 #include <drizzled/session/transactions.h>
42 #include <drizzled/open_tables_state.h>
43 #include <drizzled/table/cache.h>
44 #include <drizzled/create_field.h>
45 
46 namespace drizzled {
47 
48 extern plugin::StorageEngine *heap_engine;
49 extern plugin::StorageEngine *myisam_engine;
50 
51 /*
52  Check if insert fields are correct.
53 
54  SYNOPSIS
55  check_insert_fields()
56  session The current thread.
57  table The table for insert.
58  fields The insert fields.
59  values The insert values.
60  check_unique If duplicate values should be rejected.
61 
62  NOTE
63  Clears TIMESTAMP_AUTO_SET_ON_INSERT from table->timestamp_field_type
64  or leaves it as is, depending on if timestamp should be updated or
65  not.
66 
67  RETURN
68  0 OK
69  -1 Error
70 */
71 
72 static int check_insert_fields(Session *session, TableList *table_list,
73  List<Item> &fields, List<Item> &values,
74  bool check_unique,
75  table_map *)
76 {
77  Table *table= table_list->table;
78 
79  if (fields.size() == 0 && values.size() != 0)
80  {
81  if (values.size() != table->getShare()->sizeFields())
82  {
83  my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
84  return -1;
85  }
86  clear_timestamp_auto_bits(table->timestamp_field_type,
87  TIMESTAMP_AUTO_SET_ON_INSERT);
88  /*
89  No fields are provided so all fields must be provided in the values.
90  Thus we set all bits in the write set.
91  */
92  table->setWriteSet();
93  }
94  else
95  { // Part field list
96  Select_Lex *select_lex= &session->lex().select_lex;
97  Name_resolution_context *context= &select_lex->context;
98  Name_resolution_context_state ctx_state;
99  int res;
100 
101  if (fields.size() != values.size())
102  {
103  my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
104  return -1;
105  }
106 
107  session->dup_field= 0;
108 
109  /* Save the state of the current name resolution context. */
110  ctx_state.save_state(context, table_list);
111 
112  /*
113  Perform name resolution only in the first table - 'table_list',
114  which is the table that is inserted into.
115  */
116  table_list->next_local= 0;
117  context->resolve_in_table_list_only(table_list);
118  res= setup_fields(session, 0, fields, MARK_COLUMNS_WRITE, 0, 0);
119 
120  /* Restore the current context. */
121  ctx_state.restore_state(context, table_list);
122 
123  if (res)
124  return -1;
125 
126  if (check_unique && session->dup_field)
127  {
128  my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0), session->dup_field->field_name);
129  return -1;
130  }
131  if (table->timestamp_field) // Don't automaticly set timestamp if used
132  {
133  if (table->timestamp_field->isWriteSet())
134  {
135  clear_timestamp_auto_bits(table->timestamp_field_type,
136  TIMESTAMP_AUTO_SET_ON_INSERT);
137  }
138  else
139  {
140  table->setWriteSet(table->timestamp_field->position());
141  }
142  }
143  }
144 
145  return 0;
146 }
147 
148 
149 /*
150  Check update fields for the timestamp field.
151 
152  SYNOPSIS
153  check_update_fields()
154  session The current thread.
155  insert_table_list The insert table list.
156  table The table for update.
157  update_fields The update fields.
158 
159  NOTE
160  If the update fields include the timestamp field,
161  remove TIMESTAMP_AUTO_SET_ON_UPDATE from table->timestamp_field_type.
162 
163  RETURN
164  0 OK
165  -1 Error
166 */
167 
168 static int check_update_fields(Session *session, TableList *insert_table_list,
169  List<Item> &update_fields,
170  table_map *)
171 {
172  Table *table= insert_table_list->table;
173  bool timestamp_mark= false;
174 
175  if (table->timestamp_field)
176  {
177  /*
178  Unmark the timestamp field so that we can check if this is modified
179  by update_fields
180  */
181  timestamp_mark= table->write_set->test(table->timestamp_field->position());
182  table->write_set->reset(table->timestamp_field->position());
183  }
184 
185  /* Check the fields we are going to modify */
186  if (setup_fields(session, 0, update_fields, MARK_COLUMNS_WRITE, 0, 0))
187  return -1;
188 
189  if (table->timestamp_field)
190  {
191  /* Don't set timestamp column if this is modified. */
192  if (table->timestamp_field->isWriteSet())
193  {
194  clear_timestamp_auto_bits(table->timestamp_field_type,
195  TIMESTAMP_AUTO_SET_ON_UPDATE);
196  }
197 
198  if (timestamp_mark)
199  {
200  table->setWriteSet(table->timestamp_field->position());
201  }
202  }
203  return 0;
204 }
205 
206 
215 static
217  thr_lock_type *lock_type,
218  enum_duplicates duplic,
219  bool )
220 {
221  if (duplic == DUP_UPDATE ||
222  (duplic == DUP_REPLACE && *lock_type == TL_WRITE_CONCURRENT_INSERT))
223  {
224  *lock_type= TL_WRITE_DEFAULT;
225  return;
226  }
227 }
228 
229 
238 bool insert_query(Session *session,TableList *table_list,
239  List<Item> &fields,
240  List<List_item> &values_list,
241  List<Item> &update_fields,
242  List<Item> &update_values,
243  enum_duplicates duplic,
244  bool ignore)
245 {
246  int error;
247  bool transactional_table, joins_freed= false;
248  bool changed;
249  uint32_t value_count;
250  ulong counter = 1;
251  uint64_t id;
252  CopyInfo info;
253  Table *table= 0;
254  List<List_item>::iterator its(values_list.begin());
255  List_item *values;
256  Name_resolution_context *context;
258  Item *unused_conds= 0;
259 
260 
261  /*
262  Upgrade lock type if the requested lock is incompatible with
263  the current connection mode or table operation.
264  */
265  upgrade_lock_type(session, &table_list->lock_type, duplic,
266  values_list.size() > 1);
267 
268  if (session->openTablesLock(table_list))
269  {
270  DRIZZLE_INSERT_DONE(1, 0);
271  return true;
272  }
273 
274  session->set_proc_info("init");
275  session->used_tables=0;
276  values= its++;
277  value_count= values->size();
278 
279  if (prepare_insert(session, table_list, table, fields, values,
280  update_fields, update_values, duplic, &unused_conds,
281  false,
282  (fields.size() || !value_count ||
283  (0) != 0), !ignore))
284  {
285  if (table != NULL)
287  if (!joins_freed)
288  free_underlaid_joins(session, &session->lex().select_lex);
289  session->setAbortOnWarning(false);
290  DRIZZLE_INSERT_DONE(1, 0);
291  return true;
292  }
293 
294  /* mysql_prepare_insert set table_list->table if it was not set */
295  table= table_list->table;
296 
297  context= &session->lex().select_lex.context;
298  /*
299  These three asserts test the hypothesis that the resetting of the name
300  resolution context below is not necessary at all since the list of local
301  tables for INSERT always consists of one table.
302  */
303  assert(!table_list->next_local);
304  assert(!context->table_list->next_local);
306 
307  /* Save the state of the current name resolution context. */
308  ctx_state.save_state(context, table_list);
309 
310  /*
311  Perform name resolution only in the first table - 'table_list',
312  which is the table that is inserted into.
313  */
314  table_list->next_local= 0;
315  context->resolve_in_table_list_only(table_list);
316 
317  while ((values= its++))
318  {
319  counter++;
320  if (values->size() != value_count)
321  {
322  my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
323 
324  if (table != NULL)
326  if (!joins_freed)
327  free_underlaid_joins(session, &session->lex().select_lex);
328  session->setAbortOnWarning(false);
329  DRIZZLE_INSERT_DONE(1, 0);
330 
331  return true;
332  }
333  if (setup_fields(session, 0, *values, MARK_COLUMNS_READ, 0, 0))
334  {
335  if (table != NULL)
337  if (!joins_freed)
338  free_underlaid_joins(session, &session->lex().select_lex);
339  session->setAbortOnWarning(false);
340  DRIZZLE_INSERT_DONE(1, 0);
341  return true;
342  }
343  }
344  its= values_list.begin();
345 
346  /* Restore the current context. */
347  ctx_state.restore_state(context, table_list);
348 
349  /*
350  Fill in the given fields and dump it to the table cursor
351  */
352  info.ignore= ignore;
353  info.handle_duplicates=duplic;
354  info.update_fields= &update_fields;
355  info.update_values= &update_values;
356 
357  /*
358  Count warnings for all inserts.
359  For single line insert, generate an error if try to set a NOT NULL field
360  to NULL.
361  */
362  session->count_cuted_fields= ignore ? CHECK_FIELD_WARN : CHECK_FIELD_ERROR_FOR_NULL;
363 
364  session->cuted_fields = 0L;
366 
367  error=0;
368  session->set_proc_info("update");
369  if (duplic == DUP_REPLACE)
370  table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
371  if (duplic == DUP_UPDATE)
372  table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
373  {
374  if (duplic != DUP_ERROR || ignore)
375  table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
376  table->cursor->ha_start_bulk_insert(values_list.size());
377  }
378 
379 
380  session->setAbortOnWarning(not ignore);
381 
382  table->mark_columns_needed_for_insert();
383 
384  while ((values= its++))
385  {
386  if (fields.size() || !value_count)
387  {
388  table->restoreRecordAsDefault(); // Get empty record
389  if (fill_record(session, fields, *values))
390  {
391  if (values_list.size() != 1 && ! session->is_error())
392  {
393  info.records++;
394  continue;
395  }
396  /*
397  TODO: set session->abort_on_warning if values_list.elements == 1
398  and check that all items return warning in case of problem with
399  storing field.
400  */
401  error=1;
402  break;
403  }
404  }
405  else
406  {
407  table->restoreRecordAsDefault(); // Get empty record
408 
409  if (fill_record(session, table->getFields(), *values))
410  {
411  if (values_list.size() != 1 && ! session->is_error())
412  {
413  info.records++;
414  continue;
415  }
416  error=1;
417  break;
418  }
419  }
420 
421  // Release latches in case bulk insert takes a long time
423 
424  error=write_record(session, table ,&info);
425  if (error)
426  break;
427  session->row_count++;
428  }
429 
430  free_underlaid_joins(session, &session->lex().select_lex);
431  joins_freed= true;
432 
433  /*
434  Now all rows are inserted. Time to update logs and sends response to
435  user
436  */
437  {
438  /*
439  Do not do this release if this is a delayed insert, it would steal
440  auto_inc values from the delayed_insert thread as they share Table.
441  */
443  if (table->cursor->ha_end_bulk_insert() && !error)
444  {
445  table->print_error(errno,MYF(0));
446  error=1;
447  }
448  if (duplic != DUP_ERROR || ignore)
449  table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
450 
451  transactional_table= table->cursor->has_transactions();
452 
453  changed= (info.copied || info.deleted || info.updated);
454  if ((changed && error <= 0) || session->transaction.stmt.hasModifiedNonTransData())
455  {
456  if (session->transaction.stmt.hasModifiedNonTransData())
457  session->transaction.all.markModifiedNonTransData();
458  }
459  assert(transactional_table || !changed || session->transaction.stmt.hasModifiedNonTransData());
460 
461  }
462  session->set_proc_info("end");
463  /*
464  We'll report to the client this id:
465  - if the table contains an autoincrement column and we successfully
466  inserted an autogenerated value, the autogenerated value.
467  - if the table contains no autoincrement column and LAST_INSERT_ID(X) was
468  called, X.
469  - if the table contains an autoincrement column, and some rows were
470  inserted, the id of the last "inserted" row (if IGNORE, that value may not
471  have been really inserted but ignored).
472  */
473  id= (session->first_successful_insert_id_in_cur_stmt > 0) ?
475  (session->arg_of_last_insert_id_function ?
477  ((table->next_number_field && info.copied) ?
478  table->next_number_field->val_int() : 0));
479  table->next_number_field=0;
480  session->count_cuted_fields= CHECK_FIELD_IGNORE;
481  table->auto_increment_field_not_null= false;
482  if (duplic == DUP_REPLACE)
483  table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
484 
485  if (error)
486  {
487  if (table != NULL)
489  if (!joins_freed)
490  free_underlaid_joins(session, &session->lex().select_lex);
491  session->setAbortOnWarning(false);
492  DRIZZLE_INSERT_DONE(1, 0);
493  return true;
494  }
495 
496  if (values_list.size() == 1 && (!(session->options & OPTION_WARNINGS) ||
497  !session->cuted_fields))
498  {
499  session->row_count_func= info.copied + info.deleted + info.updated;
500  session->my_ok((ulong) session->rowCount(),
501  info.copied + info.deleted + info.touched, id);
502  }
503  else
504  {
505  char buff[160];
506  if (ignore)
507  snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
508  (ulong) (info.records - info.copied), (ulong) session->cuted_fields);
509  else
510  snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
511  (ulong) (info.deleted + info.updated), (ulong) session->cuted_fields);
512  session->row_count_func= info.copied + info.deleted + info.updated;
513  session->my_ok((ulong) session->rowCount(),
514  info.copied + info.deleted + info.touched, id, buff);
515  }
516  session->status_var.inserted_row_count+= session->rowCount();
517  session->setAbortOnWarning(false);
518  DRIZZLE_INSERT_DONE(0, session->rowCount());
519 
520  return false;
521 }
522 
523 
524 /*
525  Check if table can be updated
526 
527  SYNOPSIS
528  prepare_insert_check_table()
529  session Thread handle
530  table_list Table list
531  fields List of fields to be updated
532  where Pointer to where clause
533  select_insert Check is making for SELECT ... INSERT
534 
535  RETURN
536  false ok
537  true ERROR
538 */
539 
540 static bool prepare_insert_check_table(Session *session, TableList *table_list,
541  List<Item> &,
542  bool select_insert)
543 {
544 
545 
546  /*
547  first table in list is the one we'll INSERT into, requires INSERT_ACL.
548  all others require SELECT_ACL only. the ACL requirement below is for
549  new leaves only anyway (view-constituents), so check for SELECT rather
550  than INSERT.
551  */
552 
553  return setup_tables_and_check_access(session, &session->lex().select_lex.context,
554  &session->lex().select_lex.top_join_list, table_list, &session->lex().select_lex.leaf_tables, select_insert);
555 }
556 
557 
558 /*
559  Prepare items in INSERT statement
560 
561  SYNOPSIS
562  prepare_insert()
563  session Thread handler
564  table_list Global/local table list
565  table Table to insert into (can be NULL if table should
566  be taken from table_list->table)
567  where Where clause (for insert ... select)
568  select_insert true if INSERT ... SELECT statement
569  check_fields true if need to check that all INSERT fields are
570  given values.
571  abort_on_warning whether to report if some INSERT field is not
572  assigned as an error (true) or as a warning (false).
573 
574  TODO (in far future)
575  In cases of:
576  INSERT INTO t1 SELECT a, sum(a) as sum1 from t2 GROUP BY a
577  ON DUPLICATE KEY ...
578  we should be able to refer to sum1 in the ON DUPLICATE KEY part
579 
580  WARNING
581  You MUST set table->insert_values to 0 after calling this function
582  before releasing the table object.
583 
584  RETURN VALUE
585  false OK
586  true error
587 */
588 
589 bool prepare_insert(Session *session, TableList *table_list,
590  Table *table, List<Item> &fields, List_item *values,
591  List<Item> &update_fields, List<Item> &update_values,
592  enum_duplicates duplic,
593  COND **,
594  bool select_insert,
595  bool check_fields, bool abort_on_warning)
596 {
597  Select_Lex *select_lex= &session->lex().select_lex;
598  Name_resolution_context *context= &select_lex->context;
599  Name_resolution_context_state ctx_state;
600  bool insert_into_view= (0 != 0);
601  bool res= 0;
602  table_map map= 0;
603 
604  /* INSERT should have a SELECT or VALUES clause */
605  assert (!select_insert || !values);
606 
607  /*
608  For subqueries in VALUES() we should not see the table in which we are
609  inserting (for INSERT ... SELECT this is done by changing table_list,
610  because INSERT ... SELECT share Select_Lex it with SELECT.
611  */
612  if (not select_insert)
613  {
614  for (Select_Lex_Unit *un= select_lex->first_inner_unit();
615  un;
616  un= un->next_unit())
617  {
618  for (Select_Lex *sl= un->first_select();
619  sl;
620  sl= sl->next_select())
621  {
622  sl->context.outer_context= 0;
623  }
624  }
625  }
626 
627  if (duplic == DUP_UPDATE)
628  {
629  /* it should be allocated before Item::fix_fields() */
630  table_list->set_insert_values();
631  }
632 
633  if (prepare_insert_check_table(session, table_list, fields, select_insert))
634  return true;
635 
636 
637  /* Prepare the fields in the statement. */
638  if (values)
639  {
640  /* if we have INSERT ... VALUES () we cannot have a GROUP BY clause */
641  assert (!select_lex->group_list.elements);
642 
643  /* Save the state of the current name resolution context. */
644  ctx_state.save_state(context, table_list);
645 
646  /*
647  Perform name resolution only in the first table - 'table_list',
648  which is the table that is inserted into.
649  */
650  table_list->next_local= 0;
651  context->resolve_in_table_list_only(table_list);
652 
653  res= check_insert_fields(session, context->table_list, fields, *values,
654  !insert_into_view, &map) ||
655  setup_fields(session, 0, *values, MARK_COLUMNS_READ, 0, 0);
656 
657  if (!res && check_fields)
658  {
659  bool saved_abort_on_warning= session->abortOnWarning();
660 
661  session->setAbortOnWarning(abort_on_warning);
662  res= check_that_all_fields_are_given_values(session,
663  table ? table :
664  context->table_list->table,
665  context->table_list);
666  session->setAbortOnWarning(saved_abort_on_warning);
667  }
668 
669  if (!res && duplic == DUP_UPDATE)
670  {
671  res= check_update_fields(session, context->table_list, update_fields, &map);
672  }
673 
674  /* Restore the current context. */
675  ctx_state.restore_state(context, table_list);
676 
677  if (not res)
678  res= setup_fields(session, 0, update_values, MARK_COLUMNS_READ, 0, 0);
679  }
680 
681  if (res)
682  return res;
683 
684  if (not table)
685  table= table_list->table;
686 
687  if (not select_insert && unique_table(table_list, table_list->next_global, true))
688  {
689  my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->alias);
690  return true;
691  }
692 
693  if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
694  table->prepare_for_position();
695 
696  return false;
697 }
698 
699 
700  /* Check if there is more uniq keys after field */
701 
702 static int last_uniq_key(Table *table,uint32_t keynr)
703 {
704  while (++keynr < table->getShare()->sizeKeys())
705  if (table->key_info[keynr].flags & HA_NOSAME)
706  return 0;
707  return 1;
708 }
709 
710 
711 /*
712  Write a record to table with optional deleting of conflicting records,
713  invoke proper triggers if needed.
714 
715  SYNOPSIS
716  write_record()
717  session - thread context
718  table - table to which record should be written
719  info - CopyInfo structure describing handling of duplicates
720  and which is used for counting number of records inserted
721  and deleted.
722 
723  NOTE
724  Once this record will be written to table after insert trigger will
725  be invoked. If instead of inserting new record we will update old one
726  then both on update triggers will work instead. Similarly both on
727  delete triggers will be invoked if we will delete conflicting records.
728 
729  Sets session->transaction.stmt.modified_non_trans_data to true if table which is updated didn't have
730  transactions.
731 
732  RETURN VALUE
733  0 - success
734  non-0 - error
735 */
736 
737 
738 int write_record(Session *session, Table *table,CopyInfo *info)
739 {
740  int error;
741  std::vector<unsigned char> key;
742  boost::dynamic_bitset<> *save_read_set, *save_write_set;
743  uint64_t prev_insert_id= table->cursor->next_insert_id;
744  uint64_t insert_id_for_cur_row= 0;
745 
746 
747  info->records++;
748  save_read_set= table->read_set;
749  save_write_set= table->write_set;
750 
751  if (info->handle_duplicates == DUP_REPLACE || info->handle_duplicates == DUP_UPDATE)
752  {
753  while ((error=table->cursor->insertRecord(table->getInsertRecord())))
754  {
755  uint32_t key_nr;
756  /*
757  If we do more than one iteration of this loop, from the second one the
758  row will have an explicit value in the autoinc field, which was set at
759  the first call of handler::update_auto_increment(). So we must save
760  the autogenerated value to avoid session->insert_id_for_cur_row to become
761  0.
762  */
763  if (table->cursor->insert_id_for_cur_row > 0)
764  insert_id_for_cur_row= table->cursor->insert_id_for_cur_row;
765  else
766  table->cursor->insert_id_for_cur_row= insert_id_for_cur_row;
767  bool is_duplicate_key_error;
768  if (table->cursor->is_fatal_error(error, HA_CHECK_DUP))
769  goto err;
770  is_duplicate_key_error= table->cursor->is_fatal_error(error, 0);
771  if (!is_duplicate_key_error)
772  {
773  /*
774  We come here when we had an ignorable error which is not a duplicate
775  key error. In this we ignore error if ignore flag is set, otherwise
776  report error as usual. We will not do any duplicate key processing.
777  */
778  if (info->ignore)
779  goto gok_or_after_err; /* Ignoring a not fatal error, return 0 */
780  goto err;
781  }
782  if ((int) (key_nr = table->get_dup_key(error)) < 0)
783  {
784  error= HA_ERR_FOUND_DUPP_KEY; /* Database can't find key */
785  goto err;
786  }
787  /* Read all columns for the row we are going to replace */
788  table->use_all_columns();
789  /*
790  Don't allow REPLACE to replace a row when a auto_increment column
791  was used. This ensures that we don't get a problem when the
792  whole range of the key has been used.
793  */
794  if (info->handle_duplicates == DUP_REPLACE &&
795  table->next_number_field &&
796  key_nr == table->getShare()->next_number_index &&
797  (insert_id_for_cur_row > 0))
798  goto err;
799  if (table->cursor->getEngine()->check_flag(HTON_BIT_DUPLICATE_POS))
800  {
801  if (table->cursor->rnd_pos(table->getUpdateRecord(),table->cursor->dup_ref))
802  goto err;
803  }
804  else
805  {
806  if (table->cursor->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
807  {
808  error=errno;
809  goto err;
810  }
811 
812  if (not key.size())
813  {
814  key.resize(table->getShare()->max_unique_length);
815  }
816  key_copy(&key[0], table->getInsertRecord(), table->key_info+key_nr, 0);
817  if ((error=(table->cursor->index_read_idx_map(table->getUpdateRecord(),key_nr,
818  &key[0], HA_WHOLE_KEY,
819  HA_READ_KEY_EXACT))))
820  goto err;
821  }
822  if (info->handle_duplicates == DUP_UPDATE)
823  {
824  /*
825  We don't check for other UNIQUE keys - the first row
826  that matches, is updated. If update causes a conflict again,
827  an error is returned
828  */
829  assert(table->insert_values.size());
830  table->storeRecordAsInsert();
831  table->restoreRecord();
832  assert(info->update_fields->size() ==
833  info->update_values->size());
834  if (fill_record(session, *info->update_fields,
835  *info->update_values,
836  info->ignore))
837  goto before_err;
838 
839  table->cursor->restore_auto_increment(prev_insert_id);
840  if (table->next_number_field)
841  table->cursor->adjust_next_insert_id_after_explicit_value(
842  table->next_number_field->val_int());
843  info->touched++;
844 
845  if (! table->records_are_comparable() || table->compare_records())
846  {
847  if ((error=table->cursor->updateRecord(table->getUpdateRecord(),
848  table->getInsertRecord())) &&
849  error != HA_ERR_RECORD_IS_THE_SAME)
850  {
851  if (info->ignore &&
852  !table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
853  {
854  goto gok_or_after_err;
855  }
856  goto err;
857  }
858 
859  if (error != HA_ERR_RECORD_IS_THE_SAME)
860  info->updated++;
861  else
862  error= 0;
863  /*
864  If ON DUP KEY UPDATE updates a row instead of inserting one, it's
865  like a regular UPDATE statement: it should not affect the value of a
866  next SELECT LAST_INSERT_ID() or insert_id().
867  Except if LAST_INSERT_ID(#) was in the INSERT query, which is
868  handled separately by Session::arg_of_last_insert_id_function.
869  */
870  insert_id_for_cur_row= table->cursor->insert_id_for_cur_row= 0;
871  info->copied++;
872  }
873 
874  if (table->next_number_field)
875  table->cursor->adjust_next_insert_id_after_explicit_value(
876  table->next_number_field->val_int());
877  info->touched++;
878 
879  goto gok_or_after_err;
880  }
881  else /* DUP_REPLACE */
882  {
883  /*
884  The manual defines the REPLACE semantics that it is either
885  an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
886  InnoDB do not function in the defined way if we allow MySQL
887  to convert the latter operation internally to an UPDATE.
888  We also should not perform this conversion if we have
889  timestamp field with ON UPDATE which is different from DEFAULT.
890  Another case when conversion should not be performed is when
891  we have ON DELETE trigger on table so user may notice that
892  we cheat here. Note that it is ok to do such conversion for
893  tables which have ON UPDATE but have no ON DELETE triggers,
894  we just should not expose this fact to users by invoking
895  ON UPDATE triggers.
896  */
897  if (last_uniq_key(table,key_nr) &&
898  !table->cursor->referenced_by_foreign_key() &&
899  (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET ||
900  table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH))
901  {
902  if ((error=table->cursor->updateRecord(table->getUpdateRecord(),
903  table->getInsertRecord())) &&
904  error != HA_ERR_RECORD_IS_THE_SAME)
905  goto err;
906  if (error != HA_ERR_RECORD_IS_THE_SAME)
907  info->deleted++;
908  else
909  error= 0;
910  session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
911  /*
912  Since we pretend that we have done insert we should call
913  its after triggers.
914  */
915  goto after_n_copied_inc;
916  }
917  else
918  {
919  if ((error=table->cursor->deleteRecord(table->getUpdateRecord())))
920  goto err;
921  info->deleted++;
922  if (!table->cursor->has_transactions())
923  session->transaction.stmt.markModifiedNonTransData();
924  /* Let us attempt do write_row() once more */
925  }
926  }
927  }
928  session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
929  /*
930  Restore column maps if they where replaced during an duplicate key
931  problem.
932  */
933  if (table->read_set != save_read_set ||
934  table->write_set != save_write_set)
935  table->column_bitmaps_set(*save_read_set, *save_write_set);
936  }
937  else if ((error=table->cursor->insertRecord(table->getInsertRecord())))
938  {
939  if (!info->ignore ||
940  table->cursor->is_fatal_error(error, HA_CHECK_DUP))
941  goto err;
942  table->cursor->restore_auto_increment(prev_insert_id);
943  goto gok_or_after_err;
944  }
945 
946 after_n_copied_inc:
947  info->copied++;
948  session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
949 
950 gok_or_after_err:
951  if (!table->cursor->has_transactions())
952  session->transaction.stmt.markModifiedNonTransData();
953  return 0;
954 
955 err:
956  info->last_errno= error;
957  /* current_select is NULL if this is a delayed insert */
958  if (session->lex().current_select)
959  session->lex().current_select->no_error= 0; // Give error
960  table->print_error(error,MYF(0));
961 
962 before_err:
963  table->cursor->restore_auto_increment(prev_insert_id);
964  table->column_bitmaps_set(*save_read_set, *save_write_set);
965  return 1;
966 }
967 
968 
969 /******************************************************************************
970  Check that all fields with arn't null_fields are used
971 ******************************************************************************/
972 
973 int check_that_all_fields_are_given_values(Session *session, Table *entry,
974  TableList *)
975 {
976  int err= 0;
977 
978  for (Field **field=entry->getFields() ; *field ; field++)
979  {
980  if (not (*field)->isWriteSet())
981  {
982  /*
983  * If the field doesn't have any default value
984  * and there is no actual value specified in the
985  * INSERT statement, throw error ER_NO_DEFAULT_FOR_FIELD.
986  */
987  if (((*field)->flags & NO_DEFAULT_VALUE_FLAG) &&
988  ((*field)->real_type() != DRIZZLE_TYPE_ENUM))
989  {
990  my_error(ER_NO_DEFAULT_FOR_FIELD, MYF(0), (*field)->field_name);
991  err= 1;
992  }
993  }
994  else
995  {
996  /*
997  * However, if an actual NULL value was specified
998  * for the field and the field is a NOT NULL field,
999  * throw ER_BAD_NULL_ERROR.
1000  *
1001  * Per the SQL standard, inserting NULL into a NOT NULL
1002  * field requires an error to be thrown.
1003  */
1004  if (((*field)->flags & NOT_NULL_FLAG) &&
1005  (*field)->is_null())
1006  {
1007  my_error(ER_BAD_NULL_ERROR, MYF(0), (*field)->field_name);
1008  err= 1;
1009  }
1010  }
1011  }
1012  return session->abortOnWarning() ? err : 0;
1013 }
1014 
1015 /***************************************************************************
1016  Store records in INSERT ... SELECT *
1017 ***************************************************************************/
1018 
1019 
1020 /*
1021  make insert specific preparation and checks after opening tables
1022 
1023  SYNOPSIS
1024  insert_select_prepare()
1025  session thread handler
1026 
1027  RETURN
1028  false OK
1029  true Error
1030 */
1031 
1032 bool insert_select_prepare(Session *session)
1033 {
1034  LEX *lex= &session->lex();
1035  Select_Lex *select_lex= &lex->select_lex;
1036 
1037  /*
1038  Select_Lex do not belong to INSERT statement, so we can't add WHERE
1039  clause if table is VIEW
1040  */
1041 
1042  if (prepare_insert(session, lex->query_tables,
1043  lex->query_tables->table, lex->field_list, 0,
1044  lex->update_list, lex->value_list,
1045  lex->duplicates,
1046  &select_lex->where, true, false, false))
1047  return true;
1048 
1049  /*
1050  exclude first table from leaf tables list, because it belong to
1051  INSERT
1052  */
1053  assert(select_lex->leaf_tables != 0);
1054  lex->leaf_tables_insert= select_lex->leaf_tables;
1055  /* skip all leaf tables belonged to view where we are insert */
1056  select_lex->leaf_tables= select_lex->leaf_tables->next_leaf;
1057  return false;
1058 }
1059 
1060 
1061 select_insert::select_insert(TableList *table_list_par, Table *table_par,
1062  List<Item> *fields_par,
1063  List<Item> *update_fields,
1064  List<Item> *update_values,
1065  enum_duplicates duplic,
1066  bool ignore_check_option_errors) :
1067  table_list(table_list_par), table(table_par), fields(fields_par),
1068  autoinc_value_of_last_inserted_row(0),
1069  insert_into_view(table_list_par && 0 != 0)
1070 {
1071  info.handle_duplicates= duplic;
1072  info.ignore= ignore_check_option_errors;
1073  info.update_fields= update_fields;
1074  info.update_values= update_values;
1075 }
1076 
1077 
1078 int
1079 select_insert::prepare(List<Item> &values, Select_Lex_Unit *u)
1080 {
1081  int res;
1082  table_map map= 0;
1083  Select_Lex *lex_current_select_save= session->lex().current_select;
1084 
1085 
1086  unit= u;
1087 
1088  /*
1089  Since table in which we are going to insert is added to the first
1090  select, LEX::current_select should point to the first select while
1091  we are fixing fields from insert list.
1092  */
1093  session->lex().current_select= &session->lex().select_lex;
1094  res= check_insert_fields(session, table_list, *fields, values,
1095  !insert_into_view, &map) ||
1096  setup_fields(session, 0, values, MARK_COLUMNS_READ, 0, 0);
1097 
1098  if (!res && fields->size())
1099  {
1100  bool saved_abort_on_warning= session->abortOnWarning();
1101  session->setAbortOnWarning(not info.ignore);
1102  res= check_that_all_fields_are_given_values(session, table_list->table,
1103  table_list);
1104  session->setAbortOnWarning(saved_abort_on_warning);
1105  }
1106 
1107  if (info.handle_duplicates == DUP_UPDATE && !res)
1108  {
1109  Name_resolution_context *context= &session->lex().select_lex.context;
1110  Name_resolution_context_state ctx_state;
1111 
1112  /* Save the state of the current name resolution context. */
1113  ctx_state.save_state(context, table_list);
1114 
1115  /* Perform name resolution only in the first table - 'table_list'. */
1116  table_list->next_local= 0;
1117  context->resolve_in_table_list_only(table_list);
1118 
1119  res= res || check_update_fields(session, context->table_list,
1120  *info.update_fields, &map);
1121  /*
1122  When we are not using GROUP BY and there are no ungrouped aggregate functions
1123  we can refer to other tables in the ON DUPLICATE KEY part.
1124  We use next_name_resolution_table descructively, so check it first (views?)
1125  */
1126  assert (!table_list->next_name_resolution_table);
1127  if (session->lex().select_lex.group_list.elements == 0 and
1128  not session->lex().select_lex.with_sum_func)
1129  /*
1130  We must make a single context out of the two separate name resolution contexts :
1131  the INSERT table and the tables in the SELECT part of INSERT ... SELECT.
1132  To do that we must concatenate the two lists
1133  */
1134  table_list->next_name_resolution_table=
1135  ctx_state.get_first_name_resolution_table();
1136 
1137  res= res || setup_fields(session, 0, *info.update_values,
1138  MARK_COLUMNS_READ, 0, 0);
1139  if (!res)
1140  {
1141  /*
1142  Traverse the update values list and substitute fields from the
1143  select for references (Item_ref objects) to them. This is done in
1144  order to get correct values from those fields when the select
1145  employs a temporary table.
1146  */
1147  List<Item>::iterator li(info.update_values->begin());
1148  Item *item;
1149 
1150  while ((item= li++))
1151  {
1152  item->transform(&Item::update_value_transformer,
1153  (unsigned char*)session->lex().current_select);
1154  }
1155  }
1156 
1157  /* Restore the current context. */
1158  ctx_state.restore_state(context, table_list);
1159  }
1160 
1161  session->lex().current_select= lex_current_select_save;
1162  if (res)
1163  return 1;
1164  /*
1165  if it is INSERT into join view then check_insert_fields already found
1166  real table for insert
1167  */
1168  table= table_list->table;
1169 
1170  /*
1171  Is table which we are changing used somewhere in other parts of
1172  query
1173  */
1174  if (unique_table(table_list, table_list->next_global))
1175  {
1176  /* Using same table for INSERT and SELECT */
1177  session->lex().current_select->options|= OPTION_BUFFER_RESULT;
1178  session->lex().current_select->join->select_options|= OPTION_BUFFER_RESULT;
1179  }
1180  else if (not (session->lex().current_select->options & OPTION_BUFFER_RESULT))
1181  {
1182  /*
1183  We must not yet prepare the result table if it is the same as one of the
1184  source tables (INSERT SELECT). The preparation may disable
1185  indexes on the result table, which may be used during the select, if it
1186  is the same table (Bug #6034). Do the preparation after the select phase
1187  in select_insert::prepare2().
1188  We won't start bulk inserts at all if this statement uses functions or
1189  should invoke triggers since they may access to the same table too.
1190  */
1191  table->cursor->ha_start_bulk_insert((ha_rows) 0);
1192  }
1193  table->restoreRecordAsDefault(); // Get empty record
1194  table->next_number_field=table->found_next_number_field;
1195 
1196  session->cuted_fields=0;
1197 
1198  if (info.ignore || info.handle_duplicates != DUP_ERROR)
1199  table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
1200 
1201  if (info.handle_duplicates == DUP_REPLACE)
1202  table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1203 
1204  if (info.handle_duplicates == DUP_UPDATE)
1205  table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1206 
1207  session->setAbortOnWarning(not info.ignore);
1208  table->mark_columns_needed_for_insert();
1209 
1210 
1211  return res;
1212 }
1213 
1214 
1215 /*
1216  Finish the preparation of the result table.
1217 
1218  SYNOPSIS
1219  select_insert::prepare2()
1220  void
1221 
1222  DESCRIPTION
1223  If the result table is the same as one of the source tables (INSERT SELECT),
1224  the result table is not finally prepared at the join prepair phase.
1225  Do the final preparation now.
1226 
1227  RETURN
1228  0 OK
1229 */
1230 
1231 int select_insert::prepare2(void)
1232 {
1233  if (session->lex().current_select->options & OPTION_BUFFER_RESULT)
1234  table->cursor->ha_start_bulk_insert((ha_rows) 0);
1235 
1236  return 0;
1237 }
1238 
1239 
1240 void select_insert::cleanup()
1241 {
1242  /* select_insert/select_create are never re-used in prepared statement */
1243  assert(0);
1244 }
1245 
1246 select_insert::~select_insert()
1247 {
1248 
1249  if (table)
1250  {
1251  table->next_number_field=0;
1252  table->auto_increment_field_not_null= false;
1253  table->cursor->ha_reset();
1254  }
1255  session->count_cuted_fields= CHECK_FIELD_IGNORE;
1256  session->setAbortOnWarning(false);
1257  return;
1258 }
1259 
1260 
1261 bool select_insert::send_data(List<Item> &values)
1262 {
1263 
1264  bool error= false;
1265 
1266  if (unit->offset_limit_cnt)
1267  { // using limit offset,count
1268  unit->offset_limit_cnt--;
1269  return false;
1270  }
1271 
1272  session->count_cuted_fields= CHECK_FIELD_WARN; // Calculate cuted fields
1273  store_values(values);
1274  session->count_cuted_fields= CHECK_FIELD_IGNORE;
1275  if (session->is_error())
1276  return true;
1277 
1278  // Release latches in case bulk insert takes a long time
1279  plugin::TransactionalStorageEngine::releaseTemporaryLatches(session);
1280 
1281  error= write_record(session, table, &info);
1282  table->auto_increment_field_not_null= false;
1283 
1284  if (!error)
1285  {
1286  if (info.handle_duplicates == DUP_UPDATE)
1287  {
1288  /*
1289  Restore fields of the record since it is possible that they were
1290  changed by ON DUPLICATE KEY UPDATE clause.
1291 
1292  If triggers exist then whey can modify some fields which were not
1293  originally touched by INSERT ... SELECT, so we have to restore
1294  their original values for the next row.
1295  */
1296  table->restoreRecordAsDefault();
1297  }
1298  if (table->next_number_field)
1299  {
1300  /*
1301  If no value has been autogenerated so far, we need to remember the
1302  value we just saw, we may need to send it to client in the end.
1303  */
1304  if (session->first_successful_insert_id_in_cur_stmt == 0) // optimization
1305  autoinc_value_of_last_inserted_row=
1306  table->next_number_field->val_int();
1307  /*
1308  Clear auto-increment field for the next record, if triggers are used
1309  we will clear it twice, but this should be cheap.
1310  */
1311  table->next_number_field->reset();
1312  }
1313  }
1314  return(error);
1315 }
1316 
1317 
1318 void select_insert::store_values(List<Item> &values)
1319 {
1320  if (fields->size())
1321  fill_record(session, *fields, values, true);
1322  else
1323  fill_record(session, table->getFields(), values, true);
1324 }
1325 
1326 void select_insert::send_error(drizzled::error_t errcode,const char *err)
1327 {
1328  my_message(errcode, err, MYF(0));
1329 }
1330 
1331 
1332 bool select_insert::send_eof()
1333 {
1334  int error;
1335  bool const trans_table= table->cursor->has_transactions();
1336  uint64_t id;
1337  bool changed;
1338 
1339  error= table->cursor->ha_end_bulk_insert();
1340  table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1341  table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1342 
1343  if ((changed= (info.copied || info.deleted || info.updated)))
1344  {
1345  if (session->transaction.stmt.hasModifiedNonTransData())
1346  session->transaction.all.markModifiedNonTransData();
1347  }
1348  assert(trans_table || !changed ||
1349  session->transaction.stmt.hasModifiedNonTransData());
1350 
1351  table->cursor->ha_release_auto_increment();
1352 
1353  if (error)
1354  {
1355  table->print_error(error,MYF(0));
1356  DRIZZLE_INSERT_SELECT_DONE(error, 0);
1357  return 1;
1358  }
1359  char buff[160];
1360  if (info.ignore)
1361  snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
1362  (ulong) (info.records - info.copied), (ulong) session->cuted_fields);
1363  else
1364  snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
1365  (ulong) (info.deleted+info.updated), (ulong) session->cuted_fields);
1366  session->row_count_func= info.copied + info.deleted + info.updated;
1367 
1368  id= (session->first_successful_insert_id_in_cur_stmt > 0) ?
1369  session->first_successful_insert_id_in_cur_stmt :
1370  (session->arg_of_last_insert_id_function ?
1371  session->first_successful_insert_id_in_prev_stmt :
1372  (info.copied ? autoinc_value_of_last_inserted_row : 0));
1373  session->my_ok((ulong) session->rowCount(),
1374  info.copied + info.deleted + info.touched, id, buff);
1375  session->status_var.inserted_row_count+= session->rowCount();
1376  DRIZZLE_INSERT_SELECT_DONE(0, session->rowCount());
1377  return 0;
1378 }
1379 
1380 void select_insert::abort() {
1381 
1382 
1383  /*
1384  If the creation of the table failed (due to a syntax error, for
1385  example), no table will have been opened and therefore 'table'
1386  will be NULL. In that case, we still need to execute the rollback
1387  and the end of the function.
1388  */
1389  if (table)
1390  {
1391  bool changed, transactional_table;
1392 
1393  table->cursor->ha_end_bulk_insert();
1394 
1395  changed= (info.copied || info.deleted || info.updated);
1396  transactional_table= table->cursor->has_transactions();
1397  assert(transactional_table || !changed ||
1398  session->transaction.stmt.hasModifiedNonTransData());
1399  table->cursor->ha_release_auto_increment();
1400  }
1401 
1402  if (DRIZZLE_INSERT_SELECT_DONE_ENABLED())
1403  {
1404  DRIZZLE_INSERT_SELECT_DONE(0, info.copied + info.deleted + info.updated);
1405  }
1406 
1407  return;
1408 }
1409 
1410 
1411 /***************************************************************************
1412  CREATE TABLE (SELECT) ...
1413 ***************************************************************************/
1414 
1415 /*
1416  Create table from lists of fields and items (or just return Table
1417  object for pre-opened existing table).
1418 
1419  SYNOPSIS
1420  create_table_from_items()
1421  session in Thread object
1422  create_info in Create information (like MAX_ROWS, ENGINE or
1423  temporary table flag)
1424  create_table in Pointer to TableList object providing database
1425  and name for table to be created or to be open
1426  alter_info in/out Initial list of columns and indexes for the table
1427  to be created
1428  items in List of items which should be used to produce rest
1429  of fields for the table (corresponding fields will
1430  be added to the end of alter_info->create_list)
1431  lock out Pointer to the DrizzleLock object for table created
1432  (or open temporary table) will be returned in this
1433  parameter. Since this table is not included in
1434  Session::lock caller is responsible for explicitly
1435  unlocking this table.
1436  hooks
1437 
1438  NOTES
1439  This function behaves differently for base and temporary tables:
1440  - For base table we assume that either table exists and was pre-opened
1441  and locked at openTablesLock() stage (and in this case we just
1442  emit error or warning and return pre-opened Table object) or special
1443  placeholder was put in table cache that guarantees that this table
1444  won't be created or opened until the placeholder will be removed
1445  (so there is an exclusive lock on this table).
1446  - We don't pre-open existing temporary table, instead we either open
1447  or create and then open table in this function.
1448 
1449  Since this function contains some logic specific to CREATE TABLE ...
1450  SELECT it should be changed before it can be used in other contexts.
1451 
1452  RETURN VALUES
1453  non-zero Pointer to Table object for table created or opened
1454  0 Error
1455 */
1456 
1457 static Table *create_table_from_items(Session *session, HA_CREATE_INFO *create_info,
1458  TableList *create_table,
1459  message::Table &table_proto,
1460  AlterInfo *alter_info,
1461  List<Item> *items,
1462  bool is_if_not_exists,
1463  DrizzleLock **lock,
1464  const identifier::Table& identifier)
1465 {
1466  TableShare share(message::Table::INTERNAL);
1467  uint32_t select_field_count= items->size();
1468  /* Add selected items to field list */
1469  List<Item>::iterator it(items->begin());
1470  Item *item;
1471  Field *tmp_field;
1472 
1473  if (not (identifier.isTmp()) && create_table->table->db_stat)
1474  {
1475  /* Table already exists and was open at openTablesLock() stage. */
1476  if (is_if_not_exists)
1477  {
1478  create_info->table_existed= 1; // Mark that table existed
1479  push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1480  ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
1481  create_table->getTableName());
1482  return create_table->table;
1483  }
1484 
1485  my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
1486  return NULL;
1487  }
1488 
1489  {
1490  table::Shell tmp_table(share); // Used during 'CreateField()'
1491 
1492  if (not table_proto.engine().name().compare("MyISAM"))
1493  tmp_table.getMutableShare()->db_low_byte_first= true;
1494  else if (not table_proto.engine().name().compare("MEMORY"))
1495  tmp_table.getMutableShare()->db_low_byte_first= true;
1496 
1497  tmp_table.in_use= session;
1498 
1499  while ((item=it++))
1500  {
1501  CreateField *cr_field;
1502  Field *field, *def_field;
1503  if (item->type() == Item::FUNC_ITEM)
1504  {
1505  if (item->result_type() != STRING_RESULT)
1506  {
1507  field= item->tmp_table_field(&tmp_table);
1508  }
1509  else
1510  {
1511  field= item->tmp_table_field_from_field_type(&tmp_table, 0);
1512  }
1513  }
1514  else
1515  {
1516  field= create_tmp_field(session, &tmp_table, item, item->type(),
1517  (Item ***) 0, &tmp_field, &def_field, false,
1518  false, false, 0);
1519  }
1520 
1521  if (!field ||
1522  !(cr_field=new CreateField(field,(item->type() == Item::FIELD_ITEM ?
1523  ((Item_field *)item)->field :
1524  (Field*) 0))))
1525  {
1526  return NULL;
1527  }
1528 
1529  if (item->maybe_null)
1530  {
1531  cr_field->flags &= ~NOT_NULL_FLAG;
1532  }
1533 
1534  alter_info->create_list.push_back(cr_field);
1535  }
1536  }
1537 
1538  /*
1539  Create and lock table.
1540 
1541  Note that we either creating (or opening existing) temporary table or
1542  creating base table on which name we have exclusive lock. So code below
1543  should not cause deadlocks or races.
1544  */
1545  Table *table= 0;
1546  {
1547  if (not create_table_no_lock(session,
1548  identifier,
1549  create_info,
1550  table_proto,
1551  alter_info,
1552  false,
1553  select_field_count,
1554  is_if_not_exists))
1555  {
1556  if (create_info->table_existed && not identifier.isTmp())
1557  {
1558  /*
1559  This means that someone created table underneath server
1560  or it was created via different mysqld front-end to the
1561  cluster. We don't have much options but throw an error.
1562  */
1563  my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
1564  return NULL;
1565  }
1566 
1567  if (not identifier.isTmp())
1568  {
1569  /* CREATE TABLE... has found that the table already exists for insert and is adapting to use it */
1570  boost::mutex::scoped_lock scopedLock(table::Cache::mutex());
1571 
1572  if (create_table->table)
1573  {
1574  table::Concurrent *concurrent_table= static_cast<table::Concurrent *>(create_table->table);
1575 
1576  if (concurrent_table->reopen_name_locked_table(create_table, session))
1577  {
1578  (void)plugin::StorageEngine::dropTable(*session, identifier);
1579  }
1580  else
1581  {
1582  table= create_table->table;
1583  }
1584  }
1585  else
1586  {
1587  (void)plugin::StorageEngine::dropTable(*session, identifier);
1588  }
1589  }
1590  else
1591  {
1592  if (not (table= session->openTable(create_table, (bool*) 0,
1593  DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
1594  not create_info->table_existed)
1595  {
1596  /*
1597  This shouldn't happen as creation of temporary table should make
1598  it preparable for open. But let us do close_temporary_table() here
1599  just in case.
1600  */
1601  session->open_tables.drop_temporary_table(identifier);
1602  }
1603  }
1604  }
1605  if (not table) // open failed
1606  return NULL;
1607  }
1608 
1609  table->reginfo.lock_type=TL_WRITE;
1610  if (not ((*lock)= session->lockTables(&table, 1, DRIZZLE_LOCK_IGNORE_FLUSH)))
1611  {
1612  if (*lock)
1613  {
1614  session->unlockTables(*lock);
1615  *lock= 0;
1616  }
1617 
1618  if (not create_info->table_existed)
1619  session->drop_open_table(table, identifier);
1620 
1621  return NULL;
1622  }
1623 
1624  return table;
1625 }
1626 
1627 
1628 int
1629 select_create::prepare(List<Item> &values, Select_Lex_Unit *u)
1630 {
1631  DrizzleLock *extra_lock= NULL;
1632  /*
1633  For replication, the CREATE-SELECT statement is written
1634  in two pieces: the first transaction messsage contains
1635  the CREATE TABLE statement as a CreateTableStatement message
1636  necessary to create the table.
1637 
1638  The second transaction message contains all the InsertStatement
1639  and associated InsertRecords that should go into the table.
1640  */
1641 
1642  unit= u;
1643 
1644  if (not (table= create_table_from_items(session, create_info, create_table,
1645  table_proto,
1646  alter_info, &values,
1647  is_if_not_exists,
1648  &extra_lock, identifier)))
1649  {
1650  return(-1); // abort() deletes table
1651  }
1652 
1653  if (extra_lock)
1654  {
1655  assert(m_plock == NULL);
1656 
1657  if (identifier.isTmp())
1658  m_plock= &m_lock;
1659  else
1660  m_plock= &session->open_tables.extra_lock;
1661 
1662  *m_plock= extra_lock;
1663  }
1664 
1665  if (table->getShare()->sizeFields() < values.size())
1666  {
1667  my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1);
1668  return(-1);
1669  }
1670 
1671  /* First field to copy */
1672  field= table->getFields() + table->getShare()->sizeFields() - values.size();
1673 
1674  /* Mark all fields that are given values */
1675  for (Field **f= field ; *f ; f++)
1676  {
1677  table->setWriteSet((*f)->position());
1678  }
1679 
1680  /* Don't set timestamp if used */
1681  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
1682  table->next_number_field=table->found_next_number_field;
1683 
1684  table->restoreRecordAsDefault(); // Get empty record
1685  session->cuted_fields=0;
1686  if (info.ignore || info.handle_duplicates != DUP_ERROR)
1687  table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
1688 
1689  if (info.handle_duplicates == DUP_REPLACE)
1690  table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1691 
1692  if (info.handle_duplicates == DUP_UPDATE)
1693  table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1694 
1695  table->cursor->ha_start_bulk_insert((ha_rows) 0);
1696  session->setAbortOnWarning(not info.ignore);
1697  if (check_that_all_fields_are_given_values(session, table, table_list))
1698  return 1;
1699 
1700  table->mark_columns_needed_for_insert();
1701  table->cursor->extra(HA_EXTRA_WRITE_CACHE);
1702  return 0;
1703 }
1704 
1705 void select_create::store_values(List<Item> &values)
1706 {
1707  fill_record(session, field, values, true);
1708 }
1709 
1710 
1711 void select_create::send_error(drizzled::error_t errcode,const char *err)
1712 {
1713  /*
1714  This will execute any rollbacks that are necessary before writing
1715  the transcation cache.
1716 
1717  We disable the binary log since nothing should be written to the
1718  binary log. This disabling is important, since we potentially do
1719  a "roll back" of non-transactional tables by removing the table,
1720  and the actual rollback might generate events that should not be
1721  written to the binary log.
1722 
1723  */
1724  select_insert::send_error(errcode, err);
1725 }
1726 
1727 
1728 bool select_create::send_eof()
1729 {
1730  bool tmp=select_insert::send_eof();
1731  if (tmp)
1732  abort();
1733  else
1734  {
1735  /*
1736  Do an implicit commit at end of statement for non-temporary
1737  tables. This can fail, but we should unlock the table
1738  nevertheless.
1739  */
1740  if (!table->getShare()->getType())
1741  {
1742  TransactionServices::autocommitOrRollback(*session, 0);
1743  (void) session->endActiveTransaction();
1744  }
1745 
1746  table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1747  table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1748  if (m_plock)
1749  {
1750  session->unlockTables(*m_plock);
1751  *m_plock= NULL;
1752  m_plock= NULL;
1753  }
1754  }
1755  return tmp;
1756 }
1757 
1758 
1759 void select_create::abort()
1760 {
1761  /*
1762  In select_insert::abort() we roll back the statement, including
1763  truncating the transaction cache of the binary log. To do this, we
1764  pretend that the statement is transactional, even though it might
1765  be the case that it was not.
1766 
1767  We roll back the statement prior to deleting the table and prior
1768  to releasing the lock on the table, since there might be potential
1769  for failure if the rollback is executed after the drop or after
1770  unlocking the table.
1771 
1772  We also roll back the statement regardless of whether the creation
1773  of the table succeeded or not, since we need to reset the binary
1774  log state.
1775  */
1776  select_insert::abort();
1777 
1778  if (m_plock)
1779  {
1780  session->unlockTables(*m_plock);
1781  *m_plock= NULL;
1782  m_plock= NULL;
1783  }
1784 
1785  if (table)
1786  {
1787  table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1788  table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1789  if (not create_info->table_existed)
1790  session->drop_open_table(table, identifier);
1791  table= NULL; // Safety
1792  }
1793 }
1794 
1795 } /* namespace drizzled */