Drizzled Public API Documentation

sort.cc
00001 /* Copyright (C) 2000-2006 MySQL AB
00002 
00003    This program is free software; you can redistribute it and/or modify
00004    it under the terms of the GNU General Public License as published by
00005    the Free Software Foundation; version 2 of the License.
00006 
00007    This program is distributed in the hope that it will be useful,
00008    but WITHOUT ANY WARRANTY; without even the implied warranty of
00009    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00010    GNU General Public License for more details.
00011 
00012    You should have received a copy of the GNU General Public License
00013    along with this program; if not, write to the Free Software
00014    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
00015 
00016 /*
00017   Creates a index for a database by reading keys, sorting them and outputing
00018   them in sorted order through SORT_INFO functions.
00019 */
00020 
00021 #include "myisam_priv.h"
00022 #include <stddef.h>
00023 #include <queue>
00024 #include <algorithm>
00025 
00026 /* static variables */
00027 
00028 #undef MIN_SORT_MEMORY
00029 #undef MYF_RW
00030 #undef DISK_BUFFER_SIZE
00031 
00032 #define MERGEBUFF 15
00033 #define MERGEBUFF2 31
00034 #define MIN_SORT_MEMORY (4096-MALLOC_OVERHEAD)
00035 #define MYF_RW  MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL)
00036 #define DISK_BUFFER_SIZE (IO_SIZE*16)
00037 
00038 using namespace std;
00039 using namespace drizzled;
00040 
00041 
00042 /*
00043  Pointers of functions for store and read keys from temp file
00044 */
00045 
00046 extern void print_error(const char *fmt,...);
00047 
00048 /* Functions defined in this file */
00049 
00050 static ha_rows  find_all_keys(MI_SORT_PARAM *info,uint32_t keys,
00051             unsigned char **sort_keys,
00052             DYNAMIC_ARRAY *buffpek,
00053             size_t *maxbuffer,
00054             internal::IO_CACHE *tempfile,
00055             internal::IO_CACHE *tempfile_for_exceptions);
00056 static int  write_keys(MI_SORT_PARAM *info,unsigned char **sort_keys,
00057                              uint32_t count, BUFFPEK *buffpek,internal::IO_CACHE *tempfile);
00058 static int  write_key(MI_SORT_PARAM *info, unsigned char *key,
00059           internal::IO_CACHE *tempfile);
00060 static int  write_index(MI_SORT_PARAM *info,unsigned char * *sort_keys,
00061                               uint32_t count);
00062 static int  merge_many_buff(MI_SORT_PARAM *info,uint32_t keys,
00063           unsigned char * *sort_keys,
00064           BUFFPEK *buffpek,size_t *maxbuffer,
00065           internal::IO_CACHE *t_file);
00066 static uint32_t  read_to_buffer(internal::IO_CACHE *fromfile,BUFFPEK *buffpek,
00067                                   uint32_t sort_length);
00068 static int  merge_buffers(MI_SORT_PARAM *info,uint32_t keys,
00069                                 internal::IO_CACHE *from_file, internal::IO_CACHE *to_file,
00070                                 unsigned char * *sort_keys, BUFFPEK *lastbuff,
00071                                 BUFFPEK *Fb, BUFFPEK *Tb);
00072 static int  merge_index(MI_SORT_PARAM *,uint,unsigned char **,BUFFPEK *, int,
00073                               internal::IO_CACHE *);
00074 static int  write_keys_varlen(MI_SORT_PARAM *info,unsigned char **sort_keys,
00075                        uint32_t count, BUFFPEK *buffpek,
00076                        internal::IO_CACHE *tempfile);
00077 static uint32_t  read_to_buffer_varlen(internal::IO_CACHE *fromfile,BUFFPEK *buffpek,
00078                                 uint32_t sort_length);
00079 static int  write_merge_key(MI_SORT_PARAM *info, internal::IO_CACHE *to_file,
00080                      unsigned char *key, uint32_t sort_length, uint32_t count);
00081 static int  write_merge_key_varlen(MI_SORT_PARAM *info,
00082                             internal::IO_CACHE *to_file,
00083                             unsigned char* key, uint32_t sort_length,
00084                             uint32_t count);
00085 
00086 inline int
00087 my_var_write(MI_SORT_PARAM *info, internal::IO_CACHE *to_file, unsigned char *bufs);
00088 
00089 /*
00090   Creates a index of sorted keys
00091 
00092   SYNOPSIS
00093     _create_index_by_sort()
00094     info    Sort parameters
00095     no_messages   Set to 1 if no output
00096     sortbuff_size Size if sortbuffer to allocate
00097 
00098   RESULT
00099     0 ok
00100    <> 0 Error
00101 */
00102 
00103 int _create_index_by_sort(MI_SORT_PARAM *info,bool no_messages,
00104         size_t sortbuff_size)
00105 {
00106   int error;
00107   size_t maxbuffer, skr;
00108   uint32_t memavl,old_memavl,keys,sort_length;
00109   DYNAMIC_ARRAY buffpek;
00110   ha_rows records;
00111   unsigned char **sort_keys;
00112   internal::IO_CACHE tempfile, tempfile_for_exceptions;
00113 
00114   if (info->keyinfo->flag & HA_VAR_LENGTH_KEY)
00115   {
00116     info->write_keys=write_keys_varlen;
00117     info->read_to_buffer=read_to_buffer_varlen;
00118     info->write_key= write_merge_key_varlen;
00119   }
00120   else
00121   {
00122     info->write_keys=write_keys;
00123     info->read_to_buffer=read_to_buffer;
00124     info->write_key=write_merge_key;
00125   }
00126 
00127   my_b_clear(&tempfile);
00128   my_b_clear(&tempfile_for_exceptions);
00129   memset(&buffpek, 0, sizeof(buffpek));
00130   sort_keys= (unsigned char **) NULL; error= 1;
00131   maxbuffer=1;
00132 
00133   memavl=max(sortbuff_size,(size_t)MIN_SORT_MEMORY);
00134   records=  info->sort_info->max_records;
00135   sort_length=  info->key_length;
00136 
00137   while (memavl >= MIN_SORT_MEMORY)
00138   {
00139     if ((records < UINT32_MAX) &&
00140        ((internal::my_off_t) (records + 1) *
00141         (sort_length + sizeof(char*)) <= (internal::my_off_t) memavl))
00142       keys= (uint)records+1;
00143     else
00144       do
00145       {
00146   skr=maxbuffer;
00147   if (memavl < sizeof(BUFFPEK)* maxbuffer ||
00148       (keys=(memavl-sizeof(BUFFPEK)* maxbuffer)/
00149              (sort_length+sizeof(char*))) <= 1 ||
00150             keys < maxbuffer)
00151   {
00152     mi_check_print_error(info->sort_info->param,
00153              "myisam_sort_buffer_size is too small");
00154     goto err;
00155   }
00156       }
00157       while ((maxbuffer= (size_t)(records/(keys-1)+1)) != skr);
00158 
00159     if ((sort_keys=(unsigned char **)malloc(keys*(sort_length+sizeof(char*)))))
00160     {
00161       if (my_init_dynamic_array(&buffpek, sizeof(BUFFPEK), maxbuffer,
00162            maxbuffer/2))
00163       {
00164   free((unsigned char*) sort_keys);
00165         sort_keys= 0;
00166       }
00167       else
00168   break;
00169     }
00170     old_memavl=memavl;
00171     if ((memavl=memavl/4*3) < MIN_SORT_MEMORY && old_memavl > MIN_SORT_MEMORY)
00172       memavl=MIN_SORT_MEMORY;
00173   }
00174   if (memavl < MIN_SORT_MEMORY)
00175   {
00176     mi_check_print_error(info->sort_info->param,"MyISAM sort buffer too small");
00177     goto err;
00178   }
00179   (*info->lock_in_memory)(info->sort_info->param);/* Everything is allocated */
00180 
00181   if (!no_messages)
00182     printf("  - Searching for keys, allocating buffer for %d keys\n",keys);
00183 
00184   if ((records=find_all_keys(info,keys,sort_keys,&buffpek,&maxbuffer,
00185            &tempfile,&tempfile_for_exceptions))
00186       == HA_POS_ERROR)
00187     goto err;
00188   if (maxbuffer == 0)
00189   {
00190     if (!no_messages)
00191       printf("  - Dumping %u keys\n", (uint32_t) records);
00192     if (write_index(info,sort_keys, (uint) records))
00193       goto err;
00194   }
00195   else
00196   {
00197     keys=(keys*(sort_length+sizeof(char*)))/sort_length;
00198     if (maxbuffer >= MERGEBUFF2)
00199     {
00200       if (!no_messages)
00201   printf("  - Merging %u keys\n", (uint32_t) records);
00202       if (merge_many_buff(info,keys,sort_keys, (BUFFPEK*)buffpek.buffer, &maxbuffer, &tempfile))
00203   goto err;
00204     }
00205     if (internal::flush_io_cache(&tempfile) ||
00206   tempfile.reinit_io_cache(internal::READ_CACHE,0L,0,0))
00207       goto err;
00208     if (!no_messages)
00209       printf("  - Last merge and dumping keys\n");
00210     if (merge_index(info,keys,sort_keys, (BUFFPEK*)buffpek.buffer, maxbuffer, &tempfile))
00211       goto err;
00212   }
00213 
00214   if (flush_pending_blocks(info))
00215     goto err;
00216 
00217   if (my_b_inited(&tempfile_for_exceptions))
00218   {
00219     MI_INFO *idx=info->sort_info->info;
00220     uint32_t     keyno=info->key;
00221     uint32_t     key_length, ref_length=idx->s->rec_reflength;
00222 
00223     if (not no_messages)
00224       printf("  - Adding exceptions\n");
00225 
00226     if (flush_io_cache(&tempfile_for_exceptions) || tempfile_for_exceptions.reinit_io_cache(internal::READ_CACHE,0L,0,0))
00227     {
00228       goto err;
00229     }
00230 
00231     while (!my_b_read(&tempfile_for_exceptions,(unsigned char*)&key_length,
00232           sizeof(key_length))
00233         && !my_b_read(&tempfile_for_exceptions,(unsigned char*)sort_keys,
00234           (uint) key_length))
00235     {
00236   if (_mi_ck_write(idx,keyno,(unsigned char*) sort_keys,key_length-ref_length))
00237     goto err;
00238     }
00239   }
00240 
00241   error =0;
00242 
00243 err:
00244   if (sort_keys)
00245     free((unsigned char*) sort_keys);
00246   delete_dynamic(&buffpek);
00247   tempfile.close_cached_file();
00248   tempfile_for_exceptions.close_cached_file();
00249 
00250   return(error ? -1 : 0);
00251 } /* _create_index_by_sort */
00252 
00253 
00254 /* Search after all keys and place them in a temp. file */
00255 
00256 static ha_rows  find_all_keys(MI_SORT_PARAM *info, uint32_t keys,
00257             unsigned char **sort_keys,
00258             DYNAMIC_ARRAY *buffpek,
00259             size_t *maxbuffer, internal::IO_CACHE *tempfile,
00260             internal::IO_CACHE *tempfile_for_exceptions)
00261 {
00262   int error;
00263   uint32_t idx;
00264 
00265   idx=error=0;
00266   sort_keys[0]=(unsigned char*) (sort_keys+keys);
00267 
00268   while (!(error=(*info->key_read)(info,sort_keys[idx])))
00269   {
00270     if (info->real_key_length > info->key_length)
00271     {
00272       if (write_key(info,sort_keys[idx],tempfile_for_exceptions))
00273         return(HA_POS_ERROR);
00274       continue;
00275     }
00276 
00277     if (++idx == keys)
00278     {
00279       if (info->write_keys(info,sort_keys,idx-1,(BUFFPEK *)alloc_dynamic(buffpek),
00280          tempfile))
00281       return(HA_POS_ERROR);
00282 
00283       sort_keys[0]=(unsigned char*) (sort_keys+keys);
00284       memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length);
00285       idx=1;
00286     }
00287     sort_keys[idx]=sort_keys[idx-1]+info->key_length;
00288   }
00289   if (error > 0)
00290     return(HA_POS_ERROR);
00291   if (buffpek->size())
00292   {
00293     if (info->write_keys(info,sort_keys,idx,(BUFFPEK *)alloc_dynamic(buffpek),
00294        tempfile))
00295       return(HA_POS_ERROR);
00296     *maxbuffer=buffpek->size() - 1;
00297   }
00298   else
00299     *maxbuffer=0;
00300 
00301   return((*maxbuffer)*(keys-1)+idx);
00302 } /* find_all_keys */
00303 
00304 
00305 int thr_write_keys(MI_SORT_PARAM *sort_param)
00306 {
00307   SORT_INFO *sort_info= sort_param->sort_info;
00308   MI_CHECK *param= sort_info->param;
00309   uint32_t length= 0, keys;
00310   ulong *rec_per_key_part= param->rec_per_key_part;
00311   int got_error=sort_info->got_error;
00312   uint32_t i;
00313   MI_INFO *info=sort_info->info;
00314   MYISAM_SHARE *share=info->s;
00315   MI_SORT_PARAM *sinfo;
00316   unsigned char *mergebuf=0;
00317 
00318   for (i= 0, sinfo= sort_param ;
00319        i < sort_info->total_keys ;
00320        i++, rec_per_key_part+=sinfo->keyinfo->keysegs, sinfo++)
00321   {
00322     if (!sinfo->sort_keys)
00323     {
00324       got_error=1;
00325       void * rec_buff_ptr= mi_get_rec_buff_ptr(info, sinfo->rec_buff);
00326       if (rec_buff_ptr != NULL)
00327         free(rec_buff_ptr);
00328       continue;
00329     }
00330     if (!got_error)
00331     {
00332       mi_set_key_active(share->state.key_map, sinfo->key);
00333       if (!sinfo->buffpek.size())
00334       {
00335         if (param->testflag & T_VERBOSE)
00336         {
00337           printf("Key %d  - Dumping %u keys\n",sinfo->key+1, sinfo->keys);
00338           fflush(stdout);
00339         }
00340         if (write_index(sinfo, sinfo->sort_keys, sinfo->keys) || flush_pending_blocks(sinfo))
00341           got_error=1;
00342       }
00343       if (!got_error && param->testflag & T_STATISTICS)
00344         update_key_parts(sinfo->keyinfo, rec_per_key_part, sinfo->unique,
00345                          param->stats_method == MI_STATS_METHOD_IGNORE_NULLS?
00346                          sinfo->notnull: NULL,
00347                          (uint64_t) info->state->records);
00348     }
00349     free((unsigned char*) sinfo->sort_keys);
00350     void * rec_buff_ptr= mi_get_rec_buff_ptr(info, sinfo->rec_buff);
00351     if (rec_buff_ptr != NULL)
00352       free(rec_buff_ptr);
00353     sinfo->sort_keys=0;
00354   }
00355 
00356   for (i= 0, sinfo= sort_param ;
00357        i < sort_info->total_keys ;
00358        i++,
00359    delete_dynamic(&sinfo->buffpek),
00360    sinfo->tempfile.close_cached_file(),
00361    sinfo->tempfile_for_exceptions.close_cached_file(),
00362    sinfo++)
00363   {
00364     if (got_error)
00365       continue;
00366     if (sinfo->keyinfo->flag & HA_VAR_LENGTH_KEY)
00367     {
00368       sinfo->write_keys=write_keys_varlen;
00369       sinfo->read_to_buffer=read_to_buffer_varlen;
00370       sinfo->write_key=write_merge_key_varlen;
00371     }
00372     else
00373     {
00374       sinfo->write_keys=write_keys;
00375       sinfo->read_to_buffer=read_to_buffer;
00376       sinfo->write_key=write_merge_key;
00377     }
00378     if (sinfo->buffpek.size())
00379     {
00380       size_t maxbuffer=sinfo->buffpek.size() - 1;
00381       if (!mergebuf)
00382       {
00383         length=param->sort_buffer_length;
00384         while (length >= MIN_SORT_MEMORY)
00385         {
00386           if ((mergebuf= (unsigned char *)malloc(length)))
00387               break;
00388           length=length*3/4;
00389         }
00390         if (!mergebuf)
00391         {
00392           got_error=1;
00393           continue;
00394         }
00395       }
00396       keys=length/sinfo->key_length;
00397       if (maxbuffer >= MERGEBUFF2)
00398       {
00399         if (param->testflag & T_VERBOSE)
00400           printf("Key %d  - Merging %u keys\n",sinfo->key+1, sinfo->keys);
00401         if (merge_many_buff(sinfo, keys, (unsigned char **)mergebuf, (BUFFPEK*)sinfo->buffpek.buffer,
00402           &maxbuffer, &sinfo->tempfile))
00403         {
00404           got_error=1;
00405           continue;
00406         }
00407       }
00408       if (flush_io_cache(&sinfo->tempfile) || sinfo->tempfile.reinit_io_cache(internal::READ_CACHE,0L,0,0))
00409       {
00410         got_error=1;
00411         continue;
00412       }
00413       if (param->testflag & T_VERBOSE)
00414         printf("Key %d  - Last merge and dumping keys\n", sinfo->key+1);
00415       if (merge_index(sinfo, keys, (unsigned char **)mergebuf, (BUFFPEK*)sinfo->buffpek.buffer,
00416                       maxbuffer,&sinfo->tempfile) ||
00417     flush_pending_blocks(sinfo))
00418       {
00419         got_error=1;
00420         continue;
00421       }
00422     }
00423     if (my_b_inited(&sinfo->tempfile_for_exceptions))
00424     {
00425       uint32_t key_length;
00426 
00427       if (param->testflag & T_VERBOSE)
00428         printf("Key %d  - Dumping 'long' keys\n", sinfo->key+1);
00429 
00430       if (flush_io_cache(&sinfo->tempfile_for_exceptions) || sinfo->tempfile_for_exceptions.reinit_io_cache(internal::READ_CACHE,0L,0,0))
00431       {
00432         got_error=1;
00433         continue;
00434       }
00435 
00436       while (!got_error &&
00437        !my_b_read(&sinfo->tempfile_for_exceptions,(unsigned char*)&key_length,
00438       sizeof(key_length)))
00439       {
00440         unsigned char ft_buf[10];
00441         if (key_length > sizeof(ft_buf) ||
00442             my_b_read(&sinfo->tempfile_for_exceptions, (unsigned char*)ft_buf,
00443                       (uint)key_length) ||
00444             _mi_ck_write(info, sinfo->key, (unsigned char*)ft_buf,
00445                          key_length - info->s->rec_reflength))
00446           got_error=1;
00447       }
00448     }
00449   }
00450   free((unsigned char*) mergebuf);
00451   return(got_error);
00452 }
00453 
00454         /* Write all keys in memory to file for later merge */
00455 
00456 static int  write_keys(MI_SORT_PARAM *info, register unsigned char **sort_keys,
00457                              uint32_t count, BUFFPEK *buffpek, internal::IO_CACHE *tempfile)
00458 {
00459   unsigned char **end;
00460   uint32_t sort_length=info->key_length;
00461 
00462   internal::my_qsort2((unsigned char*) sort_keys,count,sizeof(unsigned char*),(qsort2_cmp) info->key_cmp,
00463             info);
00464   if (!my_b_inited(tempfile) && tempfile->open_cached_file(P_tmpdir, "ST", DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
00465     return(1);
00466 
00467   buffpek->file_pos=my_b_tell(tempfile);
00468   buffpek->count=count;
00469 
00470   for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
00471   {
00472     if (my_b_write(tempfile,(unsigned char*) *sort_keys,(uint) sort_length))
00473       return(1);
00474   }
00475   return(0);
00476 } /* write_keys */
00477 
00478 
00479 inline int
00480 my_var_write(MI_SORT_PARAM *info, internal::IO_CACHE *to_file, unsigned char *bufs)
00481 {
00482   int err;
00483   uint16_t len = _mi_keylength(info->keyinfo, (unsigned char*) bufs);
00484 
00485   /* The following is safe as this is a local file */
00486   if ((err= my_b_write(to_file, (unsigned char*)&len, sizeof(len))))
00487     return (err);
00488   if ((err= my_b_write(to_file,bufs, (uint) len)))
00489     return (err);
00490   return (0);
00491 }
00492 
00493 
00494 static int  write_keys_varlen(MI_SORT_PARAM *info,
00495             register unsigned char **sort_keys,
00496                                     uint32_t count, BUFFPEK *buffpek,
00497             internal::IO_CACHE *tempfile)
00498 {
00499   unsigned char **end;
00500   int err;
00501 
00502   internal::my_qsort2((unsigned char*) sort_keys,count,sizeof(unsigned char*),(qsort2_cmp) info->key_cmp,
00503             info);
00504   if (!my_b_inited(tempfile) && tempfile->open_cached_file(P_tmpdir, "ST", DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
00505     return(1);
00506 
00507   buffpek->file_pos=my_b_tell(tempfile);
00508   buffpek->count=count;
00509   for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
00510   {
00511     if ((err= my_var_write(info,tempfile, (unsigned char*) *sort_keys)))
00512       return(err);
00513   }
00514   return(0);
00515 } /* write_keys_varlen */
00516 
00517 
00518 static int  write_key(MI_SORT_PARAM *info, unsigned char *key,
00519           internal::IO_CACHE *tempfile)
00520 {
00521   uint32_t key_length=info->real_key_length;
00522 
00523   if (!my_b_inited(tempfile) && tempfile->open_cached_file(P_tmpdir, "ST", DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
00524     return(1);
00525 
00526   if (my_b_write(tempfile,(unsigned char*)&key_length,sizeof(key_length)) ||
00527       my_b_write(tempfile,(unsigned char*)key,(uint) key_length))
00528     return(1);
00529   return(0);
00530 } /* write_key */
00531 
00532 
00533 /* Write index */
00534 
00535 static int  write_index(MI_SORT_PARAM *info, register unsigned char **sort_keys,
00536                               register uint32_t count)
00537 {
00538   internal::my_qsort2((unsigned char*) sort_keys,(size_t) count,sizeof(unsigned char*),
00539            (qsort2_cmp) info->key_cmp,info);
00540   while (count--)
00541   {
00542     if ((*info->key_write)(info,*sort_keys++))
00543       return(-1);
00544   }
00545   return(0);
00546 } /* write_index */
00547 
00548 
00549         /* Merge buffers to make < MERGEBUFF2 buffers */
00550 
00551 static int  merge_many_buff(MI_SORT_PARAM *info, uint32_t keys,
00552           unsigned char **sort_keys, BUFFPEK *buffpek,
00553           size_t *maxbuffer, internal::IO_CACHE *t_file)
00554 {
00555   uint32_t i;
00556   internal::IO_CACHE t_file2, *from_file, *to_file, *temp;
00557   BUFFPEK *lastbuff;
00558 
00559   if (*maxbuffer < MERGEBUFF2)
00560     return(0);
00561   if (flush_io_cache(t_file) || t_file2.open_cached_file(P_tmpdir, "ST",
00562                        DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
00563     return(1);
00564 
00565   from_file= t_file ; to_file= &t_file2;
00566   while (*maxbuffer >= MERGEBUFF2)
00567   {
00568     from_file->reinit_io_cache(internal::READ_CACHE,0L,0,0);
00569     to_file->reinit_io_cache(internal::WRITE_CACHE,0L,0,0);
00570     lastbuff=buffpek;
00571     for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
00572     {
00573       if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
00574                         buffpek+i,buffpek+i+MERGEBUFF-1))
00575         goto cleanup;
00576     }
00577     if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
00578                       buffpek+i,buffpek+ *maxbuffer))
00579       break;
00580     if (flush_io_cache(to_file))
00581       break;
00582     temp=from_file; from_file=to_file; to_file=temp;
00583     *maxbuffer= (int) (lastbuff-buffpek)-1;
00584   }
00585 cleanup:
00586   to_file->close_cached_file();                   /* This holds old result */
00587   if (to_file == t_file)
00588     *t_file=t_file2;                            /* Copy result file */
00589 
00590   return(*maxbuffer >= MERGEBUFF2);        /* Return 1 if interrupted */
00591 } /* merge_many_buff */
00592 
00593 
00594 /*
00595    Read data to buffer
00596 
00597   SYNOPSIS
00598     read_to_buffer()
00599     fromfile    File to read from
00600     buffpek   Where to read from
00601     sort_length   max length to read
00602   RESULT
00603     > 0 Ammount of bytes read
00604     -1  Error
00605 */
00606 
00607 static uint32_t  read_to_buffer(internal::IO_CACHE *fromfile, BUFFPEK *buffpek,
00608                                   uint32_t sort_length)
00609 {
00610   register uint32_t count;
00611   uint32_t length;
00612 
00613   if ((count=(uint) min((ha_rows) buffpek->max_keys,buffpek->count)))
00614   {
00615     if (my_pread(fromfile->file,(unsigned char*) buffpek->base,
00616                  (length= sort_length*count),buffpek->file_pos,MYF_RW))
00617       return((uint) -1);
00618     buffpek->key=buffpek->base;
00619     buffpek->file_pos+= length;                 /* New filepos */
00620     buffpek->count-=    count;
00621     buffpek->mem_count= count;
00622   }
00623   return (count*sort_length);
00624 } /* read_to_buffer */
00625 
00626 static uint32_t  read_to_buffer_varlen(internal::IO_CACHE *fromfile, BUFFPEK *buffpek,
00627                                          uint32_t sort_length)
00628 {
00629   register uint32_t count;
00630   uint16_t length_of_key = 0;
00631   uint32_t idx;
00632   unsigned char *buffp;
00633 
00634   if ((count=(uint) min((ha_rows) buffpek->max_keys,buffpek->count)))
00635   {
00636     buffp = buffpek->base;
00637 
00638     for (idx=1;idx<=count;idx++)
00639     {
00640       if (my_pread(fromfile->file,(unsigned char*)&length_of_key,sizeof(length_of_key),
00641                    buffpek->file_pos,MYF_RW))
00642         return((uint) -1);
00643       buffpek->file_pos+=sizeof(length_of_key);
00644       if (my_pread(fromfile->file,(unsigned char*) buffp,length_of_key,
00645                    buffpek->file_pos,MYF_RW))
00646         return((uint) -1);
00647       buffpek->file_pos+=length_of_key;
00648       buffp = buffp + sort_length;
00649     }
00650     buffpek->key=buffpek->base;
00651     buffpek->count-=    count;
00652     buffpek->mem_count= count;
00653   }
00654   return (count*sort_length);
00655 } /* read_to_buffer_varlen */
00656 
00657 
00658 static int  write_merge_key_varlen(MI_SORT_PARAM *info,
00659            internal::IO_CACHE *to_file, unsigned char* key,
00660                                          uint32_t sort_length, uint32_t count)
00661 {
00662   uint32_t idx;
00663   unsigned char *bufs = key;
00664 
00665   for (idx=1;idx<=count;idx++)
00666   {
00667     int err;
00668     if ((err= my_var_write(info, to_file, bufs)))
00669       return (err);
00670     bufs=bufs+sort_length;
00671   }
00672   return(0);
00673 }
00674 
00675 
00676 static int  write_merge_key(MI_SORT_PARAM *info,
00677           internal::IO_CACHE *to_file, unsigned char *key,
00678           uint32_t sort_length, uint32_t count)
00679 {
00680   (void)info;
00681   return my_b_write(to_file, key, (size_t) sort_length*count);
00682 }
00683 
00684 /*
00685  * Function object to be used as the comparison function
00686  * for the priority queue in the merge_buffers method.
00687  */
00688 class compare_functor
00689 {
00690   qsort2_cmp key_compare;
00691   void *key_compare_arg;
00692   public:
00693   compare_functor(qsort2_cmp in_key_compare, void *in_compare_arg)
00694     : key_compare(in_key_compare), key_compare_arg(in_compare_arg) { }
00695   inline bool operator()(const BUFFPEK *i, const BUFFPEK *j) const
00696   {
00697     int val= key_compare(key_compare_arg,
00698                       &i->key, &j->key);
00699     return (val >= 0);
00700   }
00701 };
00702 
00703 /*
00704   Merge buffers to one buffer
00705   If to_file == 0 then use info->key_write
00706 */
00707 
00708 static int
00709 merge_buffers(MI_SORT_PARAM *info, uint32_t keys, internal::IO_CACHE *from_file,
00710               internal::IO_CACHE *to_file, unsigned char **sort_keys, BUFFPEK *lastbuff,
00711               BUFFPEK *Fb, BUFFPEK *Tb)
00712 {
00713   int error;
00714   uint32_t sort_length,maxcount;
00715   ha_rows count;
00716   internal::my_off_t to_start_filepos= 0;
00717   unsigned char *strpos;
00718   BUFFPEK *buffpek;
00719   priority_queue<BUFFPEK *, vector<BUFFPEK *>, compare_functor > 
00720     queue(compare_functor((qsort2_cmp) info->key_cmp, static_cast<void *>(info)));
00721   volatile int *killed= killed_ptr(info->sort_info->param);
00722 
00723   count=error=0;
00724   maxcount=keys/((uint) (Tb-Fb) +1);
00725   assert(maxcount > 0);
00726   if (to_file)
00727     to_start_filepos=my_b_tell(to_file);
00728   strpos=(unsigned char*) sort_keys;
00729   sort_length=info->key_length;
00730 
00731   for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
00732   {
00733     count+= buffpek->count;
00734     buffpek->base= strpos;
00735     buffpek->max_keys=maxcount;
00736     strpos+= (uint) (error=(int) info->read_to_buffer(from_file,buffpek,
00737                                                       sort_length));
00738     if (error == -1)
00739       goto err;
00740     queue.push(buffpek);
00741   }
00742 
00743   while (queue.size() > 1)
00744   {
00745     for (;;)
00746     {
00747       if (*killed)
00748       {
00749         error=1; goto err;
00750       }
00751       buffpek= queue.top();
00752       if (to_file)
00753       {
00754         if (info->write_key(info,to_file,(unsigned char*) buffpek->key,
00755                             (uint) sort_length,1))
00756         {
00757           error=1; goto err;
00758         }
00759       }
00760       else
00761       {
00762         if ((*info->key_write)(info,(void*) buffpek->key))
00763         {
00764           error=1; goto err;
00765         }
00766       }
00767       buffpek->key+=sort_length;
00768       if (! --buffpek->mem_count)
00769       {
00770         if (!(error=(int) info->read_to_buffer(from_file,buffpek,sort_length)))
00771         {
00772           queue.pop();
00773           break;                /* One buffer have been removed */
00774         }
00775       }
00776       else if (error == -1)
00777         goto err;
00778       /* Top element has been replaced */
00779       queue.pop();
00780       queue.push(buffpek);
00781     }
00782   }
00783   buffpek= queue.top();
00784   buffpek->base=(unsigned char *) sort_keys;
00785   buffpek->max_keys=keys;
00786   do
00787   {
00788     if (to_file)
00789     {
00790       if (info->write_key(info,to_file,(unsigned char*) buffpek->key,
00791                          sort_length,buffpek->mem_count))
00792       {
00793         error=1; goto err;
00794       }
00795     }
00796     else
00797     {
00798       register unsigned char *end;
00799       strpos= buffpek->key;
00800       for (end=strpos+buffpek->mem_count*sort_length;
00801            strpos != end ;
00802            strpos+=sort_length)
00803       {
00804         if ((*info->key_write)(info,(void*) strpos))
00805         {
00806           error=1; goto err;
00807         }
00808       }
00809     }
00810   }
00811   while ((error=(int) info->read_to_buffer(from_file,buffpek,sort_length)) != -1 &&
00812          error != 0);
00813 
00814   lastbuff->count=count;
00815   if (to_file)
00816     lastbuff->file_pos=to_start_filepos;
00817 err:
00818   return(error);
00819 } /* merge_buffers */
00820 
00821 
00822         /* Do a merge to output-file (save only positions) */
00823 
00824 static int
00825 merge_index(MI_SORT_PARAM *info, uint32_t keys, unsigned char **sort_keys,
00826             BUFFPEK *buffpek, int maxbuffer, internal::IO_CACHE *tempfile)
00827 {
00828   if (merge_buffers(info,keys,tempfile,(internal::IO_CACHE*) 0,sort_keys,buffpek,buffpek,
00829                     buffpek+maxbuffer))
00830     return(1);
00831   return(0);
00832 } /* merge_index */
00833