83 #include <drizzled/error.h>
84 #include <drizzled/thr_lock.h>
85 #include <drizzled/session.h>
86 #include <drizzled/session/times.h>
87 #include <drizzled/sql_base.h>
88 #include <drizzled/lock.h>
89 #include <drizzled/pthread_globals.h>
90 #include <drizzled/internal/my_sys.h>
91 #include <drizzled/pthread_globals.h>
92 #include <drizzled/plugin/storage_engine.h>
93 #include <drizzled/util/test.h>
94 #include <drizzled/open_tables_state.h>
95 #include <drizzled/table/cache.h>
100 #include <functional>
102 #include <boost/thread/shared_mutex.hpp>
103 #include <boost/thread/condition_variable.hpp>
110 static boost::mutex LOCK_global_read_lock;
111 static boost::condition_variable_any COND_global_read_lock;
118 static void print_lock_error(
int error,
const char *);
142 static drizzled::error_t thr_lock_errno_to_mysql[]=
143 { EE_OK, EE_ERROR_FIRST, ER_LOCK_WAIT_TIMEOUT, ER_LOCK_DEADLOCK };
173 DrizzleLock *Session::lockTables(Table **tables, uint32_t count, uint32_t flags)
175 DrizzleLock *sql_lock;
176 Table *write_lock_used;
177 vector<plugin::StorageEngine *> involved_engines;
181 if (! (sql_lock= get_lock_data(tables, count,
true, &write_lock_used)))
184 if (global_read_lock && write_lock_used and (not (flags & DRIZZLE_LOCK_IGNORE_GLOBAL_READ_LOCK)))
190 if (wait_if_global_read_lock(1, 1))
197 if (open_tables.version != g_refresh_version)
205 set_proc_info(
"Notify start statement");
210 if (sql_lock->sizeTable())
212 size_t num_tables= sql_lock->sizeTable();
213 plugin::StorageEngine *engine;
214 std::set<size_t> involved_slots;
216 for (
size_t x= 1; x <= num_tables; x++, tables++)
218 engine= (*tables)->cursor->getEngine();
220 if (involved_slots.count(engine->getId()) > 0)
223 involved_engines.push_back(engine);
224 involved_slots.insert(engine->getId());
227 for_each(involved_engines.begin(),
228 involved_engines.end(),
229 bind2nd(mem_fun(&plugin::StorageEngine::startStatement),
this));
232 set_proc_info(
"External lock");
239 if (sql_lock->sizeTable() && lock_external(sql_lock->getTable(), sql_lock->sizeTable()))
245 set_proc_info(
"Table lock");
247 memcpy(sql_lock->getLocks() + sql_lock->sizeLock(),
248 sql_lock->getLocks(),
249 sql_lock->sizeLock() *
sizeof(*sql_lock->getLocks()));
252 drizzled::error_t rc;
253 rc= thr_lock_errno_to_mysql[(int) thr_multi_lock(*
this,
254 sql_lock->getLocks() +
255 sql_lock->sizeLock(),
256 sql_lock->sizeLock(),
260 if (sql_lock->sizeTable())
261 unlock_external(sql_lock->getTable(), sql_lock->sizeTable());
263 my_error(rc, MYF(0));
273 unlockTables(sql_lock);
278 times.set_time_after_lock();
284 int Session::lock_external(Table **tables, uint32_t count)
287 for (uint32_t i= 1 ; i <= count ; i++, tables++)
289 assert((*tables)->reginfo.lock_type >= TL_READ);
291 if ((*tables)->db_stat & HA_READ_ONLY ||
292 ((*tables)->reginfo.lock_type >= TL_READ &&
293 (*tables)->reginfo.lock_type <= TL_READ_NO_INSERT))
296 if ((error=(*tables)->cursor->ha_external_lock(
this,lock_type)))
298 print_lock_error(error, (*tables)->cursor->getEngine()->getName().c_str());
302 (*tables)->cursor->ha_external_lock(
this, F_UNLCK);
303 (*tables)->current_lock=F_UNLCK;
309 (*tables)->db_stat &= ~ HA_BLOCK_LOCK;
310 (*tables)->current_lock= lock_type;
317 void Session::unlockTables(DrizzleLock *sql_lock)
319 if (sql_lock->sizeLock())
320 sql_lock->unlock(sql_lock->sizeLock());
321 if (sql_lock->sizeTable())
322 unlock_external(sql_lock->getTable(), sql_lock->sizeTable());
332 void Session::unlockSomeTables(
Table **table, uint32_t count)
335 Table *write_lock_used;
336 if ((sql_lock= get_lock_data(table, count,
false,
338 unlockTables(sql_lock);
352 for (i=found=0 ; i < sql_lock->sizeLock(); i++)
354 if (sql_lock->getLocks()[i]->type >= TL_WRITE_ALLOW_READ)
356 std::swap(*lock_local, sql_lock->getLocks()[i]);
364 thr_multi_unlock(lock_local, i - found);
365 sql_lock->setLock(found);
370 Table **table= sql_lock->getTable();
371 for (i=found=0 ; i < sql_lock->sizeTable() ; i++)
374 if ((uint32_t) sql_lock->getTable()[i]->reginfo.lock_type >= TL_WRITE_ALLOW_READ)
376 std::swap(*table, sql_lock->getTable()[i]);
384 unlock_external(table, i - found);
385 sql_lock->resizeTable(found);
388 table= sql_lock->getTable();
390 for (i= 0; i < sql_lock->sizeTable(); i++)
421 void Session::removeLock(
Table *table)
423 unlockSomeTables(&table, 1);
429 void Session::abortLock(
Table *table)
432 Table *write_lock_used;
434 if ((locked= get_lock_data(&table, 1,
false,
437 for (uint32_t x= 0; x < locked->sizeLock(); x++)
438 locked->getLocks()[x]->lock->abort_locks();
456 bool Session::abortLockForThread(
Table *table)
459 Table* write_lock_used;
460 if (
DrizzleLock* locked= get_lock_data(&table, 1,
false, &write_lock_used))
462 for (uint32_t i= 0; i < locked->sizeLock(); i++)
464 if (locked->getLocks()[i]->lock->abort_locks_for_thread(table->
in_use->thread_id))
474 int Session::unlock_external(
Table **table, uint32_t count)
481 if ((*table)->current_lock != F_UNLCK)
484 if ((error=(*table)->cursor->ha_external_lock(
this, F_UNLCK)))
487 print_lock_error(error_code, (*table)->cursor->getEngine()->getName().c_str());
508 bool should_lock,
Table **write_lock_used)
512 Table **to, **table_buf;
515 for (uint32_t i= lock_count= 0 ; i < count ; i++)
517 Table *t= table_ptr[i];
519 if (! (t->getEngine()->check_flag(HTON_BIT_SKIP_STORE_LOCK)))
536 locks= locks_buf= sql_lock->getLocks();
537 to= table_buf= sql_lock->getTable();
539 for (uint32_t i= 0; i < count ; i++)
542 thr_lock_type lock_type;
544 if (table_ptr[i]->getEngine()->check_flag(HTON_BIT_SKIP_STORE_LOCK))
548 lock_type= table->reginfo.lock_type;
549 assert (lock_type != TL_WRITE_DEFAULT);
550 if (lock_type >= TL_WRITE_ALLOW_WRITE)
552 *write_lock_used=table;
553 if (table->
db_stat & HA_READ_ONLY)
555 my_error(ER_OPEN_AS_READONLY, MYF(0), table->getAlias());
557 sql_lock->setLock(locks - sql_lock->getLocks());
563 locks= table->
cursor->
store_lock(
this, locks, should_lock ? lock_type : TL_IGNORE);
568 table->
lock_count= (uint32_t) (locks - locks_start);
587 sql_lock->setLock(locks - locks_buf);
621 identifier::Table identifier(table_list->getSchemaName(), table_list->getTableName());
624 table::CacheRange ppp= table::getCache().equal_range(identifier.getKey());
625 for (table::CacheMap::const_iterator iter= ppp.first; iter != ppp.second; ++iter)
627 Table *table= iter->second;
628 if (table->reginfo.lock_type < TL_WRITE)
630 if (table->
in_use ==
this)
632 table->getMutableShare()->resetVersion();
633 table->locked_by_name=
true;
640 table_list->
table=
reinterpret_cast<Table*
>(table);
643 return (test(table::Cache::removeTable(*
this, identifier, RTFC_NO_FLAG)));
647 void TableList::unlock_table_name()
651 table::remove_table(static_cast<table::Concurrent *>(table));
657 static bool locked_named_table(TableList *table_list)
659 for (; table_list; table_list=table_list->next_local)
661 Table *table= table_list->table;
664 Table *save_next= table->getNext();
665 table->setNext(NULL);
666 bool result= table::Cache::areTablesUsed(table_list->table, 0);
667 table->setNext(save_next);
676 bool Session::wait_for_locked_table_names(TableList *table_list)
681 assert(ownership of table::Cache::mutex());
684 while (locked_named_table(table_list))
691 wait_for_condition(table::Cache::mutex(), COND_refresh);
692 table::Cache::mutex().lock();
714 bool got_all_locks=
true;
724 got_all_locks=
false;
728 if (not got_all_locks && wait_for_locked_table_names(table_list))
755 bool Session::lock_table_names_exclusively(
TableList *table_list)
757 if (lock_table_names(table_list))
766 table->table->open_placeholder= 1;
794 void TableList::unlock_table_names(
TableList *last_table)
796 for (
TableList *table_iter=
this; table_iter != last_table; table_iter= table_iter->
next_local)
798 table_iter->unlock_table_name();
804 static void print_lock_error(
int error,
const char *table)
806 drizzled::error_t textno;
809 case HA_ERR_LOCK_WAIT_TIMEOUT:
810 textno=ER_LOCK_WAIT_TIMEOUT;
812 case HA_ERR_READ_ONLY_TRANSACTION:
813 textno=ER_READ_ONLY_TRANSACTION;
815 case HA_ERR_LOCK_DEADLOCK:
816 textno=ER_LOCK_DEADLOCK;
818 case HA_ERR_WRONG_COMMAND:
819 textno=ER_ILLEGAL_HA;
826 if ( textno == ER_ILLEGAL_HA )
827 my_error(textno, MYF(ME_BELL+ME_OLDWIN+ME_WAITTANG), table);
829 my_error(textno, MYF(ME_BELL+ME_OLDWIN+ME_WAITTANG), error);
908 volatile uint32_t global_read_lock=0;
909 volatile uint32_t global_read_lock_blocks_commit=0;
910 static volatile uint32_t protect_against_global_read_lock=0;
911 static volatile uint32_t waiting_for_read_lock=0;
913 bool Session::lockGlobalReadLock()
915 if (isGlobalReadLock() == Session::NONE)
917 const char *old_message;
918 LOCK_global_read_lock.lock();
919 old_message= enter_cond(COND_global_read_lock, LOCK_global_read_lock,
920 "Waiting to get readlock");
922 waiting_for_read_lock++;
923 boost::mutex::scoped_lock scopedLock(LOCK_global_read_lock, boost::adopt_lock_t());
924 while (protect_against_global_read_lock && not getKilled())
925 COND_global_read_lock.wait(scopedLock);
926 waiting_for_read_lock--;
927 scopedLock.release();
930 exit_cond(old_message);
933 setGlobalReadLock(Session::GOT_GLOBAL_READ_LOCK);
935 exit_cond(old_message);
950 void Session::unlockGlobalReadLock(
void)
954 if (not isGlobalReadLock())
958 boost::mutex::scoped_lock scopedLock(LOCK_global_read_lock);
959 tmp= --global_read_lock;
960 if (isGlobalReadLock() == Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT)
961 --global_read_lock_blocks_commit;
966 COND_global_read_lock.notify_all();
968 setGlobalReadLock(Session::NONE);
971 static inline bool must_wait(
bool is_not_commit)
973 return (global_read_lock &&
975 global_read_lock_blocks_commit));
978 bool Session::wait_if_global_read_lock(
bool abort_on_refresh,
bool is_not_commit)
980 const char *old_message= NULL;
981 bool result= 0, need_exit_cond;
988 safe_mutex_assert_not_owner(table::Cache::mutex().native_handle());
990 LOCK_global_read_lock.lock();
991 if ((need_exit_cond= must_wait(is_not_commit)))
993 if (isGlobalReadLock())
996 my_message(ER_CANT_UPDATE_WITH_READLOCK,
997 ER(ER_CANT_UPDATE_WITH_READLOCK), MYF(0));
998 LOCK_global_read_lock.unlock();
1004 return is_not_commit;
1006 old_message= enter_cond(COND_global_read_lock, LOCK_global_read_lock,
1007 "Waiting for release of readlock");
1009 while (must_wait(is_not_commit) && not getKilled() &&
1010 (!abort_on_refresh || open_tables.version == g_refresh_version))
1012 boost::mutex::scoped_lock scoped(LOCK_global_read_lock, boost::adopt_lock_t());
1013 COND_global_read_lock.wait(scoped);
1020 if (not abort_on_refresh && not result)
1021 protect_against_global_read_lock++;
1027 if (unlikely(need_exit_cond))
1029 exit_cond(old_message);
1033 LOCK_global_read_lock.unlock();
1040 void Session::startWaitingGlobalReadLock()
1042 if (unlikely(isGlobalReadLock()))
1045 LOCK_global_read_lock.lock();
1046 bool tmp= (!--protect_against_global_read_lock && (waiting_for_read_lock || global_read_lock_blocks_commit));
1047 LOCK_global_read_lock.unlock();
1050 COND_global_read_lock.notify_all();
1054 bool Session::makeGlobalReadLockBlockCommit()
1057 const char *old_message;
1062 if (isGlobalReadLock() != Session::GOT_GLOBAL_READ_LOCK)
1064 LOCK_global_read_lock.lock();
1066 global_read_lock_blocks_commit++;
1067 old_message= enter_cond(COND_global_read_lock, LOCK_global_read_lock,
1068 "Waiting for all running commits to finish");
1069 while (protect_against_global_read_lock && not getKilled())
1071 boost::mutex::scoped_lock scopedLock(LOCK_global_read_lock, boost::adopt_lock_t());
1072 COND_global_read_lock.wait(scopedLock);
1073 scopedLock.release();
1075 if ((error= test(getKilled())))
1077 global_read_lock_blocks_commit--;
1081 setGlobalReadLock(Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT);
1084 exit_cond(old_message);
1111 COND_refresh.notify_all();
1112 COND_global_read_lock.notify_all();