Package cherrypy :: Package process :: Module wspbus
[hide private]
[frames] | no frames]

Source Code for Module cherrypy.process.wspbus

  1  """An implementation of the Web Site Process Bus. 
  2   
  3  This module is completely standalone, depending only on the stdlib. 
  4   
  5  Web Site Process Bus 
  6  -------------------- 
  7   
  8  A Bus object is used to contain and manage site-wide behavior: 
  9  daemonization, HTTP server start/stop, process reload, signal handling, 
 10  drop privileges, PID file management, logging for all of these, 
 11  and many more. 
 12   
 13  In addition, a Bus object provides a place for each web framework 
 14  to register code that runs in response to site-wide events (like 
 15  process start and stop), or which controls or otherwise interacts with 
 16  the site-wide components mentioned above. For example, a framework which 
 17  uses file-based templates would add known template filenames to an 
 18  autoreload component. 
 19   
 20  Ideally, a Bus object will be flexible enough to be useful in a variety 
 21  of invocation scenarios: 
 22   
 23   1. The deployer starts a site from the command line via a 
 24      framework-neutral deployment script; applications from multiple frameworks 
 25      are mixed in a single site. Command-line arguments and configuration 
 26      files are used to define site-wide components such as the HTTP server, 
 27      WSGI component graph, autoreload behavior, signal handling, etc. 
 28   2. The deployer starts a site via some other process, such as Apache; 
 29      applications from multiple frameworks are mixed in a single site. 
 30      Autoreload and signal handling (from Python at least) are disabled. 
 31   3. The deployer starts a site via a framework-specific mechanism; 
 32      for example, when running tests, exploring tutorials, or deploying 
 33      single applications from a single framework. The framework controls 
 34      which site-wide components are enabled as it sees fit. 
 35   
 36  The Bus object in this package uses topic-based publish-subscribe 
 37  messaging to accomplish all this. A few topic channels are built in 
 38  ('start', 'stop', 'exit', 'graceful', 'log', and 'main'). Frameworks and 
 39  site containers are free to define their own. If a message is sent to a 
 40  channel that has not been defined or has no listeners, there is no effect. 
 41   
 42  In general, there should only ever be a single Bus object per process. 
 43  Frameworks and site containers share a single Bus object by publishing 
 44  messages and subscribing listeners. 
 45   
 46  The Bus object works as a finite state machine which models the current 
 47  state of the process. Bus methods move it from one state to another; 
 48  those methods then publish to subscribed listeners on the channel for 
 49  the new state.:: 
 50   
 51                          O 
 52                          | 
 53                          V 
 54         STOPPING --> STOPPED --> EXITING -> X 
 55            A   A         | 
 56            |    \___     | 
 57            |        \    | 
 58            |         V   V 
 59          STARTED <-- STARTING 
 60   
 61  """ 
 62   
 63  import atexit 
 64  import os 
 65  import sys 
 66  import threading 
 67  import time 
 68  import traceback as _traceback 
 69  import warnings 
 70   
 71  from cherrypy._cpcompat import set 
 72   
 73  # Here I save the value of os.getcwd(), which, if I am imported early enough, 
 74  # will be the directory from which the startup script was run.  This is needed 
 75  # by _do_execv(), to change back to the original directory before execv()ing a 
 76  # new process.  This is a defense against the application having changed the 
 77  # current working directory (which could make sys.executable "not found" if 
 78  # sys.executable is a relative-path, and/or cause other problems). 
 79  _startup_cwd = os.getcwd() 
 80   
