Package cherrypy :: Package test :: Module webtest
[hide private]
[frames] | no frames]

Source Code for Module cherrypy.test.webtest

  1  """Extensions to unittest for web frameworks. 
  2   
  3  Use the WebCase.getPage method to request a page from your HTTP server. 
  4   
  5  Framework Integration 
  6  ===================== 
  7   
  8  If you have control over your server process, you can handle errors 
  9  in the server-side of the HTTP conversation a bit better. You must run 
 10  both the client (your WebCase tests) and the server in the same process 
 11  (but in separate threads, obviously). 
 12   
 13  When an error occurs in the framework, call server_error. It will print 
 14  the traceback to stdout, and keep any assertions you have from running 
 15  (the assumption is that, if the server errors, the page output will not 
 16  be of further significance to your tests). 
 17  """ 
 18   
 19  import os 
 20  import pprint 
 21  import re 
 22  import socket 
 23  import sys 
 24  import time 
 25  import traceback 
 26  import types 
 27   
 28  from unittest import * 
 29  from unittest import _TextTestResult 
 30   
 31  from cherrypy._cpcompat import basestring, ntob, py3k, HTTPConnection, HTTPSConnection, unicodestr 
 32   
 33   
 34   
35 -def interface(host):
36 """Return an IP address for a client connection given the server host. 37 38 If the server is listening on '0.0.0.0' (INADDR_ANY) 39 or '::' (IN6ADDR_ANY), this will return the proper localhost.""" 40 if host == '0.0.0.0': 41 # INADDR_ANY, which should respond on localhost. 42 return "127.0.0.1" 43 if host == '::': 44 # IN6ADDR_ANY, which should respond on localhost. 45 return "::1" 46 return host
47 48
49 -class TerseTestResult(_TextTestResult):
50
51 - def printErrors(self):
52 # Overridden to avoid unnecessary empty line 53 if self.errors or self.failures: 54 if self.dots or self.showAll: 55 self.stream.writeln() 56 self.printErrorList('ERROR', self.errors) 57 self.printErrorList('FAIL', self.failures)
58 59
60 -class TerseTestRunner(TextTestRunner):
61 """A test runner class that displays results in textual form.""" 62
63 - def _makeResult(self):
64 return TerseTestResult(self.stream, self.descriptions, self.verbosity)
65
66 - def run(self, test):
67 "Run the given test case or test suite." 68 # Overridden to remove unnecessary empty lines and separators 69 result = self._makeResult() 70 test(result) 71 result.printErrors() 72 if not result.wasSuccessful(): 73 self.stream.write("FAILED (") 74 failed, errored = list(map(len, (result.failures, result.errors))) 75 if failed: 76 self.stream.write("failures=%d" % failed) 77 if errored: 78 if failed: self.stream.write(", ") 79 self.stream.write("errors=%d" % errored) 80 self.stream.writeln(")") 81 return result
82 83
84 -class ReloadingTestLoader(TestLoader):
85
86 - def loadTestsFromName(self, name, module=None):
87 """Return a suite of all tests cases given a string specifier. 88 89 The name may resolve either to a module, a test case class, a 90 test method within a test case class, or a callable object which 91 returns a TestCase or TestSuite instance. 92 93 The method optionally resolves the names relative to a given module. 94 """ 95 parts = name.split('.') 96 unused_parts = [] 97 if module is None: 98 if not parts: 99 raise ValueError("incomplete test name: %s" % name) 100 else: 101 parts_copy = parts[:] 102 while parts_copy: 103 target = ".".join(parts_copy) 104 if target in sys.modules: 105 module = reload(sys.modules[target]) 106 parts = unused_parts 107 break 108 else: 109 try: 110 module = __import__(target) 111 parts = unused_parts 112 break 113 except ImportError: 114 unused_parts.insert(0,parts_copy[-1]) 115 del parts_copy[-1] 116 if not parts_copy: 117 raise 118 parts = parts[1:] 119 obj = module 120 for part in parts: 121 obj = getattr(obj, part) 122 123 if type(obj) == types.ModuleType: 124 return self.loadTestsFromModule(obj) 125 elif (((py3k and isinstance(obj, type)) 126 or isinstance(obj, (type, types.ClassType))) 127 and issubclass(obj, TestCase)): 128 return self.loadTestsFromTestCase(obj) 129 elif type(obj) == types.UnboundMethodType: 130 if py3k: 131 return obj.__self__.__class__(obj.__name__) 132 else: 133 return obj.im_class(obj.__name__) 134 elif hasattr(obj, '__call__'): 135 test = obj() 136 if not isinstance(test, TestCase) and \ 137 not isinstance(test, TestSuite): 138 raise ValueError("calling %s returned %s, " 139 "not a test" % (obj,test)) 140 return test 141 else: 142 raise ValueError("do not know how to make test from: %s" % obj)
143 144 145 try: 146 # Jython support 147 if sys.platform[:4] == 'java':
148 - def getchar():
149 # Hopefully this is enough 150 return sys.stdin.read(1)
151 else: 152 # On Windows, msvcrt.getch reads a single char without output. 153 import msvcrt
154 - def getchar():
155 return msvcrt.getch()
156 except ImportError: 157 # Unix getchr 158 import tty, termios
159 - def getchar():
160 fd = sys.stdin.fileno() 161 old_settings = termios.tcgetattr(fd) 162 try: 163 tty.setraw(sys.stdin.fileno()) 164 ch = sys.stdin.read(1) 165 finally: 166 termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) 167 return ch
168 169
170 -class WebCase(TestCase):
171 HOST = "127.0.0.1" 172 PORT = 8000 173 HTTP_CONN = HTTPConnection 174 PROTOCOL = "HTTP/1.1" 175 176 scheme = "http" 177 url = None 178 179 status = None 180 headers = None 181 body = None 182 183 encoding = 'utf-8' 184 185 time = None 186
187 - def get_conn(self, auto_open=False):
188 """Return a connection to our HTTP server.""" 189 if self.scheme == "https": 190 cls = HTTPSConnection 191 else: 192 cls = HTTPConnection 193 conn = cls(self.interface(), self.PORT) 194 # Automatically re-connect? 195 conn.auto_open = auto_open 196 conn.connect() 197 return conn
198
199 - def set_persistent(self, on=True, auto_open=False):
200 """Make our HTTP_CONN persistent (or not). 201 202 If the 'on' argument is True (the default), then self.HTTP_CONN 203 will be set to an instance of HTTPConnection (or HTTPS 204 if self.scheme is "https"). This will then persist across requests. 205 206 We only allow for a single open connection, so if you call this 207 and we currently have an open connection, it will be closed. 208 """ 209 try: 210 self.HTTP_CONN.close() 211 except (TypeError, AttributeError): 212 pass 213 214 if on: 215 self.HTTP_CONN = self.get_conn(auto_open=auto_open) 216 else: 217 if self.scheme == "https": 218 self.HTTP_CONN = HTTPSConnection 219 else: 220 self.HTTP_CONN = HTTPConnection
221
222 - def _get_persistent(self):
223 return hasattr(self.HTTP_CONN, "__class__")
224 - def _set_persistent(self, on):
225 self.set_persistent(on)
226 persistent = property(_get_persistent, _set_persistent) 227
228 - def interface(self):
229 """Return an IP address for a client connection. 230 231 If the server is listening on '0.0.0.0' (INADDR_ANY) 232 or '::' (IN6ADDR_ANY), this will return the proper localhost.""" 233 return interface(self.HOST)
234
235 - def getPage(self, url, headers=None, method="GET", body=None, protocol=None):
236 """Open the url with debugging support. Return status, headers, body.""" 237 ServerError.on = False 238 239 if isinstance(url, unicodestr): 240 url = url.encode('utf-8') 241 if isinstance(body, unicodestr): 242 body = body.encode('utf-8') 243 244 self.url = url 245 self.time = None 246 start = time.time() 247 result = openURL(url, headers, method, body, self.HOST, self.PORT, 248 self.HTTP_CONN, protocol or self.PROTOCOL) 249 self.time = time.time() - start 250 self.status, self.headers, self.body = result 251 252 # Build a list of request cookies from the previous response cookies. 253 self.cookies = [('Cookie', v) for k, v in self.headers 254 if k.lower() == 'set-cookie'] 255 256 if ServerError.on: 257 raise ServerError() 258 return result
259 260 interactive = True 261 console_height = 30 262
263 - def _handlewebError(self, msg):
264 print("") 265 print(" ERROR: %s" % msg) 266 267 if not self.interactive: 268 raise self.failureException(msg) 269 270 p = " Show: [B]ody [H]eaders [S]tatus [U]RL; [I]gnore, [R]aise, or sys.e[X]it >> " 271 sys.stdout.write(p) 272 sys.stdout.flush() 273 while True: 274 i = getchar().upper() 275 if not isinstance(i, type("")): 276 i = i.decode('ascii') 277 if i not in "BHSUIRX": 278 continue 279 print(i.upper()) # Also prints new line 280 if i == "B": 281 for x, line in enumerate(self.body.splitlines()): 282 if (x + 1) % self.console_height == 0: 283 # The \r and comma should make the next line overwrite 284 sys.stdout.write("<-- More -->\r") 285 m = getchar().lower() 286 # Erase our "More" prompt 287 sys.stdout.write(" \r") 288 if m == "q": 289 break 290 print(line) 291 elif i == "H": 292 pprint.pprint(self.headers) 293 elif i == "S": 294 print(self.status) 295 elif i == "U": 296 print(self.url) 297 elif i == "I": 298 # return without raising the normal exception 299 return 300 elif i == "R": 301 raise self.failureException(msg) 302 elif i == "X": 303 self.exit() 304 sys.stdout.write(p) 305 sys.stdout.flush()
306
307 - def exit(self):
308 sys.exit()
309
310 - def assertStatus(self, status, msg=None):
311 """Fail if self.status != status.""" 312 if isinstance(status, basestring): 313 if not self.status == status: 314 if msg is None: 315 msg = 'Status (%r) != %r' % (self.status, status) 316 self._handlewebError(msg) 317 elif isinstance(status, int): 318 code = int(self.status[:3]) 319 if code != status: 320 if msg is None: 321 msg = 'Status (%r) != %r' % (self.status, status) 322 self._handlewebError(msg) 323 else: 324 # status is a tuple or list. 325 match = False 326 for s in status: 327 if isinstance(s, basestring): 328 if self.status == s: 329 match = True 330 break 331 elif int(self.status[:3]) == s: 332 match = True 333 break 334 if not match: 335 if msg is None: 336 msg = 'Status (%r) not in %r' % (self.status, status) 337 self._handlewebError(msg)
338
339 - def assertHeader(self, key, value=None, msg=None):
340 """Fail if (key, [value]) not in self.headers.""" 341 lowkey = key.lower() 342 for k, v in self.headers: 343 if k.lower() == lowkey: 344 if value is None or str(value) == v: 345 return v 346 347 if msg is None: 348 if value is None: 349 msg = '%r not in headers' % key 350 else: 351 msg = '%r:%r not in headers' % (key, value) 352 self._handlewebError(msg)
353
354 - def assertHeaderItemValue(self, key, value, msg=None):
355 """Fail if the header does not contain the specified value""" 356 actual_value = self.assertHeader(key, msg=msg) 357 header_values = map(str.strip, actual_value.split(',')) 358 if value in header_values: 359 return value 360 361 if msg is None: 362 msg = "%r not in %r" % (value, header_values) 363 self._handlewebError(msg)
364
365 - def assertNoHeader(self, key, msg=None):
366 """Fail if key in self.headers.""" 367 lowkey = key.lower() 368 matches = [k for k, v in self.headers if k.lower() == lowkey] 369 if matches: 370 if msg is None: 371 msg = '%r in headers' % key 372 self._handlewebError(msg)
373
374 - def assertBody(self, value, msg=None):
375 """Fail if value != self.body.""" 376 if isinstance(value, unicodestr): 377 value = value.encode(self.encoding) 378 if value != self.body: 379 if msg is None: 380 msg = 'expected body:\n%r\n\nactual body:\n%r' % (value, self.body) 381 self._handlewebError(msg)
382
383 - def assertInBody(self, value, msg=None):
384 """Fail if value not in self.body.""" 385 if isinstance(value, unicodestr): 386 value = value.encode(self.encoding) 387 if value not in self.body: 388 if msg is None: 389 msg = '%r not in body: %s' % (value, self.body) 390 self._handlewebError(msg)
391
392 - def assertNotInBody(self, value, msg=None):
393 """Fail if value in self.body.""" 394 if isinstance(value, unicodestr): 395 value = value.encode(self.encoding) 396 if value in self.body: 397 if msg is None: 398 msg = '%r found in body' % value 399 self._handlewebError(msg)
400
401 - def assertMatchesBody(self, pattern, msg=None, flags=0):
402 """Fail if value (a regex pattern) is not in self.body.""" 403 if isinstance(pattern, unicodestr): 404 pattern = pattern.encode(self.encoding) 405 if re.search(pattern, self.body, flags) is None: 406 if msg is None: 407 msg = 'No match for %r in body' % pattern 408 self._handlewebError(msg)
409 410 411 methods_with_bodies = ("POST", "PUT") 412
413 -def cleanHeaders(headers, method, body, host, port):
414 """Return request headers, with required headers added (if missing).""" 415 if headers is None: 416 headers = [] 417 418 # Add the required Host request header if not present. 419 # [This specifies the host:port of the server, not the client.] 420 found = False 421 for k, v in headers: 422 if k.lower() == 'host': 423 found = True 424 break 425 if not found: 426 if port == 80: 427 headers.append(("Host", host)) 428 else: 429 headers.append(("Host", "%s:%s" % (host, port))) 430 431 if method in methods_with_bodies: 432 # Stick in default type and length headers if not present 433 found = False 434 for k, v in headers: 435 if k.lower() == 'content-type': 436 found = True 437 break 438 if not found: 439 headers.append(("Content-Type", "application/x-www-form-urlencoded")) 440 headers.append(("Content-Length", str(len(body or "")))) 441 442 return headers
443 444
445 -def shb(response):
446 """Return status, headers, body the way we like from a response.""" 447 if py3k: 448 h = response.getheaders() 449 else: 450 h = [] 451 key, value = None, None 452 for line in response.msg.headers: 453 if line: 454 if line[0] in " \t": 455 value += line.strip() 456 else: 457 if key and value: 458 h.append((key, value)) 459 key, value = line.split(":", 1) 460 key = key.strip() 461 value = value.strip() 462 if key and value: 463 h.append((key, value)) 464 465 return "%s %s" % (response.status, response.reason), h, response.read()
466 467
468 -def openURL(url, headers=None, method="GET", body=None, 469 host="127.0.0.1", port=8000, http_conn=HTTPConnection, 470 protocol="HTTP/1.1"):
471 """Open the given HTTP resource and return status, headers, and body.""" 472 473 headers = cleanHeaders(headers, method, body, host, port) 474 475 # Trying 10 times is simply in case of socket errors. 476 # Normal case--it should run once. 477 for trial in range(10): 478 try: 479 # Allow http_conn to be a class or an instance 480 if hasattr(http_conn, "host"): 481 conn = http_conn 482 else: 483 conn = http_conn(interface(host), port) 484 485 conn._http_vsn_str = protocol 486 conn._http_vsn = int("".join([x for x in protocol if x.isdigit()])) 487 488 # skip_accept_encoding argument added in python version 2.4 489 if sys.version_info < (2, 4): 490 def putheader(self, header, value): 491 if header == 'Accept-Encoding' and value == 'identity': 492 return 493 self.__class__.putheader(self, header, value)
494 import new 495 conn.putheader = new.instancemethod(putheader, conn, conn.__class__) 496 conn.putrequest(method.upper(), url, skip_host=True) 497 elif not py3k: 498 conn.putrequest(method.upper(), url, skip_host=True, 499 skip_accept_encoding=True) 500 else: 501 import http.client 502 # Replace the stdlib method, which only accepts ASCII url's 503 def putrequest(self, method, url): 504 if self._HTTPConnection__response and self._HTTPConnection__response.isclosed(): 505 self._HTTPConnection__response = None 506 507 if self._HTTPConnection__state == http.client._CS_IDLE: 508 self._HTTPConnection__state = http.client._CS_REQ_STARTED 509 else: 510 raise http.client.CannotSendRequest() 511 512 self._method = method 513 if not url: 514 url = ntob('/') 515 request = ntob(' ').join((method.encode("ASCII"), url, 516 self._http_vsn_str.encode("ASCII"))) 517 self._output(request) 518 import types 519 conn.putrequest = types.MethodType(putrequest, conn) 520 521 conn.putrequest(method.upper(), url) 522 523 for key, value in headers: 524 conn.putheader(key, ntob(value, "Latin-1")) 525 conn.endheaders() 526 527 if body is not None: 528 conn.send(body) 529 530 # Handle response 531 response = conn.getresponse() 532 533 s, h, b = shb(response) 534 535 if not hasattr(http_conn, "host"): 536 # We made our own conn instance. Close it. 537 conn.close() 538 539 return s, h, b 540 except socket.error: 541 time.sleep(0.5) 542 if trial == 9: 543 raise 544 545 546 # Add any exceptions which your web framework handles 547 # normally (that you don't want server_error to trap). 548 ignored_exceptions = [] 549 550 # You'll want set this to True when you can't guarantee 551 # that each response will immediately follow each request; 552 # for example, when handling requests via multiple threads. 553 ignore_all = False 554
555 -class ServerError(Exception):
556 on = False
557 558
559 -def server_error(exc=None):
560 """Server debug hook. Return True if exception handled, False if ignored. 561 562 You probably want to wrap this, so you can still handle an error using 563 your framework when it's ignored. 564 """ 565 if exc is None: 566 exc = sys.exc_info() 567 568 if ignore_all or exc[0] in ignored_exceptions: 569 return False 570 else: 571 ServerError.on = True 572 print("") 573 print("".join(traceback.format_exception(*exc))) 574 return True
575