00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121 #ifdef HAVE_CONFIG_H
00122 # include "config.h"
00123 #endif
00124
00125 #ifdef HAVE_GETOPT
00126 # include <unistd.h>
00127 extern char* optarg;
00128 extern int optind;
00129 #else
00130 # include "getopt.h"
00131 #endif
00132
00133 #ifdef HAVE_IOSTREAM
00134 # include <iostream>
00135 #else
00136 # include <iostream.h>
00137 #endif
00138
00139 #ifdef HAVE_STD_IOSTREAM
00140 using namespace std;
00141 #endif
00142
00143 #ifdef HAVE_STDLIB_H
00144 # include <stdlib.h>
00145 #endif
00146
00147 #ifdef HAVE_SIGNAL_H
00148 # include <signal.h>
00149 #endif
00150
00151 #include <cstdio>
00152
00153 #include "CosEventComm.hh"
00154 #include "CosEventChannelAdmin.hh"
00155 #include "naming.h"
00156
00157 static void usage(int argc, char **argv);
00158
00159 class Consumer_i : virtual public POA_CosEventComm::PullConsumer {
00160 public:
00161 Consumer_i () {};
00162 void disconnect_pull_consumer ();
00163 };
00164
00165 void Consumer_i::disconnect_pull_consumer () {
00166 cout << "Pull Consumer: disconnected." << endl;
00167 }
00168
00169 int
00170 main(int argc, char **argv)
00171 {
00172
00173
00174 CORBA::ORB_ptr orb = CORBA::ORB_init(argc,argv);
00175
00176
00177 bool trymode =false;
00178 int discnum =0;
00179 bool refnil =false;
00180 int sleepInterval =0;
00181 const char* channelName ="EventChannel";
00182
00183 int c;
00184 while ((c = getopt(argc,argv,"td:rs:n:h")) != EOF)
00185 {
00186 switch (c)
00187 {
00188 case 't': trymode = true;
00189 break;
00190
00191 case 'd': discnum = atoi(optarg);
00192 break;
00193
00194 case 'r': refnil = true;
00195 break;
00196
00197 case 's': sleepInterval = atoi(optarg);
00198 break;
00199
00200 case 'n': channelName = optarg;
00201 break;
00202
00203 case 'h':
00204 default : usage(argc,argv);
00205 exit(-1);
00206 break;
00207 }
00208 }
00209
00210 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
00211
00212 signal(SIGPIPE, SIG_IGN);
00213 #endif
00214
00215 Consumer_i* consumer =NULL;
00216 CosEventChannelAdmin::EventChannel_var channel;
00217
00218 const char* action="";
00219 try {
00220 CORBA::Object_var obj;
00221
00222
00223
00224
00225
00226 if (! refnil)
00227 {
00228 consumer=new Consumer_i();
00229
00230 action="resolve initial reference 'RootPOA'";
00231 obj=orb->resolve_initial_references("RootPOA");
00232 PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj);
00233 if(CORBA::is_nil(rootPoa))
00234 throw CORBA::OBJECT_NOT_EXIST();
00235
00236 action="activate the RootPOA's POAManager";
00237 PortableServer::POAManager_var pman =rootPoa->the_POAManager();
00238 pman->activate();
00239 }
00240
00241
00242
00243
00244 if(optind<argc)
00245 {
00246 action="convert URI from command line into object reference";
00247 obj=orb->string_to_object(argv[optind]);
00248 }
00249 else
00250 {
00251 action="resolve initial reference 'NameService'";
00252 obj=orb->resolve_initial_references("NameService");
00253 CosNaming::NamingContext_var rootContext=
00254 CosNaming::NamingContext::_narrow(obj);
00255 if(CORBA::is_nil(rootContext))
00256 throw CORBA::OBJECT_NOT_EXIST();
00257
00258 action="find EventChannel in NameService";
00259 cout << action << endl;
00260 obj=rootContext->resolve(str2name(channelName));
00261 }
00262
00263 action="narrow object reference to event channel";
00264 channel=CosEventChannelAdmin::EventChannel::_narrow(obj);
00265 if(CORBA::is_nil(channel))
00266 {
00267 cerr << "Failed to narrow Event Channel reference." << endl;
00268 exit(1);
00269 }
00270
00271 }
00272 catch(CORBA::ORB::InvalidName& ex) {
00273 cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl;
00274 exit(1);
00275 }
00276 catch(CosNaming::NamingContext::InvalidName& ex) {
00277 cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl;
00278 exit(1);
00279 }
00280 catch(CosNaming::NamingContext::NotFound& ex) {
00281 cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl;
00282 exit(1);
00283 }
00284 catch(CosNaming::NamingContext::CannotProceed& ex) {
00285 cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl;
00286 exit(1);
00287 }
00288 catch(CORBA::TRANSIENT& ex) {
00289 cerr<<"Failed to "<<action<<". TRANSIENT"<<endl;
00290 exit(1);
00291 }
00292 catch(CORBA::OBJECT_NOT_EXIST& ex) {
00293 cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl;
00294 exit(1);
00295 }
00296 catch(CORBA::SystemException& ex) {
00297 cerr<<"Failed to "<<action<<".";
00298 #if defined(HAVE_OMNIORB4)
00299 cerr<<" "<<ex._name();
00300 if(ex.NP_minorString())
00301 cerr<<" ("<<ex.NP_minorString()<<")";
00302 #endif
00303 cerr<<endl;
00304 exit(1);
00305 }
00306 catch(CORBA::Exception& ex) {
00307 cerr<<"Failed to "<<action<<"."
00308 #if defined(HAVE_OMNIORB4)
00309 " "<<ex._name()
00310 #endif
00311 <<endl;
00312 exit(1);
00313 }
00314
00315
00316
00317 CosEventChannelAdmin::ConsumerAdmin_var consumer_admin;
00318 while (1)
00319 {
00320 try {
00321 consumer_admin = channel->for_consumers ();
00322 if (CORBA::is_nil (consumer_admin))
00323 {
00324 cerr << "Event Channel returned nil Consumer Admin!" << endl;
00325 exit (1);
00326 }
00327 break;
00328 }
00329 catch (CORBA::COMM_FAILURE& ex) {
00330 cerr << "Caught COMM_FAILURE exception "
00331 << "obtaining Consumer Admin! Retrying..."
00332 << endl;
00333 continue;
00334 }
00335 }
00336 cout << "Obtained Consumer Admin." << endl;
00337
00338 while (1)
00339 {
00340
00341
00342 CosEventChannelAdmin::ProxyPullSupplier_var proxy_supplier;
00343 while (1)
00344 {
00345 try {
00346 proxy_supplier = consumer_admin->obtain_pull_supplier ();
00347 if (CORBA::is_nil (proxy_supplier))
00348 {
00349 cerr << "Consumer Admin returned nil Proxy Supplier!" << endl;
00350 exit (1);
00351 }
00352 break;
00353 }
00354 catch (CORBA::COMM_FAILURE& ex) {
00355 cerr << "Caught COMM_FAILURE Exception "
00356 << "obtaining Pull Supplier! Retrying..."
00357 << endl;
00358 continue;
00359 }
00360 }
00361 cout << "Obtained ProxyPullSupplier." << endl;
00362
00363
00364
00365 CosEventComm::PullConsumer_ptr cptr =CosEventComm::PullConsumer::_nil();
00366 if (! refnil) {
00367 cptr=consumer->_this();
00368 }
00369
00370 while (1)
00371 {
00372 try {
00373 proxy_supplier->connect_pull_consumer(cptr);
00374 break;
00375 }
00376 catch (CORBA::BAD_PARAM& ex) {
00377 cerr << "Caught BAD_PARAM exception connecting Pull Consumer!"<<endl;
00378 exit (1);
00379 }
00380 catch (CosEventChannelAdmin::AlreadyConnected& ex) {
00381 cerr << "Proxy Pull Supplier already connected!"
00382 << endl;
00383 break;
00384 }
00385 catch (CORBA::COMM_FAILURE& ex) {
00386 cerr << "Caught COMM_FAILURE Exception "
00387 << "connecting Pull Consumer! Retrying..."
00388 << endl;
00389 continue;
00390 }
00391 }
00392 cout << "Connected Pull Consumer." << endl;
00393
00394
00395 CORBA::Any *data;
00396 CORBA::ULong l = 0;
00397 for (int i=0; (discnum == 0) || (i < discnum); i++)
00398 {
00399 if(trymode)
00400 {
00401 try {
00402 CORBA::Boolean has_event;
00403 data = proxy_supplier->try_pull(has_event);
00404 cout << "Consumer: try_pull() called. Data : " << flush;
00405 if (has_event)
00406 {
00407 l = 0;
00408 *data >>= l;
00409 cout << l << endl;
00410 delete data;
00411 }
00412 else
00413 {
00414 cout << "None" << endl;
00415 }
00416 }
00417 catch (CosEventComm::Disconnected& ex) {
00418 cout << endl;
00419 cerr << "Failed. Caught Disconnected Exception !" << endl;
00420 }
00421 catch (CORBA::COMM_FAILURE& ex) {
00422 cout << endl;
00423 cerr << "Failed. Caught COMM_FAILURE Exception !" << endl;
00424 }
00425 catch (CORBA::Exception& ex) {
00426 cout << endl;
00427 cerr<<"CORBA exception, unable to try_pull()"
00428 #ifdef HAVE_OMNIORB4
00429 <<": "<<ex._name()
00430 #endif
00431 << endl;
00432 }
00433 }
00434 else
00435 {
00436 try {
00437 cout << "Pull Consumer: pull() called. ";
00438 cout.flush();
00439 data = proxy_supplier->pull();
00440 l = 0;
00441 *data >>= l;
00442 cout << "Data : " << l << endl;
00443 delete data;
00444 }
00445 catch(CORBA::TRANSIENT&) {
00446 cout << "caught TRANSIENT." << endl;
00447 omni_thread::sleep(1);
00448 }
00449 catch (CosEventComm::Disconnected& ex) {
00450 cout << endl;
00451 cerr << "Failed. Caught Disconnected exception!" << endl;
00452 exit(1);
00453 }
00454 catch (CORBA::COMM_FAILURE& ex) {
00455 cout << endl;
00456 cerr << "Failed. Caught COMM_FAILURE exception!" << endl;
00457 exit(1);
00458 }
00459 catch (CORBA::SystemException& ex) {
00460 cout << endl;
00461 cerr<<"System exception, unable to pull()";
00462 #ifdef HAVE_OMNIORB4
00463 cerr<<": "<<ex._name();
00464 if(ex.NP_minorString())
00465 cerr<<" ("<<ex.NP_minorString()<<")";
00466 #endif
00467 cerr<< endl;
00468 exit(1);
00469 }
00470 catch (CORBA::Exception& ex) {
00471 cout << endl;
00472 cerr<<"CORBA exception, unable to pull()"
00473 #ifdef HAVE_OMNIORB4
00474 <<": "<<ex._name()
00475 #endif
00476 << endl;
00477 exit(1);
00478 }
00479 }
00480 }
00481
00482
00483 while (1)
00484 {
00485 try {
00486 proxy_supplier->disconnect_pull_supplier();
00487 break;
00488 }
00489 catch (CORBA::COMM_FAILURE& ex) {
00490 cerr << "Caught COMM_FAILURE exception "
00491 << "disconnecting Pull Consumer! Retrying..."
00492 << endl;
00493 continue;
00494 }
00495 }
00496 cout << "Disconnected Pull Consumer." << endl;
00497
00498
00499 cout << "Sleeping " << sleepInterval << " seconds." << endl;
00500 omni_thread::sleep(sleepInterval);
00501 }
00502
00503
00504 return 0;
00505 }
00506
00507 static void
00508 usage(int argc, char **argv)
00509 {
00510 cerr<<
00511 "\nCreate a PullConsumer to receive events from a channel.\n"
00512 "syntax: "<<(argc?argv[0]:"pullcons")<<" OPTIONS [CHANNEL_URI]\n"
00513 "\n"
00514 "CHANNEL_URI: The event channel may be specified as a URI.\n"
00515 " This may be an IOR, or a corbaloc::: or corbaname::: URI.\n"
00516 "\n"
00517 "OPTIONS: DEFAULT:\n"
00518 " -t enable try_pull mode\n"
00519 " -r connect using a nil reference\n"
00520 " -d NUM disconnect after receiving NUM events [0 - never disconnect]\n"
00521 " -s SECS sleep SECS seconds after disconnecting [0]\n"
00522 " -n NAME channel name (if URI is not specified) [\"EventChannel\"]\n"
00523 " -h display this help text\n" << endl;
00524 }