81 -class ChannelFailures(Exception):
82 """Exception raised when errors occur in a listener during Bus.publish().""" 83 delimiter = '\n' 84
85 - def __init__(self, *args, **kwargs):
86 # Don't use 'super' here; Exceptions are old-style in Py2.4 87 # See http://www.cherrypy.org/ticket/959 88 Exception.__init__(self, *args, **kwargs) 89 self._exceptions = list()
90
91 - def handle_exception(self):
92 """Append the current exception to self.""" 93 self._exceptions.append(sys.exc_info()[1])
94
95 - def get_instances(self):
96 """Return a list of seen exception instances.""" 97 return self._exceptions[:]
98
99 - def __str__(self):
100 exception_strings = map(repr, self.get_instances()) 101 return self.delimiter.join(exception_strings)
102 103 __repr__ = __str__ 104
105 - def __bool__(self):
106 return bool(self._exceptions)
107 __nonzero__ = __bool__
108 109 # Use a flag to indicate the state of the bus.
110 -class _StateEnum(object):
111 - class State(object):
112 name = None
113 - def __repr__(self):
114 return "states.%s" % self.name
115
116 - def __setattr__(self, key, value):
117 if isinstance(value, self.State): 118 value.name = key 119 object.__setattr__(self, key, value)
120 states = _StateEnum() 121 states.STOPPED = states.State() 122 states.STARTING = states.State() 123 states.STARTED = states.State() 124 states.STOPPING = states.State() 125 states.EXITING = states.State() 126 127 128 try: 129 import fcntl 130 except ImportError: 131 max_files = 0 132 else: 133 try: 134 max_files = os.sysconf('SC_OPEN_MAX') 135 except AttributeError: 136 max_files = 1024 137 138
139 -class Bus(object):
140 """Process state-machine and messenger for HTTP site deployment. 141 142 All listeners for a given channel are guaranteed to be called even 143 if others at the same channel fail. Each failure is logged, but 144 execution proceeds on to the next listener. The only way to stop all 145 processing from inside a listener is to raise SystemExit and stop the 146 whole server. 147 """ 148 149 states = states 150 state = states.STOPPED 151 execv = False 152 max_cloexec_files = max_files 153
154 - def __init__(self):
155 self.execv = False 156 self.state = states.STOPPED 157 self.listeners = dict( 158 [(channel, set()) for channel 159 in ('start', 'stop', 'exit', 'graceful', 'log', 'main')]) 160 self._priorities = {}
161
162 - def subscribe(self, channel, callback, priority=None):
163 """Add the given callback at the given channel (if not present).""" 164 if channel not in self.listeners: 165 self.listeners[channel] = set() 166 self.listeners[channel].add(callback) 167 168 if priority is None: 169 priority = getattr(callback, 'priority', 50) 170 self._priorities[(channel, callback)] = priority
171
172 - def unsubscribe(self, channel, callback):
173 """Discard the given callback (if present).""" 174 listeners = self.listeners.get(channel) 175 if listeners and callback in listeners: 176 listeners.discard(callback) 177 del self._priorities[(channel, callback)]
178
179 - def publish(self, channel, *args, **kwargs):
180 """Return output of all subscribers for the given channel.""" 181 if channel not in self.listeners: 182 return [] 183 184 exc = ChannelFailures() 185 output = [] 186 187 items = [(self._priorities[(channel, listener)], listener) 188 for listener in self.listeners[channel]] 189 try: 190 items.sort(key=lambda item: item[0]) 191 except TypeError: 192 # Python 2.3 had no 'key' arg, but that doesn't matter 193 # since it could sort dissimilar types just fine. 194 items.sort() 195 for priority, listener in items: 196 try: 197 output.append(listener(*args, **kwargs)) 198 except KeyboardInterrupt: 199 raise 200 except SystemExit: 201 e = sys.exc_info()[1] 202 # If we have previous errors ensure the exit code is non-zero 203 if exc and e.code == 0: 204 e.code = 1 205 raise 206 except: 207 exc.handle_exception() 208 if channel == 'log': 209 # Assume any further messages to 'log' will fail. 210 pass 211 else: 212 self.log("Error in %r listener %r" % (channel, listener), 213 level=40, traceback=True) 214 if exc: 215 raise exc 216 return output
217
218 - def _clean_exit(self):
219 """An atexit handler which asserts the Bus is not running.""" 220 if self.state != states.EXITING: 221 warnings.warn( 222 "The main thread is exiting, but the Bus is in the %r state; " 223 "shutting it down automatically now. You must either call " 224 "bus.block() after start(), or call bus.exit() before the " 225 "main thread exits." % self.state, RuntimeWarning) 226 self.exit()
227
228 - def start(self):
229 """Start all services.""" 230 atexit.register(self._clean_exit) 231 232 self.state = states.STARTING 233 self.log('Bus STARTING') 234 try: 235 self.publish('start') 236 self.state = states.STARTED 237 self.log('Bus STARTED') 238 except (KeyboardInterrupt, SystemExit): 239 raise 240 except: 241 self.log("Shutting down due to error in start listener:", 242 level=40, traceback=True) 243 e_info = sys.exc_info()[1] 244 try: 245 self.exit() 246 except: 247 # Any stop/exit errors will be logged inside publish(). 248 pass 249 # Re-raise the original error 250 raise e_info
251
252 - def exit(self):
253 """Stop all services and prepare to exit the process.""" 254 exitstate = self.state 255 try: 256 self.stop() 257 258 self.state = states.EXITING 259 self.log('Bus EXITING') 260 self.publish('exit') 261 # This isn't strictly necessary, but it's better than seeing 262 # "Waiting for child threads to terminate..." and then nothing. 263 self.log('Bus EXITED') 264 except: 265 # This method is often called asynchronously (whether thread, 266 # signal handler, console handler, or atexit handler), so we 267 # can't just let exceptions propagate out unhandled. 268 # Assume it's been logged and just die. 269 os._exit(70) # EX_SOFTWARE 270 271 if exitstate == states.STARTING: 272 # exit() was called before start() finished, possibly due to 273 # Ctrl-C because a start listener got stuck. In this case, 274 # we could get stuck in a loop where Ctrl-C never exits the 275 # process, so we just call os.exit here. 276 os._exit(70) # EX_SOFTWARE
277
278 - def restart(self):
279 """Restart the process (may close connections). 280 281 This method does not restart the process from the calling thread; 282 instead, it stops the bus and asks the main thread to call execv. 283 """ 284 self.execv = True 285 self.exit()
286
287 - def graceful(self):
288 """Advise all services to reload.""" 289 self.log('Bus graceful') 290 self.publish('graceful')
291
292 - def block(self, interval=0.1):
293 """Wait for the EXITING state, KeyboardInterrupt or SystemExit. 294 295 This function is intended to be called only by the main thread. 296 After waiting for the EXITING state, it also waits for all threads 297 to terminate, and then calls os.execv if self.execv is True. This 298 design allows another thread to call bus.restart, yet have the main 299 thread perform the actual execv call (required on some platforms). 300 """ 301 try: 302 self.wait(states.EXITING, interval=interval, channel='main') 303 except (KeyboardInterrupt, IOError): 304 # The time.sleep call might raise 305 # "IOError: [Errno 4] Interrupted function call" on KBInt. 306 self.log('Keyboard Interrupt: shutting down bus') 307 self.exit() 308 except SystemExit: 309 self.log('SystemExit raised: shutting down bus') 310 self.exit() 311 raise 312 313 # Waiting for ALL child threads to finish is necessary on OS X. 314 # See http://www.cherrypy.org/ticket/581. 315 # It's also good to let them all shut down before allowing 316 # the main thread to call atexit handlers. 317 # See http://www.cherrypy.org/ticket/751. 318 self.log("Waiting for child threads to terminate...") 319 for t in threading.enumerate(): 320 if t != threading.currentThread() and t.isAlive(): 321 # Note that any dummy (external) threads are always daemonic. 322 if hasattr(threading.Thread, "daemon"): 323 # Python 2.6+ 324 d = t.daemon 325 else: 326 d = t.isDaemon() 327 if not d: 328 self.log("Waiting for thread %s." % t.getName()) 329 t.join() 330 331 if self.execv: 332 self._do_execv()
333
334 - def wait(self, state, interval=0.1, channel=None):
335 """Poll for the given state(s) at intervals; publish to channel.""" 336 if isinstance(state, (tuple, list)): 337 states = state 338 else: 339 states = [state] 340 341 def _wait(): 342 while self.state not in states: 343 time.sleep(interval) 344 self.publish(channel)
345 346 # From http://psyco.sourceforge.net/psycoguide/bugs.html: 347 # "The compiled machine code does not include the regular polling 348 # done by Python, meaning that a KeyboardInterrupt will not be 349 # detected before execution comes back to the regular Python 350 # interpreter. Your program cannot be interrupted if caught 351 # into an infinite Psyco-compiled loop." 352 try: 353 sys.modules['psyco'].cannotcompile(_wait) 354 except (KeyError, AttributeError): 355 pass 356 357 _wait()
358
359 - def _do_execv(self):
360 """Re-execute the current process. 361 362 This must be called from the main thread, because certain platforms 363 (OS X) don't allow execv to be called in a child thread very well. 364 """ 365 args = sys.argv[:] 366 self.log('Re-spawning %s' % ' '.join(args)) 367 368 if sys.platform[:4] == 'java': 369 from _systemrestart import SystemRestart 370 raise SystemRestart 371 else: 372 args.insert(0, sys.executable) 373 if sys.platform == 'win32': 374 args = ['"%s"' % arg for arg in args] 375 376 os.chdir(_startup_cwd) 377 if self.max_cloexec_files: 378 self._set_cloexec() 379 os.execv(sys.executable, args)
380
381 - def _set_cloexec(self):
382 """Set the CLOEXEC flag on all open files (except stdin/out/err). 383 384 If self.max_cloexec_files is an integer (the default), then on 385 platforms which support it, it represents the max open files setting 386 for the operating system. This function will be called just before 387 the process is restarted via os.execv() to prevent open files 388 from persisting into the new process. 389 390 Set self.max_cloexec_files to 0 to disable this behavior. 391 """ 392 for fd in range(3, self.max_cloexec_files): # skip stdin/out/err 393 try: 394 flags = fcntl.fcntl(fd, fcntl.F_GETFD) 395 except IOError: 396 continue 397 fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
398
399 - def stop(self):
400 """Stop all services.""" 401 self.state = states.STOPPING 402 self.log('Bus STOPPING') 403 self.publish('stop') 404 self.state = states.STOPPED 405 self.log('Bus STOPPED')
406
407 - def start_with_callback(self, func, args=None, kwargs=None):
408 """Start 'func' in a new thread T, then start self (and return T).""" 409 if args is None: 410 args = () 411 if kwargs is None: 412 kwargs = {} 413 args = (func,) + args 414 415 def _callback(func, *a, **kw): 416 self.wait(states.STARTED) 417 func(*a, **kw)
418 t = threading.Thread(target=_callback, args=args, kwargs=kwargs) 419 t.setName('Bus Callback ' + t.getName()) 420 t.start() 421 422 self.start() 423 424 return t 425
426 - def log(self, msg="", level=20, traceback=False):
427 """Log the given message. Append the last traceback if requested.""" 428 if traceback: 429 msg += "\n" + "".join(_traceback.format_exception(*sys.exc_info())) 430 self.publish('log', msg, level)
431 432 bus = Bus() 433