Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00030 #include <config.h>
00031
00032 #include <drizzled/sql_select.h>
00033 #include <drizzled/field/blob.h>
00034 #include <drizzled/drizzled.h>
00035 #include <drizzled/internal/my_sys.h>
00036 #include <drizzled/table.h>
00037 #include <drizzled/session.h>
00038
00039 #include <algorithm>
00040
00041 using namespace std;
00042
00043 namespace drizzled
00044 {
00045
00046 static uint32_t used_blob_length(CacheField **ptr);
00047
00048 static uint32_t used_blob_length(CacheField **ptr)
00049 {
00050 uint32_t length,blob_length;
00051 for (length=0 ; *ptr ; ptr++)
00052 {
00053 (*ptr)->blob_length=blob_length=(*ptr)->blob_field->get_length();
00054 length+=blob_length;
00055 (*ptr)->blob_field->get_ptr(&(*ptr)->str);
00056 }
00057 return length;
00058 }
00059
00060
00061
00062
00063
00064
00065
00066 int join_init_cache(Session *session, JoinTable *tables, uint32_t table_count)
00067 {
00068 unsigned int length, blobs;
00069 size_t size;
00070 CacheField *copy,**blob_ptr;
00071 JoinCache *cache;
00072 JoinTable *join_tab;
00073
00074 cache= &tables[table_count].cache;
00075 cache->fields=blobs=0;
00076
00077 join_tab= tables;
00078 for (unsigned int i= 0; i < table_count ; i++, join_tab++)
00079 {
00080 if (!join_tab->used_fieldlength)
00081 calc_used_field_length(session, join_tab);
00082 cache->fields+=join_tab->used_fields;
00083 blobs+=join_tab->used_blobs;
00084
00085
00086 if (join_tab->rowid_keep_flags & JoinTable::KEEP_ROWID)
00087 {
00088 cache->fields++;
00089 join_tab->used_fieldlength += join_tab->table->cursor->ref_length;
00090 }
00091 }
00092 if (!(cache->field=(CacheField*)
00093 memory::sql_alloc(sizeof(CacheField)*(cache->fields+table_count*2)+(blobs+1)* sizeof(CacheField*))))
00094 {
00095 size= cache->end - cache->buff;
00096 global_join_buffer.sub(size);
00097 free((unsigned char*) cache->buff);
00098 cache->buff=0;
00099 return(1);
00100 }
00101 copy=cache->field;
00102 blob_ptr=cache->blob_ptr=(CacheField**)
00103 (cache->field+cache->fields+table_count*2);
00104
00105 length=0;
00106 for (unsigned int i= 0 ; i < table_count ; i++)
00107 {
00108 uint32_t null_fields=0, used_fields;
00109 Field **f_ptr,*field;
00110 for (f_ptr= tables[i].table->getFields(), used_fields= tables[i].used_fields; used_fields; f_ptr++)
00111 {
00112 field= *f_ptr;
00113 if (field->isReadSet())
00114 {
00115 used_fields--;
00116 length+=field->fill_cache_field(copy);
00117 if (copy->blob_field)
00118 (*blob_ptr++)=copy;
00119 if (field->maybe_null())
00120 null_fields++;
00121 copy->get_rowid= NULL;
00122 copy++;
00123 }
00124 }
00125
00126 if (null_fields && tables[i].table->getNullFields())
00127 {
00128 copy->str= tables[i].table->null_flags;
00129 copy->length= tables[i].table->getShare()->null_bytes;
00130 copy->strip=0;
00131 copy->blob_field=0;
00132 copy->get_rowid= NULL;
00133 length+=copy->length;
00134 copy++;
00135 cache->fields++;
00136 }
00137
00138 if (tables[i].table->maybe_null)
00139 {
00140 copy->str= (unsigned char*) &tables[i].table->null_row;
00141 copy->length=sizeof(tables[i].table->null_row);
00142 copy->strip=0;
00143 copy->blob_field=0;
00144 copy->get_rowid= NULL;
00145 length+=copy->length;
00146 copy++;
00147 cache->fields++;
00148 }
00149
00150 if (tables[i].rowid_keep_flags & JoinTable::KEEP_ROWID)
00151 {
00152 copy->str= tables[i].table->cursor->ref;
00153 copy->length= tables[i].table->cursor->ref_length;
00154 copy->strip=0;
00155 copy->blob_field=0;
00156 copy->get_rowid= NULL;
00157 if (tables[i].rowid_keep_flags & JoinTable::CALL_POSITION)
00158 {
00159
00160 copy->get_rowid= tables[i].table;
00161
00162 tables[i].rowid_keep_flags&= ~((int)JoinTable::CALL_POSITION);
00163 }
00164 copy++;
00165 }
00166 }
00167
00168 cache->length= length+blobs*sizeof(char*);
00169 cache->blobs= blobs;
00170 *blob_ptr= NULL;
00171 size= max((size_t) session->variables.join_buff_size, (size_t)cache->length);
00172 if (not global_join_buffer.add(size))
00173 {
00174 my_error(ER_OUT_OF_GLOBAL_JOINMEMORY, MYF(ME_ERROR+ME_WAITTANG));
00175 return 1;
00176 }
00177 if (!(cache->buff= (unsigned char*) malloc(size)))
00178 return 1;
00179 cache->end= cache->buff+size;
00180 cache->reset_cache_write();
00181
00182 return 0;
00183 }
00184
00185 bool JoinCache::store_record_in_cache()
00186 {
00187 JoinCache *cache= this;
00188 unsigned char *local_pos;
00189 CacheField *copy,*end_field;
00190 bool last_record;
00191
00192 local_pos= cache->pos;
00193 end_field= cache->field+cache->fields;
00194
00195 {
00196 uint32_t local_length;
00197
00198 local_length= cache->length;
00199 if (cache->blobs)
00200 {
00201 local_length+= used_blob_length(cache->blob_ptr);
00202 }
00203
00204 if ((last_record= (local_length + cache->length > (size_t) (cache->end - local_pos))))
00205 {
00206 cache->ptr_record= cache->records;
00207 }
00208 }
00209
00210
00211
00212
00213 cache->records++;
00214 for (copy= cache->field; copy < end_field; copy++)
00215 {
00216 if (copy->blob_field)
00217 {
00218 if (last_record)
00219 {
00220 copy->blob_field->get_image(local_pos, copy->length+sizeof(char*), copy->blob_field->charset());
00221 local_pos+= copy->length+sizeof(char*);
00222 }
00223 else
00224 {
00225 copy->blob_field->get_image(local_pos, copy->length,
00226 copy->blob_field->charset());
00227 memcpy(local_pos + copy->length,copy->str,copy->blob_length);
00228 local_pos+= copy->length+copy->blob_length;
00229 }
00230 }
00231 else
00232 {
00233
00234 if (copy->get_rowid)
00235 copy->get_rowid->cursor->position(copy->get_rowid->getInsertRecord());
00236
00237 if (copy->strip)
00238 {
00239 unsigned char *str, *local_end;
00240 for (str= copy->str,local_end= str+copy->length; local_end > str && local_end[-1] == ' '; local_end--) {}
00241
00242 uint32_t local_length= (uint32_t) (local_end - str);
00243 memcpy(local_pos+2, str, local_length);
00244 int2store(local_pos, local_length);
00245 local_pos+= local_length+2;
00246 }
00247 else
00248 {
00249 memcpy(local_pos, copy->str, copy->length);
00250 local_pos+= copy->length;
00251 }
00252 }
00253 }
00254 cache->pos= local_pos;
00255 return last_record || (size_t) (cache->end - local_pos) < cache->length;
00256 }
00257
00258 void JoinCache::reset_cache_read()
00259 {
00260 record_nr= 0;
00261 pos= buff;
00262 }
00263
00264 void JoinCache::reset_cache_write()
00265 {
00266 reset_cache_read();
00267 records= 0;
00268 ptr_record= UINT32_MAX;
00269 }
00270
00275 }