Drizzled Public API Documentation

sql_load.cc
1 /* Copyright (C) 2000-2006 MySQL AB
2  Copyright (C) 2011 Stewart Smith
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
16 
17 
18 /* Copy data from a textfile to table */
19 
20 #include <config.h>
21 
22 #include <drizzled/sql_load.h>
23 #include <drizzled/error.h>
24 #include <drizzled/catalog/local.h>
25 #include <drizzled/session.h>
26 #include <drizzled/sql_base.h>
27 #include <drizzled/field/epoch.h>
28 #include <drizzled/internal/my_sys.h>
29 #include <drizzled/internal/iocache.h>
30 #include <drizzled/plugin/storage_engine.h>
31 #include <drizzled/sql_lex.h>
32 #include <drizzled/copy_info.h>
33 #include <drizzled/file_exchange.h>
34 #include <drizzled/util/test.h>
35 #include <drizzled/session/transactions.h>
36 
37 #include <sys/stat.h>
38 #include <fcntl.h>
39 #include <algorithm>
40 #include <climits>
41 #include <boost/filesystem.hpp>
42 
43 namespace fs=boost::filesystem;
44 using namespace std;
45 namespace drizzled
46 {
47 
48 class READ_INFO {
49  int cursor;
50  unsigned char *buffer; /* Buffer for read text */
51  unsigned char *end_of_buff; /* Data in bufferts ends here */
52  size_t buff_length; /* Length of buffert */
53  size_t max_length; /* Max length of row */
54  char *field_term_ptr,*line_term_ptr,*line_start_ptr,*line_start_end;
55  uint field_term_length,line_term_length,enclosed_length;
56  int field_term_char,line_term_char,enclosed_char,escape_char;
57  int *stack,*stack_pos;
58  bool found_end_of_line,start_of_line,eof;
59  bool need_end_io_cache;
61 
62 public:
63  bool error,line_cuted,found_null,enclosed;
64  unsigned char *row_start, /* Found row starts here */
65  *row_end; /* Found row ends here */
66  const charset_info_st *read_charset;
67 
68  READ_INFO(int cursor, size_t tot_length, const charset_info_st * const cs,
69  String &field_term,String &line_start,String &line_term,
70  String &enclosed,int escape, bool is_fifo);
71  ~READ_INFO();
72  int read_field();
73  int read_fixed_length(void);
74  int next_line(void);
75  char unescape(char chr);
76  int terminator(char *ptr,uint32_t length);
77  bool find_start_of_fields();
78 
79  /*
80  We need to force cache close before destructor is invoked to log
81  the last read block
82  */
83  void end_io_cache()
84  {
85  cache.end_io_cache();
86  need_end_io_cache = 0;
87  }
88 
89  /*
90  Either this method, or we need to make cache public
91  Arg must be set from load() since constructor does not see
92  either the table or Session value
93  */
94  void set_io_cache_arg(void* arg) { cache.arg = arg; }
95 };
96 
97 static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
98  List<Item> &fields_vars, List<Item> &set_fields,
99  List<Item> &set_values, READ_INFO &read_info,
100  uint32_t skip_lines,
101  bool ignore_check_option_errors);
102 static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
103  List<Item> &fields_vars, List<Item> &set_fields,
104  List<Item> &set_values, READ_INFO &read_info,
105  String &enclosed, uint32_t skip_lines,
106  bool ignore_check_option_errors);
107 
108 
109 /*
110  Execute LOAD DATA query
111 
112  SYNOPSYS
113  load()
114  session - current thread
115  ex - file_exchange object representing source cursor and its parsing rules
116  table_list - list of tables to which we are loading data
117  fields_vars - list of fields and variables to which we read
118  data from cursor
119  set_fields - list of fields mentioned in set clause
120  set_values - expressions to assign to fields in previous list
121  handle_duplicates - indicates whenever we should emit error or
122  replace row if we will meet duplicates.
123  ignore - - indicates whenever we should ignore duplicates
124 
125  RETURN VALUES
126  true - error / false - success
127 */
128 
129 int load(Session *session,file_exchange *ex,TableList *table_list,
130  List<Item> &fields_vars, List<Item> &set_fields,
131  List<Item> &set_values,
132  enum enum_duplicates handle_duplicates, bool ignore)
133 {
134  int file;
135  Table *table= NULL;
136  int error;
137  String *field_term=ex->field_term,*escaped=ex->escaped;
138  String *enclosed=ex->enclosed;
139  bool is_fifo=0;
140 
141  assert(table_list->getSchemaName()); // This should never be null
142 
143  /*
144  If path for cursor is not defined, we will use the current database.
145  If this is not set, we will use the directory where the table to be
146  loaded is located
147  */
148  util::string::ptr schema(session->schema());
149  const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
150  assert(tdb);
151  uint32_t skip_lines= ex->skip_lines;
152  bool transactional_table;
153 
154  /* Escape and enclosed character may be a utf8 4-byte character */
155  if (escaped->length() > 4 || enclosed->length() > 4)
156  {
157  my_error(ER_WRONG_FIELD_TERMINATORS,MYF(0),enclosed->c_ptr(), enclosed->length());
158  return true;
159  }
160 
161  if (session->openTablesLock(table_list))
162  return true;
163 
164  if (setup_tables_and_check_access(session, &session->lex().select_lex.context,
165  &session->lex().select_lex.top_join_list,
166  table_list,
167  &session->lex().select_lex.leaf_tables, true))
168  return(-1);
169 
170  /*
171  Let us emit an error if we are loading data to table which is used
172  in subselect in SET clause like we do it for INSERT.
173 
174  The main thing to fix to remove this restriction is to ensure that the
175  table is marked to be 'used for insert' in which case we should never
176  mark this table as 'const table' (ie, one that has only one row).
177  */
178  if (unique_table(table_list, table_list->next_global))
179  {
180  my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
181  return true;
182  }
183 
184  table= table_list->table;
185  transactional_table= table->cursor->has_transactions();
186 
187  if (!fields_vars.size())
188  {
189  Field **field;
190  for (field= table->getFields(); *field ; field++)
191  fields_vars.push_back(new Item_field(*field));
192  table->setWriteSet();
193  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
194  /*
195  Let us also prepare SET clause, altough it is probably empty
196  in this case.
197  */
198  if (setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
199  setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
200  return true;
201  }
202  else
203  { // Part field list
204  /* TODO: use this conds for 'WITH CHECK OPTIONS' */
205  if (setup_fields(session, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
206  setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
207  check_that_all_fields_are_given_values(session, table, table_list))
208  return true;
209  /*
210  Check whenever TIMESTAMP field with auto-set feature specified
211  explicitly.
212  */
213  if (table->timestamp_field)
214  {
215  if (table->isWriteSet(table->timestamp_field->position()))
216  {
217  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
218  }
219  else
220  {
221  table->setWriteSet(table->timestamp_field->position());
222  }
223  }
224  /* Fix the expressions in SET clause */
225  if (setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
226  return true;
227  }
228 
229  table->mark_columns_needed_for_insert();
230 
231  size_t tot_length=0;
232  bool use_blobs= 0, use_vars= 0;
233  List<Item>::iterator it(fields_vars.begin());
234  Item *item;
235 
236  while ((item= it++))
237  {
238  Item *real_item= item->real_item();
239 
240  if (real_item->type() == Item::FIELD_ITEM)
241  {
242  Field *field= ((Item_field*)real_item)->field;
243  if (field->flags & BLOB_FLAG)
244  {
245  use_blobs= 1;
246  tot_length+= 256; // Will be extended if needed
247  }
248  else
249  tot_length+= field->field_length;
250  }
251  else if (item->type() == Item::STRING_ITEM)
252  use_vars= 1;
253  }
254  if (use_blobs && !ex->line_term->length() && !field_term->length())
255  {
256  my_message(ER_BLOBS_AND_NO_TERMINATED,ER(ER_BLOBS_AND_NO_TERMINATED),
257  MYF(0));
258  return true;
259  }
260  if (use_vars && !field_term->length() && !enclosed->length())
261  {
262  my_error(ER_LOAD_FROM_FIXED_SIZE_ROWS_TO_VAR, MYF(0));
263  return true;
264  }
265 
266  fs::path to_file(ex->file_name);
267  fs::path target_path(fs::system_complete(catalog::local_identifier().getPath()));
268  if (not to_file.has_root_directory())
269  {
270  int count_elements= 0;
271  for (fs::path::iterator iter= to_file.begin();
272  iter != to_file.end();
273  ++iter, ++count_elements)
274  { }
275 
276  if (count_elements == 1)
277  {
278  target_path /= tdb;
279  }
280  target_path /= to_file;
281  }
282  else
283  {
284  target_path= to_file;
285  }
286 
287  if (not secure_file_priv.string().empty())
288  {
289  if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
290  {
291  /* Read only allowed from within dir specified by secure_file_priv */
292  my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
293  return true;
294  }
295  }
296 
297  struct stat stat_info;
298  if (stat(target_path.file_string().c_str(), &stat_info))
299  {
300  my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
301  return true;
302  }
303 
304  // if we are not in slave thread, the cursor must be:
305  if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
306  (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
307  ((stat_info.st_mode & S_IFREG) == S_IFREG ||
308  (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
309  {
310  my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
311  return true;
312  }
313  if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
314  is_fifo = 1;
315 
316 
317  if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
318  {
319  my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
320  return true;
321  }
322  CopyInfo info;
323  memset(&info, 0, sizeof(info));
324  info.ignore= ignore;
325  info.handle_duplicates=handle_duplicates;
326  info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
327 
328  identifier::Schema identifier(session->catalog().identifier(),
329  *schema);
330  READ_INFO read_info(file, tot_length,
331  ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
332  *field_term, *ex->line_start, *ex->line_term, *enclosed,
333  info.escape_char, is_fifo);
334  if (read_info.error)
335  {
336  if (file >= 0)
337  internal::my_close(file,MYF(0)); // no files in net reading
338  return true; // Can't allocate buffers
339  }
340 
341  /*
342  * Per the SQL standard, inserting NULL into a NOT NULL
343  * field requires an error to be thrown.
344  *
345  * @NOTE
346  *
347  * NULL check and handling occurs in field_conv.cc
348  */
349  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
350  session->cuted_fields=0L;
351  /* Skip lines if there is a line terminator */
352  if (ex->line_term->length())
353  {
354  /* ex->skip_lines needs to be preserved for logging */
355  while (skip_lines > 0)
356  {
357  skip_lines--;
358  if (read_info.next_line())
359  break;
360  }
361  }
362 
363  if (!(error=test(read_info.error)))
364  {
365 
367  if (ignore ||
368  handle_duplicates == DUP_REPLACE)
369  table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
370  if (handle_duplicates == DUP_REPLACE)
371  table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
372  table->cursor->ha_start_bulk_insert((ha_rows) 0);
373  table->copy_blobs=1;
374 
375  session->setAbortOnWarning(true);
376 
377  if (!field_term->length() && !enclosed->length())
378  error= read_fixed_length(session, info, table_list, fields_vars,
379  set_fields, set_values, read_info,
380  skip_lines, ignore);
381  else
382  error= read_sep_field(session, info, table_list, fields_vars,
383  set_fields, set_values, read_info,
384  *enclosed, skip_lines, ignore);
385  if (table->cursor->ha_end_bulk_insert() && !error)
386  {
387  table->print_error(errno, MYF(0));
388  error= 1;
389  }
390  table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
391  table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
392  table->next_number_field=0;
393  }
394  if (file >= 0)
395  internal::my_close(file,MYF(0));
396  free_blobs(table); /* if pack_blob was used */
397  table->copy_blobs=0;
398  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
399 
400  if (error)
401  {
402  error= -1; // Error on read
403  goto err;
404  }
405 
406  char msg[FN_REFLEN];
407  snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
408  (info.records - info.copied), session->cuted_fields);
409 
410  if (session->transaction.stmt.hasModifiedNonTransData())
411  session->transaction.all.markModifiedNonTransData();
412 
413  session->my_ok(info.copied + info.deleted, 0, 0L, msg);
414 err:
415  assert(transactional_table || !(info.copied || info.deleted) ||
416  session->transaction.stmt.hasModifiedNonTransData());
418  table->auto_increment_field_not_null= false;
419  session->setAbortOnWarning(false);
420 
421  return(error);
422 }
423 
424 
425 /****************************************************************************
426 ** Read of rows of fixed size + optional garage + optonal newline
427 ****************************************************************************/
428 
429 static int
430 read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
431  List<Item> &fields_vars, List<Item> &set_fields,
432  List<Item> &set_values, READ_INFO &read_info,
433  uint32_t skip_lines, bool ignore_check_option_errors)
434 {
435  List<Item>::iterator it(fields_vars.begin());
436  Item_field *sql_field;
437  Table *table= table_list->table;
438  bool err;
439 
440  while (!read_info.read_fixed_length())
441  {
442  if (session->getKilled())
443  {
444  session->send_kill_message();
445  return 1;
446  }
447  if (skip_lines)
448  {
449  /*
450  We could implement this with a simple seek if:
451  - We are not using DATA INFILE LOCAL
452  - escape character is ""
453  - line starting prefix is ""
454  */
455  skip_lines--;
456  continue;
457  }
458  it= fields_vars.begin();
459  unsigned char *pos=read_info.row_start;
460 #ifdef HAVE_VALGRIND
461  read_info.row_end[0]=0;
462 #endif
463 
464  table->restoreRecordAsDefault();
465  /*
466  There is no variables in fields_vars list in this format so
467  this conversion is safe.
468  */
469  while ((sql_field= (Item_field*) it++))
470  {
471  Field *field= sql_field->field;
472  if (field == table->next_number_field)
473  table->auto_increment_field_not_null= true;
474  /*
475  No fields specified in fields_vars list can be null in this format.
476  Mark field as not null, we should do this for each row because of
477  restore_record...
478  */
479  field->set_notnull();
480 
481  if (pos == read_info.row_end)
482  {
483  session->cuted_fields++; /* Not enough fields */
484  push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
485  ER_WARN_TOO_FEW_RECORDS,
486  ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
487 
488  if (not field->maybe_null() and field->is_timestamp())
489  ((field::Epoch::pointer) field)->set_time();
490  }
491  else
492  {
493  uint32_t length;
494  unsigned char save_chr;
495  if ((length=(uint32_t) (read_info.row_end-pos)) >
496  field->field_length)
497  {
498  length=field->field_length;
499  }
500  save_chr=pos[length];
501  pos[length]='\0'; // Add temp null terminator for store()
502  field->store((char*) pos,length,read_info.read_charset);
503  pos[length]=save_chr;
504  if ((pos+=length) > read_info.row_end)
505  pos= read_info.row_end; /* Fills rest with space */
506  }
507  }
508  if (pos != read_info.row_end)
509  {
510  session->cuted_fields++; /* To long row */
511  push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
512  ER_WARN_TOO_MANY_RECORDS,
513  ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
514  }
515 
516  if (session->getKilled() ||
517  fill_record(session, set_fields, set_values,
518  ignore_check_option_errors))
519  return 1;
520 
521  err= write_record(session, table, &info);
522  table->auto_increment_field_not_null= false;
523  if (err)
524  return 1;
525 
526  /*
527  We don't need to reset auto-increment field since we are restoring
528  its default value at the beginning of each loop iteration.
529  */
530  if (read_info.next_line()) // Skip to next line
531  break;
532  if (read_info.line_cuted)
533  {
534  session->cuted_fields++; /* To long row */
535  push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
536  ER_WARN_TOO_MANY_RECORDS,
537  ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
538  }
539  session->row_count++;
540  }
541  return(test(read_info.error));
542 }
543 
544 
545 
546 static int
547 read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
548  List<Item> &fields_vars, List<Item> &set_fields,
549  List<Item> &set_values, READ_INFO &read_info,
550  String &enclosed, uint32_t skip_lines,
551  bool ignore_check_option_errors)
552 {
553  List<Item>::iterator it(fields_vars.begin());
554  Item *item;
555  Table *table= table_list->table;
556  uint32_t enclosed_length;
557  bool err;
558 
559  enclosed_length=enclosed.length();
560 
561  for (;;it= fields_vars.begin())
562  {
563  if (session->getKilled())
564  {
565  session->send_kill_message();
566  return 1;
567  }
568 
569  table->restoreRecordAsDefault();
570 
571  while ((item= it++))
572  {
573  uint32_t length;
574  unsigned char *pos;
575  Item *real_item;
576 
577  if (read_info.read_field())
578  break;
579 
580  /* If this line is to be skipped we don't want to fill field or var */
581  if (skip_lines)
582  continue;
583 
584  pos=read_info.row_start;
585  length=(uint32_t) (read_info.row_end-pos);
586 
587  real_item= item->real_item();
588 
589  if ((!read_info.enclosed && (enclosed_length && length == 4 && !memcmp(pos, STRING_WITH_LEN("NULL")))) ||
590  (length == 1 && read_info.found_null))
591  {
592 
593  if (real_item->type() == Item::FIELD_ITEM)
594  {
595  Field *field= ((Item_field *)real_item)->field;
596  if (field->reset())
597  {
598  my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0), field->field_name,
599  session->row_count);
600  return 1;
601  }
602  field->set_null();
603  if (not field->maybe_null())
604  {
605  if (field->is_timestamp())
606  {
607  ((field::Epoch::pointer) field)->set_time();
608  }
609  else if (field != table->next_number_field)
610  {
611  field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN, ER_WARN_NULL_TO_NOTNULL, 1);
612  }
613  }
614  }
615  else if (item->type() == Item::STRING_ITEM)
616  {
617  ((Item_user_var_as_out_param *)item)->set_null_value(
618  read_info.read_charset);
619  }
620  else
621  {
622  my_error(ER_LOAD_DATA_INVALID_COLUMN, MYF(0), item->full_name());
623  return 1;
624  }
625 
626  continue;
627  }
628 
629  if (real_item->type() == Item::FIELD_ITEM)
630  {
631  Field *field= ((Item_field *)real_item)->field;
632  field->set_notnull();
633  read_info.row_end[0]=0; // Safe to change end marker
634  if (field == table->next_number_field)
635  table->auto_increment_field_not_null= true;
636  field->store((char*) pos, length, read_info.read_charset);
637  }
638  else if (item->type() == Item::STRING_ITEM)
639  {
640  ((Item_user_var_as_out_param *)item)->set_value(str_ref((char*) pos, length), read_info.read_charset);
641  }
642  else
643  {
644  my_error(ER_LOAD_DATA_INVALID_COLUMN, MYF(0), item->full_name());
645  return 1;
646  }
647  }
648  if (read_info.error)
649  break;
650  if (skip_lines)
651  {
652  skip_lines--;
653  continue;
654  }
655  if (item)
656  {
657  /* Have not read any field, thus input cursor is simply ended */
658  if (item == &fields_vars.front())
659  break;
660  for (; item ; item= it++)
661  {
662  Item *real_item= item->real_item();
663  if (real_item->type() == Item::FIELD_ITEM)
664  {
665  Field *field= ((Item_field *)real_item)->field;
666  if (field->reset())
667  {
668  my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0),field->field_name,
669  session->row_count);
670  return 1;
671  }
672  if (not field->maybe_null() and field->is_timestamp())
673  ((field::Epoch::pointer) field)->set_time();
674  /*
675  QQ: We probably should not throw warning for each field.
676  But how about intention to always have the same number
677  of warnings in Session::cuted_fields (and get rid of cuted_fields
678  in the end ?)
679  */
680  session->cuted_fields++;
681  push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
682  ER_WARN_TOO_FEW_RECORDS,
683  ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
684  }
685  else if (item->type() == Item::STRING_ITEM)
686  {
687  ((Item_user_var_as_out_param *)item)->set_null_value(
688  read_info.read_charset);
689  }
690  else
691  {
692  my_error(ER_LOAD_DATA_INVALID_COLUMN, MYF(0), item->full_name());
693  return 1;
694  }
695  }
696  }
697 
698  if (session->getKilled() ||
699  fill_record(session, set_fields, set_values,
700  ignore_check_option_errors))
701  return 1;
702 
703  err= write_record(session, table, &info);
704  table->auto_increment_field_not_null= false;
705  if (err)
706  return 1;
707  /*
708  We don't need to reset auto-increment field since we are restoring
709  its default value at the beginning of each loop iteration.
710  */
711  if (read_info.next_line()) // Skip to next line
712  break;
713  if (read_info.line_cuted)
714  {
715  session->cuted_fields++; /* To long row */
716  push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
717  ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
718  session->row_count);
719  if (session->getKilled())
720  return 1;
721  }
722  session->row_count++;
723  }
724  return(test(read_info.error));
725 }
726 
727 
728 /* Unescape all escape characters, mark \N as null */
729 
730 char
731 READ_INFO::unescape(char chr)
732 {
733  /* keep this switch synchornous with the ESCAPE_CHARS macro */
734  switch(chr) {
735  case 'n': return '\n';
736  case 't': return '\t';
737  case 'r': return '\r';
738  case 'b': return '\b';
739  case '0': return 0; // Ascii null
740  case 'Z': return '\032'; // Win32 end of cursor
741  case 'N': found_null=1;
742 
743  /* fall through */
744  default: return chr;
745  }
746 }
747 
748 
749 /*
750  Read a line using buffering
751  If last line is empty (in line mode) then it isn't outputed
752 */
753 
754 
755 READ_INFO::READ_INFO(int file_par, size_t tot_length,
756  const charset_info_st * const cs,
757  String &field_term, String &line_start, String &line_term,
758  String &enclosed_par, int escape, bool is_fifo)
759  :cursor(file_par),escape_char(escape)
760 {
761  read_charset= cs;
762  field_term_ptr=(char*) field_term.ptr();
763  field_term_length= field_term.length();
764  line_term_ptr=(char*) line_term.ptr();
765  line_term_length= line_term.length();
766  if (line_start.length() == 0)
767  {
768  line_start_ptr=0;
769  start_of_line= 0;
770  }
771  else
772  {
773  line_start_ptr=(char*) line_start.ptr();
774  line_start_end=line_start_ptr+line_start.length();
775  start_of_line= 1;
776  }
777  /* If field_terminator == line_terminator, don't use line_terminator */
778  if (field_term_length == line_term_length &&
779  !memcmp(field_term_ptr,line_term_ptr,field_term_length))
780  {
781  line_term_length=0;
782  line_term_ptr=(char*) "";
783  }
784  enclosed_char= (enclosed_length=enclosed_par.length()) ?
785  (unsigned char) enclosed_par[0] : INT_MAX;
786  field_term_char= field_term_length ? (unsigned char) field_term_ptr[0] : INT_MAX;
787  line_term_char= line_term_length ? (unsigned char) line_term_ptr[0] : INT_MAX;
788  error=eof=found_end_of_line=found_null=line_cuted=0;
789  buff_length=tot_length;
790 
791 
792  /* Set of a stack for unget if long terminators */
793  size_t length= max(field_term_length,line_term_length)+1;
794  set_if_bigger(length, line_start.length());
795  stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
796 
797  if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
798  error=1;
799  else
800  {
801  end_of_buff=buffer+buff_length;
802  if (cache.init_io_cache((false) ? -1 : cursor, 0,
803  (false) ? internal::READ_NET :
804  (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
805  MYF(MY_WME)))
806  {
807  free((unsigned char*) buffer);
808  error=1;
809  }
810  else
811  {
812  /*
813  init_io_cache() will not initialize read_function member
814  if the cache is READ_NET. So we work around the problem with a
815  manual assignment
816  */
817  need_end_io_cache = 1;
818  }
819  }
820 }
821 
822 
823 READ_INFO::~READ_INFO()
824 {
825  if (!error)
826  {
827  if (need_end_io_cache)
828  cache.end_io_cache();
829  free(buffer);
830  error=1;
831  }
832 }
833 
834 
835 #define GET (stack_pos != stack ? *--stack_pos : cache.get())
836 #define PUSH(A) *(stack_pos++)=(A)
837 
838 
839 inline int READ_INFO::terminator(char *ptr,uint32_t length)
840 {
841  int chr=0; // Keep gcc happy
842  uint32_t i;
843  for (i=1 ; i < length ; i++)
844  {
845  if ((chr=GET) != *++ptr)
846  {
847  break;
848  }
849  }
850  if (i == length)
851  return 1;
852  PUSH(chr);
853  while (i-- > 1)
854  PUSH((unsigned char) *--ptr);
855  return 0;
856 }
857 
858 
859 int READ_INFO::read_field()
860 {
861  int chr,found_enclosed_char;
862  unsigned char *to,*new_buffer;
863 
864  found_null=0;
865  if (found_end_of_line)
866  return 1; // One have to call next_line
867 
868  /* Skip until we find 'line_start' */
869 
870  if (start_of_line)
871  { // Skip until line_start
872  start_of_line=0;
873  if (find_start_of_fields())
874  return 1;
875  }
876  if ((chr=GET) == my_b_EOF)
877  {
878  found_end_of_line=eof=1;
879  return 1;
880  }
881  to=buffer;
882  if (chr == enclosed_char)
883  {
884  found_enclosed_char=enclosed_char;
885  *to++=(unsigned char) chr; // If error
886  }
887  else
888  {
889  found_enclosed_char= INT_MAX;
890  PUSH(chr);
891  }
892 
893  for (;;)
894  {
895  while ( to < end_of_buff)
896  {
897  chr = GET;
898  if ((my_mbcharlen(read_charset, chr) > 1) &&
899  to+my_mbcharlen(read_charset, chr) <= end_of_buff)
900  {
901  unsigned char* p = (unsigned char*)to;
902  *to++ = chr;
903  int ml = my_mbcharlen(read_charset, chr);
904  int i;
905  for (i=1; i<ml; i++) {
906  chr = GET;
907  if (chr == my_b_EOF)
908  goto found_eof;
909  *to++ = chr;
910  }
911  if (my_ismbchar(read_charset,
912  (const char *)p,
913  (const char *)to))
914  continue;
915  for (i=0; i<ml; i++)
916  PUSH((unsigned char) *--to);
917  chr = GET;
918  }
919  if (chr == my_b_EOF)
920  goto found_eof;
921  if (chr == escape_char)
922  {
923  if ((chr=GET) == my_b_EOF)
924  {
925  *to++= (unsigned char) escape_char;
926  goto found_eof;
927  }
928  /*
929  When escape_char == enclosed_char, we treat it like we do for
930  handling quotes in SQL parsing -- you can double-up the
931  escape_char to include it literally, but it doesn't do escapes
932  like \n. This allows: LOAD DATA ... ENCLOSED BY '"' ESCAPED BY '"'
933  with data like: "fie""ld1", "field2"
934  */
935  if (escape_char != enclosed_char || chr == escape_char)
936  {
937  *to++ = (unsigned char) unescape((char) chr);
938  continue;
939  }
940  PUSH(chr);
941  chr= escape_char;
942  }
943 #ifdef ALLOW_LINESEPARATOR_IN_STRINGS
944  if (chr == line_term_char)
945 #else
946  if (chr == line_term_char && found_enclosed_char == INT_MAX)
947 #endif
948  {
949  if (terminator(line_term_ptr,line_term_length))
950  { // Maybe unexpected linefeed
951  enclosed=0;
952  found_end_of_line=1;
953  row_start=buffer;
954  row_end= to;
955  return 0;
956  }
957  }
958  if (chr == found_enclosed_char)
959  {
960  if ((chr=GET) == found_enclosed_char)
961  { // Remove dupplicated
962  *to++ = (unsigned char) chr;
963  continue;
964  }
965  // End of enclosed field if followed by field_term or line_term
966  if (chr == my_b_EOF ||
967  (chr == line_term_char && terminator(line_term_ptr, line_term_length)))
968  { // Maybe unexpected linefeed
969  enclosed=1;
970  found_end_of_line=1;
971  row_start=buffer+1;
972  row_end= to;
973  return 0;
974  }
975  if (chr == field_term_char &&
976  terminator(field_term_ptr,field_term_length))
977  {
978  enclosed=1;
979  row_start=buffer+1;
980  row_end= to;
981  return 0;
982  }
983  /*
984  The string didn't terminate yet.
985  Store back next character for the loop
986  */
987  PUSH(chr);
988  /* copy the found term character to 'to' */
989  chr= found_enclosed_char;
990  }
991  else if (chr == field_term_char && found_enclosed_char == INT_MAX)
992  {
993  if (terminator(field_term_ptr,field_term_length))
994  {
995  enclosed=0;
996  row_start=buffer;
997  row_end= to;
998  return 0;
999  }
1000  }
1001  *to++ = (unsigned char) chr;
1002  }
1003  /*
1004  ** We come here if buffer is too small. Enlarge it and continue
1005  */
1006  new_buffer=(unsigned char*) realloc(buffer, buff_length+1+IO_SIZE);
1007  to=new_buffer + (to-buffer);
1008  buffer=new_buffer;
1009  buff_length+=IO_SIZE;
1010  end_of_buff=buffer+buff_length;
1011  }
1012 
1013 found_eof:
1014  enclosed=0;
1015  found_end_of_line=eof=1;
1016  row_start=buffer;
1017  row_end=to;
1018  return 0;
1019 }
1020 
1021 /*
1022  Read a row with fixed length.
1023 
1024  NOTES
1025  The row may not be fixed size on disk if there are escape
1026  characters in the cursor.
1027 
1028  IMPLEMENTATION NOTE
1029  One can't use fixed length with multi-byte charset **
1030 
1031  RETURN
1032  0 ok
1033  1 error
1034 */
1035 
1036 int READ_INFO::read_fixed_length()
1037 {
1038  int chr;
1039  unsigned char *to;
1040  if (found_end_of_line)
1041  return 1; // One have to call next_line
1042 
1043  if (start_of_line)
1044  { // Skip until line_start
1045  start_of_line=0;
1046  if (find_start_of_fields())
1047  return 1;
1048  }
1049 
1050  to=row_start=buffer;
1051  while (to < end_of_buff)
1052  {
1053  if ((chr=GET) == my_b_EOF)
1054  goto found_eof;
1055  if (chr == escape_char)
1056  {
1057  if ((chr=GET) == my_b_EOF)
1058  {
1059  *to++= (unsigned char) escape_char;
1060  goto found_eof;
1061  }
1062  *to++ =(unsigned char) unescape((char) chr);
1063  continue;
1064  }
1065  if (chr == line_term_char)
1066  {
1067  if (terminator(line_term_ptr,line_term_length))
1068  { // Maybe unexpected linefeed
1069  found_end_of_line=1;
1070  row_end= to;
1071  return 0;
1072  }
1073  }
1074  *to++ = (unsigned char) chr;
1075  }
1076  row_end=to; // Found full line
1077  return 0;
1078 
1079 found_eof:
1080  found_end_of_line=eof=1;
1081  row_start=buffer;
1082  row_end=to;
1083  return to == buffer ? 1 : 0;
1084 }
1085 
1086 
1087 int READ_INFO::next_line()
1088 {
1089  line_cuted=0;
1090  start_of_line= line_start_ptr != 0;
1091  if (found_end_of_line || eof)
1092  {
1093  found_end_of_line=0;
1094  return eof;
1095  }
1096  found_end_of_line=0;
1097  if (!line_term_length)
1098  return 0; // No lines
1099  for (;;)
1100  {
1101  int chr = GET;
1102  if (my_mbcharlen(read_charset, chr) > 1)
1103  {
1104  for (uint32_t i=1;
1105  chr != my_b_EOF && i<my_mbcharlen(read_charset, chr);
1106  i++)
1107  chr = GET;
1108  if (chr == escape_char)
1109  continue;
1110  }
1111  if (chr == my_b_EOF)
1112  {
1113  eof=1;
1114  return 1;
1115  }
1116  if (chr == escape_char)
1117  {
1118  line_cuted=1;
1119  if (GET == my_b_EOF)
1120  return 1;
1121  continue;
1122  }
1123  if (chr == line_term_char && terminator(line_term_ptr,line_term_length))
1124  return 0;
1125  line_cuted=1;
1126  }
1127 }
1128 
1129 
1130 bool READ_INFO::find_start_of_fields()
1131 {
1132  int chr;
1133  try_again:
1134  do
1135  {
1136  if ((chr=GET) == my_b_EOF)
1137  {
1138  found_end_of_line=eof=1;
1139  return 1;
1140  }
1141  } while ((char) chr != line_start_ptr[0]);
1142  for (char *ptr=line_start_ptr+1 ; ptr != line_start_end ; ptr++)
1143  {
1144  chr=GET; // Eof will be checked later
1145  if ((char) chr != *ptr)
1146  { // Can't be line_start
1147  PUSH(chr);
1148  while (--ptr != line_start_ptr)
1149  { // Restart with next char
1150  PUSH((unsigned char) *ptr);
1151  }
1152  goto try_again;
1153  }
1154  }
1155  return 0;
1156 }
1157 
1158 
1159 } /* namespace drizzled */