00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00037 #include <config.h>
00038 #include <drizzled/replication_services.h>
00039 #include <drizzled/plugin/transaction_replicator.h>
00040 #include <drizzled/plugin/transaction_applier.h>
00041 #include <drizzled/message/transaction.pb.h>
00042 #include <drizzled/gettext.h>
00043 #include <drizzled/session.h>
00044 #include <drizzled/error.h>
00045
00046 #include <string>
00047 #include <vector>
00048 #include <algorithm>
00049
00050 using namespace std;
00051
00052 namespace drizzled
00053 {
00054
00055 ReplicationServices::ReplicationServices() :
00056 is_active(false)
00057 {
00058 }
00059
00060 void ReplicationServices::normalizeReplicatorName(string &name)
00061 {
00062 transform(name.begin(),
00063 name.end(),
00064 name.begin(),
00065 ::tolower);
00066 if (name.find("replicator") == string::npos)
00067 name.append("replicator", 10);
00068 {
00069 size_t found_underscore= name.find('_');
00070 while (found_underscore != string::npos)
00071 {
00072 name.erase(found_underscore, 1);
00073 found_underscore= name.find('_');
00074 }
00075 }
00076 }
00077
00078 bool ReplicationServices::evaluateRegisteredPlugins()
00079 {
00080
00081
00082
00083
00084
00085
00086
00087
00088 if (appliers.empty())
00089 return true;
00090
00091 if (replicators.empty() && not appliers.empty())
00092 {
00093 errmsg_printf(error::ERROR,
00094 N_("You registered a TransactionApplier plugin but no "
00095 "TransactionReplicator plugins were registered.\n"));
00096 return false;
00097 }
00098
00099 for (Appliers::iterator appl_iter= appliers.begin();
00100 appl_iter != appliers.end();
00101 ++appl_iter)
00102 {
00103 plugin::TransactionApplier *applier= (*appl_iter).second;
00104 string requested_replicator_name= (*appl_iter).first;
00105 normalizeReplicatorName(requested_replicator_name);
00106
00107 bool found= false;
00108 Replicators::iterator repl_iter;
00109 for (repl_iter= replicators.begin();
00110 repl_iter != replicators.end();
00111 ++repl_iter)
00112 {
00113 string replicator_name= (*repl_iter)->getName();
00114 normalizeReplicatorName(replicator_name);
00115
00116 if (requested_replicator_name.compare(replicator_name) == 0)
00117 {
00118 found= true;
00119 break;
00120 }
00121 }
00122 if (not found)
00123 {
00124 errmsg_printf(error::ERROR,
00125 N_("You registered a TransactionApplier plugin but no "
00126 "TransactionReplicator plugins were registered that match the "
00127 "requested replicator name of '%s'.\n"
00128 "We have deactivated the TransactionApplier '%s'.\n"),
00129 requested_replicator_name.c_str(),
00130 applier->getName().c_str());
00131 applier->deactivate();
00132 return false;
00133 }
00134 else
00135 {
00136 replication_streams.push_back(make_pair(*repl_iter, applier));
00137 }
00138 }
00139 is_active= true;
00140 return true;
00141 }
00142
00143 void ReplicationServices::attachReplicator(plugin::TransactionReplicator *in_replicator)
00144 {
00145 replicators.push_back(in_replicator);
00146 }
00147
00148 void ReplicationServices::detachReplicator(plugin::TransactionReplicator *in_replicator)
00149 {
00150 replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
00151 }
00152
00153 void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier, const string &requested_replicator_name)
00154 {
00155 appliers.push_back(make_pair(requested_replicator_name, in_applier));
00156 }
00157
00158 void ReplicationServices::detachApplier(plugin::TransactionApplier *)
00159 {
00160 }
00161
00162 bool ReplicationServices::isActive() const
00163 {
00164 return is_active;
00165 }
00166
00167 plugin::ReplicationReturnCode ReplicationServices::pushTransactionMessage(Session &in_session,
00168 message::Transaction &to_push)
00169 {
00170 plugin::ReplicationReturnCode result= plugin::SUCCESS;
00171
00172 for (ReplicationStreams::iterator iter= replication_streams.begin();
00173 iter != replication_streams.end();
00174 ++iter)
00175 {
00176 plugin::TransactionReplicator *cur_repl= iter->first;
00177 plugin::TransactionApplier *cur_appl= iter->second;
00178
00179 result= cur_repl->replicate(cur_appl, in_session, to_push);
00180
00181 if (result == plugin::SUCCESS)
00182 {
00183
00184
00185
00186
00187
00188
00189 last_applied_timestamp.fetch_and_store(to_push.transaction_context().end_timestamp());
00190 }
00191 else
00192 return result;
00193 }
00194 return result;
00195 }
00196
00197 ReplicationServices::ReplicationStreams &ReplicationServices::getReplicationStreams()
00198 {
00199 return replication_streams;
00200 }
00201
00202 }