28 #include <drizzled/error.h>
29 #include <drizzled/field/epoch.h>
30 #include <drizzled/gettext.h>
31 #include <drizzled/internal/my_sys.h>
32 #include <drizzled/item/empty_string.h>
33 #include <drizzled/item/int.h>
34 #include <drizzled/lock.h>
35 #include <drizzled/message/table.h>
36 #include <drizzled/optimizer/cost_vector.h>
37 #include <drizzled/plugin/client.h>
38 #include <drizzled/plugin/event_observer.h>
39 #include <drizzled/plugin/storage_engine.h>
40 #include <drizzled/probes.h>
41 #include <drizzled/session.h>
42 #include <drizzled/sql_base.h>
43 #include <drizzled/sql_parse.h>
44 #include <drizzled/transaction_services.h>
45 #include <drizzled/key.h>
46 #include <drizzled/sql_lex.h>
47 #include <drizzled/resource_context.h>
48 #include <drizzled/statistics_variables.h>
49 #include <drizzled/system_variables.h>
58 Cursor::Cursor(plugin::StorageEngine &engine_arg,
62 estimation_rows_to_insert(0),
64 key_used_on_scan(MAX_KEY), active_index(MAX_KEY),
65 ref_length(sizeof(internal::my_off_t)),
68 next_insert_id(0), insert_id_for_cur_row(0)
83 Cursor *Cursor::clone(memory::Root *mem_root)
85 Cursor *new_handler= getTable()->getMutableShare()->db_type()->getCursor(*getTable());
91 new_handler->ref= mem_root->alloc(ALIGN_SIZE(ref_length)*2);
92 identifier::Table identifier(*getTable());
93 return new_handler->ha_open(identifier, getTable()->getDBStat(), HA_OPEN_IGNORE_IF_LOCKED) ? NULL : new_handler;
101 uint32_t Cursor::calculate_key_len(uint32_t key_position, key_part_map keypart_map_arg)
104 assert(((keypart_map_arg + 1) & keypart_map_arg) == 0);
106 const KeyPartInfo *key_part_found= getTable()->getShare()->getKeyInfo(key_position).key_part;
107 const KeyPartInfo *end_key_part_found= key_part_found + getTable()->getShare()->getKeyInfo(key_position).key_parts;
110 while (key_part_found < end_key_part_found && keypart_map_arg)
112 length+= key_part_found->store_length;
113 keypart_map_arg >>= 1;
119 int Cursor::startIndexScan(uint32_t idx,
bool sorted)
122 assert(inited == NONE);
123 if (!(result= doStartIndexScan(idx, sorted)))
129 int Cursor::endIndexScan()
131 assert(inited==INDEX);
134 return(doEndIndexScan());
137 int Cursor::startTableScan(
bool scan)
140 assert(inited==NONE || (inited==RND && scan));
141 inited= (result= doStartTableScan(scan)) ? NONE: RND;
146 int Cursor::endTableScan()
150 return(doEndTableScan());
153 int Cursor::ha_index_or_rnd_end()
155 return inited == INDEX ? endIndexScan() : inited == RND ? endTableScan() : 0;
158 void Cursor::ha_start_bulk_insert(ha_rows rows)
160 estimation_rows_to_insert= rows;
161 start_bulk_insert(rows);
164 int Cursor::ha_end_bulk_insert()
166 estimation_rows_to_insert= 0;
167 return end_bulk_insert();
170 const key_map *Cursor::keys_to_use_for_scanning()
172 return &key_map_empty;
175 bool Cursor::has_transactions()
177 return (getTable()->getShare()->db_type()->check_flag(HTON_BIT_DOES_TRANSACTIONS));
180 void Cursor::ha_statistic_increment(uint64_t system_status_var::*offset)
const
182 (getTable()->in_use->status_var.*offset)++;
185 void **Cursor::ha_data(Session *session)
const
187 return session->getEngineData(getEngine());
190 bool Cursor::is_fatal_error(
int error, uint32_t flags)
193 ((flags & HA_CHECK_DUP_KEY) &&
194 (error == HA_ERR_FOUND_DUPP_KEY ||
195 error == HA_ERR_FOUND_DUPP_UNIQUE)))
201 ha_rows Cursor::records() {
return stats.records; }
202 uint64_t Cursor::tableSize() {
return stats.index_file_length + stats.data_file_length; }
203 uint64_t Cursor::rowSize() {
return getTable()->getRecordLength() + getTable()->sizeFields(); }
205 int Cursor::doOpen(
const identifier::Table &identifier,
int mode, uint32_t test_if_locked)
207 return open(identifier.getPath().c_str(), mode, test_if_locked);
222 if ((error= doOpen(identifier, mode, test_if_locked)))
224 if ((error == EACCES || error == EROFS) && mode == O_RDWR &&
225 (getTable()->db_stat & HA_TRY_READ_ONLY))
227 getTable()->db_stat|=HA_READ_ONLY;
228 error= doOpen(identifier, O_RDONLY,test_if_locked);
237 if (getTable()->getShare()->db_options_in_use & HA_OPTION_READ_ONLY_DATA)
238 getTable()->db_stat|=HA_READ_ONLY;
239 (void) extra(HA_EXTRA_NO_READCHECK);
243 ref= getTable()->alloc(ALIGN_SIZE(ref_length)*2);
244 dup_ref=ref+ALIGN_SIZE(ref_length);
255 int Cursor::read_first_row(
unsigned char * buf, uint32_t primary_key)
259 ha_statistic_increment(&system_status_var::ha_read_first_count);
266 if (stats.deleted < 10 || primary_key >= MAX_KEY ||
267 !(getTable()->index_flags(primary_key) & HA_READ_ORDER))
269 error= startTableScan(1);
272 while ((error= rnd_next(buf)) == HA_ERR_RECORD_DELETED) ;
273 (void) endTableScan();
279 error= startIndexScan(primary_key, 0);
282 error=index_first(buf);
283 (void) endIndexScan();
303 if (variables->auto_increment_increment == 1)
305 nr= (((nr+ variables->auto_increment_increment -
306 variables->auto_increment_offset)) /
307 (uint64_t) variables->auto_increment_increment);
308 return (nr* (uint64_t) variables->auto_increment_increment +
309 variables->auto_increment_offset);
313 void Cursor::adjust_next_insert_id_after_explicit_value(uint64_t nr)
320 if ((next_insert_id > 0) && (nr >= next_insert_id))
321 set_next_insert_id(compute_next_insert_id(nr, &getTable()->in_use->variables));
343 if (unlikely(nr < variables->auto_increment_offset))
352 if (variables->auto_increment_increment == 1)
354 nr= (((nr - variables->auto_increment_offset)) /
355 (uint64_t) variables->auto_increment_increment);
356 return (nr * (uint64_t) variables->auto_increment_increment +
357 variables->auto_increment_offset);
422 #define AUTO_INC_DEFAULT_NB_ROWS 1 // Some prefer 1024 here
423 #define AUTO_INC_DEFAULT_NB_MAX_BITS 16
424 #define AUTO_INC_DEFAULT_NB_MAX ((1 << AUTO_INC_DEFAULT_NB_MAX_BITS) - 1)
426 int Cursor::update_auto_increment()
428 uint64_t nr, nb_reserved_values;
430 Session *session= getTable()->in_use;
431 drizzle_system_variables *variables= &session->variables;
437 assert(next_insert_id >= auto_inc_interval_for_cur_row.minimum());
443 if ((nr= getTable()->next_number_field->val_int()) != 0
444 || getTable()->auto_increment_field_not_null)
452 adjust_next_insert_id_after_explicit_value(nr);
453 insert_id_for_cur_row= 0;
458 if ((nr= next_insert_id) >= auto_inc_interval_for_cur_row.maximum())
465 uint32_t nb_already_reserved_intervals= 0;
466 uint64_t nb_desired_values;
478 if (nb_already_reserved_intervals == 0 &&
479 (estimation_rows_to_insert > 0))
480 nb_desired_values= estimation_rows_to_insert;
484 if (nb_already_reserved_intervals <= AUTO_INC_DEFAULT_NB_MAX_BITS)
486 nb_desired_values= AUTO_INC_DEFAULT_NB_ROWS *
487 (1 << nb_already_reserved_intervals);
488 set_if_smaller(nb_desired_values, (uint64_t)AUTO_INC_DEFAULT_NB_MAX);
491 nb_desired_values= AUTO_INC_DEFAULT_NB_MAX;
494 get_auto_increment(variables->auto_increment_offset,
495 variables->auto_increment_increment,
496 nb_desired_values, &nr,
497 &nb_reserved_values);
498 if (nr == ~(uint64_t) 0)
499 return HA_ERR_AUTOINC_READ_FAILED;
512 if (getTable()->getShare()->next_number_keypart == 0)
519 if (unlikely(getTable()->next_number_field->store((int64_t) nr,
true)))
524 if (session->getKilled() == Session::KILL_BAD_DATA)
525 return HA_ERR_AUTOINC_ERANGE;
535 nr=
prev_insert_id(getTable()->next_number_field->val_int(), variables);
536 if (unlikely(getTable()->next_number_field->store((int64_t) nr,
true)))
537 nr= getTable()->next_number_field->val_int();
540 auto_inc_interval_for_cur_row.replace(nr, nb_reserved_values, variables->auto_increment_increment);
549 insert_id_for_cur_row= nr;
576 void Cursor::ha_release_auto_increment()
578 release_auto_increment();
579 insert_id_for_cur_row= 0;
580 auto_inc_interval_for_cur_row.replace(0, 0, 0);
584 void Cursor::drop_table()
589 int Cursor::ha_check(Session*)
601 Cursor::setTransactionReadWrite()
608 if (not getTable()->in_use)
619 ResourceContext& resource_context= getTable()->in_use->getResourceContext(*getEngine());
636 Cursor::ha_delete_all_rows()
638 setTransactionReadWrite();
640 int result= delete_all_rows();
650 Session& session= *getTable()->in_use;
651 TransactionServices::truncateTable(session, *getTable());
665 Cursor::ha_reset_auto_increment(uint64_t value)
667 setTransactionReadWrite();
669 return reset_auto_increment(value);
682 setTransactionReadWrite();
684 return analyze(session);
694 Cursor::ha_disable_indexes(uint32_t mode)
696 setTransactionReadWrite();
698 return disable_indexes(mode);
709 Cursor::ha_enable_indexes(uint32_t mode)
711 setTransactionReadWrite();
713 return enable_indexes(mode);
723 int Cursor::ha_discard_or_import_tablespace(
bool discard)
725 setTransactionReadWrite();
726 return discard_or_import_tablespace(discard);
735 void Cursor::closeMarkForDelete()
737 setTransactionReadWrite();
741 int Cursor::index_next_same(
unsigned char *buf,
const unsigned char *key, uint32_t keylen)
743 int error= index_next(buf);
747 ptrdiff_t ptrdiff= buf - getTable()->getInsertRecord();
748 unsigned char *save_record_0= NULL;
763 save_record_0= getTable()->getInsertRecord();
764 getTable()->record[0]= buf;
765 key_info= getTable()->key_info + active_index;
766 key_part= key_info->key_part;
767 key_part_end= key_part + key_info->key_parts;
768 for (; key_part < key_part_end; key_part++)
770 assert(key_part->field);
771 key_part->field->move_field_offset(ptrdiff);
777 getTable()->status=STATUS_NOT_FOUND;
778 error= HA_ERR_END_OF_FILE;
784 getTable()->record[0]= save_record_0;
785 for (key_part= key_info->key_part; key_part < key_part_end; key_part++)
786 key_part->field->move_field_offset(-ptrdiff);
817 double Cursor::index_only_read_time(uint32_t keynr,
double key_records)
819 uint32_t keys_per_block= (stats.block_size/2/
820 (getTable()->key_info[keynr].key_length + ref_length) + 1);
821 return ((
double) (key_records + keys_per_block-1) /
822 (
double) keys_per_block);
863 void *seq_init_param,
869 ha_rows rows, total_rows= 0;
875 seq_it= seq->init(seq_init_param, n_ranges, *flags);
876 while (!seq->next(seq_it, &range))
881 min_endp= range.start_key.length? &range.start_key : NULL;
882 max_endp= range.end_key.length? &range.end_key : NULL;
884 if ((range.range_flag & UNIQUE_RANGE) && !(range.range_flag & NULL_RANGE))
888 if (HA_POS_ERROR == (rows= this->records_in_range(keyno, min_endp,
892 total_rows= HA_POS_ERROR;
899 if (total_rows != HA_POS_ERROR)
902 *flags |= HA_MRR_USE_DEFAULT_IMPL;
904 cost->setAvgIOCost(1);
905 if ((*flags & HA_MRR_INDEX_ONLY) && total_rows > 2)
906 cost->setIOCount(index_only_read_time(keyno, (uint32_t)total_rows));
908 cost->setIOCount(read_time(keyno, n_ranges, total_rows));
949 int Cursor::multi_range_read_info(uint32_t keyno, uint32_t n_ranges, uint32_t n_rows,
954 *flags |= HA_MRR_USE_DEFAULT_IMPL;
957 cost->setAvgIOCost(1);
960 if (*flags & HA_MRR_INDEX_ONLY)
961 cost->setIOCount(index_only_read_time(keyno, n_rows));
963 cost->setIOCount(read_time(keyno, n_ranges, n_rows));
1010 Cursor::multi_range_read_init(
RANGE_SEQ_IF *seq_funcs,
void *seq_init_param,
1011 uint32_t n_ranges, uint32_t mode)
1013 mrr_iter= seq_funcs->init(seq_init_param, n_ranges, mode);
1014 mrr_funcs= *seq_funcs;
1015 mrr_is_output_sorted= test(mode & HA_MRR_SORTED);
1016 mrr_have_range=
false;
1035 int Cursor::multi_range_read_next(
char **range_info)
1040 if (not mrr_have_range)
1042 mrr_have_range=
true;
1049 if (mrr_cur_range.range_flag != (UNIQUE_RANGE | EQ_RANGE))
1051 result= read_range_next();
1053 if (result != HA_ERR_END_OF_FILE)
1058 if (was_semi_consistent_read())
1064 result= HA_ERR_END_OF_FILE;
1069 while (!(range_res= mrr_funcs.next(mrr_iter, &mrr_cur_range)))
1072 result= read_range_first(mrr_cur_range.start_key.keypart_map ?
1073 &mrr_cur_range.start_key : 0,
1074 mrr_cur_range.end_key.keypart_map ?
1075 &mrr_cur_range.end_key : 0,
1076 test(mrr_cur_range.range_flag & EQ_RANGE),
1077 mrr_is_output_sorted);
1078 if (result != HA_ERR_END_OF_FILE)
1082 while ((result == HA_ERR_END_OF_FILE) && !range_res);
1084 *range_info= mrr_cur_range.ptr;
1118 eq_range= eq_range_arg;
1122 end_range= &save_end_range;
1123 save_end_range= *end_key;
1124 key_compare_result_on_equal= ((end_key->flag == HA_READ_BEFORE_KEY) ? 1 :
1125 (end_key->flag == HA_READ_AFTER_KEY) ? -1 : 0);
1127 range_key_part= getTable()->key_info[active_index].key_part;
1130 result= index_first(getTable()->getInsertRecord());
1132 result= index_read_map(getTable()->getInsertRecord(),
1134 start_key->keypart_map,
1137 return((result == HA_ERR_KEY_NOT_FOUND)
1138 ? HA_ERR_END_OF_FILE
1141 return (compare_key(end_range) <= 0 ? 0 : HA_ERR_END_OF_FILE);
1158 int Cursor::read_range_next()
1165 return(index_next_same(getTable()->getInsertRecord(),
1167 end_range->length));
1169 result= index_next(getTable()->getInsertRecord());
1172 return(compare_key(end_range) <= 0 ? 0 : HA_ERR_END_OF_FILE);
1196 cmp=
key_cmp(range_key_part, range->key, range->length);
1198 cmp= key_compare_result_on_equal;
1202 int Cursor::index_read_idx_map(
unsigned char * buf, uint32_t index,
1203 const unsigned char * key,
1204 key_part_map keypart_map,
1205 enum ha_rkey_function find_flag)
1208 error= doStartIndexScan(index, 0);
1211 error= index_read_map(buf, key, keypart_map, find_flag);
1212 error1= doEndIndexScan();
1214 return error ? error : error1;
1217 static bool log_row_for_replication(
Table* table,
1218 const unsigned char *before_record,
1219 const unsigned char *after_record)
1223 if (table->getShare()->getType() || not TransactionServices::shouldConstructMessages())
1228 switch (session->lex().sql_command)
1230 case SQLCOM_CREATE_TABLE:
1240 result= TransactionServices::insertRecord(*session, *table);
1242 case SQLCOM_REPLACE:
1243 case SQLCOM_REPLACE_SELECT:
1262 if (after_record == NULL)
1269 TransactionServices::deleteRecord(*session, *table,
true);
1279 if (before_record == NULL)
1280 result= TransactionServices::insertRecord(*session, *table);
1282 TransactionServices::updateRecord(*session, *table, before_record, after_record);
1286 case SQLCOM_INSERT_SELECT:
1294 if (before_record == NULL)
1295 result= TransactionServices::insertRecord(*session, *table);
1297 TransactionServices::updateRecord(*session, *table, before_record, after_record);
1301 TransactionServices::updateRecord(*session, *table, before_record, after_record);
1305 TransactionServices::deleteRecord(*session, *table);
1314 int Cursor::ha_external_lock(
Session *session,
int lock_type)
1321 assert(next_insert_id == 0);
1323 if (DRIZZLE_CURSOR_RDLOCK_START_ENABLED() ||
1324 DRIZZLE_CURSOR_WRLOCK_START_ENABLED() ||
1325 DRIZZLE_CURSOR_UNLOCK_START_ENABLED())
1327 if (lock_type == F_RDLCK)
1329 DRIZZLE_CURSOR_RDLOCK_START(getTable()->getShare()->getSchemaName(),
1330 getTable()->getShare()->getTableName());
1332 else if (lock_type == F_WRLCK)
1334 DRIZZLE_CURSOR_WRLOCK_START(getTable()->getShare()->getSchemaName(),
1335 getTable()->getShare()->getTableName());
1337 else if (lock_type == F_UNLCK)
1339 DRIZZLE_CURSOR_UNLOCK_START(getTable()->getShare()->getSchemaName(),
1340 getTable()->getShare()->getTableName());
1349 int error= external_lock(session, lock_type);
1351 if (DRIZZLE_CURSOR_RDLOCK_DONE_ENABLED() ||
1352 DRIZZLE_CURSOR_WRLOCK_DONE_ENABLED() ||
1353 DRIZZLE_CURSOR_UNLOCK_DONE_ENABLED())
1355 if (lock_type == F_RDLCK)
1357 DRIZZLE_CURSOR_RDLOCK_DONE(error);
1359 else if (lock_type == F_WRLCK)
1361 DRIZZLE_CURSOR_WRLOCK_DONE(error);
1363 else if (lock_type == F_UNLCK)
1365 DRIZZLE_CURSOR_UNLOCK_DONE(error);
1376 int Cursor::ha_reset()
1379 assert(! getTable()->getShare()->all_set.none());
1380 assert(getTable()->key_read == 0);
1382 assert(inited == NONE);
1384 getTable()->free_io_cache();
1386 getTable()->default_column_bitmaps();
1391 int Cursor::insertRecord(
unsigned char *buf)
1401 if (getTable()->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
1403 getTable()->timestamp_field->set_time();
1406 DRIZZLE_INSERT_ROW_START(getTable()->getShare()->getSchemaName(), getTable()->getShare()->getTableName());
1407 setTransactionReadWrite();
1409 if (unlikely(plugin::EventObserver::beforeInsertRecord(*getTable(), buf)))
1411 error= ER_EVENT_OBSERVER_PLUGIN;
1415 error= doInsertRecord(buf);
1416 if (unlikely(plugin::EventObserver::afterInsertRecord(*getTable(), buf, error)))
1418 error= ER_EVENT_OBSERVER_PLUGIN;
1422 ha_statistic_increment(&system_status_var::ha_write_count);
1424 DRIZZLE_INSERT_ROW_DONE(error);
1426 if (unlikely(error))
1431 if (unlikely(log_row_for_replication(getTable(), NULL, buf)))
1432 return HA_ERR_LOG_ROW_FOR_REPLICATION_FAILED;
1438 int Cursor::updateRecord(
const unsigned char *old_data,
unsigned char *new_data)
1446 assert(new_data == getTable()->getInsertRecord());
1448 DRIZZLE_UPDATE_ROW_START(getTable()->getShare()->getSchemaName(), getTable()->getShare()->getTableName());
1449 setTransactionReadWrite();
1450 if (unlikely(plugin::EventObserver::beforeUpdateRecord(*getTable(), old_data, new_data)))
1452 error= ER_EVENT_OBSERVER_PLUGIN;
1456 if (getTable()->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
1458 getTable()->timestamp_field->set_time();
1461 error= doUpdateRecord(old_data, new_data);
1462 if (unlikely(plugin::EventObserver::afterUpdateRecord(*getTable(), old_data, new_data, error)))
1464 error= ER_EVENT_OBSERVER_PLUGIN;
1468 ha_statistic_increment(&system_status_var::ha_update_count);
1470 DRIZZLE_UPDATE_ROW_DONE(error);
1472 if (unlikely(error))
1477 if (unlikely(log_row_for_replication(getTable(), old_data, new_data)))
1478 return HA_ERR_LOG_ROW_FOR_REPLICATION_FAILED;
1482 TableShare *Cursor::getShare()
1484 return getTable()->getMutableShare();
1487 int Cursor::deleteRecord(
const unsigned char *buf)
1491 DRIZZLE_DELETE_ROW_START(getTable()->getShare()->getSchemaName(), getTable()->getShare()->getTableName());
1492 setTransactionReadWrite();
1493 if (unlikely(plugin::EventObserver::beforeDeleteRecord(*getTable(), buf)))
1495 error= ER_EVENT_OBSERVER_PLUGIN;
1499 error= doDeleteRecord(buf);
1500 if (unlikely(plugin::EventObserver::afterDeleteRecord(*getTable(), buf, error)))
1502 error= ER_EVENT_OBSERVER_PLUGIN;
1506 ha_statistic_increment(&system_status_var::ha_delete_count);
1508 DRIZZLE_DELETE_ROW_DONE(error);
1510 if (unlikely(error))
1513 if (unlikely(log_row_for_replication(getTable(), buf, NULL)))
1514 return HA_ERR_LOG_ROW_FOR_REPLICATION_FAILED;