Drizzled Public API Documentation

transaction_reader.cc
00001 /* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
00002  *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
00003  *
00004  *  Copyright (C) 2009 Sun Microsystems, Inc.
00005  *
00006  *  Authors:
00007  *
00008  *    Jay Pipes <joinfu@sun.com>
00009  *
00010  *  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; version 2 of the License.
00013  *
00014  *  This program is distributed in the hope that it will be useful,
00015  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00016  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017  *  GNU General Public License for more details.
00018  *
00019  *  You should have received a copy of the GNU General Public License
00020  *  along with this program; if not, write to the Free Software
00021  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00022  */
00023 
00024 #include <config.h>
00025 #include <sys/types.h>
00026 #include <sys/stat.h>
00027 #include <limits.h>
00028 #include <iostream>
00029 #include <string>
00030 #include <algorithm>
00031 #include <vector>
00032 #include <unistd.h>
00033 #include <drizzled/gettext.h>
00034 #include <drizzled/message/transaction.pb.h>
00035 #include <drizzled/message/statement_transform.h>
00036 #include "transaction_manager.h"
00037 #include "transaction_file_reader.h"
00038 #include "transaction_log_connection.h"
00039 
00040 #include <google/protobuf/io/coded_stream.h>
00041 #include <google/protobuf/io/zero_copy_stream_impl.h>
00042 
00043 #include <boost/program_options.hpp>
00044 
00045 using namespace std;
00046 using namespace google;
00047 using namespace drizzled;
00048 
00049 namespace po= boost::program_options;
00050 
00051 static const char *replace_with_spaces= "\n\r";
00052 
00053 static void printErrorMessage(enum message::TransformSqlError err)
00054 {
00055   switch (err)
00056   {
00057     case message::MISSING_HEADER:
00058     {
00059       cerr << "Data segment without a header\n";
00060       break;
00061     }
00062     case message::MISSING_DATA:
00063     {
00064       cerr << "Header segment without a data segment\n";
00065       break;
00066     }
00067     case message::UUID_MISMATCH:
00068     {
00069       cerr << "UUID on objects did not match\n";
00070       break;
00071     }
00072     default:
00073     {
00074       cerr << "Unhandled error\n";
00075       break;
00076     }
00077   }
00078 }
00079 
00089 static bool printStatement(const message::Statement &statement, string &output)
00090 {
00091   vector<string> sql_strings;
00092   enum message::TransformSqlError err;
00093 
00094   err= message::transformStatementToSql(statement,
00095                                         sql_strings,
00096                                         message::DRIZZLE,
00097                                         true /* already in transaction */);
00098 
00099   if (err != message::NONE)
00100   {
00101     printErrorMessage(err);
00102     return false;
00103   }
00104 
00105   for (vector<string>::iterator sql_string_iter= sql_strings.begin();
00106        sql_string_iter != sql_strings.end();
00107        ++sql_string_iter)
00108   {
00109     string &sql= *sql_string_iter;
00110 
00111     /*
00112      * Replace \n and \r with spaces so that SQL statements
00113      * are always on a single line
00114      */
00115     {
00116       string::size_type found= sql.find_first_of(replace_with_spaces);
00117       while (found != string::npos)
00118       {
00119         sql[found]= ' ';
00120         found= sql.find_first_of(replace_with_spaces, found);
00121       }
00122     }
00123 
00124     /*
00125      * Embedded NUL characters are a pain in the ass.
00126      */
00127     {
00128       string::size_type found= sql.find_first_of('\0');
00129       while (found != string::npos)
00130       {
00131         sql[found]= '\\';
00132         sql.insert(found + 1, 1, '0');
00133         found= sql.find_first_of('\0', found);
00134       }
00135     }
00136 
00137     output.append(sql + ";\n");
00138   }
00139   
00140   return true;
00141 }
00142 
00143 static bool isDDLStatement(const message::Statement &statement)
00144 {
00145   bool isDDL;
00146 
00147   switch (statement.type())
00148   {
00149     case (message::Statement::TRUNCATE_TABLE):
00150     case (message::Statement::CREATE_SCHEMA):
00151     case (message::Statement::ALTER_SCHEMA):
00152     case (message::Statement::DROP_SCHEMA):
00153     case (message::Statement::CREATE_TABLE):
00154     case (message::Statement::ALTER_TABLE):
00155     case (message::Statement::DROP_TABLE):
00156     case (message::Statement::RAW_SQL):
00157     {
00158       isDDL= true;
00159       break;
00160     }
00161     default:
00162     {
00163       isDDL= false;
00164       break;
00165     }
00166   }
00167   
00168   return isDDL;
00169 }
00170 
00171 static bool isEndStatement(const message::Statement &statement)
00172 {
00173   switch (statement.type())
00174   {
00175     case (message::Statement::INSERT):
00176     {
00177       const message::InsertData &data= statement.insert_data();
00178       if (not data.end_segment())
00179         return false;
00180       break;
00181     }
00182     case (message::Statement::UPDATE):
00183     {
00184       const message::UpdateData &data= statement.update_data();
00185       if (not data.end_segment())
00186         return false;
00187       break;
00188     }
00189     case (message::Statement::DELETE):
00190     {
00191       const message::DeleteData &data= statement.delete_data();
00192       if (not data.end_segment())
00193         return false;
00194       break;
00195     }
00196     default:
00197       return true;
00198   }
00199   return true;
00200 }
00201 
00202 static bool isEndTransaction(const message::Transaction &transaction)
00203 {
00204   const message::TransactionContext trx= transaction.transaction_context();
00205 
00206   size_t num_statements= transaction.statement_size();
00207 
00208   /*
00209    * If any Statement is partial, then we can expect another Transaction
00210    * message.
00211    */
00212   for (size_t x= 0; x < num_statements; ++x)
00213   {
00214     const message::Statement &statement= transaction.statement(x);
00215 
00216     if (not isEndStatement(statement))
00217       return false;
00218   }
00219 
00220   return true;
00221 }
00222 
00223 static void printEvent(const message::Event &event)
00224 {
00225   switch (event.type())
00226   {
00227     case message::Event::STARTUP:
00228     {
00229       cout << "-- EVENT: Server startup\n";
00230       break;
00231     }
00232     case message::Event::SHUTDOWN:
00233     {
00234       cout << "-- EVENT: Server shutdown\n";
00235       break;
00236     }
00237     default:
00238     {
00239       cout << "-- EVENT: Unknown event\n";
00240       break;
00241     }
00242   }
00243 }
00244 
00245 static void printTransactionSummary(const message::Transaction &transaction,
00246                                     bool ignore_events)
00247 {
00248   static uint64_t last_trx_id= 0;
00249   const message::TransactionContext trx= transaction.transaction_context();
00250 
00251   if (last_trx_id != trx.transaction_id())
00252     cout << "\ntransaction_id = " << trx.transaction_id() << endl;
00253 
00254   last_trx_id= trx.transaction_id();
00255 
00256   if (transaction.has_event() && (not ignore_events))
00257   {
00258     cout << "\t";
00259     printEvent(transaction.event());
00260   }
00261 
00262   size_t num_statements= transaction.statement_size();
00263   size_t x;
00264 
00265   for (x= 0; x < num_statements; ++x)
00266   {
00267     const message::Statement &statement= transaction.statement(x);
00268 
00269     switch (statement.type())
00270     {
00271       case (message::Statement::ROLLBACK):
00272       {
00273         cout << "\tROLLBACK\n";
00274         break;
00275       }
00276       case (message::Statement::INSERT):
00277       {
00278         const message::InsertHeader &header= statement.insert_header();
00279         const message::TableMetadata &meta= header.table_metadata();
00280         cout << "\tINSERT INTO `" << meta.table_name() << "`\n";
00281         break;
00282       }
00283       case (message::Statement::DELETE):
00284       {
00285         const message::DeleteHeader &header= statement.delete_header();
00286         const message::TableMetadata &meta= header.table_metadata();
00287         cout << "\tDELETE FROM `" << meta.table_name() << "`\n";
00288         break;
00289       }
00290       case (message::Statement::UPDATE):
00291       {
00292         const message::UpdateHeader &header= statement.update_header();
00293         const message::TableMetadata &meta= header.table_metadata();
00294         cout << "\tUPDATE `" << meta.table_name() << "`\n";
00295         break;
00296       }
00297       case (message::Statement::TRUNCATE_TABLE):
00298       {
00299         const message::TableMetadata &meta= statement.truncate_table_statement().table_metadata();
00300         cout << "\tTRUNCATE TABLE `" << meta.table_name() << "`\n";
00301         break;
00302       }
00303       case (message::Statement::CREATE_SCHEMA):
00304       {
00305         const message::Schema &schema= statement.create_schema_statement().schema();
00306         cout << "\tCREATE SCHEMA `" << schema.name() << "`\n";
00307         break;
00308       }
00309       case (message::Statement::ALTER_SCHEMA):
00310       {
00311         const message::Schema &schema= statement.alter_schema_statement().before();
00312         cout << "\tALTER SCHEMA `" << schema.name() << "`\n";
00313         break;
00314       }
00315       case (message::Statement::DROP_SCHEMA):
00316       {
00317         cout << "\tDROP SCHEMA `" << statement.drop_schema_statement().schema_name() << "`\n";
00318         break;
00319       }
00320       case (message::Statement::CREATE_TABLE):
00321       {
00322         const message::Table &table= statement.create_table_statement().table();
00323         cout << "\tCREATE TABLE `" << table.name() << "`\n";
00324         break;
00325       }
00326       case (message::Statement::ALTER_TABLE):
00327       {
00328         const message::Table &table= statement.alter_table_statement().before();
00329         cout << "\tALTER TABLE `" << table.name() << "`\n";
00330         break;
00331       }
00332       case (message::Statement::DROP_TABLE):
00333       {
00334         const message::TableMetadata &meta= statement.drop_table_statement().table_metadata();
00335         cout << "\tDROP TABLE `" << meta.table_name() << "`\n";
00336         break;
00337       }
00338       case (message::Statement::SET_VARIABLE):
00339       {
00340         const message::FieldMetadata &meta= statement.set_variable_statement().variable_metadata();
00341         cout << "\tSET VARIABLE " << meta.name() << "\n";
00342         break;
00343       }
00344       case (message::Statement::RAW_SQL):
00345       {
00346         cout << "\tRAW SQL\n";
00347         break;
00348       }
00349       case (message::Statement::ROLLBACK_STATEMENT):
00350       {
00351         cout << "\tROLLBACK STATEMENT\n";
00352         break;
00353       }
00354       default:
00355         cout << "\tUnhandled Statement Type\n";
00356     }
00357   }
00358 }
00359 
00370 static bool printTransaction(const message::Transaction &transaction,
00371                              bool ignore_events,
00372                              bool print_as_raw)
00373 {
00374   static uint64_t last_trx_id= 0;
00375   bool should_commit= true;
00376   const message::TransactionContext trx= transaction.transaction_context();
00377 
00378   /*
00379    * First check to see if this is an event message.
00380    */
00381   if (transaction.has_event())
00382   {
00383     last_trx_id= trx.transaction_id();
00384     if (not ignore_events)
00385     {
00386       if (print_as_raw)
00387         transaction.PrintDebugString();
00388       else
00389         printEvent(transaction.event());
00390     }
00391     return true;
00392   }
00393 
00394   if (print_as_raw)
00395   {
00396     transaction.PrintDebugString();
00397     return true;
00398   }
00399 
00400   size_t num_statements= transaction.statement_size();
00401   vector<string> cached_statement_sql;
00402 
00403   for (size_t x= 0; x < num_statements; ++x)
00404   {
00405     const message::Statement &statement= transaction.statement(x);
00406 
00407     /* Transactional DDL not supported yet. Use AUTOCOMMIT for DDL. */
00408     if (x == 0)
00409     {
00410       /* Transaction ID change means new transaction to start */
00411       if (trx.transaction_id() != last_trx_id)
00412       {
00413         if (isDDLStatement(statement))
00414         {
00415           cout << "SET AUTOCOMMIT=0;" << endl;
00416         }
00417         else
00418         {
00419           cout << "START TRANSACTION;" << endl;
00420         }
00421       }
00422   
00423       last_trx_id= trx.transaction_id();
00424     }
00425 
00426     if (should_commit)
00427       should_commit= isEndStatement(statement);
00428 
00429     /* A ROLLBACK would be the only Statement within the Transaction
00430      * since all other Statements will have been deleted from the
00431      * Transaction message, so we should fall out of this loop immediately.
00432      * We don't want to issue an unnecessary COMMIT, so we change
00433      * should_commit to false here.
00434      */
00435     if (statement.type() == message::Statement::ROLLBACK)
00436       should_commit= false;
00437 
00438     string output;
00439 
00440     if (not printStatement(statement, output))
00441     {
00442       return false;
00443     }
00444 
00445     if (isEndStatement(statement))
00446     {
00447       /* A complete, non-segmented statement */
00448       if (cached_statement_sql.empty() &&
00449           (statement.type() != message::Statement::ROLLBACK_STATEMENT))
00450       {
00451         cout << output;
00452       }
00453       
00454       /* A segmented statement that was rolled back */
00455       else if (statement.type() == message::Statement::ROLLBACK_STATEMENT)
00456       {
00457         cached_statement_sql.clear();
00458         cout << "-- Rollback statement\n";
00459       }
00460       
00461       /* A segmented statement that was successfully executed */
00462       else
00463       {
00464         for (size_t y= 0; y < cached_statement_sql.size(); y++)
00465         {
00466           cout << cached_statement_sql[y];
00467         }
00468         cached_statement_sql.clear();
00469       }
00470     }
00471     
00472     /*
00473      * We cache segmented statements so we can support rolling back a
00474      * statement that fails mid-execution.
00475      */
00476     else
00477     {
00478       cached_statement_sql.push_back(output);
00479     }
00480   }
00481 
00482   /*
00483    * If ALL Statements are end segments, we can commit this Transaction.
00484    * We can also check to see if the transaction_id changed, but this
00485    * wouldn't work for the last Transaction in the transaction log since
00486    * we don't have another Transaction to compare to. Checking for all
00487    * end segments (like we do above) covers this case.
00488    */
00489   if (should_commit)
00490     cout << "COMMIT;" << endl;
00491   
00492   return true;
00493 }
00494 
00495 static bool processTransactionMessage(TransactionManager &trx_mgr, 
00496                                       const message::Transaction &transaction, 
00497                                       bool summarize,
00498                                       bool ignore_events,
00499                                       bool print_as_raw)
00500 {
00501   if (not isEndTransaction(transaction))
00502   {
00503     trx_mgr.store(transaction);
00504   }
00505   else
00506   {
00507     const message::TransactionContext trx= transaction.transaction_context();
00508     uint64_t transaction_id= trx.transaction_id();
00509 
00510     /*
00511      * If there are any previous Transaction messages for this transaction,
00512      * store this one, then output all of them together.
00513      */
00514     if (trx_mgr.contains(transaction_id))
00515     {
00516       trx_mgr.store(transaction);
00517 
00518       uint32_t size= trx_mgr.getTransactionBufferSize(transaction_id);
00519       uint32_t idx= 0;
00520 
00521       while (idx != size)
00522       {
00523         message::Transaction new_trx;
00524         trx_mgr.getTransactionMessage(new_trx, transaction_id, idx);
00525         if (summarize)
00526         {
00527           printTransactionSummary(new_trx, ignore_events);
00528         }
00529         else
00530         {
00531           if (not printTransaction(new_trx, ignore_events, print_as_raw))
00532           {
00533             return false;
00534           }
00535         }
00536         idx++;
00537       }
00538 
00539       /* No longer need this transaction */
00540       trx_mgr.remove(transaction_id);
00541     }
00542     else
00543     {
00544       if (summarize)
00545       {
00546         printTransactionSummary(transaction, ignore_events);
00547       }
00548       else
00549       {
00550         if (not printTransaction(transaction, ignore_events, print_as_raw))
00551         {
00552           return false;
00553         }
00554       }
00555     }
00556   }
00557   
00558   return true;
00559 }
00560 
00561 static void getTrxIdList(TransactionLogConnection *connection,
00562                          const string &query_string,
00563                          vector<uint64_t> &ordered_trx_id_list)
00564 {
00565   drizzle_result_st result;
00566   
00567   connection->query(query_string, &result);
00568   
00569   drizzle_row_t row;
00570   while ((row= drizzle_row_next(&result)))
00571   {
00572     if (row[0])
00573     {
00574       ordered_trx_id_list.push_back(boost::lexical_cast<uint64_t>(row[0]));
00575     }
00576   }
00577 
00578   drizzle_result_free(&result);
00579 }
00580 
00581 static int extractRowsForTrxIds(TransactionLogConnection *connection,
00582                                 const vector<uint64_t> &ordered_trx_id_list,
00583                                 bool summarize,
00584                                 bool ignore_events,
00585                                 bool print_as_raw)
00586 {
00587   for (size_t idx= 0; idx < ordered_trx_id_list.size(); idx++)
00588   {
00589     uint64_t trx_id= ordered_trx_id_list[idx];
00590 
00591     string sql("SELECT MESSAGE, MESSAGE_LEN"
00592                " FROM DATA_DICTIONARY.SYS_REPLICATION_LOG"
00593                " WHERE ID = ");
00594     sql.append(boost::lexical_cast<string>(trx_id));
00595     sql.append(" ORDER BY SEGID ASC");
00596 
00597     drizzle_result_st result;
00598     connection->query(sql, &result);
00599     
00600     drizzle_row_t row;
00601     while ((row= drizzle_row_next(&result)))
00602     {
00603       char* data= (char*)row[0];
00604       uint64_t length= (row[1]) ? boost::lexical_cast<uint64_t>(row[1]) : 0;
00605       
00606       message::Transaction transaction;
00607       TransactionManager trx_mgr;
00608       
00609       transaction.ParseFromArray(data, length);
00610       
00611       if (not processTransactionMessage(trx_mgr, transaction, 
00612                                         summarize, ignore_events,
00613                                         print_as_raw))
00614       {
00615         drizzle_result_free(&result);
00616         return -1;
00617       }
00618     }
00619     
00620     drizzle_result_free(&result);
00621   }
00622   
00623   return 0;
00624 }
00625 
00630 int main(int argc, char* argv[])
00631 {
00632   GOOGLE_PROTOBUF_VERIFY_VERSION;
00633   int opt_start_pos= 0;
00634   uint64_t opt_transaction_id= 0;
00635   uint64_t opt_start_transaction_id= 0;
00636   uint32_t opt_drizzle_port= 0; 
00637   string current_user, opt_password, opt_protocol, current_host;
00638   bool use_drizzle_protocol= false; 
00639 
00640   /*
00641    * Setup program options
00642    */
00643   po::options_description desc("Program options");
00644   desc.add_options()
00645     ("help", _("Display help and exit"))
00646     ("use-innodb-replication-log", _("Read from the innodb transaction log"))
00647     ("user,u", po::value<string>(&current_user)->default_value(""), 
00648       _("User for login if not current user."))
00649     ("port,p", po::value<uint32_t>(&opt_drizzle_port)->default_value(0), 
00650       _("Port number to use for connection."))
00651     ("password,P", po::value<string>(&opt_password)->default_value(""), 
00652       _("Password to use when connecting to server"))
00653     ("protocol",po::value<string>(&opt_protocol)->default_value("mysql"),
00654       _("The protocol of connection (mysql or drizzle)."))
00655     ("checksum", _("Perform checksum"))
00656     ("ignore-events", _("Ignore event messages"))
00657     ("input-file", po::value< vector<string> >(), _("Transaction log file"))
00658     ("raw", _("Print raw Protobuf messages instead of SQL"))
00659     ("start-pos",
00660       po::value<int>(&opt_start_pos),
00661       _("Start reading from the given file position"))
00662     ("start-transaction-id",
00663       po::value<uint64_t>(&opt_start_transaction_id),
00664       _("Only output for the given transaction ID and later"))
00665     ("transaction-id",
00666       po::value<uint64_t>(&opt_transaction_id),
00667       _("Only output for the given transaction ID"))
00668     ("summarize", _("Summarize message contents"));
00669 
00670   /*
00671    * We allow one positional argument that will be transaction file name
00672    */
00673   po::positional_options_description pos;
00674   pos.add("input-file", 1);
00675 
00676   /*
00677    * Parse the program options
00678    */
00679   po::variables_map vm;
00680   po::store(po::command_line_parser(argc, argv).
00681             options(desc).positional(pos).run(), vm);
00682   po::notify(vm);
00683 
00684   if (vm.count("help"))
00685   {
00686     cerr << desc << endl;
00687     return -1;
00688   }
00689 
00690   if (not vm.count("input-file") && not vm.count("use-innodb-replication-log"))
00691   {
00692     cerr << desc << endl;
00693     return -1;
00694   }
00695 
00696   /*
00697    * Specifying both a transaction ID and a start position
00698    * is not logical.
00699    */
00700   if (vm.count("start-pos") && vm.count("transaction-id"))
00701   {
00702     cerr << _("Cannot use --start-pos and --transaction-id together\n");
00703     return -1;
00704   }
00705 
00706   if (vm.count("summarize") && (vm.count("raw") || vm.count("transaction-id")))
00707   {
00708     cerr << _("Cannot use --summarize with either --raw or --transaction-id\n");
00709     return -1;
00710   }
00711 
00712   bool do_checksum= vm.count("checksum") ? true : false;
00713   bool use_innodb_replication_log= vm.count("use-innodb-replication-log") ? true : false;
00714   bool ignore_events= vm.count("ignore-events") ? true : false;
00715   bool print_as_raw= vm.count("raw") ? true : false;
00716   bool summarize= vm.count("summarize") ? true : false;
00717 
00718   /*
00719    * InnoDB transaction log processing
00720    */
00721 
00722   if (use_innodb_replication_log)
00723   {
00724     TransactionLogConnection *connection;
00725     connection= new TransactionLogConnection(current_host, opt_drizzle_port,
00726                                              current_user, opt_password,
00727                                              use_drizzle_protocol);
00728 
00729     /* List of transaction IDs (in proper commit order) to extract */
00730     vector<uint64_t> ordered_trx_id_list;
00731     string query_string;
00732 
00733     if (vm.count("transaction-id"))
00734     {
00735       ordered_trx_id_list.push_back(opt_transaction_id);
00736     }
00737     else if (vm.count("start-transaction-id"))
00738     {
00739       query_string.append("SELECT ID"
00740                           " FROM DATA_DICTIONARY.SYS_REPLICATION_LOG"
00741                           " WHERE COMMIT_ID > 0"
00742                           " AND ID > ");
00743       query_string.append(boost::lexical_cast<string>(opt_start_transaction_id));
00744       query_string.append(" ORDER BY COMMIT_ID ASC");
00745       
00746       getTrxIdList(connection, query_string, ordered_trx_id_list);
00747     }
00748     else
00749     {
00750       query_string.append("SELECT ID"
00751                           " FROM DATA_DICTIONARY.SYS_REPLICATION_LOG"
00752                           " WHERE COMMIT_ID > 0"
00753                           " ORDER BY COMMIT_ID ASC");
00754       
00755       getTrxIdList(connection, query_string, ordered_trx_id_list);
00756     }
00757 
00758     if (extractRowsForTrxIds(connection, ordered_trx_id_list,
00759                              summarize, ignore_events, print_as_raw))
00760     {
00761       return -1;
00762     }
00763   }
00764 
00765   /*
00766    * File based transaction log processing
00767    */
00768 
00769   else 
00770   {
00771     string filename= vm["input-file"].as< vector<string> >()[0];
00772 
00773     TransactionFileReader fileReader;
00774 
00775     if (not fileReader.openFile(filename, opt_start_pos))
00776     {
00777       cerr << fileReader.getErrorString() << endl;
00778       return -1;
00779     }
00780 
00781     message::Transaction transaction;
00782     TransactionManager trx_mgr;
00783     uint32_t checksum= 0;
00784 
00785     while (fileReader.getNextTransaction(transaction, &checksum))
00786     {
00787       const message::TransactionContext trx= transaction.transaction_context();
00788       uint64_t transaction_id= trx.transaction_id();
00789     
00790       /*
00791        * If we are given a transaction ID, we only look for that one and
00792        * print it out.
00793        */
00794       if (vm.count("transaction-id"))
00795       {
00796         if (opt_transaction_id == transaction_id)
00797         {
00798           if (not processTransactionMessage(trx_mgr, transaction, summarize, 
00799                                             ignore_events, print_as_raw))
00800           {
00801             return -1;
00802           }
00803         }
00804         else
00805         {
00806           continue;
00807         }
00808       }
00809       else 
00810       {
00811         /*
00812          * No transaction ID given, so process all messages.
00813          */
00814         if (not processTransactionMessage(trx_mgr, transaction, summarize,
00815                                           ignore_events, print_as_raw))
00816         {
00817           return -1;
00818         }
00819       }  
00820 
00821       if (do_checksum)
00822       {
00823         uint32_t calculated= fileReader.checksumLastReadTransaction();
00824         if (checksum != calculated)
00825         {
00826           cerr << _("Checksum failed. Wanted ")
00827                << checksum
00828                << _(" got ")
00829                << calculated
00830                << endl;
00831         }
00832       }
00833     } // end while
00834 
00835     string error= fileReader.getErrorString();
00836 
00837     if (error != "EOF")
00838     {
00839       cerr << error << endl;
00840       return -1;
00841     }
00842   }
00843 
00844   return 0;
00845 }