Drizzled Public API Documentation

thr_lock.cc
00001 /* Copyright (C) 2000 MySQL AB
00002 
00003    This program is free software; you can redistribute it and/or modify
00004    it under the terms of the GNU General Public License as published by
00005    the Free Software Foundation; version 2 of the License.
00006 
00007    This program is distributed in the hope that it will be useful,
00008    but WITHOUT ANY WARRANTY; without even the implied warranty of
00009    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00010    GNU General Public License for more details.
00011 
00012    You should have received a copy of the GNU General Public License
00013    along with this program; if not, write to the Free Software
00014    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
00015 
00016 /*
00017 Read and write locks for Posix threads. All tread must acquire
00018 all locks it needs through thr_multi_lock() to avoid dead-locks.
00019 A lock consists of a master lock (THR_LOCK), and lock instances
00020 (THR_LOCK_DATA).
00021 Any thread can have any number of lock instances (read and write:s) on
00022 any lock. All lock instances must be freed.
00023 Locks are prioritized according to:
00024 
00025 The current lock types are:
00026 
00027 TL_READ     # Low priority read
00028 TL_READ_WITH_SHARED_LOCKS
00029 TL_READ_NO_INSERT # Read without concurrent inserts
00030 TL_WRITE_ALLOW_WRITE  # Write lock that allows other writers
00031 TL_WRITE_ALLOW_READ # Write lock, but allow reading
00032 TL_WRITE_CONCURRENT_INSERT
00033       # Insert that can be mixed when selects
00034 TL_WRITE    # High priority write
00035 TL_WRITE_ONLY   # High priority write
00036       # Abort all new lock request with an error
00037 
00038 Locks are prioritized according to:
00039 
00040 WRITE_ALLOW_WRITE, WRITE_ALLOW_READ, WRITE_CONCURRENT_INSERT, WRITE_DELAYED,
00041 WRITE_LOW_PRIORITY, READ, WRITE, READ_HIGH_PRIORITY and WRITE_ONLY
00042 
00043 Locks in the same privilege level are scheduled in first-in-first-out order.
00044 
00045 To allow concurrent read/writes locks, with 'WRITE_CONCURRENT_INSERT' one
00046 should put a pointer to the following functions in the lock structure:
00047 (If the pointer is zero (default), the function is not called)
00048 
00049 
00050 The lock algorithm allows one to have one TL_WRITE_ALLOW_READ,
00051 TL_WRITE_CONCURRENT_INSERT lock at the same time as multiple read locks.
00052 
00053 */
00054 
00055 #include <config.h>
00056 #include <drizzled/internal/my_sys.h>
00057 #include <drizzled/internal/thread_var.h>
00058 #include <drizzled/statistics_variables.h>
00059 #include <drizzled/pthread_globals.h>
00060 
00061 #include <drizzled/session.h>
00062 
00063 #include "thr_lock.h"
00064 #include <drizzled/internal/m_string.h>
00065 #include <errno.h>
00066 #include <list>
00067 
00068 #if TIME_WITH_SYS_TIME
00069 # include <sys/time.h>
00070 # include <time.h>
00071 #else
00072 # if HAVE_SYS_TIME_H
00073 #  include <sys/time.h>
00074 # else
00075 #  include <time.h>
00076 # endif
00077 #endif
00078 
00079 #include <drizzled/util/test.h>
00080 
00081 #include <boost/interprocess/sync/lock_options.hpp>
00082 
00083 using namespace std;
00084 
00085 namespace drizzled
00086 {
00087 
00088 uint64_t table_lock_wait_timeout;
00089 static enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
00090 
00091 
00092 uint64_t max_write_lock_count= UINT64_MAX;
00093 
00094 /*
00095 ** For the future (now the thread specific cond is alloced by my_pthread.c)
00096 */
00097 
00098 static inline bool
00099 thr_lock_owner_equal(THR_LOCK_OWNER *rhs, THR_LOCK_OWNER *lhs)
00100 {
00101   return rhs == lhs;
00102 }
00103 
00104 
00105   /* Initialize a lock */
00106 
00107 void thr_lock_init(THR_LOCK *lock)
00108 {
00109   lock->init();
00110   lock->read.last= &lock->read.data;
00111   lock->read_wait.last= &lock->read_wait.data;
00112   lock->write_wait.last= &lock->write_wait.data;
00113   lock->write.last= &lock->write.data;
00114 }
00115 
00116 
00117 void THR_LOCK_INFO::init()
00118 {
00119   internal::st_my_thread_var *tmp= my_thread_var;
00120   thread_id= tmp->id;
00121   n_cursors= 0;
00122 }
00123 
00124   /* Initialize a lock instance */
00125 
00126 void THR_LOCK_DATA::init(THR_LOCK *lock_arg, void *param_arg)
00127 {
00128   lock= lock_arg;
00129   type= TL_UNLOCK;
00130   owner= NULL;                               /* no owner yet */
00131   status_param= param_arg;
00132   cond= NULL;
00133 }
00134 
00135 
00136 static inline bool
00137 have_old_read_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner)
00138 {
00139   for ( ; data ; data=data->next)
00140   {
00141     if (thr_lock_owner_equal(data->owner, owner))
00142       return true;          /* Already locked by thread */
00143   }
00144   return false;
00145 }
00146 
00147 static void wake_up_waiters(THR_LOCK *lock);
00148 
00149 
00150 static enum enum_thr_lock_result wait_for_lock(Session &session, struct st_lock_list *wait, THR_LOCK_DATA *data)
00151 {
00152   internal::st_my_thread_var *thread_var= session.getThreadVar();
00153 
00154   boost::condition_variable_any *cond= &thread_var->suspend;
00155   enum enum_thr_lock_result result= THR_LOCK_ABORTED;
00156   bool can_deadlock= test(data->owner->info->n_cursors);
00157 
00158   {
00159     (*wait->last)=data;       /* Wait for lock */
00160     data->prev= wait->last;
00161     wait->last= &data->next;
00162   }
00163 
00164   current_global_counters.locks_waited++;
00165 
00166   /* Set up control struct to allow others to abort locks */
00167   thread_var->current_mutex= data->lock->native_handle();
00168   thread_var->current_cond=  &thread_var->suspend;
00169   data->cond= &thread_var->suspend;;
00170 
00171   while (not thread_var->abort)
00172   {
00173     boost_unique_lock_t scoped(*data->lock->native_handle(), boost::adopt_lock_t());
00174 
00175     if (can_deadlock)
00176     {
00177       boost::xtime xt; 
00178       xtime_get(&xt, boost::TIME_UTC); 
00179       xt.sec += table_lock_wait_timeout; 
00180       if (not cond->timed_wait(scoped, xt))
00181       {
00182         result= THR_LOCK_WAIT_TIMEOUT;
00183         scoped.release();
00184         break;
00185       }
00186     }
00187     else
00188     {
00189       cond->wait(scoped);
00190     }
00191     /*
00192       We must break the wait if one of the following occurs:
00193       - the connection has been aborted (!thread_var->abort), but
00194         this is not a delayed insert thread (in_wait_list). For a delayed
00195         insert thread the proper action at shutdown is, apparently, to
00196         acquire the lock and complete the insert.
00197       - the lock has been granted (data->cond is set to NULL by the granter),
00198         or the waiting has been aborted (additionally data->type is set to
00199         TL_UNLOCK).
00200       - the wait has timed out (rc == ETIMEDOUT)
00201       Order of checks below is important to not report about timeout
00202       if the predicate is true.
00203     */
00204     if (data->cond == NULL)
00205     {
00206       scoped.release();
00207       break;
00208     }
00209     scoped.release();
00210   }
00211   if (data->cond || data->type == TL_UNLOCK)
00212   {
00213     if (data->cond)                             /* aborted or timed out */
00214     {
00215       if (((*data->prev)=data->next))   /* remove from wait-list */
00216   data->next->prev= data->prev;
00217       else
00218   wait->last=data->prev;
00219       data->type= TL_UNLOCK;                    /* No lock */
00220       wake_up_waiters(data->lock);
00221     }
00222   }
00223   else
00224   {
00225     result= THR_LOCK_SUCCESS;
00226   }
00227   data->lock->unlock();
00228 
00229   /* The following must be done after unlock of lock->mutex */
00230   boost_unique_lock_t scopedLock(thread_var->mutex);
00231   thread_var->current_mutex= NULL;
00232   thread_var->current_cond= NULL;
00233   return(result);
00234 }
00235 
00236 
00237 static enum enum_thr_lock_result thr_lock(Session &session, THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type)
00238 {
00239   THR_LOCK *lock= data->lock;
00240   enum enum_thr_lock_result result= THR_LOCK_SUCCESS;
00241   struct st_lock_list *wait_queue;
00242   THR_LOCK_DATA *lock_owner;
00243 
00244   data->next=0;
00245   data->cond=0;         /* safety */
00246   data->type=lock_type;
00247   data->owner= owner;                           /* Must be reset ! */
00248   lock->lock();
00249   if ((int) lock_type <= (int) TL_READ_NO_INSERT)
00250   {
00251     /* Request for READ lock */
00252     if (lock->write.data)
00253     {
00254       /* We can allow a read lock even if there is already a write lock
00255    on the table in one the following cases:
00256    - This thread alread have a write lock on the table
00257    - The write lock is TL_WRITE_ALLOW_READ or TL_WRITE_DELAYED
00258            and the read lock is TL_READ_HIGH_PRIORITY or TL_READ
00259          - The write lock is TL_WRITE_CONCURRENT_INSERT or TL_WRITE_ALLOW_WRITE
00260      and the read lock is not TL_READ_NO_INSERT
00261       */
00262 
00263       if (thr_lock_owner_equal(data->owner, lock->write.data->owner) ||
00264     (lock->write.data->type <= TL_WRITE_CONCURRENT_INSERT &&
00265      (((int) lock_type <= (int) TL_READ_WITH_SHARED_LOCKS) ||
00266       (lock->write.data->type != TL_WRITE_CONCURRENT_INSERT &&
00267        lock->write.data->type != TL_WRITE_ALLOW_READ))))
00268       {           /* Already got a write lock */
00269   (*lock->read.last)=data;    /* Add to running FIFO */
00270   data->prev=lock->read.last;
00271   lock->read.last= &data->next;
00272   if (lock_type == TL_READ_NO_INSERT)
00273     lock->read_no_write_count++;
00274         current_global_counters.locks_immediate++;
00275   goto end;
00276       }
00277       if (lock->write.data->type == TL_WRITE_ONLY)
00278       {
00279   /* We are not allowed to get a READ lock in this case */
00280   data->type=TL_UNLOCK;
00281         result= THR_LOCK_ABORTED;               /* Can't wait for this one */
00282   goto end;
00283       }
00284     }
00285     else if (!lock->write_wait.data ||
00286        lock->write_wait.data->type <= TL_WRITE_DEFAULT ||
00287        have_old_read_lock(lock->read.data, data->owner))
00288     {           /* No important write-locks */
00289       (*lock->read.last)=data;      /* Add to running FIFO */
00290       data->prev=lock->read.last;
00291       lock->read.last= &data->next;
00292       if (lock_type == TL_READ_NO_INSERT)
00293   lock->read_no_write_count++;
00294       current_global_counters.locks_immediate++;
00295       goto end;
00296     }
00297     /*
00298       We're here if there is an active write lock or no write
00299       lock but a high priority write waiting in the write_wait queue.
00300       In the latter case we should yield the lock to the writer.
00301     */
00302     wait_queue= &lock->read_wait;
00303   }
00304   else            /* Request for WRITE lock */
00305   {
00306     if (lock_type == TL_WRITE_CONCURRENT_INSERT)
00307       data->type=lock_type= thr_upgraded_concurrent_insert_lock;
00308 
00309     if (lock->write.data)     /* If there is a write lock */
00310     {
00311       if (lock->write.data->type == TL_WRITE_ONLY)
00312       {
00313         /* Allow lock owner to bypass TL_WRITE_ONLY. */
00314         if (!thr_lock_owner_equal(data->owner, lock->write.data->owner))
00315         {
00316           /* We are not allowed to get a lock in this case */
00317           data->type=TL_UNLOCK;
00318           result= THR_LOCK_ABORTED;               /* Can't wait for this one */
00319           goto end;
00320         }
00321       }
00322 
00323       /*
00324   The following test will not work if the old lock was a
00325   TL_WRITE_ALLOW_WRITE, TL_WRITE_ALLOW_READ or TL_WRITE_DELAYED in
00326   the same thread, but this will never happen within MySQL.
00327       */
00328       if (thr_lock_owner_equal(data->owner, lock->write.data->owner) ||
00329     (lock_type == TL_WRITE_ALLOW_WRITE &&
00330      !lock->write_wait.data &&
00331      lock->write.data->type == TL_WRITE_ALLOW_WRITE))
00332       {
00333   /*
00334           We have already got a write lock or all locks are
00335           TL_WRITE_ALLOW_WRITE
00336         */
00337 
00338   (*lock->write.last)=data; /* Add to running fifo */
00339   data->prev=lock->write.last;
00340   lock->write.last= &data->next;
00341         current_global_counters.locks_immediate++;
00342   goto end;
00343       }
00344     }
00345     else
00346     {
00347       if (!lock->write_wait.data)
00348       {           /* no scheduled write locks */
00349         bool concurrent_insert= 0;
00350   if (lock_type == TL_WRITE_CONCURRENT_INSERT)
00351         {
00352           concurrent_insert= 1;
00353         }
00354 
00355   if (!lock->read.data ||
00356       (lock_type <= TL_WRITE_CONCURRENT_INSERT &&
00357        ((lock_type != TL_WRITE_CONCURRENT_INSERT &&
00358          lock_type != TL_WRITE_ALLOW_WRITE) ||
00359         !lock->read_no_write_count)))
00360   {
00361     (*lock->write.last)=data;   /* Add as current write lock */
00362     data->prev=lock->write.last;
00363     lock->write.last= &data->next;
00364           current_global_counters.locks_immediate++;
00365     goto end;
00366   }
00367       }
00368     }
00369     wait_queue= &lock->write_wait;
00370   }
00371   /*
00372     Try to detect a trivial deadlock when using cursors: attempt to
00373     lock a table that is already locked by an open cursor within the
00374     same connection. lock_owner can be zero if we succumbed to a high
00375     priority writer in the write_wait queue.
00376   */
00377   lock_owner= lock->read.data ? lock->read.data : lock->write.data;
00378   if (lock_owner && lock_owner->owner->info == owner->info)
00379   {
00380     result= THR_LOCK_DEADLOCK;
00381     goto end;
00382   }
00383 
00384   /* Can't get lock yet;  Wait for it */
00385   return(wait_for_lock(session, wait_queue, data));
00386 end:
00387   lock->unlock();
00388 
00389   return(result);
00390 }
00391 
00392 
00393 static void free_all_read_locks(THR_LOCK *lock, bool using_concurrent_insert)
00394 {
00395   THR_LOCK_DATA *data= lock->read_wait.data;
00396 
00397   /* move all locks from read_wait list to read list */
00398   (*lock->read.last)=data;
00399   data->prev=lock->read.last;
00400   lock->read.last=lock->read_wait.last;
00401 
00402   /* Clear read_wait list */
00403   lock->read_wait.last= &lock->read_wait.data;
00404 
00405   do
00406   {
00407     boost::condition_variable_any *cond= data->cond;
00408     if ((int) data->type == (int) TL_READ_NO_INSERT)
00409     {
00410       if (using_concurrent_insert)
00411       {
00412   /*
00413     We can't free this lock;
00414     Link lock away from read chain back into read_wait chain
00415   */
00416   if (((*data->prev)=data->next))
00417     data->next->prev=data->prev;
00418   else
00419     lock->read.last=data->prev;
00420   *lock->read_wait.last= data;
00421   data->prev= lock->read_wait.last;
00422   lock->read_wait.last= &data->next;
00423   continue;
00424       }
00425       lock->read_no_write_count++;
00426     }
00427     data->cond= NULL;       /* Mark thread free */
00428     cond->notify_one();
00429   } while ((data=data->next));
00430   *lock->read_wait.last=0;
00431   if (!lock->read_wait.data)
00432     lock->write_lock_count=0;
00433 }
00434 
00435 /* Unlock lock and free next thread on same lock */
00436 
00437 static void thr_unlock(THR_LOCK_DATA *data)
00438 {
00439   THR_LOCK *lock=data->lock;
00440   enum thr_lock_type lock_type=data->type;
00441   lock->lock();
00442 
00443   if (((*data->prev)=data->next))   /* remove from lock-list */
00444     data->next->prev= data->prev;
00445   else if (lock_type <= TL_READ_NO_INSERT)
00446     lock->read.last=data->prev;
00447   else
00448     lock->write.last=data->prev;
00449   if (lock_type >= TL_WRITE_CONCURRENT_INSERT)
00450   { }
00451   else
00452   { }
00453   if (lock_type == TL_READ_NO_INSERT)
00454     lock->read_no_write_count--;
00455   data->type=TL_UNLOCK;       /* Mark unlocked */
00456   wake_up_waiters(lock);
00457   lock->unlock();
00458 }
00459 
00460 
00469 static void wake_up_waiters(THR_LOCK *lock)
00470 {
00471   THR_LOCK_DATA *data;
00472   enum thr_lock_type lock_type;
00473 
00474   if (!lock->write.data)      /* If no active write locks */
00475   {
00476     data=lock->write_wait.data;
00477     if (!lock->read.data)     /* If no more locks in use */
00478     {
00479       /* Release write-locks with TL_WRITE or TL_WRITE_ONLY priority first */
00480       if (data &&
00481     (!lock->read_wait.data || lock->read_wait.data->type <= TL_READ_WITH_SHARED_LOCKS))
00482       {
00483   if (lock->write_lock_count++ > max_write_lock_count)
00484   {
00485     /* Too many write locks in a row;  Release all waiting read locks */
00486     lock->write_lock_count=0;
00487     if (lock->read_wait.data)
00488     {
00489       free_all_read_locks(lock,0);
00490       goto end;
00491     }
00492   }
00493   for (;;)
00494   {
00495     if (((*data->prev)=data->next)) /* remove from wait-list */
00496       data->next->prev= data->prev;
00497     else
00498       lock->write_wait.last=data->prev;
00499     (*lock->write.last)=data;   /* Put in execute list */
00500     data->prev=lock->write.last;
00501     data->next=0;
00502     lock->write.last= &data->next;
00503 
00504     {
00505             boost::condition_variable_any *cond= data->cond;
00506       data->cond= NULL;       /* Mark thread free */
00507             cond->notify_one(); /* Start waiting thred */
00508     }
00509     if (data->type != TL_WRITE_ALLOW_WRITE ||
00510         !lock->write_wait.data ||
00511         lock->write_wait.data->type != TL_WRITE_ALLOW_WRITE)
00512       break;
00513     data=lock->write_wait.data;   /* Free this too */
00514   }
00515   if (data->type >= TL_WRITE)
00516           goto end;
00517   /* Release possible read locks together with the write lock */
00518       }
00519       if (lock->read_wait.data)
00520   free_all_read_locks(lock,
00521           data &&
00522           (data->type == TL_WRITE_CONCURRENT_INSERT ||
00523            data->type == TL_WRITE_ALLOW_WRITE));
00524     }
00525     else if (data &&
00526        (lock_type=data->type) <= TL_WRITE_CONCURRENT_INSERT &&
00527        ((lock_type != TL_WRITE_CONCURRENT_INSERT &&
00528          lock_type != TL_WRITE_ALLOW_WRITE) ||
00529         !lock->read_no_write_count))
00530     {
00531       do {
00532         boost::condition_variable_any *cond= data->cond;
00533   if (((*data->prev)=data->next))   /* remove from wait-list */
00534     data->next->prev= data->prev;
00535   else
00536     lock->write_wait.last=data->prev;
00537   (*lock->write.last)=data;   /* Put in execute list */
00538   data->prev=lock->write.last;
00539   lock->write.last= &data->next;
00540   data->next=0;       /* Only one write lock */
00541   data->cond= NULL;       /* Mark thread free */
00542         cond->notify_one(); /* Start waiting thread */
00543       } while (lock_type == TL_WRITE_ALLOW_WRITE &&
00544          (data=lock->write_wait.data) &&
00545          data->type == TL_WRITE_ALLOW_WRITE);
00546       if (lock->read_wait.data)
00547   free_all_read_locks(lock,
00548           (lock_type == TL_WRITE_CONCURRENT_INSERT ||
00549            lock_type == TL_WRITE_ALLOW_WRITE));
00550     }
00551     else if (!data && lock->read_wait.data)
00552     {
00553       free_all_read_locks(lock,0);
00554     }
00555   }
00556 end:
00557   return;
00558 }
00559 
00560 
00561 /*
00562 ** Get all locks in a specific order to avoid dead-locks
00563 ** Sort acording to lock position and put write_locks before read_locks if
00564 ** lock on same lock.
00565 */
00566 
00567 
00568 #define LOCK_CMP(A,B) ((unsigned char*) (A->lock) - (uint32_t) ((A)->type) < (unsigned char*) (B->lock)- (uint32_t) ((B)->type))
00569 
00570 static void sort_locks(THR_LOCK_DATA **data,uint32_t count)
00571 {
00572   THR_LOCK_DATA **pos,**end,**prev,*tmp;
00573 
00574   /* Sort locks with insertion sort (fast because almost always few locks) */
00575 
00576   for (pos=data+1,end=data+count; pos < end ; pos++)
00577   {
00578     tmp= *pos;
00579     if (LOCK_CMP(tmp,pos[-1]))
00580     {
00581       prev=pos;
00582       do {
00583   prev[0]=prev[-1];
00584       } while (--prev != data && LOCK_CMP(tmp,prev[-1]));
00585       prev[0]=tmp;
00586     }
00587   }
00588 }
00589 
00590 
00591 enum enum_thr_lock_result
00592 thr_multi_lock(Session &session, THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
00593 {
00594   THR_LOCK_DATA **pos,**end;
00595   if (count > 1)
00596     sort_locks(data,count);
00597   /* lock everything */
00598   for (pos=data,end=data+count; pos < end ; pos++)
00599   {
00600     enum enum_thr_lock_result result= thr_lock(session, *pos, owner, (*pos)->type);
00601     if (result != THR_LOCK_SUCCESS)
00602     {           /* Aborted */
00603       thr_multi_unlock(data,(uint32_t) (pos-data));
00604       return(result);
00605     }
00606   }
00607   /*
00608     Ensure that all get_locks() have the same status
00609     If we lock the same table multiple times, we must use the same
00610     status_param!
00611   */
00612 #if !defined(DONT_USE_RW_LOCKS)
00613   if (count > 1)
00614   {
00615     THR_LOCK_DATA *last_lock= end[-1];
00616     pos=end-1;
00617     do
00618     {
00619       pos--;
00620       last_lock=(*pos);
00621     } while (pos != data);
00622   }
00623 #endif
00624   return(THR_LOCK_SUCCESS);
00625 }
00626 
00627   /* free all locks */
00628 
00629 void thr_multi_unlock(THR_LOCK_DATA **data,uint32_t count)
00630 {
00631   THR_LOCK_DATA **pos,**end;
00632 
00633   for (pos=data,end=data+count; pos < end ; pos++)
00634   {
00635     if ((*pos)->type != TL_UNLOCK)
00636       thr_unlock(*pos);
00637   }
00638   return;
00639 }
00640 
00641 void DrizzleLock::unlock(uint32_t count)
00642 {
00643   THR_LOCK_DATA **pos,**end;
00644 
00645   for (pos= getLocks(),end= getLocks()+count; pos < end ; pos++)
00646   {
00647     if ((*pos)->type != TL_UNLOCK)
00648       thr_unlock(*pos);
00649   }
00650 }
00651 
00652 /*
00653   Abort all threads waiting for a lock. The lock will be upgraded to
00654   TL_WRITE_ONLY to abort any new accesses to the lock
00655 */
00656 
00657 void THR_LOCK::abort_locks()
00658 {
00659   boost_unique_lock_t scopedLock(mutex);
00660 
00661   for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
00662   {
00663     local_data->type= TL_UNLOCK;      /* Mark killed */
00664     /* It's safe to signal the cond first: we're still holding the mutex. */
00665     local_data->cond->notify_one();
00666     local_data->cond= NULL;       /* Removed from list */
00667   }
00668   for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
00669   {
00670     local_data->type= TL_UNLOCK;
00671     local_data->cond->notify_one();
00672     local_data->cond= NULL;
00673   }
00674   read_wait.last= &read_wait.data;
00675   write_wait.last= &write_wait.data;
00676   read_wait.data= write_wait.data=0;
00677   if (write.data)
00678     write.data->type=TL_WRITE_ONLY;
00679 }
00680 
00681 
00682 /*
00683   Abort all locks for specific table/thread combination
00684 
00685   This is used to abort all locks for a specific thread
00686 */
00687 
00688 bool THR_LOCK::abort_locks_for_thread(uint64_t thread_id_arg)
00689 {
00690   bool found= false;
00691 
00692   boost_unique_lock_t scopedLock(mutex);
00693   for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
00694   {
00695     if (local_data->owner->info->thread_id == thread_id_arg)
00696     {
00697       local_data->type= TL_UNLOCK;      /* Mark killed */
00698       /* It's safe to signal the cond first: we're still holding the mutex. */
00699       found= true;
00700       local_data->cond->notify_one();
00701       local_data->cond= 0;        /* Removed from list */
00702 
00703       if (((*local_data->prev)= local_data->next))
00704   local_data->next->prev= local_data->prev;
00705       else
00706   read_wait.last= local_data->prev;
00707     }
00708   }
00709   for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
00710   {
00711     if (local_data->owner->info->thread_id == thread_id_arg)
00712     {
00713       local_data->type= TL_UNLOCK;
00714       found= true;
00715       local_data->cond->notify_one();
00716       local_data->cond= NULL;
00717 
00718       if (((*local_data->prev)= local_data->next))
00719   local_data->next->prev= local_data->prev;
00720       else
00721   write_wait.last= local_data->prev;
00722     }
00723   }
00724   wake_up_waiters(this);
00725 
00726   return found;
00727 }
00728 
00729 } /* namespace drizzled */