Drizzled Public API Documentation

page0cur.cc
1 /*****************************************************************************
2 
3 Copyright (C) 1994, 2009, Innobase Oy. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License as published by the Free Software
7 Foundation; version 2 of the License.
8 
9 This program is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12 
13 You should have received a copy of the GNU General Public License along with
14 this program; if not, write to the Free Software Foundation, Inc., 51 Franklin
15 St, Fifth Floor, Boston, MA 02110-1301 USA
16 
17 *****************************************************************************/
18 
19 /********************************************************************/
26 #include "page0cur.h"
27 #ifdef UNIV_NONINL
28 #include "page0cur.ic"
29 #endif
30 
31 #include "page0zip.h"
32 #include "mtr0log.h"
33 #include "log0recv.h"
34 #include "ut0ut.h"
35 #ifndef UNIV_HOTBACKUP
36 #include "rem0cmp.h"
37 
38 #ifdef PAGE_CUR_ADAPT
39 # ifdef UNIV_SEARCH_PERF_STAT
40 static ulint page_cur_short_succ = 0;
41 # endif /* UNIV_SEARCH_PERF_STAT */
42 
43 /*******************************************************************/
55 static
56 ib_uint64_t
57 page_cur_lcg_prng(void)
58 /*===================*/
59 {
60 #define LCG_a 1103515245
61 #define LCG_c 12345
62  static ib_uint64_t lcg_current = 0;
63  static ibool initialized = FALSE;
64 
65  if (!initialized) {
66  lcg_current = (ib_uint64_t) ut_time_us(NULL);
67  initialized = TRUE;
68  }
69 
70  /* no need to "% 2^64" explicitly because lcg_current is
71  64 bit and this will be done anyway */
72  lcg_current = LCG_a * lcg_current + LCG_c;
73 
74  return(lcg_current);
75 }
76 
77 /****************************************************************/
80 UNIV_INLINE
81 ibool
82 page_cur_try_search_shortcut(
83 /*=========================*/
84  const buf_block_t* block,
85  const dict_index_t* index,
86  const dtuple_t* tuple,
87  ulint* iup_matched_fields,
90  ulint* iup_matched_bytes,
94  ulint* ilow_matched_fields,
97  ulint* ilow_matched_bytes,
101  page_cur_t* cursor)
102 {
103  const rec_t* rec;
104  const rec_t* next_rec;
105  ulint low_match;
106  ulint low_bytes;
107  ulint up_match;
108  ulint up_bytes;
109 #ifdef UNIV_SEARCH_DEBUG
110  page_cur_t cursor2;
111 #endif
112  ibool success = FALSE;
113  const page_t* page = buf_block_get_frame(block);
114  mem_heap_t* heap = NULL;
115  ulint offsets_[REC_OFFS_NORMAL_SIZE];
116  ulint* offsets = offsets_;
117  rec_offs_init(offsets_);
118 
119  ut_ad(dtuple_check_typed(tuple));
120 
121  rec = page_header_get_ptr(page, PAGE_LAST_INSERT);
122  offsets = rec_get_offsets(rec, index, offsets,
123  dtuple_get_n_fields(tuple), &heap);
124 
125  ut_ad(rec);
127 
128  ut_pair_min(&low_match, &low_bytes,
129  *ilow_matched_fields, *ilow_matched_bytes,
130  *iup_matched_fields, *iup_matched_bytes);
131 
132  up_match = low_match;
133  up_bytes = low_bytes;
134 
135  if (page_cmp_dtuple_rec_with_match(tuple, rec, offsets,
136  &low_match, &low_bytes) < 0) {
137  goto exit_func;
138  }
139 
140  next_rec = page_rec_get_next_const(rec);
141  offsets = rec_get_offsets(next_rec, index, offsets,
142  dtuple_get_n_fields(tuple), &heap);
143 
144  if (page_cmp_dtuple_rec_with_match(tuple, next_rec, offsets,
145  &up_match, &up_bytes) >= 0) {
146  goto exit_func;
147  }
148 
149  page_cur_position(rec, block, cursor);
150 
151 #ifdef UNIV_SEARCH_DEBUG
152  page_cur_search_with_match(block, index, tuple, PAGE_CUR_DBG,
153  iup_matched_fields,
154  iup_matched_bytes,
155  ilow_matched_fields,
156  ilow_matched_bytes,
157  &cursor2);
158  ut_a(cursor2.rec == cursor->rec);
159 
160  if (!page_rec_is_supremum(next_rec)) {
161 
162  ut_a(*iup_matched_fields == up_match);
163  ut_a(*iup_matched_bytes == up_bytes);
164  }
165 
166  ut_a(*ilow_matched_fields == low_match);
167  ut_a(*ilow_matched_bytes == low_bytes);
168 #endif
169  if (!page_rec_is_supremum(next_rec)) {
170 
171  *iup_matched_fields = up_match;
172  *iup_matched_bytes = up_bytes;
173  }
174 
175  *ilow_matched_fields = low_match;
176  *ilow_matched_bytes = low_bytes;
177 
178 #ifdef UNIV_SEARCH_PERF_STAT
179  page_cur_short_succ++;
180 #endif
181  success = TRUE;
182 exit_func:
183  if (UNIV_LIKELY_NULL(heap)) {
184  mem_heap_free(heap);
185  }
186  return(success);
187 }
188 
189 #endif
190 
191 #ifdef PAGE_CUR_LE_OR_EXTENDS
192 /****************************************************************/
197 static
198 ibool
199 page_cur_rec_field_extends(
200 /*=======================*/
201  const dtuple_t* tuple,
202  const rec_t* rec,
203  const ulint* offsets,
204  ulint n)
205 {
206  const dtype_t* type;
207  const dfield_t* dfield;
208  const byte* rec_f;
209  ulint rec_f_len;
210 
211  ut_ad(rec_offs_validate(rec, NULL, offsets));
212  dfield = dtuple_get_nth_field(tuple, n);
213 
214  type = dfield_get_type(dfield);
215 
216  rec_f = rec_get_nth_field(rec, offsets, n, &rec_f_len);
217 
218  if (type->mtype == DATA_VARCHAR
219  || type->mtype == DATA_CHAR
220  || type->mtype == DATA_FIXBINARY
221  || type->mtype == DATA_BINARY
222  || type->mtype == DATA_BLOB
223  || type->mtype == DATA_VARMYSQL
224  || type->mtype == DATA_MYSQL) {
225 
226  if (dfield_get_len(dfield) != UNIV_SQL_NULL
227  && rec_f_len != UNIV_SQL_NULL
228  && rec_f_len >= dfield_get_len(dfield)
229  && !cmp_data_data_slow(type->mtype, type->prtype,
230  dfield_get_data(dfield),
231  dfield_get_len(dfield),
232  rec_f, dfield_get_len(dfield))) {
233 
234  return(TRUE);
235  }
236  }
237 
238  return(FALSE);
239 }
240 #endif /* PAGE_CUR_LE_OR_EXTENDS */
241 
242 /****************************************************************/
244 UNIV_INTERN
245 void
247 /*=======================*/
248  const buf_block_t* block,
249  const dict_index_t* index,
250  const dtuple_t* tuple,
251  ulint mode,
254  ulint* iup_matched_fields,
257  ulint* iup_matched_bytes,
261  ulint* ilow_matched_fields,
264  ulint* ilow_matched_bytes,
268  page_cur_t* cursor)
269 {
270  ulint up;
271  ulint low;
272  ulint mid;
273  const page_t* page;
274  const page_dir_slot_t* slot;
275  const rec_t* up_rec;
276  const rec_t* low_rec;
277  const rec_t* mid_rec;
278  ulint up_matched_fields;
279  ulint up_matched_bytes;
280  ulint low_matched_fields;
281  ulint low_matched_bytes;
282  ulint cur_matched_fields;
283  ulint cur_matched_bytes;
284  int cmp;
285 #ifdef UNIV_SEARCH_DEBUG
286  int dbg_cmp;
287  ulint dbg_matched_fields;
288  ulint dbg_matched_bytes;
289 #endif
290 #ifdef UNIV_ZIP_DEBUG
291  const page_zip_des_t* page_zip = buf_block_get_page_zip(block);
292 #endif /* UNIV_ZIP_DEBUG */
293  mem_heap_t* heap = NULL;
294  ulint offsets_[REC_OFFS_NORMAL_SIZE];
295  ulint* offsets = offsets_;
296  rec_offs_init(offsets_);
297 
298  ut_ad(block && tuple && iup_matched_fields && iup_matched_bytes
299  && ilow_matched_fields && ilow_matched_bytes && cursor);
300  ut_ad(dtuple_validate(tuple));
301 #ifdef UNIV_DEBUG
302 # ifdef PAGE_CUR_DBG
303  if (mode != PAGE_CUR_DBG)
304 # endif /* PAGE_CUR_DBG */
305 # ifdef PAGE_CUR_LE_OR_EXTENDS
306  if (mode != PAGE_CUR_LE_OR_EXTENDS)
307 # endif /* PAGE_CUR_LE_OR_EXTENDS */
308  ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE
309  || mode == PAGE_CUR_G || mode == PAGE_CUR_GE);
310 #endif /* UNIV_DEBUG */
311  page = buf_block_get_frame(block);
312 #ifdef UNIV_ZIP_DEBUG
313  ut_a(!page_zip || page_zip_validate(page_zip, page));
314 #endif /* UNIV_ZIP_DEBUG */
315 
316  page_check_dir(page);
317 
318 #ifdef PAGE_CUR_ADAPT
319  if (page_is_leaf(page)
320  && (mode == PAGE_CUR_LE)
321  && (page_header_get_field(page, PAGE_N_DIRECTION) > 3)
322  && (page_header_get_ptr(page, PAGE_LAST_INSERT))
323  && (page_header_get_field(page, PAGE_DIRECTION) == PAGE_RIGHT)) {
324 
325  if (page_cur_try_search_shortcut(
326  block, index, tuple,
327  iup_matched_fields, iup_matched_bytes,
328  ilow_matched_fields, ilow_matched_bytes,
329  cursor)) {
330  return;
331  }
332  }
333 # ifdef PAGE_CUR_DBG
334  if (mode == PAGE_CUR_DBG) {
335  mode = PAGE_CUR_LE;
336  }
337 # endif
338 #endif
339 
340  /* The following flag does not work for non-latin1 char sets because
341  cmp_full_field does not tell how many bytes matched */
342 #ifdef PAGE_CUR_LE_OR_EXTENDS
343  ut_a(mode != PAGE_CUR_LE_OR_EXTENDS);
344 #endif /* PAGE_CUR_LE_OR_EXTENDS */
345 
346  /* If mode PAGE_CUR_G is specified, we are trying to position the
347  cursor to answer a query of the form "tuple < X", where tuple is
348  the input parameter, and X denotes an arbitrary physical record on
349  the page. We want to position the cursor on the first X which
350  satisfies the condition. */
351 
352  up_matched_fields = *iup_matched_fields;
353  up_matched_bytes = *iup_matched_bytes;
354  low_matched_fields = *ilow_matched_fields;
355  low_matched_bytes = *ilow_matched_bytes;
356 
357  /* Perform binary search. First the search is done through the page
358  directory, after that as a linear search in the list of records
359  owned by the upper limit directory slot. */
360 
361  low = 0;
362  up = page_dir_get_n_slots(page) - 1;
363 
364  /* Perform binary search until the lower and upper limit directory
365  slots come to the distance 1 of each other */
366 
367  while (up - low > 1) {
368  mid = (low + up) / 2;
369  slot = page_dir_get_nth_slot(page, mid);
370  mid_rec = page_dir_slot_get_rec(slot);
371 
372  ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
373  low_matched_fields, low_matched_bytes,
374  up_matched_fields, up_matched_bytes);
375 
376  offsets = rec_get_offsets(mid_rec, index, offsets,
378  &heap);
379 
380  cmp = cmp_dtuple_rec_with_match(tuple, mid_rec, offsets,
381  &cur_matched_fields,
382  &cur_matched_bytes);
383  if (UNIV_LIKELY(cmp > 0)) {
384 low_slot_match:
385  low = mid;
386  low_matched_fields = cur_matched_fields;
387  low_matched_bytes = cur_matched_bytes;
388 
389  } else if (UNIV_EXPECT(cmp, -1)) {
390 #ifdef PAGE_CUR_LE_OR_EXTENDS
391  if (mode == PAGE_CUR_LE_OR_EXTENDS
392  && page_cur_rec_field_extends(
393  tuple, mid_rec, offsets,
394  cur_matched_fields)) {
395 
396  goto low_slot_match;
397  }
398 #endif /* PAGE_CUR_LE_OR_EXTENDS */
399 up_slot_match:
400  up = mid;
401  up_matched_fields = cur_matched_fields;
402  up_matched_bytes = cur_matched_bytes;
403 
404  } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
405 #ifdef PAGE_CUR_LE_OR_EXTENDS
406  || mode == PAGE_CUR_LE_OR_EXTENDS
407 #endif /* PAGE_CUR_LE_OR_EXTENDS */
408  ) {
409 
410  goto low_slot_match;
411  } else {
412 
413  goto up_slot_match;
414  }
415  }
416 
417  slot = page_dir_get_nth_slot(page, low);
418  low_rec = page_dir_slot_get_rec(slot);
419  slot = page_dir_get_nth_slot(page, up);
420  up_rec = page_dir_slot_get_rec(slot);
421 
422  /* Perform linear search until the upper and lower records come to
423  distance 1 of each other. */
424 
425  while (page_rec_get_next_const(low_rec) != up_rec) {
426 
427  mid_rec = page_rec_get_next_const(low_rec);
428 
429  ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
430  low_matched_fields, low_matched_bytes,
431  up_matched_fields, up_matched_bytes);
432 
433  offsets = rec_get_offsets(mid_rec, index, offsets,
435  &heap);
436 
437  cmp = cmp_dtuple_rec_with_match(tuple, mid_rec, offsets,
438  &cur_matched_fields,
439  &cur_matched_bytes);
440  if (UNIV_LIKELY(cmp > 0)) {
441 low_rec_match:
442  low_rec = mid_rec;
443  low_matched_fields = cur_matched_fields;
444  low_matched_bytes = cur_matched_bytes;
445 
446  } else if (UNIV_EXPECT(cmp, -1)) {
447 #ifdef PAGE_CUR_LE_OR_EXTENDS
448  if (mode == PAGE_CUR_LE_OR_EXTENDS
449  && page_cur_rec_field_extends(
450  tuple, mid_rec, offsets,
451  cur_matched_fields)) {
452 
453  goto low_rec_match;
454  }
455 #endif /* PAGE_CUR_LE_OR_EXTENDS */
456 up_rec_match:
457  up_rec = mid_rec;
458  up_matched_fields = cur_matched_fields;
459  up_matched_bytes = cur_matched_bytes;
460  } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
461 #ifdef PAGE_CUR_LE_OR_EXTENDS
462  || mode == PAGE_CUR_LE_OR_EXTENDS
463 #endif /* PAGE_CUR_LE_OR_EXTENDS */
464  ) {
465 
466  goto low_rec_match;
467  } else {
468 
469  goto up_rec_match;
470  }
471  }
472 
473 #ifdef UNIV_SEARCH_DEBUG
474 
475  /* Check that the lower and upper limit records have the
476  right alphabetical order compared to tuple. */
477  dbg_matched_fields = 0;
478  dbg_matched_bytes = 0;
479 
480  offsets = rec_get_offsets(low_rec, index, offsets,
481  ULINT_UNDEFINED, &heap);
482  dbg_cmp = page_cmp_dtuple_rec_with_match(tuple, low_rec, offsets,
483  &dbg_matched_fields,
484  &dbg_matched_bytes);
485  if (mode == PAGE_CUR_G) {
486  ut_a(dbg_cmp >= 0);
487  } else if (mode == PAGE_CUR_GE) {
488  ut_a(dbg_cmp == 1);
489  } else if (mode == PAGE_CUR_L) {
490  ut_a(dbg_cmp == 1);
491  } else if (mode == PAGE_CUR_LE) {
492  ut_a(dbg_cmp >= 0);
493  }
494 
495  if (!page_rec_is_infimum(low_rec)) {
496 
497  ut_a(low_matched_fields == dbg_matched_fields);
498  ut_a(low_matched_bytes == dbg_matched_bytes);
499  }
500 
501  dbg_matched_fields = 0;
502  dbg_matched_bytes = 0;
503 
504  offsets = rec_get_offsets(up_rec, index, offsets,
505  ULINT_UNDEFINED, &heap);
506  dbg_cmp = page_cmp_dtuple_rec_with_match(tuple, up_rec, offsets,
507  &dbg_matched_fields,
508  &dbg_matched_bytes);
509  if (mode == PAGE_CUR_G) {
510  ut_a(dbg_cmp == -1);
511  } else if (mode == PAGE_CUR_GE) {
512  ut_a(dbg_cmp <= 0);
513  } else if (mode == PAGE_CUR_L) {
514  ut_a(dbg_cmp <= 0);
515  } else if (mode == PAGE_CUR_LE) {
516  ut_a(dbg_cmp == -1);
517  }
518 
519  if (!page_rec_is_supremum(up_rec)) {
520 
521  ut_a(up_matched_fields == dbg_matched_fields);
522  ut_a(up_matched_bytes == dbg_matched_bytes);
523  }
524 #endif
525  if (mode <= PAGE_CUR_GE) {
526  page_cur_position(up_rec, block, cursor);
527  } else {
528  page_cur_position(low_rec, block, cursor);
529  }
530 
531  *iup_matched_fields = up_matched_fields;
532  *iup_matched_bytes = up_matched_bytes;
533  *ilow_matched_fields = low_matched_fields;
534  *ilow_matched_bytes = low_matched_bytes;
535  if (UNIV_LIKELY_NULL(heap)) {
536  mem_heap_free(heap);
537  }
538 }
539 
540 /***********************************************************/
543 UNIV_INTERN
544 void
546 /*==========================*/
547  buf_block_t* block,
548  page_cur_t* cursor)
549 {
550  ulint rnd;
551  ulint n_recs = page_get_n_recs(buf_block_get_frame(block));
552 
553  page_cur_set_before_first(block, cursor);
554 
555  if (UNIV_UNLIKELY(n_recs == 0)) {
556 
557  return;
558  }
559 
560  rnd = (ulint) (page_cur_lcg_prng() % n_recs);
561 
562  do {
563  page_cur_move_to_next(cursor);
564  } while (rnd--);
565 }
566 
567 /***********************************************************/
569 static
570 void
571 page_cur_insert_rec_write_log(
572 /*==========================*/
573  rec_t* insert_rec,
574  ulint rec_size,
575  rec_t* cursor_rec,
577  dict_index_t* index,
578  mtr_t* mtr)
579 {
580  ulint cur_rec_size;
581  ulint extra_size;
582  ulint cur_extra_size;
583  const byte* ins_ptr;
584  byte* log_ptr;
585  const byte* log_end;
586  ulint i;
587 
588  ut_a(rec_size < UNIV_PAGE_SIZE);
589  ut_ad(page_align(insert_rec) == page_align(cursor_rec));
590  ut_ad(!page_rec_is_comp(insert_rec)
591  == !dict_table_is_comp(index->table));
592 
593  {
594  mem_heap_t* heap = NULL;
595  ulint cur_offs_[REC_OFFS_NORMAL_SIZE];
596  ulint ins_offs_[REC_OFFS_NORMAL_SIZE];
597 
598  ulint* cur_offs;
599  ulint* ins_offs;
600 
601  rec_offs_init(cur_offs_);
602  rec_offs_init(ins_offs_);
603 
604  cur_offs = rec_get_offsets(cursor_rec, index, cur_offs_,
605  ULINT_UNDEFINED, &heap);
606  ins_offs = rec_get_offsets(insert_rec, index, ins_offs_,
607  ULINT_UNDEFINED, &heap);
608 
609  extra_size = rec_offs_extra_size(ins_offs);
610  cur_extra_size = rec_offs_extra_size(cur_offs);
611  ut_ad(rec_size == rec_offs_size(ins_offs));
612  cur_rec_size = rec_offs_size(cur_offs);
613 
614  if (UNIV_LIKELY_NULL(heap)) {
615  mem_heap_free(heap);
616  }
617  }
618 
619  ins_ptr = insert_rec - extra_size;
620 
621  i = 0;
622 
623  if (cur_extra_size == extra_size) {
624  ulint min_rec_size = ut_min(cur_rec_size, rec_size);
625 
626  const byte* cur_ptr = cursor_rec - cur_extra_size;
627 
628  /* Find out the first byte in insert_rec which differs from
629  cursor_rec; skip the bytes in the record info */
630 
631  do {
632  if (*ins_ptr == *cur_ptr) {
633  i++;
634  ins_ptr++;
635  cur_ptr++;
636  } else if ((i < extra_size)
637  && (i >= extra_size
639  (insert_rec))) {
640  i = extra_size;
641  ins_ptr = insert_rec;
642  cur_ptr = cursor_rec;
643  } else {
644  break;
645  }
646  } while (i < min_rec_size);
647  }
648 
649  if (mtr_get_log_mode(mtr) != MTR_LOG_SHORT_INSERTS) {
650 
651  if (page_rec_is_comp(insert_rec)) {
652  log_ptr = mlog_open_and_write_index(
653  mtr, insert_rec, index, MLOG_COMP_REC_INSERT,
654  2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN);
655  if (UNIV_UNLIKELY(!log_ptr)) {
656  /* Logging in mtr is switched off
657  during crash recovery: in that case
658  mlog_open returns NULL */
659  return;
660  }
661  } else {
662  log_ptr = mlog_open(mtr, 11
663  + 2 + 5 + 1 + 5 + 5
664  + MLOG_BUF_MARGIN);
665  if (UNIV_UNLIKELY(!log_ptr)) {
666  /* Logging in mtr is switched off
667  during crash recovery: in that case
668  mlog_open returns NULL */
669  return;
670  }
671 
673  insert_rec, MLOG_REC_INSERT, log_ptr, mtr);
674  }
675 
676  log_end = &log_ptr[2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
677  /* Write the cursor rec offset as a 2-byte ulint */
678  mach_write_to_2(log_ptr, page_offset(cursor_rec));
679  log_ptr += 2;
680  } else {
681  log_ptr = mlog_open(mtr, 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN);
682  if (!log_ptr) {
683  /* Logging in mtr is switched off during crash
684  recovery: in that case mlog_open returns NULL */
685  return;
686  }
687  log_end = &log_ptr[5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
688  }
689 
690  if (page_rec_is_comp(insert_rec)) {
691  if (UNIV_UNLIKELY
692  (rec_get_info_and_status_bits(insert_rec, TRUE)
693  != rec_get_info_and_status_bits(cursor_rec, TRUE))) {
694 
695  goto need_extra_info;
696  }
697  } else {
698  if (UNIV_UNLIKELY
699  (rec_get_info_and_status_bits(insert_rec, FALSE)
700  != rec_get_info_and_status_bits(cursor_rec, FALSE))) {
701 
702  goto need_extra_info;
703  }
704  }
705 
706  if (extra_size != cur_extra_size || rec_size != cur_rec_size) {
707 need_extra_info:
708  /* Write the record end segment length
709  and the extra info storage flag */
710  log_ptr += mach_write_compressed(log_ptr,
711  2 * (rec_size - i) + 1);
712 
713  /* Write the info bits */
714  mach_write_to_1(log_ptr,
716  insert_rec,
717  page_rec_is_comp(insert_rec)));
718  log_ptr++;
719 
720  /* Write the record origin offset */
721  log_ptr += mach_write_compressed(log_ptr, extra_size);
722 
723  /* Write the mismatch index */
724  log_ptr += mach_write_compressed(log_ptr, i);
725 
726  ut_a(i < UNIV_PAGE_SIZE);
727  ut_a(extra_size < UNIV_PAGE_SIZE);
728  } else {
729  /* Write the record end segment length
730  and the extra info storage flag */
731  log_ptr += mach_write_compressed(log_ptr, 2 * (rec_size - i));
732  }
733 
734  /* Write to the log the inserted index record end segment which
735  differs from the cursor record */
736 
737  rec_size -= i;
738 
739  if (log_ptr + rec_size <= log_end) {
740  memcpy(log_ptr, ins_ptr, rec_size);
741  mlog_close(mtr, log_ptr + rec_size);
742  } else {
743  mlog_close(mtr, log_ptr);
744  ut_a(rec_size < UNIV_PAGE_SIZE);
745  mlog_catenate_string(mtr, ins_ptr, rec_size);
746  }
747 }
748 #else /* !UNIV_HOTBACKUP */
749 # define page_cur_insert_rec_write_log(ins_rec,size,cur,index,mtr) ((void) 0)
750 #endif /* !UNIV_HOTBACKUP */
751 
752 /***********************************************************/
755 UNIV_INTERN
756 byte*
758 /*======================*/
759  ibool is_short,
760  byte* ptr,
761  byte* end_ptr,
762  buf_block_t* block,
763  dict_index_t* index,
764  mtr_t* mtr)
765 {
766  ulint origin_offset;
767  ulint end_seg_len;
768  ulint mismatch_index;
769  page_t* page;
770  rec_t* cursor_rec;
771  byte buf1[1024];
772  byte* buf;
773  byte* ptr2 = ptr;
774  ulint info_and_status_bits = 0; /* remove warning */
775  page_cur_t cursor;
776  mem_heap_t* heap = NULL;
777  ulint offsets_[REC_OFFS_NORMAL_SIZE];
778  ulint* offsets = offsets_;
779  rec_offs_init(offsets_);
780 
781  page = block ? buf_block_get_frame(block) : NULL;
782 
783  if (is_short) {
784  cursor_rec = page_rec_get_prev(page_get_supremum_rec(page));
785  } else {
786  ulint offset;
787 
788  /* Read the cursor rec offset as a 2-byte ulint */
789 
790  if (UNIV_UNLIKELY(end_ptr < ptr + 2)) {
791 
792  return(NULL);
793  }
794 
795  offset = mach_read_from_2(ptr);
796  ptr += 2;
797 
798  cursor_rec = page + offset;
799 
800  if (UNIV_UNLIKELY(offset >= UNIV_PAGE_SIZE)) {
801 
802  recv_sys->found_corrupt_log = TRUE;
803 
804  return(NULL);
805  }
806  }
807 
808  ptr = mach_parse_compressed(ptr, end_ptr, &end_seg_len);
809 
810  if (ptr == NULL) {
811 
812  return(NULL);
813  }
814 
815  if (UNIV_UNLIKELY(end_seg_len >= UNIV_PAGE_SIZE << 1)) {
816  recv_sys->found_corrupt_log = TRUE;
817 
818  return(NULL);
819  }
820 
821  if (end_seg_len & 0x1UL) {
822  /* Read the info bits */
823 
824  if (end_ptr < ptr + 1) {
825 
826  return(NULL);
827  }
828 
829  info_and_status_bits = mach_read_from_1(ptr);
830  ptr++;
831 
832  ptr = mach_parse_compressed(ptr, end_ptr, &origin_offset);
833 
834  if (ptr == NULL) {
835 
836  return(NULL);
837  }
838 
839  ut_a(origin_offset < UNIV_PAGE_SIZE);
840 
841  ptr = mach_parse_compressed(ptr, end_ptr, &mismatch_index);
842 
843  if (ptr == NULL) {
844 
845  return(NULL);
846  }
847 
848  ut_a(mismatch_index < UNIV_PAGE_SIZE);
849  }
850 
851  if (UNIV_UNLIKELY(end_ptr < ptr + (end_seg_len >> 1))) {
852 
853  return(NULL);
854  }
855 
856  if (!block) {
857 
858  return(ptr + (end_seg_len >> 1));
859  }
860 
861  ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
862  ut_ad(!buf_block_get_page_zip(block) || page_is_comp(page));
863 
864  /* Read from the log the inserted index record end segment which
865  differs from the cursor record */
866 
867  offsets = rec_get_offsets(cursor_rec, index, offsets,
868  ULINT_UNDEFINED, &heap);
869 
870  if (!(end_seg_len & 0x1UL)) {
871  info_and_status_bits = rec_get_info_and_status_bits(
872  cursor_rec, page_is_comp(page));
873  origin_offset = rec_offs_extra_size(offsets);
874  mismatch_index = rec_offs_size(offsets) - (end_seg_len >> 1);
875  }
876 
877  end_seg_len >>= 1;
878 
879  if (mismatch_index + end_seg_len < sizeof buf1) {
880  buf = buf1;
881  } else {
882  buf = static_cast<byte *>(mem_alloc(mismatch_index + end_seg_len));
883  }
884 
885  /* Build the inserted record to buf */
886 
887  if (UNIV_UNLIKELY(mismatch_index >= UNIV_PAGE_SIZE)) {
888  fprintf(stderr,
889  "Is short %lu, info_and_status_bits %lu, offset %lu, "
890  "o_offset %lu\n"
891  "mismatch index %lu, end_seg_len %lu\n"
892  "parsed len %lu\n",
893  (ulong) is_short, (ulong) info_and_status_bits,
894  (ulong) page_offset(cursor_rec),
895  (ulong) origin_offset,
896  (ulong) mismatch_index, (ulong) end_seg_len,
897  (ulong) (ptr - ptr2));
898 
899  fputs("Dump of 300 bytes of log:\n", stderr);
900  ut_print_buf(stderr, ptr2, 300);
901  putc('\n', stderr);
902 
903  buf_page_print(page, 0);
904 
905  ut_error;
906  }
907 
908  ut_memcpy(buf, rec_get_start(cursor_rec, offsets), mismatch_index);
909  ut_memcpy(buf + mismatch_index, ptr, end_seg_len);
910 
911  if (page_is_comp(page)) {
912  rec_set_info_and_status_bits(buf + origin_offset,
913  info_and_status_bits);
914  } else {
915  rec_set_info_bits_old(buf + origin_offset,
916  info_and_status_bits);
917  }
918 
919  page_cur_position(cursor_rec, block, &cursor);
920 
921  offsets = rec_get_offsets(buf + origin_offset, index, offsets,
922  ULINT_UNDEFINED, &heap);
923  if (UNIV_UNLIKELY(!page_cur_rec_insert(&cursor,
924  buf + origin_offset,
925  index, offsets, mtr))) {
926  /* The redo log record should only have been written
927  after the write was successful. */
928  ut_error;
929  }
930 
931  if (buf != buf1) {
932 
933  mem_free(buf);
934  }
935 
936  if (UNIV_LIKELY_NULL(heap)) {
937  mem_heap_free(heap);
938  }
939 
940  return(ptr + end_seg_len);
941 }
942 
943 /***********************************************************/
948 UNIV_INTERN
949 rec_t*
951 /*====================*/
952  rec_t* current_rec,
954  dict_index_t* index,
955  const rec_t* rec,
956  ulint* offsets,
957  mtr_t* mtr)
958 {
959  byte* insert_buf;
960  ulint rec_size;
961  page_t* page;
962  rec_t* last_insert;
964  rec_t* free_rec;
966  rec_t* insert_rec;
967  ulint heap_no;
970  ut_ad(rec_offs_validate(rec, index, offsets));
971 
972  page = page_align(current_rec);
974  == (ibool) !!page_is_comp(page));
975 
976  ut_ad(!page_rec_is_supremum(current_rec));
977 
978  /* 1. Get the size of the physical record in the page */
979  rec_size = rec_offs_size(offsets);
980 
981 #ifdef UNIV_DEBUG_VALGRIND
982  {
983  const void* rec_start
984  = rec - rec_offs_extra_size(offsets);
985  ulint extra_size
986  = rec_offs_extra_size(offsets)
987  - (rec_offs_comp(offsets)
988  ? REC_N_NEW_EXTRA_BYTES
989  : REC_N_OLD_EXTRA_BYTES);
990 
991  /* All data bytes of the record must be valid. */
992  UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
993  /* The variable-length header must be valid. */
994  UNIV_MEM_ASSERT_RW(rec_start, extra_size);
995  }
996 #endif /* UNIV_DEBUG_VALGRIND */
997 
998  /* 2. Try to find suitable space from page memory management */
999 
1000  free_rec = page_header_get_ptr(page, PAGE_FREE);
1001  if (UNIV_LIKELY_NULL(free_rec)) {
1002  /* Try to allocate from the head of the free list. */
1003  ulint foffsets_[REC_OFFS_NORMAL_SIZE];
1004  ulint* foffsets = foffsets_;
1005  mem_heap_t* heap = NULL;
1006 
1007  rec_offs_init(foffsets_);
1008 
1009  foffsets = rec_get_offsets(free_rec, index, foffsets,
1010  ULINT_UNDEFINED, &heap);
1011  if (rec_offs_size(foffsets) < rec_size) {
1012  if (UNIV_LIKELY_NULL(heap)) {
1013  mem_heap_free(heap);
1014  }
1015 
1016  goto use_heap;
1017  }
1018 
1019  insert_buf = free_rec - rec_offs_extra_size(foffsets);
1020 
1021  if (page_is_comp(page)) {
1022  heap_no = rec_get_heap_no_new(free_rec);
1023  page_mem_alloc_free(page, NULL,
1024  rec_get_next_ptr(free_rec, TRUE),
1025  rec_size);
1026  } else {
1027  heap_no = rec_get_heap_no_old(free_rec);
1028  page_mem_alloc_free(page, NULL,
1029  rec_get_next_ptr(free_rec, FALSE),
1030  rec_size);
1031  }
1032 
1033  if (UNIV_LIKELY_NULL(heap)) {
1034  mem_heap_free(heap);
1035  }
1036  } else {
1037 use_heap:
1038  free_rec = NULL;
1039  insert_buf = page_mem_alloc_heap(page, NULL,
1040  rec_size, &heap_no);
1041 
1042  if (UNIV_UNLIKELY(insert_buf == NULL)) {
1043  return(NULL);
1044  }
1045  }
1046 
1047  /* 3. Create the record */
1048  insert_rec = rec_copy(insert_buf, rec, offsets);
1049  rec_offs_make_valid(insert_rec, index, offsets);
1050 
1051  /* 4. Insert the record in the linked list of records */
1052  ut_ad(current_rec != insert_rec);
1053 
1054  {
1055  /* next record after current before the insertion */
1056  rec_t* next_rec = page_rec_get_next(current_rec);
1057 #ifdef UNIV_DEBUG
1058  if (page_is_comp(page)) {
1059  ut_ad(rec_get_status(current_rec)
1060  <= REC_STATUS_INFIMUM);
1061  ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM);
1062  ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
1063  }
1064 #endif
1065  page_rec_set_next(insert_rec, next_rec);
1066  page_rec_set_next(current_rec, insert_rec);
1067  }
1068 
1069  page_header_set_field(page, NULL, PAGE_N_RECS,
1070  1 + page_get_n_recs(page));
1071 
1072  /* 5. Set the n_owned field in the inserted record to zero,
1073  and set the heap_no field */
1074  if (page_is_comp(page)) {
1075  rec_set_n_owned_new(insert_rec, NULL, 0);
1076  rec_set_heap_no_new(insert_rec, heap_no);
1077  } else {
1078  rec_set_n_owned_old(insert_rec, 0);
1079  rec_set_heap_no_old(insert_rec, heap_no);
1080  }
1081 
1082  UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets),
1083  rec_offs_size(offsets));
1084  /* 6. Update the last insertion info in page header */
1085 
1086  last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
1087  ut_ad(!last_insert || !page_is_comp(page)
1088  || rec_get_node_ptr_flag(last_insert)
1089  == rec_get_node_ptr_flag(insert_rec));
1090 
1091  if (UNIV_UNLIKELY(last_insert == NULL)) {
1092  page_header_set_field(page, NULL, PAGE_DIRECTION,
1093  PAGE_NO_DIRECTION);
1094  page_header_set_field(page, NULL, PAGE_N_DIRECTION, 0);
1095 
1096  } else if ((last_insert == current_rec)
1097  && (page_header_get_field(page, PAGE_DIRECTION)
1098  != PAGE_LEFT)) {
1099 
1100  page_header_set_field(page, NULL, PAGE_DIRECTION,
1101  PAGE_RIGHT);
1102  page_header_set_field(page, NULL, PAGE_N_DIRECTION,
1104  page, PAGE_N_DIRECTION) + 1);
1105 
1106  } else if ((page_rec_get_next(insert_rec) == last_insert)
1107  && (page_header_get_field(page, PAGE_DIRECTION)
1108  != PAGE_RIGHT)) {
1109 
1110  page_header_set_field(page, NULL, PAGE_DIRECTION,
1111  PAGE_LEFT);
1112  page_header_set_field(page, NULL, PAGE_N_DIRECTION,
1114  page, PAGE_N_DIRECTION) + 1);
1115  } else {
1116  page_header_set_field(page, NULL, PAGE_DIRECTION,
1117  PAGE_NO_DIRECTION);
1118  page_header_set_field(page, NULL, PAGE_N_DIRECTION, 0);
1119  }
1120 
1121  page_header_set_ptr(page, NULL, PAGE_LAST_INSERT, insert_rec);
1122 
1123  /* 7. It remains to update the owner record. */
1124  {
1125  rec_t* owner_rec = page_rec_find_owner_rec(insert_rec);
1126  ulint n_owned;
1127  if (page_is_comp(page)) {
1128  n_owned = rec_get_n_owned_new(owner_rec);
1129  rec_set_n_owned_new(owner_rec, NULL, n_owned + 1);
1130  } else {
1131  n_owned = rec_get_n_owned_old(owner_rec);
1132  rec_set_n_owned_old(owner_rec, n_owned + 1);
1133  }
1134 
1135  /* 8. Now we have incremented the n_owned field of the owner
1136  record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
1137  we have to split the corresponding directory slot in two. */
1138 
1139  if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
1141  page, NULL,
1142  page_dir_find_owner_slot(owner_rec));
1143  }
1144  }
1145 
1146  /* 9. Write log record of the insert */
1147  if (UNIV_LIKELY(mtr != NULL)) {
1148  page_cur_insert_rec_write_log(insert_rec, rec_size,
1149  current_rec, index, mtr);
1150  }
1151 
1152  return(insert_rec);
1153 }
1154 
1155 /***********************************************************/
1158 static
1159 rec_t*
1160 page_cur_insert_rec_zip_reorg(
1161 /*==========================*/
1162  rec_t** current_rec,
1164  buf_block_t* block,
1165  dict_index_t* index,
1166  rec_t* rec,
1167  page_t* page,
1168  page_zip_des_t* page_zip,
1169  mtr_t* mtr)
1170 {
1171  ulint pos;
1172 
1173  /* Recompress or reorganize and recompress the page. */
1174  if (UNIV_LIKELY(page_zip_compress(page_zip, page, index, mtr))) {
1175  return(rec);
1176  }
1177 
1178  /* Before trying to reorganize the page,
1179  store the number of preceding records on the page. */
1180  pos = page_rec_get_n_recs_before(rec);
1181 
1182  if (page_zip_reorganize(block, index, mtr)) {
1183  /* The page was reorganized: Find rec by seeking to pos,
1184  and update *current_rec. */
1185  rec = page + PAGE_NEW_INFIMUM;
1186 
1187  while (--pos) {
1188  rec = page + rec_get_next_offs(rec, TRUE);
1189  }
1190 
1191  *current_rec = rec;
1192  rec = page + rec_get_next_offs(rec, TRUE);
1193 
1194  return(rec);
1195  }
1196 
1197  /* Out of space: restore the page */
1198  if (!page_zip_decompress(page_zip, page, FALSE)) {
1199  ut_error; /* Memory corrupted? */
1200  }
1201  ut_ad(page_validate(page, index));
1202  return(NULL);
1203 }
1204 
1205 /***********************************************************/
1211 UNIV_INTERN
1212 rec_t*
1214 /*====================*/
1215  rec_t** current_rec,
1217  buf_block_t* block,
1218  dict_index_t* index,
1219  const rec_t* rec,
1220  ulint* offsets,
1221  mtr_t* mtr)
1222 {
1223  byte* insert_buf;
1224  ulint rec_size;
1225  page_t* page;
1226  rec_t* last_insert;
1228  rec_t* free_rec;
1230  rec_t* insert_rec;
1231  ulint heap_no;
1233  page_zip_des_t* page_zip;
1234 
1235  page_zip = buf_block_get_page_zip(block);
1236  ut_ad(page_zip);
1237 
1238  ut_ad(rec_offs_validate(rec, index, offsets));
1239 
1240  page = page_align(*current_rec);
1241  ut_ad(dict_table_is_comp(index->table));
1242  ut_ad(page_is_comp(page));
1243 
1244  ut_ad(!page_rec_is_supremum(*current_rec));
1245 #ifdef UNIV_ZIP_DEBUG
1246  ut_a(page_zip_validate(page_zip, page));
1247 #endif /* UNIV_ZIP_DEBUG */
1248 
1249  /* 1. Get the size of the physical record in the page */
1250  rec_size = rec_offs_size(offsets);
1251 
1252 #ifdef UNIV_DEBUG_VALGRIND
1253  {
1254  const void* rec_start
1255  = rec - rec_offs_extra_size(offsets);
1256  ulint extra_size
1257  = rec_offs_extra_size(offsets)
1258  - (rec_offs_comp(offsets)
1259  ? REC_N_NEW_EXTRA_BYTES
1260  : REC_N_OLD_EXTRA_BYTES);
1261 
1262  /* All data bytes of the record must be valid. */
1263  UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
1264  /* The variable-length header must be valid. */
1265  UNIV_MEM_ASSERT_RW(rec_start, extra_size);
1266  }
1267 #endif /* UNIV_DEBUG_VALGRIND */
1268 
1269  /* 2. Try to find suitable space from page memory management */
1270  if (!page_zip_available(page_zip, dict_index_is_clust(index),
1271  rec_size, 1)) {
1272 
1273  /* Try compressing the whole page afterwards. */
1274  insert_rec = page_cur_insert_rec_low(*current_rec,
1275  index, rec, offsets,
1276  NULL);
1277 
1278  if (UNIV_LIKELY(insert_rec != NULL)) {
1279  insert_rec = page_cur_insert_rec_zip_reorg(
1280  current_rec, block, index, insert_rec,
1281  page, page_zip, mtr);
1282  }
1283 
1284  return(insert_rec);
1285  }
1286 
1287  free_rec = page_header_get_ptr(page, PAGE_FREE);
1288  if (UNIV_LIKELY_NULL(free_rec)) {
1289  /* Try to allocate from the head of the free list. */
1290  lint extra_size_diff;
1291  ulint foffsets_[REC_OFFS_NORMAL_SIZE];
1292  ulint* foffsets = foffsets_;
1293  mem_heap_t* heap = NULL;
1294 
1295  rec_offs_init(foffsets_);
1296 
1297  foffsets = rec_get_offsets(free_rec, index, foffsets,
1298  ULINT_UNDEFINED, &heap);
1299  if (rec_offs_size(foffsets) < rec_size) {
1300 too_small:
1301  if (UNIV_LIKELY_NULL(heap)) {
1302  mem_heap_free(heap);
1303  }
1304 
1305  goto use_heap;
1306  }
1307 
1308  insert_buf = free_rec - rec_offs_extra_size(foffsets);
1309 
1310  /* On compressed pages, do not relocate records from
1311  the free list. If extra_size would grow, use the heap. */
1312  extra_size_diff
1313  = rec_offs_extra_size(offsets)
1314  - rec_offs_extra_size(foffsets);
1315 
1316  if (UNIV_UNLIKELY(extra_size_diff < 0)) {
1317  /* Add an offset to the extra_size. */
1318  if (rec_offs_size(foffsets)
1319  < rec_size - extra_size_diff) {
1320 
1321  goto too_small;
1322  }
1323 
1324  insert_buf -= extra_size_diff;
1325  } else if (UNIV_UNLIKELY(extra_size_diff)) {
1326  /* Do not allow extra_size to grow */
1327 
1328  goto too_small;
1329  }
1330 
1331  heap_no = rec_get_heap_no_new(free_rec);
1332  page_mem_alloc_free(page, page_zip,
1333  rec_get_next_ptr(free_rec, TRUE),
1334  rec_size);
1335 
1336  if (!page_is_leaf(page)) {
1337  /* Zero out the node pointer of free_rec,
1338  in case it will not be overwritten by
1339  insert_rec. */
1340 
1341  ut_ad(rec_size > REC_NODE_PTR_SIZE);
1342 
1343  if (rec_offs_extra_size(foffsets)
1344  + rec_offs_data_size(foffsets) > rec_size) {
1345 
1346  memset(rec_get_end(free_rec, foffsets)
1347  - REC_NODE_PTR_SIZE, 0,
1348  REC_NODE_PTR_SIZE);
1349  }
1350  } else if (dict_index_is_clust(index)) {
1351  /* Zero out the DB_TRX_ID and DB_ROLL_PTR
1352  columns of free_rec, in case it will not be
1353  overwritten by insert_rec. */
1354 
1355  ulint trx_id_col;
1356  ulint trx_id_offs;
1357  ulint len;
1358 
1359  trx_id_col = dict_index_get_sys_col_pos(index,
1360  DATA_TRX_ID);
1361  ut_ad(trx_id_col > 0);
1362  ut_ad(trx_id_col != ULINT_UNDEFINED);
1363 
1364  trx_id_offs = rec_get_nth_field_offs(foffsets,
1365  trx_id_col, &len);
1366  ut_ad(len == DATA_TRX_ID_LEN);
1367 
1368  if (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN + trx_id_offs
1369  + rec_offs_extra_size(foffsets) > rec_size) {
1370  /* We will have to zero out the
1371  DB_TRX_ID and DB_ROLL_PTR, because
1372  they will not be fully overwritten by
1373  insert_rec. */
1374 
1375  memset(free_rec + trx_id_offs, 0,
1376  DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
1377  }
1378 
1379  ut_ad(free_rec + trx_id_offs + DATA_TRX_ID_LEN
1380  == rec_get_nth_field(free_rec, foffsets,
1381  trx_id_col + 1, &len));
1382  ut_ad(len == DATA_ROLL_PTR_LEN);
1383  }
1384 
1385  if (UNIV_LIKELY_NULL(heap)) {
1386  mem_heap_free(heap);
1387  }
1388  } else {
1389 use_heap:
1390  free_rec = NULL;
1391  insert_buf = page_mem_alloc_heap(page, page_zip,
1392  rec_size, &heap_no);
1393 
1394  if (UNIV_UNLIKELY(insert_buf == NULL)) {
1395  return(NULL);
1396  }
1397 
1398  page_zip_dir_add_slot(page_zip, dict_index_is_clust(index));
1399  }
1400 
1401  /* 3. Create the record */
1402  insert_rec = rec_copy(insert_buf, rec, offsets);
1403  rec_offs_make_valid(insert_rec, index, offsets);
1404 
1405  /* 4. Insert the record in the linked list of records */
1406  ut_ad(*current_rec != insert_rec);
1407 
1408  {
1409  /* next record after current before the insertion */
1410  rec_t* next_rec = page_rec_get_next(*current_rec);
1411  ut_ad(rec_get_status(*current_rec)
1412  <= REC_STATUS_INFIMUM);
1413  ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM);
1414  ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
1415 
1416  page_rec_set_next(insert_rec, next_rec);
1417  page_rec_set_next(*current_rec, insert_rec);
1418  }
1419 
1420  page_header_set_field(page, page_zip, PAGE_N_RECS,
1421  1 + page_get_n_recs(page));
1422 
1423  /* 5. Set the n_owned field in the inserted record to zero,
1424  and set the heap_no field */
1425  rec_set_n_owned_new(insert_rec, NULL, 0);
1426  rec_set_heap_no_new(insert_rec, heap_no);
1427 
1428  UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets),
1429  rec_offs_size(offsets));
1430 
1431  page_zip_dir_insert(page_zip, *current_rec, free_rec, insert_rec);
1432 
1433  /* 6. Update the last insertion info in page header */
1434 
1435  last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
1436  ut_ad(!last_insert
1437  || rec_get_node_ptr_flag(last_insert)
1438  == rec_get_node_ptr_flag(insert_rec));
1439 
1440  if (UNIV_UNLIKELY(last_insert == NULL)) {
1441  page_header_set_field(page, page_zip, PAGE_DIRECTION,
1442  PAGE_NO_DIRECTION);
1443  page_header_set_field(page, page_zip, PAGE_N_DIRECTION, 0);
1444 
1445  } else if ((last_insert == *current_rec)
1446  && (page_header_get_field(page, PAGE_DIRECTION)
1447  != PAGE_LEFT)) {
1448 
1449  page_header_set_field(page, page_zip, PAGE_DIRECTION,
1450  PAGE_RIGHT);
1451  page_header_set_field(page, page_zip, PAGE_N_DIRECTION,
1453  page, PAGE_N_DIRECTION) + 1);
1454 
1455  } else if ((page_rec_get_next(insert_rec) == last_insert)
1456  && (page_header_get_field(page, PAGE_DIRECTION)
1457  != PAGE_RIGHT)) {
1458 
1459  page_header_set_field(page, page_zip, PAGE_DIRECTION,
1460  PAGE_LEFT);
1461  page_header_set_field(page, page_zip, PAGE_N_DIRECTION,
1463  page, PAGE_N_DIRECTION) + 1);
1464  } else {
1465  page_header_set_field(page, page_zip, PAGE_DIRECTION,
1466  PAGE_NO_DIRECTION);
1467  page_header_set_field(page, page_zip, PAGE_N_DIRECTION, 0);
1468  }
1469 
1470  page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, insert_rec);
1471 
1472  /* 7. It remains to update the owner record. */
1473  {
1474  rec_t* owner_rec = page_rec_find_owner_rec(insert_rec);
1475  ulint n_owned;
1476 
1477  n_owned = rec_get_n_owned_new(owner_rec);
1478  rec_set_n_owned_new(owner_rec, page_zip, n_owned + 1);
1479 
1480  /* 8. Now we have incremented the n_owned field of the owner
1481  record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
1482  we have to split the corresponding directory slot in two. */
1483 
1484  if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
1486  page, page_zip,
1487  page_dir_find_owner_slot(owner_rec));
1488  }
1489  }
1490 
1491  page_zip_write_rec(page_zip, insert_rec, index, offsets, 1);
1492 
1493  /* 9. Write log record of the insert */
1494  if (UNIV_LIKELY(mtr != NULL)) {
1495  page_cur_insert_rec_write_log(insert_rec, rec_size,
1496  *current_rec, index, mtr);
1497  }
1498 
1499  return(insert_rec);
1500 }
1501 
1502 #ifndef UNIV_HOTBACKUP
1503 /**********************************************************/
1507 UNIV_INLINE
1508 byte*
1509 page_copy_rec_list_to_created_page_write_log(
1510 /*=========================================*/
1511  page_t* page,
1512  dict_index_t* index,
1513  mtr_t* mtr)
1514 {
1515  byte* log_ptr;
1516 
1517  ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
1518 
1519  log_ptr = mlog_open_and_write_index(mtr, page, index,
1520  page_is_comp(page)
1523  if (UNIV_LIKELY(log_ptr != NULL)) {
1524  mlog_close(mtr, log_ptr + 4);
1525  }
1526 
1527  return(log_ptr);
1528 }
1529 #endif /* !UNIV_HOTBACKUP */
1530 
1531 /**********************************************************/
1534 UNIV_INTERN
1535 byte*
1537 /*=====================================*/
1538  byte* ptr,
1539  byte* end_ptr,
1540  buf_block_t* block,
1541  dict_index_t* index,
1542  mtr_t* mtr)
1543 {
1544  byte* rec_end;
1545  ulint log_data_len;
1546  page_t* page;
1547  page_zip_des_t* page_zip;
1548 
1549  if (ptr + 4 > end_ptr) {
1550 
1551  return(NULL);
1552  }
1553 
1554  log_data_len = mach_read_from_4(ptr);
1555  ptr += 4;
1556 
1557  rec_end = ptr + log_data_len;
1558 
1559  if (rec_end > end_ptr) {
1560 
1561  return(NULL);
1562  }
1563 
1564  if (!block) {
1565 
1566  return(rec_end);
1567  }
1568 
1569  while (ptr < rec_end) {
1570  ptr = page_cur_parse_insert_rec(TRUE, ptr, end_ptr,
1571  block, index, mtr);
1572  }
1573 
1574  ut_a(ptr == rec_end);
1575 
1576  page = buf_block_get_frame(block);
1577  page_zip = buf_block_get_page_zip(block);
1578 
1579  page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
1580  page_header_set_field(page, page_zip, PAGE_DIRECTION,
1581  PAGE_NO_DIRECTION);
1582  page_header_set_field(page, page_zip, PAGE_N_DIRECTION, 0);
1583 
1584  return(rec_end);
1585 }
1586 
1587 #ifndef UNIV_HOTBACKUP
1588 /*************************************************************/
1591 UNIV_INTERN
1592 void
1594 /*===================================*/
1595  page_t* new_page,
1596  rec_t* rec,
1597  dict_index_t* index,
1598  mtr_t* mtr)
1599 {
1600  page_dir_slot_t* slot = 0; /* remove warning */
1601  byte* heap_top;
1602  rec_t* insert_rec = 0; /* remove warning */
1603  rec_t* prev_rec;
1604  ulint count;
1605  ulint n_recs;
1606  ulint slot_index;
1607  ulint rec_size;
1608  ulint log_mode;
1609  byte* log_ptr;
1610  ulint log_data_len;
1611  mem_heap_t* heap = NULL;
1612  ulint offsets_[REC_OFFS_NORMAL_SIZE];
1613  ulint* offsets = offsets_;
1614  rec_offs_init(offsets_);
1615 
1616  ut_ad(page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW);
1617  ut_ad(page_align(rec) != new_page);
1618  ut_ad(page_rec_is_comp(rec) == page_is_comp(new_page));
1619 
1620  if (page_rec_is_infimum(rec)) {
1621 
1622  rec = page_rec_get_next(rec);
1623  }
1624 
1625  if (page_rec_is_supremum(rec)) {
1626 
1627  return;
1628  }
1629 
1630 #ifdef UNIV_DEBUG
1631  /* To pass the debug tests we have to set these dummy values
1632  in the debug version */
1633  page_dir_set_n_slots(new_page, NULL, UNIV_PAGE_SIZE / 2);
1634  page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP,
1635  new_page + UNIV_PAGE_SIZE - 1);
1636 #endif
1637 
1638  log_ptr = page_copy_rec_list_to_created_page_write_log(new_page,
1639  index, mtr);
1640 
1641  log_data_len = dyn_array_get_data_size(&(mtr->log));
1642 
1643  /* Individual inserts are logged in a shorter form */
1644 
1645  log_mode = mtr_set_log_mode(mtr, MTR_LOG_SHORT_INSERTS);
1646 
1647  prev_rec = page_get_infimum_rec(new_page);
1648  if (page_is_comp(new_page)) {
1649  heap_top = new_page + PAGE_NEW_SUPREMUM_END;
1650  } else {
1651  heap_top = new_page + PAGE_OLD_SUPREMUM_END;
1652  }
1653  count = 0;
1654  slot_index = 0;
1655  n_recs = 0;
1656 
1657  do {
1658  offsets = rec_get_offsets(rec, index, offsets,
1659  ULINT_UNDEFINED, &heap);
1660  insert_rec = rec_copy(heap_top, rec, offsets);
1661 
1662  if (page_is_comp(new_page)) {
1663  rec_set_next_offs_new(prev_rec,
1664  page_offset(insert_rec));
1665 
1666  rec_set_n_owned_new(insert_rec, NULL, 0);
1667  rec_set_heap_no_new(insert_rec,
1668  PAGE_HEAP_NO_USER_LOW + n_recs);
1669  } else {
1670  rec_set_next_offs_old(prev_rec,
1671  page_offset(insert_rec));
1672 
1673  rec_set_n_owned_old(insert_rec, 0);
1674  rec_set_heap_no_old(insert_rec,
1675  PAGE_HEAP_NO_USER_LOW + n_recs);
1676  }
1677 
1678  count++;
1679  n_recs++;
1680 
1681  if (UNIV_UNLIKELY
1682  (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2)) {
1683 
1684  slot_index++;
1685 
1686  slot = page_dir_get_nth_slot(new_page, slot_index);
1687 
1688  page_dir_slot_set_rec(slot, insert_rec);
1689  page_dir_slot_set_n_owned(slot, NULL, count);
1690 
1691  count = 0;
1692  }
1693 
1694  rec_size = rec_offs_size(offsets);
1695 
1696  ut_ad(heap_top < new_page + UNIV_PAGE_SIZE);
1697 
1698  heap_top += rec_size;
1699 
1700  page_cur_insert_rec_write_log(insert_rec, rec_size, prev_rec,
1701  index, mtr);
1702  prev_rec = insert_rec;
1703  rec = page_rec_get_next(rec);
1704  } while (!page_rec_is_supremum(rec));
1705 
1706  if ((slot_index > 0) && (count + 1
1707  + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2
1708  <= PAGE_DIR_SLOT_MAX_N_OWNED)) {
1709  /* We can merge the two last dir slots. This operation is
1710  here to make this function imitate exactly the equivalent
1711  task made using page_cur_insert_rec, which we use in database
1712  recovery to reproduce the task performed by this function.
1713  To be able to check the correctness of recovery, it is good
1714  that it imitates exactly. */
1715 
1716  count += (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
1717 
1718  page_dir_slot_set_n_owned(slot, NULL, 0);
1719 
1720  slot_index--;
1721  }
1722 
1723  if (UNIV_LIKELY_NULL(heap)) {
1724  mem_heap_free(heap);
1725  }
1726 
1727  log_data_len = dyn_array_get_data_size(&(mtr->log)) - log_data_len;
1728 
1729  ut_a(log_data_len < 100 * UNIV_PAGE_SIZE);
1730 
1731  if (UNIV_LIKELY(log_ptr != NULL)) {
1732  mach_write_to_4(log_ptr, log_data_len);
1733  }
1734 
1735  if (page_is_comp(new_page)) {
1736  rec_set_next_offs_new(insert_rec, PAGE_NEW_SUPREMUM);
1737  } else {
1738  rec_set_next_offs_old(insert_rec, PAGE_OLD_SUPREMUM);
1739  }
1740 
1741  slot = page_dir_get_nth_slot(new_page, 1 + slot_index);
1742 
1743  page_dir_slot_set_rec(slot, page_get_supremum_rec(new_page));
1744  page_dir_slot_set_n_owned(slot, NULL, count + 1);
1745 
1746  page_dir_set_n_slots(new_page, NULL, 2 + slot_index);
1747  page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP, heap_top);
1748  page_dir_set_n_heap(new_page, NULL, PAGE_HEAP_NO_USER_LOW + n_recs);
1749  page_header_set_field(new_page, NULL, PAGE_N_RECS, n_recs);
1750 
1751  page_header_set_ptr(new_page, NULL, PAGE_LAST_INSERT, NULL);
1752  page_header_set_field(new_page, NULL, PAGE_DIRECTION,
1753  PAGE_NO_DIRECTION);
1754  page_header_set_field(new_page, NULL, PAGE_N_DIRECTION, 0);
1755 
1756  /* Restore the log mode */
1757 
1758  mtr_set_log_mode(mtr, log_mode);
1759 }
1760 
1761 /***********************************************************/
1763 UNIV_INLINE
1764 void
1765 page_cur_delete_rec_write_log(
1766 /*==========================*/
1767  rec_t* rec,
1768  dict_index_t* index,
1769  mtr_t* mtr)
1770 {
1771  byte* log_ptr;
1772 
1773  ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
1774 
1775  log_ptr = mlog_open_and_write_index(mtr, rec, index,
1776  page_rec_is_comp(rec)
1778  : MLOG_REC_DELETE, 2);
1779 
1780  if (!log_ptr) {
1781  /* Logging in mtr is switched off during crash recovery:
1782  in that case mlog_open returns NULL */
1783  return;
1784  }
1785 
1786  /* Write the cursor rec offset as a 2-byte ulint */
1787  mach_write_to_2(log_ptr, page_offset(rec));
1788 
1789  mlog_close(mtr, log_ptr + 2);
1790 }
1791 #else /* !UNIV_HOTBACKUP */
1792 # define page_cur_delete_rec_write_log(rec,index,mtr) ((void) 0)
1793 #endif /* !UNIV_HOTBACKUP */
1794 
1795 /***********************************************************/
1798 UNIV_INTERN
1799 byte*
1801 /*======================*/
1802  byte* ptr,
1803  byte* end_ptr,
1804  buf_block_t* block,
1805  dict_index_t* index,
1806  mtr_t* mtr)
1807 {
1808  ulint offset;
1809  page_cur_t cursor;
1810 
1811  if (end_ptr < ptr + 2) {
1812 
1813  return(NULL);
1814  }
1815 
1816  /* Read the cursor rec offset as a 2-byte ulint */
1817  offset = mach_read_from_2(ptr);
1818  ptr += 2;
1819 
1820  ut_a(offset <= UNIV_PAGE_SIZE);
1821 
1822  if (block) {
1823  page_t* page = buf_block_get_frame(block);
1824  mem_heap_t* heap = NULL;
1825  ulint offsets_[REC_OFFS_NORMAL_SIZE];
1826  rec_t* rec = page + offset;
1827  rec_offs_init(offsets_);
1828 
1829  page_cur_position(rec, block, &cursor);
1830  ut_ad(!buf_block_get_page_zip(block) || page_is_comp(page));
1831 
1832  page_cur_delete_rec(&cursor, index,
1833  rec_get_offsets(rec, index, offsets_,
1834  ULINT_UNDEFINED, &heap),
1835  mtr);
1836  if (UNIV_LIKELY_NULL(heap)) {
1837  mem_heap_free(heap);
1838  }
1839  }
1840 
1841  return(ptr);
1842 }
1843 
1844 /***********************************************************/
1847 UNIV_INTERN
1848 void
1850 /*================*/
1851  page_cur_t* cursor,
1852  dict_index_t* index,
1853  const ulint* offsets,
1854  mtr_t* mtr)
1855 {
1856  page_dir_slot_t* cur_dir_slot;
1857  page_dir_slot_t* prev_slot;
1858  page_t* page;
1859  page_zip_des_t* page_zip;
1860  rec_t* current_rec;
1861  rec_t* prev_rec = NULL;
1862  rec_t* next_rec;
1863  ulint cur_slot_no;
1864  ulint cur_n_owned;
1865  rec_t* rec;
1866 
1867  ut_ad(cursor && mtr);
1868 
1869  page = page_cur_get_page(cursor);
1870  page_zip = page_cur_get_page_zip(cursor);
1871 
1872  /* page_zip_validate() will fail here when
1873  btr_cur_pessimistic_delete() invokes btr_set_min_rec_mark().
1874  Then, both "page_zip" and "page" would have the min-rec-mark
1875  set on the smallest user record, but "page" would additionally
1876  have it set on the smallest-but-one record. Because sloppy
1877  page_zip_validate_low() only ignores min-rec-flag differences
1878  in the smallest user record, it cannot be used here either. */
1879 
1880  current_rec = cursor->rec;
1881  ut_ad(rec_offs_validate(current_rec, index, offsets));
1882  ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
1883 
1884  /* The record must not be the supremum or infimum record. */
1885  ut_ad(page_rec_is_user_rec(current_rec));
1886 
1887  /* Save to local variables some data associated with current_rec */
1888  cur_slot_no = page_dir_find_owner_slot(current_rec);
1889  cur_dir_slot = page_dir_get_nth_slot(page, cur_slot_no);
1890  cur_n_owned = page_dir_slot_get_n_owned(cur_dir_slot);
1891 
1892  /* 0. Write the log record */
1893  page_cur_delete_rec_write_log(current_rec, index, mtr);
1894 
1895  /* 1. Reset the last insert info in the page header and increment
1896  the modify clock for the frame */
1897 
1898  page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
1899 
1900  /* The page gets invalid for optimistic searches: increment the
1901  frame modify clock */
1902 
1903  buf_block_modify_clock_inc(page_cur_get_block(cursor));
1904 
1905  /* 2. Find the next and the previous record. Note that the cursor is
1906  left at the next record. */
1907 
1908  ut_ad(cur_slot_no > 0);
1909  prev_slot = page_dir_get_nth_slot(page, cur_slot_no - 1);
1910 
1911  rec = (rec_t*) page_dir_slot_get_rec(prev_slot);
1912 
1913  /* rec now points to the record of the previous directory slot. Look
1914  for the immediate predecessor of current_rec in a loop. */
1915 
1916  while(current_rec != rec) {
1917  prev_rec = rec;
1918  rec = page_rec_get_next(rec);
1919  }
1920 
1921  page_cur_move_to_next(cursor);
1922  next_rec = cursor->rec;
1923 
1924  /* 3. Remove the record from the linked list of records */
1925 
1926  page_rec_set_next(prev_rec, next_rec);
1927 
1928  /* 4. If the deleted record is pointed to by a dir slot, update the
1929  record pointer in slot. In the following if-clause we assume that
1930  prev_rec is owned by the same slot, i.e., PAGE_DIR_SLOT_MIN_N_OWNED
1931  >= 2. */
1932 
1933 #if PAGE_DIR_SLOT_MIN_N_OWNED < 2
1934 # error "PAGE_DIR_SLOT_MIN_N_OWNED < 2"
1935 #endif
1936  ut_ad(cur_n_owned > 1);
1937 
1938  if (current_rec == page_dir_slot_get_rec(cur_dir_slot)) {
1939  page_dir_slot_set_rec(cur_dir_slot, prev_rec);
1940  }
1941 
1942  /* 5. Update the number of owned records of the slot */
1943 
1944  page_dir_slot_set_n_owned(cur_dir_slot, page_zip, cur_n_owned - 1);
1945 
1946  /* 6. Free the memory occupied by the record */
1947  page_mem_free(page, page_zip, current_rec, index, offsets);
1948 
1949  /* 7. Now we have decremented the number of owned records of the slot.
1950  If the number drops below PAGE_DIR_SLOT_MIN_N_OWNED, we balance the
1951  slots. */
1952 
1953  if (UNIV_UNLIKELY(cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED)) {
1954  page_dir_balance_slot(page, page_zip, cur_slot_no);
1955  }
1956 
1957 #ifdef UNIV_ZIP_DEBUG
1958  ut_a(!page_zip || page_zip_validate(page_zip, page));
1959 #endif /* UNIV_ZIP_DEBUG */
1960 }
1961 
1962 #ifdef UNIV_COMPILE_TEST_FUNCS
1963 
1964 /*******************************************************************/
1967 void
1968 test_page_cur_lcg_prng(
1969 /*===================*/
1970  int n)
1971 {
1972  int i;
1973  unsigned long long rnd;
1974 
1975  for (i = 0; i < n; i++) {
1976  rnd = page_cur_lcg_prng();
1977  printf("%llu\t%%2=%llu %%3=%llu %%5=%llu %%7=%llu %%11=%llu\n",
1978  rnd,
1979  rnd % 2,
1980  rnd % 3,
1981  rnd % 5,
1982  rnd % 7,
1983  rnd % 11);
1984  }
1985 }
1986 
1987 #endif /* UNIV_COMPILE_TEST_FUNCS */