00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132 #ifdef HAVE_CONFIG_H
00133 # include "config.h"
00134 #endif
00135
00136 #ifdef HAVE_GETOPT
00137 # include <unistd.h>
00138 extern char* optarg;
00139 extern int optind;
00140 #else
00141 # include "getopt.h"
00142 #endif
00143
00144 #ifdef HAVE_IOSTREAM
00145 # include <iostream>
00146 #else
00147 # include <iostream.h>
00148 #endif
00149
00150 #ifdef HAVE_STD_IOSTREAM
00151 using namespace std;
00152 #endif
00153
00154 #ifdef HAVE_STDLIB_H
00155 # include <stdlib.h>
00156 #endif
00157
00158 #ifdef HAVE_SIGNAL_H
00159 # include <signal.h>
00160 #endif
00161
00162 #include <cstdio>
00163
00164 #include "CosEventComm.hh"
00165 #include "CosEventChannelAdmin.hh"
00166 #include "naming.h"
00167
00168 static omni_mutex mutex;
00169 static omni_condition connect_cond(&mutex);
00170 static void usage(int argc, char **argv);
00171
00172 class Consumer_i : virtual public POA_CosEventComm::PushConsumer {
00173 public:
00174 Consumer_i(long disconnect=0): _disconnect(disconnect) {}
00175
00176 void push(const CORBA::Any& data);
00177 void disconnect_push_consumer ();
00178
00179 private:
00180 long _disconnect;
00181 };
00182
00183 void Consumer_i::push(const CORBA::Any& data) {
00184 CORBA::ULong l;
00185 static int i = 0;
00186
00187 i++;
00188 if( data>>=l )
00189 {
00190 cout<<"Push Consumer: push() called. Data : "<< l <<endl;
00191
00192
00193 if (i == _disconnect)
00194 {
00195 i = 0;
00196
00197
00198
00199
00200
00201 omni_mutex_lock condition_lock(mutex);
00202 connect_cond.signal();
00203 }
00204 }
00205 else
00206 {
00207 cerr<<"Push Consumer: push() called. UNEXPECTED TYPE"<<endl;
00208 }
00209 }
00210
00211 void Consumer_i::disconnect_push_consumer () {
00212 cout << "Push Consumer: disconnected." << endl;
00213 }
00214
00215 int
00216 main(int argc, char **argv)
00217 {
00218
00219
00220 CORBA::ORB_ptr orb = CORBA::ORB_init(argc,argv);
00221
00222
00223 int discnum =0;
00224 int sleepInterval =0;
00225 const char* channelName ="EventChannel";
00226
00227 int c;
00228 while ((c = getopt(argc,argv,"hd:s:n:")) != EOF)
00229 {
00230 switch (c)
00231 {
00232 case 'd': discnum = atoi(optarg);
00233 break;
00234
00235 case 's': sleepInterval = atoi(optarg);
00236 break;
00237
00238 case 'n': channelName = optarg;
00239 break;
00240
00241 case 'h': usage(argc,argv);
00242 exit(0);
00243 default : usage(argc,argv);
00244 exit(-1);
00245 }
00246 }
00247
00248 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
00249
00250 signal(SIGPIPE, SIG_IGN);
00251 #endif
00252
00253 Consumer_i* consumer = new Consumer_i (discnum);
00254 CosEventChannelAdmin::EventChannel_var channel;
00255
00256 const char* action="";
00257 try {
00258 CORBA::Object_var obj;
00259
00260 action="resolve initial reference 'RootPOA'";
00261 obj=orb->resolve_initial_references("RootPOA");
00262 PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj);
00263 if(CORBA::is_nil(rootPoa))
00264 throw CORBA::OBJECT_NOT_EXIST();
00265
00266 action="activate the RootPOA's POAManager";
00267 PortableServer::POAManager_var pman =rootPoa->the_POAManager();
00268 pman->activate();
00269
00270
00271
00272
00273 if(optind<argc)
00274 {
00275 action="convert URI from command line into object reference";
00276 obj=orb->string_to_object(argv[optind]);
00277 }
00278 else
00279 {
00280 action="resolve initial reference 'NameService'";
00281 obj=orb->resolve_initial_references("NameService");
00282 CosNaming::NamingContext_var rootContext=
00283 CosNaming::NamingContext::_narrow(obj);
00284 if(CORBA::is_nil(rootContext))
00285 throw CORBA::OBJECT_NOT_EXIST();
00286
00287 action="find EventChannel in NameService";
00288 cout << action << endl;
00289 obj=rootContext->resolve(str2name(channelName));
00290 }
00291
00292 action="narrow object reference to event channel";
00293 channel=CosEventChannelAdmin::EventChannel::_narrow(obj);
00294 if(CORBA::is_nil(channel))
00295 {
00296 cerr << "Failed to narrow Event Channel reference." << endl;
00297 exit(1);
00298 }
00299
00300 }
00301 catch(CORBA::ORB::InvalidName& ex) {
00302 cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl;
00303 exit(1);
00304 }
00305 catch(CosNaming::NamingContext::InvalidName& ex) {
00306 cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl;
00307 exit(1);
00308 }
00309 catch(CosNaming::NamingContext::NotFound& ex) {
00310 cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl;
00311 exit(1);
00312 }
00313 catch(CosNaming::NamingContext::CannotProceed& ex) {
00314 cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl;
00315 exit(1);
00316 }
00317 catch(CORBA::TRANSIENT& ex) {
00318 cerr<<"Failed to "<<action<<". TRANSIENT"<<endl;
00319 exit(1);
00320 }
00321 catch(CORBA::OBJECT_NOT_EXIST& ex) {
00322 cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl;
00323 exit(1);
00324 }
00325 catch(CORBA::SystemException& ex) {
00326 cerr<<"Failed to "<<action<<".";
00327 #if defined(HAVE_OMNIORB4)
00328 cerr<<" "<<ex._name();
00329 if(ex.NP_minorString())
00330 cerr<<" ("<<ex.NP_minorString()<<")";
00331 #endif
00332 cerr<<endl;
00333 exit(1);
00334 }
00335 catch(CORBA::Exception& ex) {
00336 cerr<<"Failed to "<<action<<"."
00337 #if defined(HAVE_OMNIORB4)
00338 " "<<ex._name()
00339 #endif
00340 <<endl;
00341 exit(1);
00342 }
00343
00344
00345
00346 CosEventChannelAdmin::ConsumerAdmin_var consumer_admin;
00347 while (1)
00348 {
00349 try {
00350 consumer_admin = channel->for_consumers ();
00351 if (CORBA::is_nil (consumer_admin))
00352 {
00353 cerr << "Event Channel returned nil Consumer Admin!" << endl;
00354 exit(1);
00355 }
00356 break;
00357 }
00358 catch (CORBA::COMM_FAILURE& ex) {
00359 cerr << "Caught COMM_FAILURE exception "
00360 << "obtaining Consumer Admin! Retrying..."
00361 << endl;
00362 continue;
00363 }
00364 }
00365 cout << "Obtained ConsumerAdmin." << endl;
00366
00367 omni_mutex_lock condition_lock(mutex);
00368 while (1) {
00369
00370
00371 CosEventChannelAdmin::ProxyPushSupplier_var proxy_supplier;
00372 while (1)
00373 {
00374 try {
00375 proxy_supplier = consumer_admin->obtain_push_supplier ();
00376 if (CORBA::is_nil (proxy_supplier))
00377 {
00378 cerr << "Consumer Admin returned nil proxy_supplier!"
00379 << endl;
00380 exit (1);
00381 }
00382 break;
00383 }
00384 catch (CORBA::COMM_FAILURE& ex) {
00385 cerr << "Caught COMM_FAILURE Exception "
00386 << "obtaining Push Supplier! Retrying..."
00387 << endl;
00388 continue;
00389 }
00390 }
00391 cout << "Obtained ProxyPushSupplier." << endl;
00392
00393
00394
00395 while (1)
00396 {
00397 try {
00398 proxy_supplier->connect_push_consumer(consumer->_this());
00399 break;
00400 }
00401 catch (CORBA::BAD_PARAM& ex) {
00402 cerr << "Caught BAD_PARAM Exception connecting Push Consumer!"
00403 << endl;
00404 exit (1);
00405 }
00406 catch (CosEventChannelAdmin::AlreadyConnected& ex) {
00407 cerr << "Proxy Push Supplier already connected!"
00408 << endl;
00409 break;
00410 }
00411 catch (CORBA::COMM_FAILURE& ex) {
00412 cerr << "Caught COMM_FAILURE exception "
00413 << "connecting Push Consumer! Retrying..."
00414 << endl;
00415 continue;
00416 }
00417 }
00418 cout << "Connected Push Consumer." << endl;
00419
00420
00421 connect_cond.wait();
00422
00423
00424 while (1)
00425 {
00426 try {
00427 proxy_supplier->disconnect_push_supplier();
00428 break;
00429 }
00430 catch (CORBA::COMM_FAILURE& ex) {
00431 cerr << "Caught COMM_FAILURE Exception "
00432 << "disconnecting Push Consumer! Retrying..."
00433 << endl;
00434 continue;
00435 }
00436 }
00437 cout << "Disconnected Push Consumer." << endl;
00438
00439
00440 cout << "Sleeping " << sleepInterval << " seconds." << endl;
00441 omni_thread::sleep(sleepInterval);
00442 }
00443
00444
00445 return 0;
00446 }
00447
00448 static void
00449 usage(int argc, char **argv)
00450 {
00451 cerr<<
00452 "\nCreate a PushConsumer to receive events from a channel.\n"
00453 "syntax: "<<(argc?argv[0]:"pushcons")<<" OPTIONS [CHANNEL_URI]\n"
00454 "\n"
00455 "CHANNEL_URI: The event channel may be specified as a URI.\n"
00456 " This may be an IOR, or a corbaloc::: or corbaname::: URI.\n"
00457 "\n"
00458 "OPTIONS: DEFAULT:\n"
00459 " -d NUM disconnect after receiving NUM events [0 - never disconnect]\n"
00460 " -s SECS sleep SECS seconds after disconnecting [0]\n"
00461 " -n NAME channel name (if URI is not specified) [\"EventChannel\"]\n"
00462 " -h display this help text\n" << endl;
00463 }