Drizzled Public API Documentation

records.cc
Go to the documentation of this file.
1 /* Copyright (C) 2000-2006 MySQL AB
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15 
22 #include <config.h>
23 
24 #include <drizzled/drizzled.h>
25 #include <drizzled/error.h>
26 #include <drizzled/internal/iocache.h>
27 #include <drizzled/internal/my_sys.h>
28 #include <drizzled/optimizer/range.h>
29 #include <drizzled/plugin/storage_engine.h>
30 #include <drizzled/records.h>
31 #include <drizzled/session.h>
32 #include <drizzled/table.h>
33 #include <drizzled/system_variables.h>
34 
35 namespace drizzled {
36 
37 static int rr_sequential(ReadRecord *info);
38 static int rr_quick(ReadRecord *info);
39 static int rr_from_tempfile(ReadRecord *info);
40 static int rr_unpack_from_tempfile(ReadRecord *info);
41 static int rr_unpack_from_buffer(ReadRecord *info);
42 static int rr_from_pointers(ReadRecord *info);
43 static int rr_from_cache(ReadRecord *info);
44 static int rr_cmp(unsigned char *a,unsigned char *b);
45 static int rr_index_first(ReadRecord *info);
46 static int rr_index(ReadRecord *info);
47 
48 void ReadRecord::init_reard_record_sequential()
49 {
50  read_record= rr_sequential;
51 }
52 
54  Table *table_arg,
55  bool print_error_arg,
56  uint32_t idx)
57 {
58  table_arg->emptyRecord();
59  table= table_arg;
60  cursor= table->cursor;
61  record= table->getInsertRecord();
62  print_error= print_error_arg;
63 
64  table->status=0; /* And it's always found */
65  if (not table->cursor->inited)
66  {
67  int error= table->cursor->startIndexScan(idx, 1);
68  if (error != 0)
69  return error;
70  }
71  /* read_record will be changed to rr_index in rr_index_first */
72  read_record= rr_index_first;
73 
74  return 0;
75 }
76 
77 
78 int ReadRecord::init_read_record(Session *session_arg,
79  Table *table_arg,
80  optimizer::SqlSelect *select_arg,
81  int use_record_cache,
82  bool print_error_arg)
83 {
84  internal::io_cache_st *tempfile;
85  int error= 0;
86 
87  session= session_arg;
88  table= table_arg;
89  cursor= table->cursor;
90  forms= &table; /* Only one table */
91 
92  if (table->sort.addon_field)
93  {
94  rec_buf= table->sort.addon_buf;
95  ref_length= table->sort.addon_length;
96  }
97  else
98  {
99  table->emptyRecord();
100  record= table->getInsertRecord();
101  ref_length= table->cursor->ref_length;
102  }
103  select= select_arg;
104  print_error= print_error_arg;
105  ignore_not_found_rows= 0;
106  table->status=0; /* And it's always found */
107 
108  if (select && select->file->inited())
109  {
110  tempfile= select->file;
111  }
112  else
113  {
114  tempfile= table->sort.io_cache;
115  }
116 
117  if (tempfile && tempfile->inited()) // Test if ref-records was used
118  {
119  read_record= table->sort.addon_field ? rr_unpack_from_tempfile : rr_from_tempfile;
120 
121  io_cache=tempfile;
122  io_cache->reinit_io_cache(internal::READ_CACHE,0L,0,0);
123  ref_pos=table->cursor->ref;
124  if (!table->cursor->inited)
125  {
126  error= table->cursor->startTableScan(0);
127  if (error != 0)
128  return error;
129  }
130 
131  /*
132  table->sort.addon_field is checked because if we use addon fields,
133  it doesn't make sense to use cache - we don't read from the table
134  and table->sort.io_cache is read sequentially
135  */
136  if (!table->sort.addon_field &&
137  session->variables.read_rnd_buff_size &&
138  !(table->cursor->getEngine()->check_flag(HTON_BIT_FAST_KEY_READ)) &&
139  (table->db_stat & HA_READ_ONLY ||
140  table->reginfo.lock_type <= TL_READ_NO_INSERT) &&
141  (uint64_t) table->getShare()->getRecordLength() * (table->cursor->stats.records+
142  table->cursor->stats.deleted) >
143  (uint64_t) MIN_FILE_LENGTH_TO_USE_ROW_CACHE &&
144  io_cache->end_of_file/ref_length * table->getShare()->getRecordLength() >
145  (internal::my_off_t) MIN_ROWS_TO_USE_TABLE_CACHE &&
146  !table->getShare()->blob_fields &&
147  ref_length <= MAX_REFLENGTH)
148  {
149  if (init_rr_cache())
150  {
151  read_record= rr_from_cache;
152  }
153  }
154  }
155  else if (select && select->quick)
156  {
157  read_record= rr_quick;
158  }
159  else if (table->sort.record_pointers)
160  {
161  error= table->cursor->startTableScan(0);
162  if (error != 0)
163  return error;
164 
165  cache_pos=table->sort.record_pointers;
166  cache_end= cache_pos+ table->sort.found_records * ref_length;
167  read_record= (table->sort.addon_field ? rr_unpack_from_buffer : rr_from_pointers);
168  }
169  else
170  {
171  read_record= rr_sequential;
172  error= table->cursor->startTableScan(1);
173  if (error != 0)
174  return error;
175 
176  /* We can use record cache if we don't update dynamic length tables */
177  if (!table->no_cache &&
178  (use_record_cache > 0 ||
179  (int) table->reginfo.lock_type <= (int) TL_READ_WITH_SHARED_LOCKS ||
180  !(table->getShare()->db_options_in_use & HA_OPTION_PACK_RECORD)))
181  {
182  table->cursor->extra_opt(HA_EXTRA_CACHE, session->variables.read_buff_size);
183  }
184  }
185 
186  return 0;
187 } /* init_read_record */
188 
189 
190 void ReadRecord::end_read_record()
191 { /* free cache if used */
192  if (cache)
193  {
194  global_read_rnd_buffer.sub(session->variables.read_rnd_buff_size);
195  free((char*) cache);
196  cache= NULL;
197  }
198  if (table)
199  {
200  table->filesort_free_buffers();
201  (void) cursor->extra(HA_EXTRA_NO_CACHE);
202  if (read_record != rr_quick) // otherwise quick_range does it
203  (void) cursor->ha_index_or_rnd_end();
204 
205  table= NULL;
206  }
207 }
208 
209 static int rr_handle_error(ReadRecord *info, int error)
210 {
211  if (error == HA_ERR_END_OF_FILE)
212  error= -1;
213  else
214  {
215  if (info->print_error)
216  info->table->print_error(error, MYF(0));
217  if (error < 0) // Fix negative BDB errno
218  error= 1;
219  }
220  return error;
221 }
222 
224 static int rr_quick(ReadRecord *info)
225 {
226  int tmp;
227  while ((tmp= info->select->quick->get_next()))
228  {
229  if (info->session->getKilled())
230  {
231  my_error(ER_SERVER_SHUTDOWN, MYF(0));
232  return 1;
233  }
234  if (tmp != HA_ERR_RECORD_DELETED)
235  {
236  tmp= rr_handle_error(info, tmp);
237  break;
238  }
239  }
240 
241  return tmp;
242 }
243 
256 static int rr_index_first(ReadRecord *info)
257 {
258  int tmp= info->cursor->index_first(info->record);
259  info->read_record= rr_index;
260  if (tmp)
261  tmp= rr_handle_error(info, tmp);
262  return tmp;
263 }
264 
280 static int rr_index(ReadRecord *info)
281 {
282  int tmp= info->cursor->index_next(info->record);
283  if (tmp)
284  tmp= rr_handle_error(info, tmp);
285  return tmp;
286 }
287 
288 int rr_sequential(ReadRecord *info)
289 {
290  int tmp;
291  while ((tmp= info->cursor->rnd_next(info->record)))
292  {
293  if (info->session->getKilled())
294  {
295  info->session->send_kill_message();
296  return 1;
297  }
298  /*
299  TODO> Fix this so that engine knows how to behave on its own.
300  rnd_next can return RECORD_DELETED for MyISAM when one thread is
301  reading and another deleting without locks.
302  */
303  if (tmp != HA_ERR_RECORD_DELETED)
304  {
305  tmp= rr_handle_error(info, tmp);
306  break;
307  }
308  }
309 
310  return tmp;
311 }
312 
313 static int rr_from_tempfile(ReadRecord *info)
314 {
315  int tmp;
316  for (;;)
317  {
318  if (info->io_cache->read(info->ref_pos, info->ref_length))
319  return -1; /* End of cursor */
320  if (!(tmp=info->cursor->rnd_pos(info->record,info->ref_pos)))
321  break;
322  /* The following is extremely unlikely to happen */
323  if (tmp == HA_ERR_RECORD_DELETED ||
324  (tmp == HA_ERR_KEY_NOT_FOUND && info->ignore_not_found_rows))
325  continue;
326  tmp= rr_handle_error(info, tmp);
327  break;
328  }
329  return tmp;
330 } /* rr_from_tempfile */
331 
348 {
349  if (info->io_cache->read(info->rec_buf, info->ref_length))
350  return -1;
351  Table *table= info->table;
352  (*table->sort.unpack)(table->sort.addon_field, info->rec_buf);
353 
354  return 0;
355 }
356 
357 static int rr_from_pointers(ReadRecord *info)
358 {
359  int tmp;
360  unsigned char *cache_pos;
361 
362 
363  for (;;)
364  {
365  if (info->cache_pos == info->cache_end)
366  return -1; /* End of cursor */
367  cache_pos= info->cache_pos;
368  info->cache_pos+= info->ref_length;
369 
370  if (!(tmp=info->cursor->rnd_pos(info->record,cache_pos)))
371  break;
372 
373  /* The following is extremely unlikely to happen */
374  if (tmp == HA_ERR_RECORD_DELETED ||
375  (tmp == HA_ERR_KEY_NOT_FOUND && info->ignore_not_found_rows))
376  continue;
377  tmp= rr_handle_error(info, tmp);
378  break;
379  }
380  return tmp;
381 }
382 
399 {
400  if (info->cache_pos == info->cache_end)
401  return -1; /* End of buffer */
402  Table *table= info->table;
403  (*table->sort.unpack)(table->sort.addon_field, info->cache_pos);
404  info->cache_pos+= info->ref_length;
405 
406  return 0;
407 }
408 
409 /* cacheing of records from a database */
410 bool ReadRecord::init_rr_cache()
411 {
412  uint32_t local_rec_cache_size;
413 
414  struct_length= 3 + MAX_REFLENGTH;
415  reclength= ALIGN_SIZE(table->getShare()->getRecordLength() + 1);
416  if (reclength < struct_length)
417  reclength= ALIGN_SIZE(struct_length);
418 
419  error_offset= table->getShare()->getRecordLength();
420  cache_records= (session->variables.read_rnd_buff_size /
421  (reclength + struct_length));
422  local_rec_cache_size= cache_records * reclength;
423  rec_cache_size= cache_records * ref_length;
424 
425  if (not global_read_rnd_buffer.add(session->variables.read_rnd_buff_size))
426  {
427  my_error(ER_OUT_OF_GLOBAL_READRNDMEMORY, MYF(ME_ERROR+ME_WAITTANG));
428  return false;
429  }
430 
431  // We have to allocate one more byte to use uint3korr (see comments for it)
432  if (cache_records <= 2)
433  return false;
434  cache= (unsigned char*) malloc(local_rec_cache_size + cache_records * struct_length + 1);
435 #ifdef HAVE_VALGRIND
436  // Avoid warnings in qsort
437  memset(cache, 0, local_rec_cache_size + cache_records * struct_length + 1);
438 #endif
439  read_positions= cache + local_rec_cache_size;
440  cache_pos= cache_end= cache;
441 
442  return true;
443 } /* init_rr_cache */
444 
445 static int rr_from_cache(ReadRecord *info)
446 {
447  uint32_t length;
448  internal::my_off_t rest_of_file;
449  int16_t error;
450  unsigned char *position,*ref_position,*record_pos;
451  uint32_t record;
452 
453  for (;;)
454  {
455  if (info->cache_pos != info->cache_end)
456  {
457  if (info->cache_pos[info->error_offset])
458  {
459  shortget(error,info->cache_pos);
460  if (info->print_error)
461  info->table->print_error(error,MYF(0));
462  }
463  else
464  {
465  error=0;
466  memcpy(info->record,info->cache_pos, (size_t) info->table->getShare()->getRecordLength());
467  }
468  info->cache_pos+= info->reclength;
469  return ((int) error);
470  }
471  length=info->rec_cache_size;
472  rest_of_file= info->io_cache->end_of_file - info->io_cache->tell();
473  if ((internal::my_off_t) length > rest_of_file)
474  {
475  length= (uint32_t) rest_of_file;
476  }
477 
478  if (!length || info->io_cache->read(info->getCache(), length))
479  {
480  return -1; /* End of cursor */
481  }
482 
483  length/=info->ref_length;
484  position=info->getCache();
485  ref_position=info->read_positions;
486  for (uint32_t i= 0 ; i < length ; i++,position+=info->ref_length)
487  {
488  memcpy(ref_position,position,(size_t) info->ref_length);
489  ref_position+=MAX_REFLENGTH;
490  int3store(ref_position,(long) i);
491  ref_position+=3;
492  }
493  internal::my_qsort(info->read_positions, length, info->struct_length,
494  (qsort_cmp) rr_cmp);
495 
496  position=info->read_positions;
497  for (uint32_t i= 0 ; i < length ; i++)
498  {
499  memcpy(info->ref_pos, position, (size_t)info->ref_length);
500  position+=MAX_REFLENGTH;
501  record=uint3korr(position);
502  position+=3;
503  record_pos= info->getCache() + record * info->reclength;
504  if ((error=(int16_t) info->cursor->rnd_pos(record_pos,info->ref_pos)))
505  {
506  record_pos[info->error_offset]=1;
507  shortstore(record_pos,error);
508  }
509  else
510  record_pos[info->error_offset]=0;
511  }
512  info->cache_end= (info->cache_pos= info->getCache())+length*info->reclength;
513  }
514 } /* rr_from_cache */
515 
516 static int rr_cmp(unsigned char *a,unsigned char *b)
517 {
518  if (a[0] != b[0])
519  return (int) a[0] - (int) b[0];
520  if (a[1] != b[1])
521  return (int) a[1] - (int) b[1];
522  if (a[2] != b[2])
523  return (int) a[2] - (int) b[2];
524 #if MAX_REFLENGTH == 4
525  return (int) a[3] - (int) b[3];
526 #else
527  if (a[3] != b[3])
528  return (int) a[3] - (int) b[3];
529  if (a[4] != b[4])
530  return (int) a[4] - (int) b[4];
531  if (a[5] != b[5])
532  return (int) a[1] - (int) b[5];
533  if (a[6] != b[6])
534  return (int) a[6] - (int) b[6];
535  return (int) a[7] - (int) b[7];
536 #endif
537 }
538 
539 } /* namespace drizzled */