Drizzled Public API Documentation

page0cur.cc
00001 /*****************************************************************************
00002 
00003 Copyright (C) 1994, 2009, Innobase Oy. All Rights Reserved.
00004 
00005 This program is free software; you can redistribute it and/or modify it under
00006 the terms of the GNU General Public License as published by the Free Software
00007 Foundation; version 2 of the License.
00008 
00009 This program is distributed in the hope that it will be useful, but WITHOUT
00010 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00011 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
00012 
00013 You should have received a copy of the GNU General Public License along with
00014 this program; if not, write to the Free Software Foundation, Inc., 51 Franklin
00015 St, Fifth Floor, Boston, MA 02110-1301 USA
00016 
00017 *****************************************************************************/
00018 
00019 /********************************************************************/
00026 #include "page0cur.h"
00027 #ifdef UNIV_NONINL
00028 #include "page0cur.ic"
00029 #endif
00030 
00031 #include "page0zip.h"
00032 #include "mtr0log.h"
00033 #include "log0recv.h"
00034 #include "ut0ut.h"
00035 #ifndef UNIV_HOTBACKUP
00036 #include "rem0cmp.h"
00037 
00038 #ifdef PAGE_CUR_ADAPT
00039 # ifdef UNIV_SEARCH_PERF_STAT
00040 static ulint  page_cur_short_succ = 0;
00041 # endif /* UNIV_SEARCH_PERF_STAT */
00042 
00043 /*******************************************************************/
00055 static
00056 ib_uint64_t
00057 page_cur_lcg_prng(void)
00058 /*===================*/
00059 {
00060 #define LCG_a 1103515245
00061 #define LCG_c 12345
00062   static ib_uint64_t  lcg_current = 0;
00063   static ibool    initialized = FALSE;
00064 
00065   if (!initialized) {
00066     lcg_current = (ib_uint64_t) ut_time_us(NULL);
00067     initialized = TRUE;
00068   }
00069 
00070   /* no need to "% 2^64" explicitly because lcg_current is
00071   64 bit and this will be done anyway */
00072   lcg_current = LCG_a * lcg_current + LCG_c;
00073 
00074   return(lcg_current);
00075 }
00076 
00077 /****************************************************************/
00080 UNIV_INLINE
00081 ibool
00082 page_cur_try_search_shortcut(
00083 /*=========================*/
00084   const buf_block_t*  block,  
00085   const dict_index_t* index,  
00086   const dtuple_t*   tuple,  
00087   ulint*      iup_matched_fields,
00090   ulint*      iup_matched_bytes,
00094   ulint*      ilow_matched_fields,
00097   ulint*      ilow_matched_bytes,
00101   page_cur_t*   cursor) 
00102 {
00103   const rec_t*  rec;
00104   const rec_t*  next_rec;
00105   ulint   low_match;
00106   ulint   low_bytes;
00107   ulint   up_match;
00108   ulint   up_bytes;
00109 #ifdef UNIV_SEARCH_DEBUG
00110   page_cur_t  cursor2;
00111 #endif
00112   ibool   success   = FALSE;
00113   const page_t* page    = buf_block_get_frame(block);
00114   mem_heap_t* heap    = NULL;
00115   ulint   offsets_[REC_OFFS_NORMAL_SIZE];
00116   ulint*    offsets   = offsets_;
00117   rec_offs_init(offsets_);
00118 
00119   ut_ad(dtuple_check_typed(tuple));
00120 
00121   rec = page_header_get_ptr(page, PAGE_LAST_INSERT);
00122   offsets = rec_get_offsets(rec, index, offsets,
00123           dtuple_get_n_fields(tuple), &heap);
00124 
00125   ut_ad(rec);
00126   ut_ad(page_rec_is_user_rec(rec));
00127 
00128   ut_pair_min(&low_match, &low_bytes,
00129         *ilow_matched_fields, *ilow_matched_bytes,
00130         *iup_matched_fields, *iup_matched_bytes);
00131 
00132   up_match = low_match;
00133   up_bytes = low_bytes;
00134 
00135   if (page_cmp_dtuple_rec_with_match(tuple, rec, offsets,
00136              &low_match, &low_bytes) < 0) {
00137     goto exit_func;
00138   }
00139 
00140   next_rec = page_rec_get_next_const(rec);
00141   offsets = rec_get_offsets(next_rec, index, offsets,
00142           dtuple_get_n_fields(tuple), &heap);
00143 
00144   if (page_cmp_dtuple_rec_with_match(tuple, next_rec, offsets,
00145              &up_match, &up_bytes) >= 0) {
00146     goto exit_func;
00147   }
00148 
00149   page_cur_position(rec, block, cursor);
00150 
00151 #ifdef UNIV_SEARCH_DEBUG
00152   page_cur_search_with_match(block, index, tuple, PAGE_CUR_DBG,
00153            iup_matched_fields,
00154            iup_matched_bytes,
00155            ilow_matched_fields,
00156            ilow_matched_bytes,
00157            &cursor2);
00158   ut_a(cursor2.rec == cursor->rec);
00159 
00160   if (!page_rec_is_supremum(next_rec)) {
00161 
00162     ut_a(*iup_matched_fields == up_match);
00163     ut_a(*iup_matched_bytes == up_bytes);
00164   }
00165 
00166   ut_a(*ilow_matched_fields == low_match);
00167   ut_a(*ilow_matched_bytes == low_bytes);
00168 #endif
00169   if (!page_rec_is_supremum(next_rec)) {
00170 
00171     *iup_matched_fields = up_match;
00172     *iup_matched_bytes = up_bytes;
00173   }
00174 
00175   *ilow_matched_fields = low_match;
00176   *ilow_matched_bytes = low_bytes;
00177 
00178 #ifdef UNIV_SEARCH_PERF_STAT
00179   page_cur_short_succ++;
00180 #endif
00181   success = TRUE;
00182 exit_func:
00183   if (UNIV_LIKELY_NULL(heap)) {
00184     mem_heap_free(heap);
00185   }
00186   return(success);
00187 }
00188 
00189 #endif
00190 
00191 #ifdef PAGE_CUR_LE_OR_EXTENDS
00192 /****************************************************************/
00197 static
00198 ibool
00199 page_cur_rec_field_extends(
00200 /*=======================*/
00201   const dtuple_t* tuple,  
00202   const rec_t*  rec,  
00203   const ulint*  offsets,
00204   ulint   n)  
00205 {
00206   const dtype_t*  type;
00207   const dfield_t* dfield;
00208   const byte* rec_f;
00209   ulint   rec_f_len;
00210 
00211   ut_ad(rec_offs_validate(rec, NULL, offsets));
00212   dfield = dtuple_get_nth_field(tuple, n);
00213 
00214   type = dfield_get_type(dfield);
00215 
00216   rec_f = rec_get_nth_field(rec, offsets, n, &rec_f_len);
00217 
00218   if (type->mtype == DATA_VARCHAR
00219       || type->mtype == DATA_CHAR
00220       || type->mtype == DATA_FIXBINARY
00221       || type->mtype == DATA_BINARY
00222       || type->mtype == DATA_BLOB
00223       || type->mtype == DATA_VARMYSQL
00224       || type->mtype == DATA_MYSQL) {
00225 
00226     if (dfield_get_len(dfield) != UNIV_SQL_NULL
00227         && rec_f_len != UNIV_SQL_NULL
00228         && rec_f_len >= dfield_get_len(dfield)
00229         && !cmp_data_data_slow(type->mtype, type->prtype,
00230              dfield_get_data(dfield),
00231              dfield_get_len(dfield),
00232              rec_f, dfield_get_len(dfield))) {
00233 
00234       return(TRUE);
00235     }
00236   }
00237 
00238   return(FALSE);
00239 }
00240 #endif /* PAGE_CUR_LE_OR_EXTENDS */
00241 
00242 /****************************************************************/
00244 UNIV_INTERN
00245 void
00246 page_cur_search_with_match(
00247 /*=======================*/
00248   const buf_block_t*  block,  
00249   const dict_index_t* index,  
00250   const dtuple_t*   tuple,  
00251   ulint     mode, 
00254   ulint*      iup_matched_fields,
00257   ulint*      iup_matched_bytes,
00261   ulint*      ilow_matched_fields,
00264   ulint*      ilow_matched_bytes,
00268   page_cur_t*   cursor) 
00269 {
00270   ulint   up;
00271   ulint   low;
00272   ulint   mid;
00273   const page_t* page;
00274   const page_dir_slot_t* slot;
00275   const rec_t*  up_rec;
00276   const rec_t*  low_rec;
00277   const rec_t*  mid_rec;
00278   ulint   up_matched_fields;
00279   ulint   up_matched_bytes;
00280   ulint   low_matched_fields;
00281   ulint   low_matched_bytes;
00282   ulint   cur_matched_fields;
00283   ulint   cur_matched_bytes;
00284   int   cmp;
00285 #ifdef UNIV_SEARCH_DEBUG
00286   int   dbg_cmp;
00287   ulint   dbg_matched_fields;
00288   ulint   dbg_matched_bytes;
00289 #endif
00290 #ifdef UNIV_ZIP_DEBUG
00291   const page_zip_des_t* page_zip = buf_block_get_page_zip(block);
00292 #endif /* UNIV_ZIP_DEBUG */
00293   mem_heap_t* heap    = NULL;
00294   ulint   offsets_[REC_OFFS_NORMAL_SIZE];
00295   ulint*    offsets   = offsets_;
00296   rec_offs_init(offsets_);
00297 
00298   ut_ad(block && tuple && iup_matched_fields && iup_matched_bytes
00299         && ilow_matched_fields && ilow_matched_bytes && cursor);
00300   ut_ad(dtuple_validate(tuple));
00301 #ifdef UNIV_DEBUG
00302 # ifdef PAGE_CUR_DBG
00303   if (mode != PAGE_CUR_DBG)
00304 # endif /* PAGE_CUR_DBG */
00305 # ifdef PAGE_CUR_LE_OR_EXTENDS
00306     if (mode != PAGE_CUR_LE_OR_EXTENDS)
00307 # endif /* PAGE_CUR_LE_OR_EXTENDS */
00308       ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE
00309             || mode == PAGE_CUR_G || mode == PAGE_CUR_GE);
00310 #endif /* UNIV_DEBUG */
00311   page = buf_block_get_frame(block);
00312 #ifdef UNIV_ZIP_DEBUG
00313   ut_a(!page_zip || page_zip_validate(page_zip, page));
00314 #endif /* UNIV_ZIP_DEBUG */
00315 
00316   page_check_dir(page);
00317 
00318 #ifdef PAGE_CUR_ADAPT
00319   if (page_is_leaf(page)
00320       && (mode == PAGE_CUR_LE)
00321       && (page_header_get_field(page, PAGE_N_DIRECTION) > 3)
00322       && (page_header_get_ptr(page, PAGE_LAST_INSERT))
00323       && (page_header_get_field(page, PAGE_DIRECTION) == PAGE_RIGHT)) {
00324 
00325     if (page_cur_try_search_shortcut(
00326           block, index, tuple,
00327           iup_matched_fields, iup_matched_bytes,
00328           ilow_matched_fields, ilow_matched_bytes,
00329           cursor)) {
00330       return;
00331     }
00332   }
00333 # ifdef PAGE_CUR_DBG
00334   if (mode == PAGE_CUR_DBG) {
00335     mode = PAGE_CUR_LE;
00336   }
00337 # endif
00338 #endif
00339 
00340   /* The following flag does not work for non-latin1 char sets because
00341   cmp_full_field does not tell how many bytes matched */
00342 #ifdef PAGE_CUR_LE_OR_EXTENDS
00343   ut_a(mode != PAGE_CUR_LE_OR_EXTENDS);
00344 #endif /* PAGE_CUR_LE_OR_EXTENDS */
00345 
00346   /* If mode PAGE_CUR_G is specified, we are trying to position the
00347   cursor to answer a query of the form "tuple < X", where tuple is
00348   the input parameter, and X denotes an arbitrary physical record on
00349   the page. We want to position the cursor on the first X which
00350   satisfies the condition. */
00351 
00352   up_matched_fields  = *iup_matched_fields;
00353   up_matched_bytes   = *iup_matched_bytes;
00354   low_matched_fields = *ilow_matched_fields;
00355   low_matched_bytes  = *ilow_matched_bytes;
00356 
00357   /* Perform binary search. First the search is done through the page
00358   directory, after that as a linear search in the list of records
00359   owned by the upper limit directory slot. */
00360 
00361   low = 0;
00362   up = page_dir_get_n_slots(page) - 1;
00363 
00364   /* Perform binary search until the lower and upper limit directory
00365   slots come to the distance 1 of each other */
00366 
00367   while (up - low > 1) {
00368     mid = (low + up) / 2;
00369     slot = page_dir_get_nth_slot(page, mid);
00370     mid_rec = page_dir_slot_get_rec(slot);
00371 
00372     ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
00373           low_matched_fields, low_matched_bytes,
00374           up_matched_fields, up_matched_bytes);
00375 
00376     offsets = rec_get_offsets(mid_rec, index, offsets,
00377             dtuple_get_n_fields_cmp(tuple),
00378             &heap);
00379 
00380     cmp = cmp_dtuple_rec_with_match(tuple, mid_rec, offsets,
00381             &cur_matched_fields,
00382             &cur_matched_bytes);
00383     if (UNIV_LIKELY(cmp > 0)) {
00384 low_slot_match:
00385       low = mid;
00386       low_matched_fields = cur_matched_fields;
00387       low_matched_bytes = cur_matched_bytes;
00388 
00389     } else if (UNIV_EXPECT(cmp, -1)) {
00390 #ifdef PAGE_CUR_LE_OR_EXTENDS
00391       if (mode == PAGE_CUR_LE_OR_EXTENDS
00392           && page_cur_rec_field_extends(
00393             tuple, mid_rec, offsets,
00394             cur_matched_fields)) {
00395 
00396         goto low_slot_match;
00397       }
00398 #endif /* PAGE_CUR_LE_OR_EXTENDS */
00399 up_slot_match:
00400       up = mid;
00401       up_matched_fields = cur_matched_fields;
00402       up_matched_bytes = cur_matched_bytes;
00403 
00404     } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
00405 #ifdef PAGE_CUR_LE_OR_EXTENDS
00406          || mode == PAGE_CUR_LE_OR_EXTENDS
00407 #endif /* PAGE_CUR_LE_OR_EXTENDS */
00408          ) {
00409 
00410       goto low_slot_match;
00411     } else {
00412 
00413       goto up_slot_match;
00414     }
00415   }
00416 
00417   slot = page_dir_get_nth_slot(page, low);
00418   low_rec = page_dir_slot_get_rec(slot);
00419   slot = page_dir_get_nth_slot(page, up);
00420   up_rec = page_dir_slot_get_rec(slot);
00421 
00422   /* Perform linear search until the upper and lower records come to
00423   distance 1 of each other. */
00424 
00425   while (page_rec_get_next_const(low_rec) != up_rec) {
00426 
00427     mid_rec = page_rec_get_next_const(low_rec);
00428 
00429     ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
00430           low_matched_fields, low_matched_bytes,
00431           up_matched_fields, up_matched_bytes);
00432 
00433     offsets = rec_get_offsets(mid_rec, index, offsets,
00434             dtuple_get_n_fields_cmp(tuple),
00435             &heap);
00436 
00437     cmp = cmp_dtuple_rec_with_match(tuple, mid_rec, offsets,
00438             &cur_matched_fields,
00439             &cur_matched_bytes);
00440     if (UNIV_LIKELY(cmp > 0)) {
00441 low_rec_match:
00442       low_rec = mid_rec;
00443       low_matched_fields = cur_matched_fields;
00444       low_matched_bytes = cur_matched_bytes;
00445 
00446     } else if (UNIV_EXPECT(cmp, -1)) {
00447 #ifdef PAGE_CUR_LE_OR_EXTENDS
00448       if (mode == PAGE_CUR_LE_OR_EXTENDS
00449           && page_cur_rec_field_extends(
00450             tuple, mid_rec, offsets,
00451             cur_matched_fields)) {
00452 
00453         goto low_rec_match;
00454       }
00455 #endif /* PAGE_CUR_LE_OR_EXTENDS */
00456 up_rec_match:
00457       up_rec = mid_rec;
00458       up_matched_fields = cur_matched_fields;
00459       up_matched_bytes = cur_matched_bytes;
00460     } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
00461 #ifdef PAGE_CUR_LE_OR_EXTENDS
00462          || mode == PAGE_CUR_LE_OR_EXTENDS
00463 #endif /* PAGE_CUR_LE_OR_EXTENDS */
00464          ) {
00465 
00466       goto low_rec_match;
00467     } else {
00468 
00469       goto up_rec_match;
00470     }
00471   }
00472 
00473 #ifdef UNIV_SEARCH_DEBUG
00474 
00475   /* Check that the lower and upper limit records have the
00476   right alphabetical order compared to tuple. */
00477   dbg_matched_fields = 0;
00478   dbg_matched_bytes = 0;
00479 
00480   offsets = rec_get_offsets(low_rec, index, offsets,
00481           ULINT_UNDEFINED, &heap);
00482   dbg_cmp = page_cmp_dtuple_rec_with_match(tuple, low_rec, offsets,
00483              &dbg_matched_fields,
00484              &dbg_matched_bytes);
00485   if (mode == PAGE_CUR_G) {
00486     ut_a(dbg_cmp >= 0);
00487   } else if (mode == PAGE_CUR_GE) {
00488     ut_a(dbg_cmp == 1);
00489   } else if (mode == PAGE_CUR_L) {
00490     ut_a(dbg_cmp == 1);
00491   } else if (mode == PAGE_CUR_LE) {
00492     ut_a(dbg_cmp >= 0);
00493   }
00494 
00495   if (!page_rec_is_infimum(low_rec)) {
00496 
00497     ut_a(low_matched_fields == dbg_matched_fields);
00498     ut_a(low_matched_bytes == dbg_matched_bytes);
00499   }
00500 
00501   dbg_matched_fields = 0;
00502   dbg_matched_bytes = 0;
00503 
00504   offsets = rec_get_offsets(up_rec, index, offsets,
00505           ULINT_UNDEFINED, &heap);
00506   dbg_cmp = page_cmp_dtuple_rec_with_match(tuple, up_rec, offsets,
00507              &dbg_matched_fields,
00508              &dbg_matched_bytes);
00509   if (mode == PAGE_CUR_G) {
00510     ut_a(dbg_cmp == -1);
00511   } else if (mode == PAGE_CUR_GE) {
00512     ut_a(dbg_cmp <= 0);
00513   } else if (mode == PAGE_CUR_L) {
00514     ut_a(dbg_cmp <= 0);
00515   } else if (mode == PAGE_CUR_LE) {
00516     ut_a(dbg_cmp == -1);
00517   }
00518 
00519   if (!page_rec_is_supremum(up_rec)) {
00520 
00521     ut_a(up_matched_fields == dbg_matched_fields);
00522     ut_a(up_matched_bytes == dbg_matched_bytes);
00523   }
00524 #endif
00525   if (mode <= PAGE_CUR_GE) {
00526     page_cur_position(up_rec, block, cursor);
00527   } else {
00528     page_cur_position(low_rec, block, cursor);
00529   }
00530 
00531   *iup_matched_fields  = up_matched_fields;
00532   *iup_matched_bytes   = up_matched_bytes;
00533   *ilow_matched_fields = low_matched_fields;
00534   *ilow_matched_bytes  = low_matched_bytes;
00535   if (UNIV_LIKELY_NULL(heap)) {
00536     mem_heap_free(heap);
00537   }
00538 }
00539 
00540 /***********************************************************/
00543 UNIV_INTERN
00544 void
00545 page_cur_open_on_rnd_user_rec(
00546 /*==========================*/
00547   buf_block_t*  block,  
00548   page_cur_t* cursor) 
00549 {
00550   ulint rnd;
00551   ulint n_recs = page_get_n_recs(buf_block_get_frame(block));
00552 
00553   page_cur_set_before_first(block, cursor);
00554 
00555   if (UNIV_UNLIKELY(n_recs == 0)) {
00556 
00557     return;
00558   }
00559 
00560   rnd = (ulint) (page_cur_lcg_prng() % n_recs);
00561 
00562   do {
00563     page_cur_move_to_next(cursor);
00564   } while (rnd--);
00565 }
00566 
00567 /***********************************************************/
00569 static
00570 void
00571 page_cur_insert_rec_write_log(
00572 /*==========================*/
00573   rec_t*    insert_rec, 
00574   ulint   rec_size, 
00575   rec_t*    cursor_rec, 
00577   dict_index_t* index,    
00578   mtr_t*    mtr)    
00579 {
00580   ulint cur_rec_size;
00581   ulint extra_size;
00582   ulint cur_extra_size;
00583   const byte* ins_ptr;
00584   byte* log_ptr;
00585   const byte* log_end;
00586   ulint i;
00587 
00588   ut_a(rec_size < UNIV_PAGE_SIZE);
00589   ut_ad(page_align(insert_rec) == page_align(cursor_rec));
00590   ut_ad(!page_rec_is_comp(insert_rec)
00591         == !dict_table_is_comp(index->table));
00592 
00593   {
00594     mem_heap_t* heap    = NULL;
00595     ulint   cur_offs_[REC_OFFS_NORMAL_SIZE];
00596     ulint   ins_offs_[REC_OFFS_NORMAL_SIZE];
00597 
00598     ulint*    cur_offs;
00599     ulint*    ins_offs;
00600 
00601     rec_offs_init(cur_offs_);
00602     rec_offs_init(ins_offs_);
00603 
00604     cur_offs = rec_get_offsets(cursor_rec, index, cur_offs_,
00605              ULINT_UNDEFINED, &heap);
00606     ins_offs = rec_get_offsets(insert_rec, index, ins_offs_,
00607              ULINT_UNDEFINED, &heap);
00608 
00609     extra_size = rec_offs_extra_size(ins_offs);
00610     cur_extra_size = rec_offs_extra_size(cur_offs);
00611     ut_ad(rec_size == rec_offs_size(ins_offs));
00612     cur_rec_size = rec_offs_size(cur_offs);
00613 
00614     if (UNIV_LIKELY_NULL(heap)) {
00615       mem_heap_free(heap);
00616     }
00617   }
00618 
00619   ins_ptr = insert_rec - extra_size;
00620 
00621   i = 0;
00622 
00623   if (cur_extra_size == extra_size) {
00624     ulint   min_rec_size = ut_min(cur_rec_size, rec_size);
00625 
00626     const byte* cur_ptr = cursor_rec - cur_extra_size;
00627 
00628     /* Find out the first byte in insert_rec which differs from
00629     cursor_rec; skip the bytes in the record info */
00630 
00631     do {
00632       if (*ins_ptr == *cur_ptr) {
00633         i++;
00634         ins_ptr++;
00635         cur_ptr++;
00636       } else if ((i < extra_size)
00637            && (i >= extra_size
00638                - page_rec_get_base_extra_size
00639                (insert_rec))) {
00640         i = extra_size;
00641         ins_ptr = insert_rec;
00642         cur_ptr = cursor_rec;
00643       } else {
00644         break;
00645       }
00646     } while (i < min_rec_size);
00647   }
00648 
00649   if (mtr_get_log_mode(mtr) != MTR_LOG_SHORT_INSERTS) {
00650 
00651     if (page_rec_is_comp(insert_rec)) {
00652       log_ptr = mlog_open_and_write_index(
00653         mtr, insert_rec, index, MLOG_COMP_REC_INSERT,
00654         2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN);
00655       if (UNIV_UNLIKELY(!log_ptr)) {
00656         /* Logging in mtr is switched off
00657         during crash recovery: in that case
00658         mlog_open returns NULL */
00659         return;
00660       }
00661     } else {
00662       log_ptr = mlog_open(mtr, 11
00663               + 2 + 5 + 1 + 5 + 5
00664               + MLOG_BUF_MARGIN);
00665       if (UNIV_UNLIKELY(!log_ptr)) {
00666         /* Logging in mtr is switched off
00667         during crash recovery: in that case
00668         mlog_open returns NULL */
00669         return;
00670       }
00671 
00672       log_ptr = mlog_write_initial_log_record_fast(
00673         insert_rec, MLOG_REC_INSERT, log_ptr, mtr);
00674     }
00675 
00676     log_end = &log_ptr[2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
00677     /* Write the cursor rec offset as a 2-byte ulint */
00678     mach_write_to_2(log_ptr, page_offset(cursor_rec));
00679     log_ptr += 2;
00680   } else {
00681     log_ptr = mlog_open(mtr, 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN);
00682     if (!log_ptr) {
00683       /* Logging in mtr is switched off during crash
00684       recovery: in that case mlog_open returns NULL */
00685       return;
00686     }
00687     log_end = &log_ptr[5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
00688   }
00689 
00690   if (page_rec_is_comp(insert_rec)) {
00691     if (UNIV_UNLIKELY
00692         (rec_get_info_and_status_bits(insert_rec, TRUE)
00693          != rec_get_info_and_status_bits(cursor_rec, TRUE))) {
00694 
00695       goto need_extra_info;
00696     }
00697   } else {
00698     if (UNIV_UNLIKELY
00699         (rec_get_info_and_status_bits(insert_rec, FALSE)
00700          != rec_get_info_and_status_bits(cursor_rec, FALSE))) {
00701 
00702       goto need_extra_info;
00703     }
00704   }
00705 
00706   if (extra_size != cur_extra_size || rec_size != cur_rec_size) {
00707 need_extra_info:
00708     /* Write the record end segment length
00709     and the extra info storage flag */
00710     log_ptr += mach_write_compressed(log_ptr,
00711              2 * (rec_size - i) + 1);
00712 
00713     /* Write the info bits */
00714     mach_write_to_1(log_ptr,
00715         rec_get_info_and_status_bits(
00716           insert_rec,
00717           page_rec_is_comp(insert_rec)));
00718     log_ptr++;
00719 
00720     /* Write the record origin offset */
00721     log_ptr += mach_write_compressed(log_ptr, extra_size);
00722 
00723     /* Write the mismatch index */
00724     log_ptr += mach_write_compressed(log_ptr, i);
00725 
00726     ut_a(i < UNIV_PAGE_SIZE);
00727     ut_a(extra_size < UNIV_PAGE_SIZE);
00728   } else {
00729     /* Write the record end segment length
00730     and the extra info storage flag */
00731     log_ptr += mach_write_compressed(log_ptr, 2 * (rec_size - i));
00732   }
00733 
00734   /* Write to the log the inserted index record end segment which
00735   differs from the cursor record */
00736 
00737   rec_size -= i;
00738 
00739   if (log_ptr + rec_size <= log_end) {
00740     memcpy(log_ptr, ins_ptr, rec_size);
00741     mlog_close(mtr, log_ptr + rec_size);
00742   } else {
00743     mlog_close(mtr, log_ptr);
00744     ut_a(rec_size < UNIV_PAGE_SIZE);
00745     mlog_catenate_string(mtr, ins_ptr, rec_size);
00746   }
00747 }
00748 #else /* !UNIV_HOTBACKUP */
00749 # define page_cur_insert_rec_write_log(ins_rec,size,cur,index,mtr) ((void) 0)
00750 #endif /* !UNIV_HOTBACKUP */
00751 
00752 /***********************************************************/
00755 UNIV_INTERN
00756 byte*
00757 page_cur_parse_insert_rec(
00758 /*======================*/
00759   ibool   is_short,
00760   byte*   ptr,  
00761   byte*   end_ptr,
00762   buf_block_t*  block,  
00763   dict_index_t* index,  
00764   mtr_t*    mtr)  
00765 {
00766   ulint origin_offset;
00767   ulint end_seg_len;
00768   ulint mismatch_index;
00769   page_t* page;
00770   rec_t*  cursor_rec;
00771   byte  buf1[1024];
00772   byte* buf;
00773   byte* ptr2      = ptr;
00774   ulint info_and_status_bits = 0; /* remove warning */
00775   page_cur_t cursor;
00776   mem_heap_t* heap    = NULL;
00777   ulint   offsets_[REC_OFFS_NORMAL_SIZE];
00778   ulint*    offsets   = offsets_;
00779   rec_offs_init(offsets_);
00780 
00781   page = block ? buf_block_get_frame(block) : NULL;
00782 
00783   if (is_short) {
00784     cursor_rec = page_rec_get_prev(page_get_supremum_rec(page));
00785   } else {
00786     ulint offset;
00787 
00788     /* Read the cursor rec offset as a 2-byte ulint */
00789 
00790     if (UNIV_UNLIKELY(end_ptr < ptr + 2)) {
00791 
00792       return(NULL);
00793     }
00794 
00795     offset = mach_read_from_2(ptr);
00796     ptr += 2;
00797 
00798     cursor_rec = page + offset;
00799 
00800     if (UNIV_UNLIKELY(offset >= UNIV_PAGE_SIZE)) {
00801 
00802       recv_sys->found_corrupt_log = TRUE;
00803 
00804       return(NULL);
00805     }
00806   }
00807 
00808   ptr = mach_parse_compressed(ptr, end_ptr, &end_seg_len);
00809 
00810   if (ptr == NULL) {
00811 
00812     return(NULL);
00813   }
00814 
00815   if (UNIV_UNLIKELY(end_seg_len >= UNIV_PAGE_SIZE << 1)) {
00816     recv_sys->found_corrupt_log = TRUE;
00817 
00818     return(NULL);
00819   }
00820 
00821   if (end_seg_len & 0x1UL) {
00822     /* Read the info bits */
00823 
00824     if (end_ptr < ptr + 1) {
00825 
00826       return(NULL);
00827     }
00828 
00829     info_and_status_bits = mach_read_from_1(ptr);
00830     ptr++;
00831 
00832     ptr = mach_parse_compressed(ptr, end_ptr, &origin_offset);
00833 
00834     if (ptr == NULL) {
00835 
00836       return(NULL);
00837     }
00838 
00839     ut_a(origin_offset < UNIV_PAGE_SIZE);
00840 
00841     ptr = mach_parse_compressed(ptr, end_ptr, &mismatch_index);
00842 
00843     if (ptr == NULL) {
00844 
00845       return(NULL);
00846     }
00847 
00848     ut_a(mismatch_index < UNIV_PAGE_SIZE);
00849   }
00850 
00851   if (UNIV_UNLIKELY(end_ptr < ptr + (end_seg_len >> 1))) {
00852 
00853     return(NULL);
00854   }
00855 
00856   if (!block) {
00857 
00858     return(ptr + (end_seg_len >> 1));
00859   }
00860 
00861   ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
00862   ut_ad(!buf_block_get_page_zip(block) || page_is_comp(page));
00863 
00864   /* Read from the log the inserted index record end segment which
00865   differs from the cursor record */
00866 
00867   offsets = rec_get_offsets(cursor_rec, index, offsets,
00868           ULINT_UNDEFINED, &heap);
00869 
00870   if (!(end_seg_len & 0x1UL)) {
00871     info_and_status_bits = rec_get_info_and_status_bits(
00872       cursor_rec, page_is_comp(page));
00873     origin_offset = rec_offs_extra_size(offsets);
00874     mismatch_index = rec_offs_size(offsets) - (end_seg_len >> 1);
00875   }
00876 
00877   end_seg_len >>= 1;
00878 
00879   if (mismatch_index + end_seg_len < sizeof buf1) {
00880     buf = buf1;
00881   } else {
00882     buf = static_cast<byte *>(mem_alloc(mismatch_index + end_seg_len));
00883   }
00884 
00885   /* Build the inserted record to buf */
00886 
00887         if (UNIV_UNLIKELY(mismatch_index >= UNIV_PAGE_SIZE)) {
00888     fprintf(stderr,
00889       "Is short %lu, info_and_status_bits %lu, offset %lu, "
00890       "o_offset %lu\n"
00891       "mismatch index %lu, end_seg_len %lu\n"
00892       "parsed len %lu\n",
00893       (ulong) is_short, (ulong) info_and_status_bits,
00894       (ulong) page_offset(cursor_rec),
00895       (ulong) origin_offset,
00896       (ulong) mismatch_index, (ulong) end_seg_len,
00897       (ulong) (ptr - ptr2));
00898 
00899     fputs("Dump of 300 bytes of log:\n", stderr);
00900     ut_print_buf(stderr, ptr2, 300);
00901     putc('\n', stderr);
00902 
00903     buf_page_print(page, 0);
00904 
00905     ut_error;
00906   }
00907 
00908   ut_memcpy(buf, rec_get_start(cursor_rec, offsets), mismatch_index);
00909   ut_memcpy(buf + mismatch_index, ptr, end_seg_len);
00910 
00911   if (page_is_comp(page)) {
00912     rec_set_info_and_status_bits(buf + origin_offset,
00913              info_and_status_bits);
00914   } else {
00915     rec_set_info_bits_old(buf + origin_offset,
00916               info_and_status_bits);
00917   }
00918 
00919   page_cur_position(cursor_rec, block, &cursor);
00920 
00921   offsets = rec_get_offsets(buf + origin_offset, index, offsets,
00922           ULINT_UNDEFINED, &heap);
00923   if (UNIV_UNLIKELY(!page_cur_rec_insert(&cursor,
00924                  buf + origin_offset,
00925                  index, offsets, mtr))) {
00926     /* The redo log record should only have been written
00927     after the write was successful. */
00928     ut_error;
00929   }
00930 
00931   if (buf != buf1) {
00932 
00933     mem_free(buf);
00934   }
00935 
00936   if (UNIV_LIKELY_NULL(heap)) {
00937     mem_heap_free(heap);
00938   }
00939 
00940   return(ptr + end_seg_len);
00941 }
00942 
00943 /***********************************************************/
00948 UNIV_INTERN
00949 rec_t*
00950 page_cur_insert_rec_low(
00951 /*====================*/
00952   rec_t*    current_rec,
00954   dict_index_t* index,  
00955   const rec_t*  rec,  
00956   ulint*    offsets,
00957   mtr_t*    mtr)  
00958 {
00959   byte*   insert_buf;
00960   ulint   rec_size;
00961   page_t*   page;   
00962   rec_t*    last_insert;  
00964   rec_t*    free_rec; 
00966   rec_t*    insert_rec; 
00967   ulint   heap_no;  
00970   ut_ad(rec_offs_validate(rec, index, offsets));
00971 
00972   page = page_align(current_rec);
00973   ut_ad(dict_table_is_comp(index->table)
00974         == (ibool) !!page_is_comp(page));
00975 
00976   ut_ad(!page_rec_is_supremum(current_rec));
00977 
00978   /* 1. Get the size of the physical record in the page */
00979   rec_size = rec_offs_size(offsets);
00980 
00981 #ifdef UNIV_DEBUG_VALGRIND
00982   {
00983     const void* rec_start
00984       = rec - rec_offs_extra_size(offsets);
00985     ulint   extra_size
00986       = rec_offs_extra_size(offsets)
00987       - (rec_offs_comp(offsets)
00988          ? REC_N_NEW_EXTRA_BYTES
00989          : REC_N_OLD_EXTRA_BYTES);
00990 
00991     /* All data bytes of the record must be valid. */
00992     UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
00993     /* The variable-length header must be valid. */
00994     UNIV_MEM_ASSERT_RW(rec_start, extra_size);
00995   }
00996 #endif /* UNIV_DEBUG_VALGRIND */
00997 
00998   /* 2. Try to find suitable space from page memory management */
00999 
01000   free_rec = page_header_get_ptr(page, PAGE_FREE);
01001   if (UNIV_LIKELY_NULL(free_rec)) {
01002     /* Try to allocate from the head of the free list. */
01003     ulint   foffsets_[REC_OFFS_NORMAL_SIZE];
01004     ulint*    foffsets  = foffsets_;
01005     mem_heap_t* heap    = NULL;
01006 
01007     rec_offs_init(foffsets_);
01008 
01009     foffsets = rec_get_offsets(free_rec, index, foffsets,
01010           ULINT_UNDEFINED, &heap);
01011     if (rec_offs_size(foffsets) < rec_size) {
01012       if (UNIV_LIKELY_NULL(heap)) {
01013         mem_heap_free(heap);
01014       }
01015 
01016       goto use_heap;
01017     }
01018 
01019     insert_buf = free_rec - rec_offs_extra_size(foffsets);
01020 
01021     if (page_is_comp(page)) {
01022       heap_no = rec_get_heap_no_new(free_rec);
01023       page_mem_alloc_free(page, NULL,
01024           rec_get_next_ptr(free_rec, TRUE),
01025           rec_size);
01026     } else {
01027       heap_no = rec_get_heap_no_old(free_rec);
01028       page_mem_alloc_free(page, NULL,
01029           rec_get_next_ptr(free_rec, FALSE),
01030           rec_size);
01031     }
01032 
01033     if (UNIV_LIKELY_NULL(heap)) {
01034       mem_heap_free(heap);
01035     }
01036   } else {
01037 use_heap:
01038     free_rec = NULL;
01039     insert_buf = page_mem_alloc_heap(page, NULL,
01040              rec_size, &heap_no);
01041 
01042     if (UNIV_UNLIKELY(insert_buf == NULL)) {
01043       return(NULL);
01044     }
01045   }
01046 
01047   /* 3. Create the record */
01048   insert_rec = rec_copy(insert_buf, rec, offsets);
01049   rec_offs_make_valid(insert_rec, index, offsets);
01050 
01051   /* 4. Insert the record in the linked list of records */
01052   ut_ad(current_rec != insert_rec);
01053 
01054   {
01055     /* next record after current before the insertion */
01056     rec_t*  next_rec = page_rec_get_next(current_rec);
01057 #ifdef UNIV_DEBUG
01058     if (page_is_comp(page)) {
01059       ut_ad(rec_get_status(current_rec)
01060         <= REC_STATUS_INFIMUM);
01061       ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM);
01062       ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
01063     }
01064 #endif
01065     page_rec_set_next(insert_rec, next_rec);
01066     page_rec_set_next(current_rec, insert_rec);
01067   }
01068 
01069   page_header_set_field(page, NULL, PAGE_N_RECS,
01070             1 + page_get_n_recs(page));
01071 
01072   /* 5. Set the n_owned field in the inserted record to zero,
01073   and set the heap_no field */
01074   if (page_is_comp(page)) {
01075     rec_set_n_owned_new(insert_rec, NULL, 0);
01076     rec_set_heap_no_new(insert_rec, heap_no);
01077   } else {
01078     rec_set_n_owned_old(insert_rec, 0);
01079     rec_set_heap_no_old(insert_rec, heap_no);
01080   }
01081 
01082   UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets),
01083          rec_offs_size(offsets));
01084   /* 6. Update the last insertion info in page header */
01085 
01086   last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
01087   ut_ad(!last_insert || !page_is_comp(page)
01088         || rec_get_node_ptr_flag(last_insert)
01089         == rec_get_node_ptr_flag(insert_rec));
01090 
01091   if (UNIV_UNLIKELY(last_insert == NULL)) {
01092     page_header_set_field(page, NULL, PAGE_DIRECTION,
01093               PAGE_NO_DIRECTION);
01094     page_header_set_field(page, NULL, PAGE_N_DIRECTION, 0);
01095 
01096   } else if ((last_insert == current_rec)
01097        && (page_header_get_field(page, PAGE_DIRECTION)
01098            != PAGE_LEFT)) {
01099 
01100     page_header_set_field(page, NULL, PAGE_DIRECTION,
01101               PAGE_RIGHT);
01102     page_header_set_field(page, NULL, PAGE_N_DIRECTION,
01103               page_header_get_field(
01104                 page, PAGE_N_DIRECTION) + 1);
01105 
01106   } else if ((page_rec_get_next(insert_rec) == last_insert)
01107        && (page_header_get_field(page, PAGE_DIRECTION)
01108            != PAGE_RIGHT)) {
01109 
01110     page_header_set_field(page, NULL, PAGE_DIRECTION,
01111               PAGE_LEFT);
01112     page_header_set_field(page, NULL, PAGE_N_DIRECTION,
01113               page_header_get_field(
01114                 page, PAGE_N_DIRECTION) + 1);
01115   } else {
01116     page_header_set_field(page, NULL, PAGE_DIRECTION,
01117               PAGE_NO_DIRECTION);
01118     page_header_set_field(page, NULL, PAGE_N_DIRECTION, 0);
01119   }
01120 
01121   page_header_set_ptr(page, NULL, PAGE_LAST_INSERT, insert_rec);
01122 
01123   /* 7. It remains to update the owner record. */
01124   {
01125     rec_t*  owner_rec = page_rec_find_owner_rec(insert_rec);
01126     ulint n_owned;
01127     if (page_is_comp(page)) {
01128       n_owned = rec_get_n_owned_new(owner_rec);
01129       rec_set_n_owned_new(owner_rec, NULL, n_owned + 1);
01130     } else {
01131       n_owned = rec_get_n_owned_old(owner_rec);
01132       rec_set_n_owned_old(owner_rec, n_owned + 1);
01133     }
01134 
01135     /* 8. Now we have incremented the n_owned field of the owner
01136     record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
01137     we have to split the corresponding directory slot in two. */
01138 
01139     if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
01140       page_dir_split_slot(
01141         page, NULL,
01142         page_dir_find_owner_slot(owner_rec));
01143     }
01144   }
01145 
01146   /* 9. Write log record of the insert */
01147   if (UNIV_LIKELY(mtr != NULL)) {
01148     page_cur_insert_rec_write_log(insert_rec, rec_size,
01149                 current_rec, index, mtr);
01150   }
01151 
01152   return(insert_rec);
01153 }
01154 
01155 /***********************************************************/
01158 static
01159 rec_t*
01160 page_cur_insert_rec_zip_reorg(
01161 /*==========================*/
01162   rec_t**   current_rec,
01164   buf_block_t*  block,  
01165   dict_index_t* index,  
01166   rec_t*    rec,  
01167   page_t*   page, 
01168   page_zip_des_t* page_zip,
01169   mtr_t*    mtr)  
01170 {
01171   ulint   pos;
01172 
01173   /* Recompress or reorganize and recompress the page. */
01174   if (UNIV_LIKELY(page_zip_compress(page_zip, page, index, mtr))) {
01175     return(rec);
01176   }
01177 
01178   /* Before trying to reorganize the page,
01179   store the number of preceding records on the page. */
01180   pos = page_rec_get_n_recs_before(rec);
01181 
01182   if (page_zip_reorganize(block, index, mtr)) {
01183     /* The page was reorganized: Find rec by seeking to pos,
01184     and update *current_rec. */
01185     rec = page + PAGE_NEW_INFIMUM;
01186 
01187     while (--pos) {
01188       rec = page + rec_get_next_offs(rec, TRUE);
01189     }
01190 
01191     *current_rec = rec;
01192     rec = page + rec_get_next_offs(rec, TRUE);
01193 
01194     return(rec);
01195   }
01196 
01197   /* Out of space: restore the page */
01198   if (!page_zip_decompress(page_zip, page, FALSE)) {
01199     ut_error; /* Memory corrupted? */
01200   }
01201   ut_ad(page_validate(page, index));
01202   return(NULL);
01203 }
01204 
01205 /***********************************************************/
01211 UNIV_INTERN
01212 rec_t*
01213 page_cur_insert_rec_zip(
01214 /*====================*/
01215   rec_t**   current_rec,
01217   buf_block_t*  block,  
01218   dict_index_t* index,  
01219   const rec_t*  rec,  
01220   ulint*    offsets,
01221   mtr_t*    mtr)  
01222 {
01223   byte*   insert_buf;
01224   ulint   rec_size;
01225   page_t*   page;   
01226   rec_t*    last_insert;  
01228   rec_t*    free_rec; 
01230   rec_t*    insert_rec; 
01231   ulint   heap_no;  
01233   page_zip_des_t* page_zip;
01234 
01235   page_zip = buf_block_get_page_zip(block);
01236   ut_ad(page_zip);
01237 
01238   ut_ad(rec_offs_validate(rec, index, offsets));
01239 
01240   page = page_align(*current_rec);
01241   ut_ad(dict_table_is_comp(index->table));
01242   ut_ad(page_is_comp(page));
01243 
01244   ut_ad(!page_rec_is_supremum(*current_rec));
01245 #ifdef UNIV_ZIP_DEBUG
01246   ut_a(page_zip_validate(page_zip, page));
01247 #endif /* UNIV_ZIP_DEBUG */
01248 
01249   /* 1. Get the size of the physical record in the page */
01250   rec_size = rec_offs_size(offsets);
01251 
01252 #ifdef UNIV_DEBUG_VALGRIND
01253   {
01254     const void* rec_start
01255       = rec - rec_offs_extra_size(offsets);
01256     ulint   extra_size
01257       = rec_offs_extra_size(offsets)
01258       - (rec_offs_comp(offsets)
01259          ? REC_N_NEW_EXTRA_BYTES
01260          : REC_N_OLD_EXTRA_BYTES);
01261 
01262     /* All data bytes of the record must be valid. */
01263     UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
01264     /* The variable-length header must be valid. */
01265     UNIV_MEM_ASSERT_RW(rec_start, extra_size);
01266   }
01267 #endif /* UNIV_DEBUG_VALGRIND */
01268 
01269   /* 2. Try to find suitable space from page memory management */
01270   if (!page_zip_available(page_zip, dict_index_is_clust(index),
01271         rec_size, 1)) {
01272 
01273     /* Try compressing the whole page afterwards. */
01274     insert_rec = page_cur_insert_rec_low(*current_rec,
01275                  index, rec, offsets,
01276                  NULL);
01277 
01278     if (UNIV_LIKELY(insert_rec != NULL)) {
01279       insert_rec = page_cur_insert_rec_zip_reorg(
01280         current_rec, block, index, insert_rec,
01281         page, page_zip, mtr);
01282     }
01283 
01284     return(insert_rec);
01285   }
01286 
01287   free_rec = page_header_get_ptr(page, PAGE_FREE);
01288   if (UNIV_LIKELY_NULL(free_rec)) {
01289     /* Try to allocate from the head of the free list. */
01290     lint  extra_size_diff;
01291     ulint   foffsets_[REC_OFFS_NORMAL_SIZE];
01292     ulint*    foffsets  = foffsets_;
01293     mem_heap_t* heap    = NULL;
01294 
01295     rec_offs_init(foffsets_);
01296 
01297     foffsets = rec_get_offsets(free_rec, index, foffsets,
01298           ULINT_UNDEFINED, &heap);
01299     if (rec_offs_size(foffsets) < rec_size) {
01300 too_small:
01301       if (UNIV_LIKELY_NULL(heap)) {
01302         mem_heap_free(heap);
01303       }
01304 
01305       goto use_heap;
01306     }
01307 
01308     insert_buf = free_rec - rec_offs_extra_size(foffsets);
01309 
01310     /* On compressed pages, do not relocate records from
01311     the free list.  If extra_size would grow, use the heap. */
01312     extra_size_diff
01313       = rec_offs_extra_size(offsets)
01314       - rec_offs_extra_size(foffsets);
01315 
01316     if (UNIV_UNLIKELY(extra_size_diff < 0)) {
01317       /* Add an offset to the extra_size. */
01318       if (rec_offs_size(foffsets)
01319           < rec_size - extra_size_diff) {
01320 
01321         goto too_small;
01322       }
01323 
01324       insert_buf -= extra_size_diff;
01325     } else if (UNIV_UNLIKELY(extra_size_diff)) {
01326       /* Do not allow extra_size to grow */
01327 
01328       goto too_small;
01329     }
01330 
01331     heap_no = rec_get_heap_no_new(free_rec);
01332     page_mem_alloc_free(page, page_zip,
01333             rec_get_next_ptr(free_rec, TRUE),
01334             rec_size);
01335 
01336     if (!page_is_leaf(page)) {
01337       /* Zero out the node pointer of free_rec,
01338       in case it will not be overwritten by
01339       insert_rec. */
01340 
01341       ut_ad(rec_size > REC_NODE_PTR_SIZE);
01342 
01343       if (rec_offs_extra_size(foffsets)
01344           + rec_offs_data_size(foffsets) > rec_size) {
01345 
01346         memset(rec_get_end(free_rec, foffsets)
01347                - REC_NODE_PTR_SIZE, 0,
01348                REC_NODE_PTR_SIZE);
01349       }
01350     } else if (dict_index_is_clust(index)) {
01351       /* Zero out the DB_TRX_ID and DB_ROLL_PTR
01352       columns of free_rec, in case it will not be
01353       overwritten by insert_rec. */
01354 
01355       ulint trx_id_col;
01356       ulint trx_id_offs;
01357       ulint len;
01358 
01359       trx_id_col = dict_index_get_sys_col_pos(index,
01360                 DATA_TRX_ID);
01361       ut_ad(trx_id_col > 0);
01362       ut_ad(trx_id_col != ULINT_UNDEFINED);
01363 
01364       trx_id_offs = rec_get_nth_field_offs(foffsets,
01365                    trx_id_col, &len);
01366       ut_ad(len == DATA_TRX_ID_LEN);
01367 
01368       if (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN + trx_id_offs
01369           + rec_offs_extra_size(foffsets) > rec_size) {
01370         /* We will have to zero out the
01371         DB_TRX_ID and DB_ROLL_PTR, because
01372         they will not be fully overwritten by
01373         insert_rec. */
01374 
01375         memset(free_rec + trx_id_offs, 0,
01376                DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
01377       }
01378 
01379       ut_ad(free_rec + trx_id_offs + DATA_TRX_ID_LEN
01380             == rec_get_nth_field(free_rec, foffsets,
01381                trx_id_col + 1, &len));
01382       ut_ad(len == DATA_ROLL_PTR_LEN);
01383     }
01384 
01385     if (UNIV_LIKELY_NULL(heap)) {
01386       mem_heap_free(heap);
01387     }
01388   } else {
01389 use_heap:
01390     free_rec = NULL;
01391     insert_buf = page_mem_alloc_heap(page, page_zip,
01392              rec_size, &heap_no);
01393 
01394     if (UNIV_UNLIKELY(insert_buf == NULL)) {
01395       return(NULL);
01396     }
01397 
01398     page_zip_dir_add_slot(page_zip, dict_index_is_clust(index));
01399   }
01400 
01401   /* 3. Create the record */
01402   insert_rec = rec_copy(insert_buf, rec, offsets);
01403   rec_offs_make_valid(insert_rec, index, offsets);
01404 
01405   /* 4. Insert the record in the linked list of records */
01406   ut_ad(*current_rec != insert_rec);
01407 
01408   {
01409     /* next record after current before the insertion */
01410     rec_t*  next_rec = page_rec_get_next(*current_rec);
01411     ut_ad(rec_get_status(*current_rec)
01412           <= REC_STATUS_INFIMUM);
01413     ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM);
01414     ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
01415 
01416     page_rec_set_next(insert_rec, next_rec);
01417     page_rec_set_next(*current_rec, insert_rec);
01418   }
01419 
01420   page_header_set_field(page, page_zip, PAGE_N_RECS,
01421             1 + page_get_n_recs(page));
01422 
01423   /* 5. Set the n_owned field in the inserted record to zero,
01424   and set the heap_no field */
01425   rec_set_n_owned_new(insert_rec, NULL, 0);
01426   rec_set_heap_no_new(insert_rec, heap_no);
01427 
01428   UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets),
01429          rec_offs_size(offsets));
01430 
01431   page_zip_dir_insert(page_zip, *current_rec, free_rec, insert_rec);
01432 
01433   /* 6. Update the last insertion info in page header */
01434 
01435   last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
01436   ut_ad(!last_insert
01437         || rec_get_node_ptr_flag(last_insert)
01438         == rec_get_node_ptr_flag(insert_rec));
01439 
01440   if (UNIV_UNLIKELY(last_insert == NULL)) {
01441     page_header_set_field(page, page_zip, PAGE_DIRECTION,
01442               PAGE_NO_DIRECTION);
01443     page_header_set_field(page, page_zip, PAGE_N_DIRECTION, 0);
01444 
01445   } else if ((last_insert == *current_rec)
01446        && (page_header_get_field(page, PAGE_DIRECTION)
01447            != PAGE_LEFT)) {
01448 
01449     page_header_set_field(page, page_zip, PAGE_DIRECTION,
01450               PAGE_RIGHT);
01451     page_header_set_field(page, page_zip, PAGE_N_DIRECTION,
01452               page_header_get_field(
01453                 page, PAGE_N_DIRECTION) + 1);
01454 
01455   } else if ((page_rec_get_next(insert_rec) == last_insert)
01456        && (page_header_get_field(page, PAGE_DIRECTION)
01457            != PAGE_RIGHT)) {
01458 
01459     page_header_set_field(page, page_zip, PAGE_DIRECTION,
01460               PAGE_LEFT);
01461     page_header_set_field(page, page_zip, PAGE_N_DIRECTION,
01462               page_header_get_field(
01463                 page, PAGE_N_DIRECTION) + 1);
01464   } else {
01465     page_header_set_field(page, page_zip, PAGE_DIRECTION,
01466               PAGE_NO_DIRECTION);
01467     page_header_set_field(page, page_zip, PAGE_N_DIRECTION, 0);
01468   }
01469 
01470   page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, insert_rec);
01471 
01472   /* 7. It remains to update the owner record. */
01473   {
01474     rec_t*  owner_rec = page_rec_find_owner_rec(insert_rec);
01475     ulint n_owned;
01476 
01477     n_owned = rec_get_n_owned_new(owner_rec);
01478     rec_set_n_owned_new(owner_rec, page_zip, n_owned + 1);
01479 
01480     /* 8. Now we have incremented the n_owned field of the owner
01481     record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
01482     we have to split the corresponding directory slot in two. */
01483 
01484     if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
01485       page_dir_split_slot(
01486         page, page_zip,
01487         page_dir_find_owner_slot(owner_rec));
01488     }
01489   }
01490 
01491   page_zip_write_rec(page_zip, insert_rec, index, offsets, 1);
01492 
01493   /* 9. Write log record of the insert */
01494   if (UNIV_LIKELY(mtr != NULL)) {
01495     page_cur_insert_rec_write_log(insert_rec, rec_size,
01496                 *current_rec, index, mtr);
01497   }
01498 
01499   return(insert_rec);
01500 }
01501 
01502 #ifndef UNIV_HOTBACKUP
01503 /**********************************************************/
01507 UNIV_INLINE
01508 byte*
01509 page_copy_rec_list_to_created_page_write_log(
01510 /*=========================================*/
01511   page_t*   page, 
01512   dict_index_t* index,  
01513   mtr_t*    mtr)  
01514 {
01515   byte* log_ptr;
01516 
01517   ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
01518 
01519   log_ptr = mlog_open_and_write_index(mtr, page, index,
01520               page_is_comp(page)
01521               ? MLOG_COMP_LIST_END_COPY_CREATED
01522               : MLOG_LIST_END_COPY_CREATED, 4);
01523   if (UNIV_LIKELY(log_ptr != NULL)) {
01524     mlog_close(mtr, log_ptr + 4);
01525   }
01526 
01527   return(log_ptr);
01528 }
01529 #endif /* !UNIV_HOTBACKUP */
01530 
01531 /**********************************************************/
01534 UNIV_INTERN
01535 byte*
01536 page_parse_copy_rec_list_to_created_page(
01537 /*=====================================*/
01538   byte*   ptr,  
01539   byte*   end_ptr,
01540   buf_block_t*  block,  
01541   dict_index_t* index,  
01542   mtr_t*    mtr)  
01543 {
01544   byte*   rec_end;
01545   ulint   log_data_len;
01546   page_t*   page;
01547   page_zip_des_t* page_zip;
01548 
01549   if (ptr + 4 > end_ptr) {
01550 
01551     return(NULL);
01552   }
01553 
01554   log_data_len = mach_read_from_4(ptr);
01555   ptr += 4;
01556 
01557   rec_end = ptr + log_data_len;
01558 
01559   if (rec_end > end_ptr) {
01560 
01561     return(NULL);
01562   }
01563 
01564   if (!block) {
01565 
01566     return(rec_end);
01567   }
01568 
01569   while (ptr < rec_end) {
01570     ptr = page_cur_parse_insert_rec(TRUE, ptr, end_ptr,
01571             block, index, mtr);
01572   }
01573 
01574   ut_a(ptr == rec_end);
01575 
01576   page = buf_block_get_frame(block);
01577   page_zip = buf_block_get_page_zip(block);
01578 
01579   page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
01580   page_header_set_field(page, page_zip, PAGE_DIRECTION,
01581               PAGE_NO_DIRECTION);
01582   page_header_set_field(page, page_zip, PAGE_N_DIRECTION, 0);
01583 
01584   return(rec_end);
01585 }
01586 
01587 #ifndef UNIV_HOTBACKUP
01588 /*************************************************************/
01591 UNIV_INTERN
01592 void
01593 page_copy_rec_list_end_to_created_page(
01594 /*===================================*/
01595   page_t*   new_page, 
01596   rec_t*    rec,    
01597   dict_index_t* index,    
01598   mtr_t*    mtr)    
01599 {
01600   page_dir_slot_t* slot = 0; /* remove warning */
01601   byte* heap_top;
01602   rec_t*  insert_rec = 0; /* remove warning */
01603   rec_t*  prev_rec;
01604   ulint count;
01605   ulint n_recs;
01606   ulint slot_index;
01607   ulint rec_size;
01608   ulint log_mode;
01609   byte* log_ptr;
01610   ulint log_data_len;
01611   mem_heap_t* heap    = NULL;
01612   ulint   offsets_[REC_OFFS_NORMAL_SIZE];
01613   ulint*    offsets   = offsets_;
01614   rec_offs_init(offsets_);
01615 
01616   ut_ad(page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW);
01617   ut_ad(page_align(rec) != new_page);
01618   ut_ad(page_rec_is_comp(rec) == page_is_comp(new_page));
01619 
01620   if (page_rec_is_infimum(rec)) {
01621 
01622     rec = page_rec_get_next(rec);
01623   }
01624 
01625   if (page_rec_is_supremum(rec)) {
01626 
01627     return;
01628   }
01629 
01630 #ifdef UNIV_DEBUG
01631   /* To pass the debug tests we have to set these dummy values
01632   in the debug version */
01633   page_dir_set_n_slots(new_page, NULL, UNIV_PAGE_SIZE / 2);
01634   page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP,
01635           new_page + UNIV_PAGE_SIZE - 1);
01636 #endif
01637 
01638   log_ptr = page_copy_rec_list_to_created_page_write_log(new_page,
01639                      index, mtr);
01640 
01641   log_data_len = dyn_array_get_data_size(&(mtr->log));
01642 
01643   /* Individual inserts are logged in a shorter form */
01644 
01645   log_mode = mtr_set_log_mode(mtr, MTR_LOG_SHORT_INSERTS);
01646 
01647   prev_rec = page_get_infimum_rec(new_page);
01648   if (page_is_comp(new_page)) {
01649     heap_top = new_page + PAGE_NEW_SUPREMUM_END;
01650   } else {
01651     heap_top = new_page + PAGE_OLD_SUPREMUM_END;
01652   }
01653   count = 0;
01654   slot_index = 0;
01655   n_recs = 0;
01656 
01657   do {
01658     offsets = rec_get_offsets(rec, index, offsets,
01659             ULINT_UNDEFINED, &heap);
01660     insert_rec = rec_copy(heap_top, rec, offsets);
01661 
01662     if (page_is_comp(new_page)) {
01663       rec_set_next_offs_new(prev_rec,
01664                 page_offset(insert_rec));
01665 
01666       rec_set_n_owned_new(insert_rec, NULL, 0);
01667       rec_set_heap_no_new(insert_rec,
01668               PAGE_HEAP_NO_USER_LOW + n_recs);
01669     } else {
01670       rec_set_next_offs_old(prev_rec,
01671                 page_offset(insert_rec));
01672 
01673       rec_set_n_owned_old(insert_rec, 0);
01674       rec_set_heap_no_old(insert_rec,
01675               PAGE_HEAP_NO_USER_LOW + n_recs);
01676     }
01677 
01678     count++;
01679     n_recs++;
01680 
01681     if (UNIV_UNLIKELY
01682         (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2)) {
01683 
01684       slot_index++;
01685 
01686       slot = page_dir_get_nth_slot(new_page, slot_index);
01687 
01688       page_dir_slot_set_rec(slot, insert_rec);
01689       page_dir_slot_set_n_owned(slot, NULL, count);
01690 
01691       count = 0;
01692     }
01693 
01694     rec_size = rec_offs_size(offsets);
01695 
01696     ut_ad(heap_top < new_page + UNIV_PAGE_SIZE);
01697 
01698     heap_top += rec_size;
01699 
01700     page_cur_insert_rec_write_log(insert_rec, rec_size, prev_rec,
01701                 index, mtr);
01702     prev_rec = insert_rec;
01703     rec = page_rec_get_next(rec);
01704   } while (!page_rec_is_supremum(rec));
01705 
01706   if ((slot_index > 0) && (count + 1
01707          + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2
01708          <= PAGE_DIR_SLOT_MAX_N_OWNED)) {
01709     /* We can merge the two last dir slots. This operation is
01710     here to make this function imitate exactly the equivalent
01711     task made using page_cur_insert_rec, which we use in database
01712     recovery to reproduce the task performed by this function.
01713     To be able to check the correctness of recovery, it is good
01714     that it imitates exactly. */
01715 
01716     count += (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
01717 
01718     page_dir_slot_set_n_owned(slot, NULL, 0);
01719 
01720     slot_index--;
01721   }
01722 
01723   if (UNIV_LIKELY_NULL(heap)) {
01724     mem_heap_free(heap);
01725   }
01726 
01727   log_data_len = dyn_array_get_data_size(&(mtr->log)) - log_data_len;
01728 
01729   ut_a(log_data_len < 100 * UNIV_PAGE_SIZE);
01730 
01731   if (UNIV_LIKELY(log_ptr != NULL)) {
01732     mach_write_to_4(log_ptr, log_data_len);
01733   }
01734 
01735   if (page_is_comp(new_page)) {
01736     rec_set_next_offs_new(insert_rec, PAGE_NEW_SUPREMUM);
01737   } else {
01738     rec_set_next_offs_old(insert_rec, PAGE_OLD_SUPREMUM);
01739   }
01740 
01741   slot = page_dir_get_nth_slot(new_page, 1 + slot_index);
01742 
01743   page_dir_slot_set_rec(slot, page_get_supremum_rec(new_page));
01744   page_dir_slot_set_n_owned(slot, NULL, count + 1);
01745 
01746   page_dir_set_n_slots(new_page, NULL, 2 + slot_index);
01747   page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP, heap_top);
01748   page_dir_set_n_heap(new_page, NULL, PAGE_HEAP_NO_USER_LOW + n_recs);
01749   page_header_set_field(new_page, NULL, PAGE_N_RECS, n_recs);
01750 
01751   page_header_set_ptr(new_page, NULL, PAGE_LAST_INSERT, NULL);
01752   page_header_set_field(new_page, NULL, PAGE_DIRECTION,
01753               PAGE_NO_DIRECTION);
01754   page_header_set_field(new_page, NULL, PAGE_N_DIRECTION, 0);
01755 
01756   /* Restore the log mode */
01757 
01758   mtr_set_log_mode(mtr, log_mode);
01759 }
01760 
01761 /***********************************************************/
01763 UNIV_INLINE
01764 void
01765 page_cur_delete_rec_write_log(
01766 /*==========================*/
01767   rec_t*    rec,  
01768   dict_index_t* index,  
01769   mtr_t*    mtr)  
01770 {
01771   byte* log_ptr;
01772 
01773   ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
01774 
01775   log_ptr = mlog_open_and_write_index(mtr, rec, index,
01776               page_rec_is_comp(rec)
01777               ? MLOG_COMP_REC_DELETE
01778               : MLOG_REC_DELETE, 2);
01779 
01780   if (!log_ptr) {
01781     /* Logging in mtr is switched off during crash recovery:
01782     in that case mlog_open returns NULL */
01783     return;
01784   }
01785 
01786   /* Write the cursor rec offset as a 2-byte ulint */
01787   mach_write_to_2(log_ptr, page_offset(rec));
01788 
01789   mlog_close(mtr, log_ptr + 2);
01790 }
01791 #else /* !UNIV_HOTBACKUP */
01792 # define page_cur_delete_rec_write_log(rec,index,mtr) ((void) 0)
01793 #endif /* !UNIV_HOTBACKUP */
01794 
01795 /***********************************************************/
01798 UNIV_INTERN
01799 byte*
01800 page_cur_parse_delete_rec(
01801 /*======================*/
01802   byte*   ptr,  
01803   byte*   end_ptr,
01804   buf_block_t*  block,  
01805   dict_index_t* index,  
01806   mtr_t*    mtr)  
01807 {
01808   ulint   offset;
01809   page_cur_t  cursor;
01810 
01811   if (end_ptr < ptr + 2) {
01812 
01813     return(NULL);
01814   }
01815 
01816   /* Read the cursor rec offset as a 2-byte ulint */
01817   offset = mach_read_from_2(ptr);
01818   ptr += 2;
01819 
01820   ut_a(offset <= UNIV_PAGE_SIZE);
01821 
01822   if (block) {
01823     page_t*   page    = buf_block_get_frame(block);
01824     mem_heap_t* heap    = NULL;
01825     ulint   offsets_[REC_OFFS_NORMAL_SIZE];
01826     rec_t*    rec   = page + offset;
01827     rec_offs_init(offsets_);
01828 
01829     page_cur_position(rec, block, &cursor);
01830     ut_ad(!buf_block_get_page_zip(block) || page_is_comp(page));
01831 
01832     page_cur_delete_rec(&cursor, index,
01833             rec_get_offsets(rec, index, offsets_,
01834                 ULINT_UNDEFINED, &heap),
01835             mtr);
01836     if (UNIV_LIKELY_NULL(heap)) {
01837       mem_heap_free(heap);
01838     }
01839   }
01840 
01841   return(ptr);
01842 }
01843 
01844 /***********************************************************/
01847 UNIV_INTERN
01848 void
01849 page_cur_delete_rec(
01850 /*================*/
01851   page_cur_t* cursor, 
01852   dict_index_t* index,  
01853   const ulint*  offsets,
01854   mtr_t*    mtr)  
01855 {
01856   page_dir_slot_t* cur_dir_slot;
01857   page_dir_slot_t* prev_slot;
01858   page_t*   page;
01859   page_zip_des_t* page_zip;
01860   rec_t*    current_rec;
01861   rec_t*    prev_rec  = NULL;
01862   rec_t*    next_rec;
01863   ulint   cur_slot_no;
01864   ulint   cur_n_owned;
01865   rec_t*    rec;
01866 
01867   ut_ad(cursor && mtr);
01868 
01869   page = page_cur_get_page(cursor);
01870   page_zip = page_cur_get_page_zip(cursor);
01871 
01872   /* page_zip_validate() will fail here when
01873   btr_cur_pessimistic_delete() invokes btr_set_min_rec_mark().
01874   Then, both "page_zip" and "page" would have the min-rec-mark
01875   set on the smallest user record, but "page" would additionally
01876   have it set on the smallest-but-one record.  Because sloppy
01877   page_zip_validate_low() only ignores min-rec-flag differences
01878   in the smallest user record, it cannot be used here either. */
01879 
01880   current_rec = cursor->rec;
01881   ut_ad(rec_offs_validate(current_rec, index, offsets));
01882   ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
01883 
01884   /* The record must not be the supremum or infimum record. */
01885   ut_ad(page_rec_is_user_rec(current_rec));
01886 
01887   /* Save to local variables some data associated with current_rec */
01888   cur_slot_no = page_dir_find_owner_slot(current_rec);
01889   cur_dir_slot = page_dir_get_nth_slot(page, cur_slot_no);
01890   cur_n_owned = page_dir_slot_get_n_owned(cur_dir_slot);
01891 
01892   /* 0. Write the log record */
01893   page_cur_delete_rec_write_log(current_rec, index, mtr);
01894 
01895   /* 1. Reset the last insert info in the page header and increment
01896   the modify clock for the frame */
01897 
01898   page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
01899 
01900   /* The page gets invalid for optimistic searches: increment the
01901   frame modify clock */
01902 
01903   buf_block_modify_clock_inc(page_cur_get_block(cursor));
01904 
01905   /* 2. Find the next and the previous record. Note that the cursor is
01906   left at the next record. */
01907 
01908   ut_ad(cur_slot_no > 0);
01909   prev_slot = page_dir_get_nth_slot(page, cur_slot_no - 1);
01910 
01911   rec = (rec_t*) page_dir_slot_get_rec(prev_slot);
01912 
01913   /* rec now points to the record of the previous directory slot. Look
01914   for the immediate predecessor of current_rec in a loop. */
01915 
01916   while(current_rec != rec) {
01917     prev_rec = rec;
01918     rec = page_rec_get_next(rec);
01919   }
01920 
01921   page_cur_move_to_next(cursor);
01922   next_rec = cursor->rec;
01923 
01924   /* 3. Remove the record from the linked list of records */
01925 
01926   page_rec_set_next(prev_rec, next_rec);
01927 
01928   /* 4. If the deleted record is pointed to by a dir slot, update the
01929   record pointer in slot. In the following if-clause we assume that
01930   prev_rec is owned by the same slot, i.e., PAGE_DIR_SLOT_MIN_N_OWNED
01931   >= 2. */
01932 
01933 #if PAGE_DIR_SLOT_MIN_N_OWNED < 2
01934 # error "PAGE_DIR_SLOT_MIN_N_OWNED < 2"
01935 #endif
01936   ut_ad(cur_n_owned > 1);
01937 
01938   if (current_rec == page_dir_slot_get_rec(cur_dir_slot)) {
01939     page_dir_slot_set_rec(cur_dir_slot, prev_rec);
01940   }
01941 
01942   /* 5. Update the number of owned records of the slot */
01943 
01944   page_dir_slot_set_n_owned(cur_dir_slot, page_zip, cur_n_owned - 1);
01945 
01946   /* 6. Free the memory occupied by the record */
01947   page_mem_free(page, page_zip, current_rec, index, offsets);
01948 
01949   /* 7. Now we have decremented the number of owned records of the slot.
01950   If the number drops below PAGE_DIR_SLOT_MIN_N_OWNED, we balance the
01951   slots. */
01952 
01953   if (UNIV_UNLIKELY(cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED)) {
01954     page_dir_balance_slot(page, page_zip, cur_slot_no);
01955   }
01956 
01957 #ifdef UNIV_ZIP_DEBUG
01958   ut_a(!page_zip || page_zip_validate(page_zip, page));
01959 #endif /* UNIV_ZIP_DEBUG */
01960 }
01961 
01962 #ifdef UNIV_COMPILE_TEST_FUNCS
01963 
01964 /*******************************************************************/
01967 void
01968 test_page_cur_lcg_prng(
01969 /*===================*/
01970   int n)  
01971 {
01972   int     i;
01973   unsigned long long  rnd;
01974 
01975   for (i = 0; i < n; i++) {
01976     rnd = page_cur_lcg_prng();
01977     printf("%llu\t%%2=%llu %%3=%llu %%5=%llu %%7=%llu %%11=%llu\n",
01978            rnd,
01979            rnd % 2,
01980            rnd % 3,
01981            rnd % 5,
01982            rnd % 7,
01983            rnd % 11);
01984   }
01985 }
01986 
01987 #endif /* UNIV_COMPILE_TEST_FUNCS */