Package logilab :: Package common :: Module testlib
[frames] | no frames]

Source Code for Module logilab.common.testlib

   1  # -*- coding: utf-8 -*- 
   2  """Run tests. 
   3   
   4  This will find all modules whose name match a given prefix in the test 
   5  directory, and run them. Various command line options provide 
   6  additional facilities. 
   7   
   8  Command line options: 
   9   
  10   -v: verbose -- run tests in verbose mode with output to stdout 
  11   -q: quiet   -- don't print anything except if a test fails 
  12   -t: testdir -- directory where the tests will be found 
  13   -x: exclude -- add a test to exclude 
  14   -p: profile -- profiled execution 
  15   -c: capture -- capture standard out/err during tests 
  16   -d: dbc     -- enable design-by-contract 
  17   -m: match   -- only run test matching the tag pattern which follow 
  18   
  19  If no non-option arguments are present, prefixes used are 'test', 
  20  'regrtest', 'smoketest' and 'unittest'. 
  21   
  22  :copyright: 2003-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved. 
  23  :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr 
  24  :license: General Public License version 2 - http://www.gnu.org/licenses 
  25  """ 
  26  __docformat__ = "restructuredtext en" 
  27  # modified copy of some functions from test/regrtest.py from PyXml 
  28  # disable camel case warning 
  29  # pylint: disable-msg=C0103 
  30   
  31  import sys 
  32  import os, os.path as osp 
  33  import re 
  34  import time 
  35  import getopt 
  36  import traceback 
  37  import inspect 
  38  import unittest 
  39  import difflib 
  40  import types 
  41  import tempfile 
  42  import math 
  43  from shutil import rmtree 
  44  from operator import itemgetter 
  45  import warnings 
  46  from compiler.consts import CO_GENERATOR 
  47  from ConfigParser import ConfigParser 
  48  from itertools import dropwhile 
  49   
  50  try: 
  51      from test import test_support 
  52  except ImportError: 
  53      # not always available 
54 - class TestSupport:
55 - def unload(self, test):
56 pass
57 test_support = TestSupport() 58 59 # pylint: disable-msg=W0622 60 from logilab.common.compat import set, enumerate, any, sorted 61 # pylint: enable-msg=W0622 62 from logilab.common.modutils import load_module_from_name 63 from logilab.common.debugger import Debugger, colorize_source 64 from logilab.common.decorators import cached, classproperty 65 from logilab.common import textutils 66 67 68 __all__ = ['main', 'unittest_main', 'find_tests', 'run_test', 'spawn'] 69 70 DEFAULT_PREFIXES = ('test', 'regrtest', 'smoketest', 'unittest', 71 'func', 'validation') 72 73 ENABLE_DBC = False 74 75 FILE_RESTART = ".pytest.restart" 76 77 # used by unittest to count the number of relevant levels in the traceback 78 __unittest = 1 79 80
81 -def with_tempdir(callable):
82 """A decorator ensuring no temporary file left when the function return 83 Work only for temporary file create with the tempfile module""" 84 def proxy(*args, **kargs): 85 86 old_tmpdir = tempfile.gettempdir() 87 new_tmpdir = tempfile.mkdtemp("-logilab-common-testlib","temp-dir-") 88 tempfile.tempdir = new_tmpdir 89 try: 90 return callable(*args, **kargs) 91 finally: 92 try: 93 rmtree(new_tmpdir, ignore_errors=True) 94 finally: 95 tempfile.tempdir = old_tmpdir
96 return proxy 97 98
99 -def run_tests(tests, quiet, verbose, runner=None, capture=0):
100 """Execute a list of tests. 101 102 :rtype: tuple 103 :return: tuple (list of passed tests, list of failed tests, list of skipped tests) 104 """ 105 good = [] 106 bad = [] 107 skipped = [] 108 all_result = None 109 for test in tests: 110 if not quiet: 111 print 112 print '-'*80 113 print "Executing", test 114 result = run_test(test, verbose, runner, capture) 115 if type(result) is type(''): 116 # an unexpected error occurred 117 skipped.append( (test, result)) 118 else: 119 if all_result is None: 120 all_result = result 121 else: 122 all_result.testsRun += result.testsRun 123 all_result.failures += result.failures 124 all_result.errors += result.errors 125 all_result.skipped += result.skipped 126 if result.errors or result.failures: 127 bad.append(test) 128 if verbose: 129 print "test", test, \ 130 "failed -- %s errors, %s failures" % ( 131 len(result.errors), len(result.failures)) 132 else: 133 good.append(test) 134 135 return good, bad, skipped, all_result
136
137 -def find_tests(testdir, 138 prefixes=DEFAULT_PREFIXES, suffix=".py", 139 excludes=(), 140 remove_suffix=True):
141 """ 142 Return a list of all applicable test modules. 143 """ 144 tests = [] 145 for name in os.listdir(testdir): 146 if not suffix or name.endswith(suffix): 147 for prefix in prefixes: 148 if name.startswith(prefix): 149 if remove_suffix and name.endswith(suffix): 150 name = name[:-len(suffix)] 151 if name not in excludes: 152 tests.append(name) 153 tests.sort() 154 return tests
155 156
157 -def run_test(test, verbose, runner=None, capture=0):
158 """ 159 Run a single test. 160 161 test -- the name of the test 162 verbose -- if true, print more messages 163 """ 164 test_support.unload(test) 165 try: 166 m = load_module_from_name(test, path=sys.path) 167 # m = __import__(test, globals(), locals(), sys.path) 168 try: 169 suite = m.suite 170 if callable(suite): 171 suite = suite() 172 except AttributeError: 173 loader = unittest.TestLoader() 174 suite = loader.loadTestsFromModule(m) 175 if runner is None: 176 runner = SkipAwareTextTestRunner(capture=capture) # verbosity=0) 177 return runner.run(suite) 178 except KeyboardInterrupt, v: 179 raise KeyboardInterrupt, v, sys.exc_info()[2] 180 except: 181 # raise 182 type, value = sys.exc_info()[:2] 183 msg = "test %s crashed -- %s : %s" % (test, type, value) 184 if verbose: 185 traceback.print_exc() 186 return msg
187
188 -def _count(n, word):
189 """format word according to n""" 190 if n == 1: 191 return "%d %s" % (n, word) 192 else: 193 return "%d %ss" % (n, word)
194 195 196 197 198 ## PostMortem Debug facilities #####
199 -def start_interactive_mode(result):
200 """starts an interactive shell so that the user can inspect errors 201 """ 202 debuggers = result.debuggers 203 descrs = result.error_descrs + result.fail_descrs 204 if len(debuggers) == 1: 205 # don't ask for test name if there's only one failure 206 debuggers[0].start() 207 else: 208 while True: 209 testindex = 0 210 print "Choose a test to debug:" 211 # order debuggers in the same way than errors were printed 212 print "\n".join(['\t%s : %s' % (i, descr) for i, (_, descr) 213 in enumerate(descrs)]) 214 print "Type 'exit' (or ^D) to quit" 215 print 216 try: 217 todebug = raw_input('Enter a test name: ') 218 if todebug.strip().lower() == 'exit': 219 print 220 break 221 else: 222 try: 223 testindex = int(todebug) 224 debugger = debuggers[descrs[testindex][0]] 225 except (ValueError, IndexError): 226 print "ERROR: invalid test number %r" % (todebug, ) 227 else: 228 debugger.start() 229 except (EOFError, KeyboardInterrupt): 230 print 231 break
232 233 234 # test utils ################################################################## 235 from cStringIO import StringIO 236
237 -class SkipAwareTestResult(unittest._TextTestResult):
238
239 - def __init__(self, stream, descriptions, verbosity, 240 exitfirst=False, capture=0, printonly=None, 241 pdbmode=False, cvg=None, colorize=False):
242 super(SkipAwareTestResult, self).__init__(stream, 243 descriptions, verbosity) 244 self.skipped = [] 245 self.debuggers = [] 246 self.fail_descrs = [] 247 self.error_descrs = [] 248 self.exitfirst = exitfirst 249 self.capture = capture 250 self.printonly = printonly 251 self.pdbmode = pdbmode 252 self.cvg = cvg 253 self.colorize = colorize 254 self.pdbclass = Debugger 255 self.verbose = verbosity > 1
256
257 - def descrs_for(self, flavour):
258 return getattr(self, '%s_descrs' % flavour.lower())
259
260 - def _create_pdb(self, test_descr, flavour):
261 self.descrs_for(flavour).append( (len(self.debuggers), test_descr) ) 262 if self.pdbmode: 263 self.debuggers.append(self.pdbclass(sys.exc_info()[2]))
264 265
266 - def _iter_valid_frames(self, frames):
267 """only consider non-testlib frames when formatting traceback""" 268 lgc_testlib = osp.abspath(__file__) 269 std_testlib = osp.abspath(unittest.__file__) 270 invalid = lambda fi: osp.abspath(fi[1]) in (lgc_testlib, std_testlib) 271 for frameinfo in dropwhile(invalid, frames): 272 yield frameinfo
273
274 - def _exc_info_to_string(self, err, test):
275 """Converts a sys.exc_info()-style tuple of values into a string. 276 277 This method is overridden here because we want to colorize 278 lines if --color is passed, and display local variables if 279 --verbose is passed 280 """ 281 exctype, exc, tb = err 282 output = ['Traceback (most recent call last)'] 283 frames = inspect.getinnerframes(tb) 284 colorize = self.colorize 285 frames = enumerate(self._iter_valid_frames(frames)) 286 for index, (frame, filename, lineno, funcname, ctx, ctxindex) in frames: 287 filename = osp.abspath(filename) 288 if ctx is None: # pyc files or C extensions for instance 289 source = '<no source available>' 290 else: 291 source = ''.join(ctx) 292 if colorize: 293 filename = textutils.colorize_ansi(filename, 'magenta') 294 source = colorize_source(source) 295 output.append(' File "%s", line %s, in %s' % (filename, lineno, funcname)) 296 output.append(' %s' % source.strip()) 297 if self.verbose: 298 output.append('%r == %r' % (dir(frame), test.__module__)) 299 output.append('') 300 output.append(' ' + ' local variables '.center(66, '-')) 301 for varname, value in sorted(frame.f_locals.items()): 302 output.append(' %s: %r' % (varname, value)) 303 if varname == 'self': # special handy processing for self 304 for varname, value in sorted(vars(value).items()): 305 output.append(' self.%s: %r' % (varname, value)) 306 output.append(' ' + '-' * 66) 307 output.append('') 308 output.append(''.join(traceback.format_exception_only(exctype, exc))) 309 return '\n'.join(output)
310
311 - def addError(self, test, err):
312 """err == (exc_type, exc, tcbk)""" 313 exc_type, exc, _ = err # 314 if exc_type == TestSkipped: 315 self.addSkipped(test, exc) 316 else: 317 if self.exitfirst: 318 self.shouldStop = True 319 descr = self.getDescription(test) 320 super(SkipAwareTestResult, self).addError(test, err) 321 self._create_pdb(descr, 'error')
322
323 - def addFailure(self, test, err):
324 if self.exitfirst: 325 self.shouldStop = True 326 descr = self.getDescription(test) 327 super(SkipAwareTestResult, self).addFailure(test, err) 328 self._create_pdb(descr, 'fail')
329
330 - def addSkipped(self, test, reason):
331 self.skipped.append((test, self.getDescription(test), reason)) 332 if self.showAll: 333 self.stream.writeln("SKIPPED") 334 elif self.dots: 335 self.stream.write('S')
336
337 - def printErrors(self):
338 super(SkipAwareTestResult, self).printErrors() 339 self.printSkippedList()
340
341 - def printSkippedList(self):
342 for _, descr, err in self.skipped: # test, descr, err 343 self.stream.writeln(self.separator1) 344 self.stream.writeln("%s: %s" % ('SKIPPED', descr)) 345 self.stream.writeln("\t%s" % err)
346
347 - def printErrorList(self, flavour, errors):
348 for (_, descr), (test, err) in zip(self.descrs_for(flavour), errors): 349 self.stream.writeln(self.separator1) 350 if self.colorize: 351 self.stream.writeln("%s: %s" % ( 352 textutils.colorize_ansi(flavour, color='red'), descr)) 353 else: 354 self.stream.writeln("%s: %s" % (flavour, descr)) 355 356 self.stream.writeln(self.separator2) 357 self.stream.writeln(err) 358 try: 359 output, errput = test.captured_output() 360 except AttributeError: 361 pass # original unittest 362 else: 363 if output: 364 self.stream.writeln(self.separator2) 365 self.stream.writeln("captured stdout".center( 366 len(self.separator2))) 367 self.stream.writeln(self.separator2) 368 self.stream.writeln(output) 369 else: 370 self.stream.writeln('no stdout'.center( 371 len(self.separator2))) 372 if errput: 373 self.stream.writeln(self.separator2) 374 self.stream.writeln("captured stderr".center( 375 len(self.separator2))) 376 self.stream.writeln(self.separator2) 377 self.stream.writeln(errput) 378 else: 379 self.stream.writeln('no stderr'.center( 380 len(self.separator2)))
381 382
383 -def run(self, result, runcondition=None, options=None):
384 for test in self._tests: 385 if result.shouldStop: 386 break 387 try: 388 test(result, runcondition, options) 389 except TypeError: 390 # this might happen if a raw unittest.TestCase is defined 391 # and used with python (and not pytest) 392 warnings.warn("%s should extend lgc.testlib.TestCase instead of unittest.TestCase" 393 % test) 394 test(result) 395 return result
396 unittest.TestSuite.run = run 397 398 # backward compatibility: TestSuite might be imported from lgc.testlib 399 TestSuite = unittest.TestSuite 400 401 # python2.3 compat
402 -def __call__(self, *args, **kwds):
403 return self.run(*args, **kwds)
404 unittest.TestSuite.__call__ = __call__ 405 406
407 -class SkipAwareTextTestRunner(unittest.TextTestRunner):
408
409 - def __init__(self, stream=sys.stderr, verbosity=1, 410 exitfirst=False, capture=False, printonly=None, 411 pdbmode=False, cvg=None, test_pattern=None, 412 skipped_patterns=(), colorize=False, batchmode=False, 413 options=None):
414 super(SkipAwareTextTestRunner, self).__init__(stream=stream, 415 verbosity=verbosity) 416 self.exitfirst = exitfirst 417 self.capture = capture 418 self.printonly = printonly 419 self.pdbmode = pdbmode 420 self.cvg = cvg 421 self.test_pattern = test_pattern 422 self.skipped_patterns = skipped_patterns 423 self.colorize = colorize 424 self.batchmode = batchmode 425 self.options = options
426
427 - def _this_is_skipped(self, testedname):
428 return any([(pat in testedname) for pat in self.skipped_patterns])
429
430 - def _runcondition(self, test, skipgenerator=True):
431 if isinstance(test, InnerTest): 432 testname = test.name 433 else: 434 if isinstance(test, TestCase): 435 meth = test._get_test_method() 436 func = meth.im_func 437 testname = '%s.%s' % (meth.im_class.__name__, func.__name__) 438 elif isinstance(test, types.FunctionType): 439 func = test 440 testname = func.__name__ 441 elif isinstance(test, types.MethodType): 442 func = test.im_func 443 testname = '%s.%s' % (test.im_class.__name__, func.__name__) 444 else: 445 return True # Not sure when this happens 446 447 if is_generator(func) and skipgenerator: 448 return self.does_match_tags(func) # Let inner tests decide at run time 449 450 # print 'testname', testname, self.test_pattern 451 if self._this_is_skipped(testname): 452 return False # this was explicitly skipped 453 if self.test_pattern is not None: 454 try: 455 classpattern, testpattern = self.test_pattern.split('.') 456 klass, name = testname.split('.') 457 if classpattern not in klass or testpattern not in name: 458 return False 459 except ValueError: 460 if self.test_pattern not in testname: 461 return False 462 463 return self.does_match_tags(test)
464
465 - def does_match_tags(self, test):
466 if self.options is not None: 467 tags_pattern = getattr(self.options, 'tags_pattern', None) 468 if tags_pattern is not None: 469 tags = getattr(test, 'tags', None) 470 if tags is not None: 471 return tags.match(tags_pattern) 472 if isinstance(test, types.MethodType): 473 tags = getattr(test.im_class, 'tags', Tags()) 474 return tags.match(tags_pattern) 475 return False 476 return True # no pattern
477
478 - def _makeResult(self):
479 return SkipAwareTestResult(self.stream, self.descriptions, 480 self.verbosity, self.exitfirst, self.capture, 481 self.printonly, self.pdbmode, self.cvg, 482 self.colorize)
483
484 - def run(self, test):
485 "Run the given test case or test suite." 486 result = self._makeResult() 487 startTime = time.time() 488 test(result, self._runcondition, self.options) 489 stopTime = time.time() 490 timeTaken = stopTime - startTime 491 result.printErrors() 492 if not self.batchmode: 493 self.stream.writeln(result.separator2) 494 run = result.testsRun 495 self.stream.writeln("Ran %d test%s in %.3fs" % 496 (run, run != 1 and "s" or "", timeTaken)) 497 self.stream.writeln() 498 if not result.wasSuccessful(): 499 if self.colorize: 500 self.stream.write(textutils.colorize_ansi("FAILED", color='red')) 501 else: 502 self.stream.write("FAILED") 503 else: 504 if self.colorize: 505 self.stream.write(textutils.colorize_ansi("OK", color='green')) 506 else: 507 self.stream.write("OK") 508 failed, errored, skipped = map(len, (result.failures, result.errors, 509 result.skipped)) 510 511 det_results = [] 512 for name, value in (("failures", result.failures), 513 ("errors",result.errors), 514 ("skipped", result.skipped)): 515 if value: 516 det_results.append("%s=%i" % (name, len(value))) 517 if det_results: 518 self.stream.write(" (") 519 self.stream.write(', '.join(det_results)) 520 self.stream.write(")") 521 self.stream.writeln("") 522 return result
523 524
525 -class keywords(dict):
526 """Keyword args (**kwargs) support for generative tests."""
527
528 -class starargs(tuple):
529 """Variable arguments (*args) for generative tests."""
530 - def __new__(cls, *args):
531 return tuple.__new__(cls, args)
532 533 534
535 -class NonStrictTestLoader(unittest.TestLoader):
536 """ 537 Overrides default testloader to be able to omit classname when 538 specifying tests to run on command line. 539 540 For example, if the file test_foo.py contains :: 541 542 class FooTC(TestCase): 543 def test_foo1(self): # ... 544 def test_foo2(self): # ... 545 def test_bar1(self): # ... 546 547 class BarTC(TestCase): 548 def test_bar2(self): # ... 549 550 'python test_foo.py' will run the 3 tests in FooTC 551 'python test_foo.py FooTC' will run the 3 tests in FooTC 552 'python test_foo.py test_foo' will run test_foo1 and test_foo2 553 'python test_foo.py test_foo1' will run test_foo1 554 'python test_foo.py test_bar' will run FooTC.test_bar1 and BarTC.test_bar2 555 """ 556
557 - def __init__(self):
558 self.skipped_patterns = []
559
560 - def loadTestsFromNames(self, names, module=None):
561 suites = [] 562 for name in names: 563 suites.extend(self.loadTestsFromName(name, module)) 564 return self.suiteClass(suites)
565
566 - def _collect_tests(self, module):
567 tests = {} 568 for obj in vars(module).values(): 569 if (issubclass(type(obj), (types.ClassType, type)) and 570 issubclass(obj, unittest.TestCase)): 571 classname = obj.__name__ 572 if classname[0] == '_' or self._this_is_skipped(classname): 573 continue 574 methodnames = [] 575 # obj is a TestCase class 576 for attrname in dir(obj): 577 if attrname.startswith(self.testMethodPrefix): 578 attr = getattr(obj, attrname) 579 if callable(attr): 580 methodnames.append(attrname) 581 # keep track of class (obj) for convenience 582 tests[classname] = (obj, methodnames) 583 return tests
584
585 - def loadTestsFromSuite(self, module, suitename):
586 try: 587 suite = getattr(module, suitename)() 588 except AttributeError: 589 return [] 590 assert hasattr(suite, '_tests'), \ 591 "%s.%s is not a valid TestSuite" % (module.__name__, suitename) 592 # python2.3 does not implement __iter__ on suites, we need to return 593 # _tests explicitly 594 return suite._tests
595
596 - def loadTestsFromName(self, name, module=None):
597 parts = name.split('.') 598 if module is None or len(parts) > 2: 599 # let the base class do its job here 600 return [super(NonStrictTestLoader, self).loadTestsFromName(name)] 601 tests = self._collect_tests(module) 602 # import pprint 603 # pprint.pprint(tests) 604 collected = [] 605 if len(parts) == 1: 606 pattern = parts[0] 607 if callable(getattr(module, pattern, None) 608 ) and pattern not in tests: 609 # consider it as a suite 610 return self.loadTestsFromSuite(module, pattern) 611 if pattern in tests: 612 # case python unittest_foo.py MyTestTC 613 klass, methodnames = tests[pattern] 614 for methodname in methodnames: 615 collected = [klass(methodname) 616 for methodname in methodnames] 617 else: 618 # case python unittest_foo.py something 619 for klass, methodnames in tests.values(): 620 collected += [klass(methodname) 621 for methodname in methodnames] 622 elif len(parts) == 2: 623 # case "MyClass.test_1" 624 classname, pattern = parts 625 klass, methodnames = tests.get(classname, (None, [])) 626 for methodname in methodnames: 627 collected = [klass(methodname) for methodname in methodnames] 628 return collected
629
630 - def _this_is_skipped(self, testedname):
631 return any([(pat in testedname) for pat in self.skipped_patterns])
632
633 - def getTestCaseNames(self, testCaseClass):
634 """Return a sorted sequence of method names found within testCaseClass 635 """ 636 is_skipped = self._this_is_skipped 637 classname = testCaseClass.__name__ 638 if classname[0] == '_' or is_skipped(classname): 639 return [] 640 testnames = super(NonStrictTestLoader, self).getTestCaseNames( 641 testCaseClass) 642 return [testname for testname in testnames if not is_skipped(testname)]
643 644
645 -class SkipAwareTestProgram(unittest.TestProgram):
646 # XXX: don't try to stay close to unittest.py, use optparse 647 USAGE = """\ 648 Usage: %(progName)s [options] [test] [...] 649 650 Options: 651 -h, --help Show this message 652 -v, --verbose Verbose output 653 -i, --pdb Enable test failure inspection 654 -x, --exitfirst Exit on first failure 655 -c, --capture Captures and prints standard out/err only on errors 656 -p, --printonly Only prints lines matching specified pattern 657 (implies capture) 658 -s, --skip skip test matching this pattern (no regexp for now) 659 -q, --quiet Minimal output 660 --color colorize tracebacks 661 662 -m, --match Run only test whose tag match this pattern 663 664 -P, --profile FILE: Run the tests using cProfile and saving results 665 in FILE 666 667 Examples: 668 %(progName)s - run default set of tests 669 %(progName)s MyTestSuite - run suite 'MyTestSuite' 670 %(progName)s MyTestCase.testSomething - run MyTestCase.testSomething 671 %(progName)s MyTestCase - run all 'test*' test methods 672 in MyTestCase 673 """
674 - def __init__(self, module='__main__', defaultTest=None, batchmode=False, 675 cvg=None, options=None, outstream=sys.stderr):
676 self.batchmode = batchmode 677 self.cvg = cvg 678 self.options = options 679 self.outstream = outstream 680 super(SkipAwareTestProgram, self).__init__( 681 module=module, defaultTest=defaultTest, 682 testLoader=NonStrictTestLoader())
683
684 - def parseArgs(self, argv):
685 self.pdbmode = False 686 self.exitfirst = False 687 self.capture = 0 688 self.printonly = None 689 self.skipped_patterns = [] 690 self.test_pattern = None 691 self.tags_pattern = None 692 self.colorize = False 693 self.profile_name = None 694 import getopt 695 try: 696 options, args = getopt.getopt(argv[1:], 'hHvixrqcp:s:m:P:', 697 ['help', 'verbose', 'quiet', 'pdb', 698 'exitfirst', 'restart', 'capture', 'printonly=', 699 'skip=', 'color', 'match=', 'profile=']) 700 for opt, value in options: 701 if opt in ('-h', '-H', '--help'): 702 self.usageExit() 703 if opt in ('-i', '--pdb'): 704 self.pdbmode = True 705 if opt in ('-x', '--exitfirst'): 706 self.exitfirst = True 707 if opt in ('-r', '--restart'): 708 self.restart = True 709 self.exitfirst = True 710 if opt in ('-q', '--quiet'): 711 self.verbosity = 0 712 if opt in ('-v', '--verbose'): 713 self.verbosity = 2 714 if opt in ('-c', '--capture'): 715 self.capture += 1 716 if opt in ('-p', '--printonly'): 717 self.printonly = re.compile(value) 718 if opt in ('-s', '--skip'): 719 self.skipped_patterns = [pat.strip() for pat in 720 value.split(', ')] 721 if opt == '--color': 722 self.colorize = True 723 if opt in ('-m', '--match'): 724 #self.tags_pattern = value 725 self.options["tag_pattern"] = value 726 if opt in ('-P', '--profile'): 727 self.profile_name = value 728 self.testLoader.skipped_patterns = self.skipped_patterns 729 if self.printonly is not None: 730 self.capture += 1 731 if len(args) == 0 and self.defaultTest is None: 732 suitefunc = getattr(self.module, 'suite', None) 733 if isinstance(suitefunc, (types.FunctionType, 734 types.MethodType)): 735 self.test = self.module.suite() 736 else: 737 self.test = self.testLoader.loadTestsFromModule(self.module) 738 return 739 if len(args) > 0: 740 self.test_pattern = args[0] 741 self.testNames = args 742 else: 743 self.testNames = (self.defaultTest, ) 744 self.createTests() 745 except getopt.error, msg: 746 self.usageExit(msg)
747 748
749 - def runTests(self):
750 if self.profile_name: 751 import cProfile 752 cProfile.runctx('self._runTests()', globals(), locals(), self.profile_name ) 753 else: 754 return self._runTests()
755
756 - def _runTests(self):
757 if hasattr(self.module, 'setup_module'): 758 try: 759 self.module.setup_module(self.options) 760 except Exception, exc: 761 print 'setup_module error:', exc 762 sys.exit(1) 763 self.testRunner = SkipAwareTextTestRunner(verbosity=self.verbosity, 764 stream=self.outstream, 765 exitfirst=self.exitfirst, 766 capture=self.capture, 767 printonly=self.printonly, 768 pdbmode=self.pdbmode, 769 cvg=self.cvg, 770 test_pattern=self.test_pattern, 771 skipped_patterns=self.skipped_patterns, 772 colorize=self.colorize, 773 batchmode=self.batchmode, 774 options=self.options) 775 776 def removeSucceededTests(obj, succTests): 777 """ Recursive function that removes succTests from 778 a TestSuite or TestCase 779 """ 780 if isinstance(obj, TestSuite): 781 removeSucceededTests(obj._tests, succTests) 782 if isinstance(obj, list): 783 for el in obj[:]: 784 if isinstance(el, TestSuite): 785 removeSucceededTests(el, succTests) 786 elif isinstance(el, TestCase): 787 descr = '.'.join((el.__class__.__module__, 788 el.__class__.__name__, 789 el._testMethodName)) 790 if descr in succTests: 791 obj.remove(el)
792 # take care, self.options may be None 793 if getattr(self.options, 'restart', False): 794 # retrieve succeeded tests from FILE_RESTART 795 try: 796 restartfile = open(FILE_RESTART, 'r') 797 try: 798 try: 799 succeededtests = list(elem.rstrip('\n\r') for elem in 800 restartfile.readlines()) 801 removeSucceededTests(self.test, succeededtests) 802 except Exception, e: 803 raise e 804 finally: 805 restartfile.close() 806 except Exception ,e: 807 raise "Error while reading \ 808 succeeded tests into", osp.join(os.getcwd(),FILE_RESTART) 809 810 result = self.testRunner.run(self.test) 811 if hasattr(self.module, 'teardown_module'): 812 try: 813 self.module.teardown_module(self.options, result) 814 except Exception, exc: 815 print 'teardown_module error:', exc 816 sys.exit(1) 817 if result.debuggers and self.pdbmode: 818 start_interactive_mode(result) 819 if not self.batchmode: 820 sys.exit(not result.wasSuccessful()) 821 self.result = result
822 823 824 825
826 -class FDCapture:
827 """adapted from py lib (http://codespeak.net/py) 828 Capture IO to/from a given os-level filedescriptor. 829 """
830 - def __init__(self, fd, attr='stdout', printonly=None):
831 self.targetfd = fd 832 self.tmpfile = os.tmpfile() # self.maketempfile() 833 self.printonly = printonly 834 # save original file descriptor 835 self._savefd = os.dup(fd) 836 # override original file descriptor 837 os.dup2(self.tmpfile.fileno(), fd) 838 # also modify sys module directly 839 self.oldval = getattr(sys, attr) 840 setattr(sys, attr, self) # self.tmpfile) 841 self.attr = attr
842
843 - def write(self, msg):
844 # msg might be composed of several lines 845 for line in msg.splitlines(): 846 line += '\n' # keepdend=True is not enough 847 if self.printonly is None or self.printonly.search(line) is None: 848 self.tmpfile.write(line) 849 else: 850 os.write(self._savefd, line)
851 852 ## def maketempfile(self): 853 ## tmpf = os.tmpfile() 854 ## fd = os.dup(tmpf.fileno()) 855 ## newf = os.fdopen(fd, tmpf.mode, 0) # No buffering 856 ## tmpf.close() 857 ## return newf 858
859 - def restore(self):
860 """restore original fd and returns captured output""" 861 #XXX: hack hack hack 862 self.tmpfile.flush() 863 try: 864 ref_file = getattr(sys, '__%s__' % self.attr) 865 ref_file.flush() 866 except AttributeError: 867 pass 868 if hasattr(self.oldval, 'flush'): 869 self.oldval.flush() 870 # restore original file descriptor 871 os.dup2(self._savefd, self.targetfd) 872 # restore sys module 873 setattr(sys, self.attr, self.oldval) 874 # close backup descriptor 875 os.close(self._savefd) 876 # go to beginning of file and read it 877 self.tmpfile.seek(0) 878 return self.tmpfile.read()
879 880
881 -def _capture(which='stdout', printonly=None):
882 """private method, should not be called directly 883 (cf. capture_stdout() and capture_stderr()) 884 """ 885 assert which in ('stdout', 'stderr' 886 ), "Can only capture stdout or stderr, not %s" % which 887 if which == 'stdout': 888 fd = 1 889 else: 890 fd = 2 891 return FDCapture(fd, which, printonly)
892
893 -def capture_stdout(printonly=None):
894 """captures the standard output 895 896 returns a handle object which has a `restore()` method. 897 The restore() method returns the captured stdout and restores it 898 """ 899 return _capture('stdout', printonly)
900
901 -def capture_stderr(printonly=None):
902 """captures the standard error output 903 904 returns a handle object which has a `restore()` method. 905 The restore() method returns the captured stderr and restores it 906 """ 907 return _capture('stderr', printonly)
908 909
910 -def unittest_main(module='__main__', defaultTest=None, 911 batchmode=False, cvg=None, options=None, 912 outstream=sys.stderr):
913 """use this function if you want to have the same functionality 914 as unittest.main""" 915 return SkipAwareTestProgram(module, defaultTest, batchmode, 916 cvg, options, outstream)
917
918 -class TestSkipped(Exception):
919 """raised when a test is skipped"""
920
921 -def is_generator(function):
922 flags = function.func_code.co_flags 923 return flags & CO_GENERATOR
924 925
926 -def parse_generative_args(params):
927 args = [] 928 varargs = () 929 kwargs = {} 930 flags = 0 # 2 <=> starargs, 4 <=> kwargs 931 for param in params: 932 if isinstance(param, starargs): 933 varargs = param 934 if flags: 935 raise TypeError('found starargs after keywords !') 936 flags |= 2 937 args += list(varargs) 938 elif isinstance(param, keywords): 939 kwargs = param 940 if flags & 4: 941 raise TypeError('got multiple keywords parameters') 942 flags |= 4 943 elif flags & 2 or flags & 4: 944 raise TypeError('found parameters after kwargs or args') 945 else: 946 args.append(param) 947 948 return args, kwargs
949
950 -class InnerTest(tuple):
951 - def __new__(cls, name, *data):
952 instance = tuple.__new__(cls, data) 953 instance.name = name 954 return instance
955 956
957 -class TestCase(unittest.TestCase):
958 """unittest.TestCase with some additional methods""" 959 960 capture = False 961 pdbclass = Debugger 962
963 - def __init__(self, methodName='runTest'):
964 super(TestCase, self).__init__(methodName) 965 # internal API changed in python2.5 966 if sys.version_info >= (2, 5): 967 self.__exc_info = self._exc_info 968 self.__testMethodName = self._testMethodName 969 else: 970 # let's give easier access to _testMethodName to every subclasses 971 self._testMethodName = self.__testMethodName 972 self._captured_stdout = "" 973 self._captured_stderr = "" 974 self._out = [] 975 self._err = [] 976 self._current_test_descr = None 977 self._options_ = None
978
979 - def datadir(cls): # pylint: disable-msg=E0213
980 """helper attribute holding the standard test's data directory 981 982 NOTE: this is a logilab's standard 983 """ 984 mod = __import__(cls.__module__) 985 return osp.join(osp.dirname(osp.abspath(mod.__file__)), 'data')
986 # cache it (use a class method to cache on class since TestCase is 987 # instantiated for each test run) 988 datadir = classproperty(cached(datadir)) 989
990 - def datapath(cls, fname):
991 """joins the object's datadir and `fname`""" 992 return osp.join(cls.datadir, fname)
993 datapath = classmethod(datapath) 994
995 - def set_description(self, descr):
996 """sets the current test's description. 997 This can be useful for generative tests because it allows to specify 998 a description per yield 999 """ 1000 self._current_test_descr = descr
1001 1002 # override default's unittest.py feature
1003 - def shortDescription(self):
1004 """override default unitest shortDescription to handle correctly 1005 generative tests 1006 """ 1007 if self._current_test_descr is not None: 1008 return self._current_test_descr 1009 return super(TestCase, self).shortDescription()
1010 1011
1012 - def captured_output(self):
1013 """return a two tuple with standard output and error stripped""" 1014 return self._captured_stdout.strip(), self._captured_stderr.strip()
1015
1016 - def _start_capture(self):
1017 """start_capture if enable""" 1018 if self.capture: 1019 warnings.simplefilter('ignore', DeprecationWarning) 1020 self.start_capture()
1021
1022 - def _stop_capture(self):
1023 """stop_capture and restore previous output""" 1024 self._force_output_restore()
1025
1026 - def start_capture(self, printonly=None):
1027 """start_capture""" 1028 self._out.append(capture_stdout(printonly or self._printonly)) 1029 self._err.append(capture_stderr(printonly or self._printonly))
1030
1031 - def printonly(self, pattern, flags=0):
1032 """set the pattern of line to print""" 1033 rgx = re.compile(pattern, flags) 1034 if self._out: 1035 self._out[-1].printonly = rgx 1036 self._err[-1].printonly = rgx 1037 else: 1038 self.start_capture(printonly=rgx)
1039
1040 - def stop_capture(self):
1041 """stop output and error capture""" 1042 if self._out: 1043 _out = self._out.pop() 1044 _err = self._err.pop() 1045 return _out.restore(), _err.restore() 1046 return '', ''
1047
1048 - def _force_output_restore(self):
1049 """remove all capture set""" 1050 while self._out: 1051 self._captured_stdout += self._out.pop().restore() 1052 self._captured_stderr += self._err.pop().restore()
1053
1054 - def quiet_run(self, result, func, *args, **kwargs):
1055 self._start_capture() 1056 try: 1057 func(*args, **kwargs) 1058 except (KeyboardInterrupt, SystemExit): 1059 self._stop_capture() 1060 raise 1061 except: 1062 self._stop_capture() 1063 result.addError(self, self.__exc_info()) 1064 return False 1065 self._stop_capture() 1066 return True
1067
1068 - def _get_test_method(self):
1069 """return the test method""" 1070 return getattr(self, self.__testMethodName)
1071 1072
1073 - def optval(self, option, default=None):
1074 """return the option value or default if the option is not define""" 1075 return getattr(self._options_, option, default)
1076
1077 - def __call__(self, result=None, runcondition=None, options=None):
1078 """rewrite TestCase.__call__ to support generative tests 1079 This is mostly a copy/paste from unittest.py (i.e same 1080 variable names, same logic, except for the generative tests part) 1081 """ 1082 if result is None: 1083 result = self.defaultTestResult() 1084 result.pdbclass = self.pdbclass 1085 # if self.capture is True here, it means it was explicitly specified 1086 # in the user's TestCase class. If not, do what was asked on cmd line 1087 self.capture = self.capture or getattr(result, 'capture', False) 1088 self._options_ = options 1089 self._printonly = getattr(result, 'printonly', None) 1090 # if result.cvg: 1091 # result.cvg.start() 1092 testMethod = self._get_test_method() 1093 if runcondition and not runcondition(testMethod): 1094 return # test is skipped 1095 result.startTest(self) 1096 try: 1097 if not self.quiet_run(result, self.setUp): 1098 return 1099 generative = is_generator(testMethod.im_func) 1100 # generative tests 1101 if generative: 1102 self._proceed_generative(result, testMethod, 1103 runcondition) 1104 else: 1105 status = self._proceed(result, testMethod) 1106 success = (status == 0) 1107 if not self.quiet_run(result, self.tearDown): 1108 return 1109 if not generative and success: 1110 if hasattr(options, "exitfirst") and options.exitfirst: 1111 # add this test to restart file 1112 try: 1113 restartfile = open(FILE_RESTART, 'a') 1114 try: 1115 try: 1116 descr = '.'.join((self.__class__.__module__, 1117 self.__class__.__name__, 1118 self._testMethodName)) 1119 restartfile.write(descr+os.linesep) 1120 except Exception, e: 1121 raise e 1122 finally: 1123 restartfile.close() 1124 except Exception, e: 1125 print >> sys.__stderr__, "Error while saving \ 1126 succeeded test into", osp.join(os.getcwd(),FILE_RESTART) 1127 raise e 1128 result.addSuccess(self) 1129 finally: 1130 # if result.cvg: 1131 # result.cvg.stop() 1132 result.stopTest(self)
1133 1134 1135
1136 - def _proceed_generative(self, result, testfunc, runcondition=None):
1137 # cancel startTest()'s increment 1138 result.testsRun -= 1 1139 self._start_capture() 1140 success = True 1141 try: 1142 for params in testfunc(): 1143 if runcondition and not runcondition(testfunc, 1144 skipgenerator=False): 1145 if not (isinstance(params, InnerTest) 1146 and runcondition(params)): 1147 continue 1148 if not isinstance(params, (tuple, list)): 1149 params = (params, ) 1150 func = params[0] 1151 args, kwargs = parse_generative_args(params[1:]) 1152 # increment test counter manually 1153 result.testsRun += 1 1154 status = self._proceed(result, func, args, kwargs) 1155 if status == 0: 1156 result.addSuccess(self) 1157 success = True 1158 else: 1159 success = False 1160 if status == 2: 1161 result.shouldStop = True 1162 if result.shouldStop: # either on error or on exitfirst + error 1163 break 1164 except: 1165 # if an error occurs between two yield 1166 result.addError(self, self.__exc_info()) 1167 success = False 1168 self._stop_capture() 1169 return success
1170
1171 - def _proceed(self, result, testfunc, args=(), kwargs=None):
1172 """proceed the actual test 1173 returns 0 on success, 1 on failure, 2 on error 1174 1175 Note: addSuccess can't be called here because we have to wait 1176 for tearDown to be successfully executed to declare the test as 1177 successful 1178 """ 1179 self._start_capture() 1180 kwargs = kwargs or {} 1181 try: 1182 testfunc(*args, **kwargs) 1183 self._stop_capture() 1184 except self.failureException: 1185 self._stop_capture() 1186 result.addFailure(self, self.__exc_info()) 1187 return 1 1188 except KeyboardInterrupt: 1189 self._stop_capture() 1190 raise 1191 except: 1192 self._stop_capture() 1193 result.addError(self, self.__exc_info()) 1194 return 2 1195 return 0
1196
1197 - def defaultTestResult(self):
1198 """return a new instance of the defaultTestResult""" 1199 return SkipAwareTestResult()
1200
1201 - def skip(self, msg=None):
1202 """mark a test as skipped for the <msg> reason""" 1203 msg = msg or 'test was skipped' 1204 raise TestSkipped(msg)
1205
1206 - def assertIn(self, object, set):
1207 """assert <object> are in <set>""" 1208 self.assert_(object in set, "%s not in %s" % (object, set))
1209
1210 - def assertNotIn(self, object, set):
1211 """assert <object> are not in <set>""" 1212 self.assert_(object not in set, "%s in %s" % (object, set))
1213
1214 - def assertDictEquals(self, dict1, dict2):
1215 """compares two dicts 1216 1217 If the two dict differ, the first difference is shown in the error 1218 message 1219 """ 1220 dict1 = dict(dict1) 1221 msgs = [] 1222 for key, value in dict2.items(): 1223 try: 1224 if dict1[key] != value: 1225 msgs.append('%r != %r for key %r' % (dict1[key], value, 1226 key)) 1227 del dict1[key] 1228 except KeyError: 1229 msgs.append('missing %r key' % key) 1230 if dict1: 1231 msgs.append('dict2 is lacking %r' % dict1) 1232 if msgs: 1233 self.fail('\n'.join(msgs))
1234 assertDictEqual = assertDictEquals 1235 1236 1237
1238 - def assertUnorderedIterableEquals(self, got, expected, msg=None):
1239 """compares two iterable and shows difference between both""" 1240 got, expected = list(got), list(expected) 1241 self.assertSetEqual(set(got), set(expected), msg) 1242 if len(got) != len(expected): 1243 if msg is None: 1244 msg = ['Iterable have the same elements but not the same number', 1245 '\t<element>\t<expected>i\t<got>'] 1246 got_count = {} 1247 expected_count = {} 1248 for element in got: 1249 got_count[element] = got_count.get(element,0) + 1 1250 for element in expected: 1251 expected_count[element] = expected_count.get(element,0) + 1 1252 # we know that got_count.key() == expected_count.key() 1253 # because of assertSetEquals 1254 for element, count in got_count.iteritems(): 1255 other_count = expected_count[element] 1256 if other_count != count: 1257 msg.append('\t%s\t%s\t%s' % (element, other_count, count)) 1258 1259 self.fail(msg)
1260 1261 assertUnorderedIterableEqual = assertUnorderedIterableEquals 1262 assertUnordIterEquals = assertUnordIterEqual = assertUnorderedIterableEqual 1263
1264 - def assertSetEquals(self,got,expected, msg=None):
1265 if not(isinstance(got, set) and isinstance(expected, set)): 1266 warnings.warn("the assertSetEquals function if now intended for set only."\ 1267 "use assertUnorderedIterableEquals instead.", 1268 DeprecationWarning, 2) 1269 return self.assertUnorderedIterableEquals(got,expected, msg) 1270 1271 items={} 1272 items['missing'] = expected - got 1273 items['unexpected'] = got - expected 1274 if any(items.itervalues()): 1275 if msg is None: 1276 msg = '\n'.join('%s:\n\t%s' % (key,"\n\t".join(str(value) for value in values)) 1277 for key, values in items.iteritems() if values) 1278 self.fail(msg)
1279 1280 1281 assertSetEqual = assertSetEquals 1282
1283 - def assertListEquals(self, list_1, list_2, msg=None):
1284 """compares two lists 1285 1286 If the two list differ, the first difference is shown in the error 1287 message 1288 """ 1289 _l1 = list_1[:] 1290 for i, value in enumerate(list_2): 1291 try: 1292 if _l1[0] != value: 1293 from pprint import pprint 1294 pprint(list_1) 1295 pprint(list_2) 1296 self.fail('%r != %r for index %d' % (_l1[0], value, i)) 1297 del _l1[0] 1298 except IndexError: 1299 if msg is None: 1300 msg = 'list_1 has only %d elements, not %s '\ 1301 '(at least %r missing)'% (i, len(list_2), value) 1302 self.fail(msg) 1303 if _l1: 1304 if msg is None: 1305 msg = 'list_2 is lacking %r' % _l1 1306 self.fail(msg)
1307 assertListEqual = assertListEquals 1308
1309 - def assertLinesEquals(self, list_1, list_2, msg=None, striplines=False):
1310 """assert list of lines are equal""" 1311 lines1 = list_1.splitlines() 1312 if striplines: 1313 lines1 = [l.strip() for l in lines1] 1314 lines2 = list_2.splitlines() 1315 if striplines: 1316 lines2 = [l.strip() for l in lines2] 1317 self.assertListEquals(lines1, lines2, msg)
1318 assertLineEqual = assertLinesEquals 1319
1320 - def assertXMLWellFormed(self, stream, msg=None):
1321 """asserts the XML stream is well-formed (no DTD conformance check)""" 1322 from xml.sax import make_parser, SAXParseException 1323 parser = make_parser() 1324 try: 1325 parser.parse(stream) 1326 except SAXParseException: 1327 if msg is None: 1328 msg = 'XML stream not well formed' 1329 self.fail(msg)
1330
1331 - def assertXMLStringWellFormed(self, xml_string, msg=None):
1332 """asserts the XML string is well-formed (no DTD conformance check)""" 1333 stream = StringIO(xml_string) 1334 self.assertXMLWellFormed(stream, msg)
1335
1336 - def assertXMLEqualsTuple(self, element, tup):
1337 """compare an ElementTree Element to a tuple formatted as follow: 1338 (tagname, [attrib[, children[, text[, tail]]]])""" 1339 # check tag 1340 self.assertTextEquals(element.tag, tup[0]) 1341 # check attrib 1342 if len(element.attrib) or len(tup)>1: 1343 if len(tup)<=1: 1344 self.fail( "tuple %s has no attributes (%s expected)"%(tup, 1345 dict(element.attrib))) 1346 self.assertDictEquals(element.attrib, tup[1]) 1347 # check children 1348 if len(element) or len(tup)>2: 1349 if len(tup)<=2: 1350 self.fail( "tuple %s has no children (%i expected)"%(tup, 1351 len(element))) 1352 if len(element) != len(tup[2]): 1353 self.fail( "tuple %s has %i children%s (%i expected)"%(tup, 1354 len(tup[2]), 1355 ('', 's')[len(tup[2])>1], len(element))) 1356 for index in xrange(len(tup[2])): 1357 self.assertXMLEqualsTuple(element[index], tup[2][index]) 1358 #check text 1359 if element.text or len(tup)>3: 1360 if len(tup)<=3: 1361 self.fail( "tuple %s has no text value (%r expected)"%(tup, 1362 element.text)) 1363 self.assertTextEquals(element.text, tup[3]) 1364 #check tail 1365 if element.tail or len(tup)>4: 1366 if len(tup)<=4: 1367 self.fail( "tuple %s has no tail value (%r expected)"%(tup, 1368 element.tail)) 1369 self.assertTextEquals(element.tail, tup[4])
1370
1371 - def _difftext(self, lines1, lines2, junk=None, msg_prefix='Texts differ'):
1372 junk = junk or (' ', '\t') 1373 # result is a generator 1374 result = difflib.ndiff(lines1, lines2, charjunk=lambda x: x in junk) 1375 read = [] 1376 for line in result: 1377 read.append(line) 1378 # lines that don't start with a ' ' are diff ones 1379 if not line.startswith(' '): 1380 self.fail('\n'.join(['%s\n'%msg_prefix]+read + list(result)))
1381
1382 - def assertTextEquals(self, text1, text2, junk=None, 1383 msg_prefix='Text differ'):
1384 """compare two multiline strings (using difflib and splitlines())""" 1385 msg = [] 1386 if not isinstance(text1, basestring): 1387 msg.append('text1 is not a string (%s)'%(type(text1))) 1388 if not isinstance(text2, basestring): 1389 msg.append('text2 is not a string (%s)'%(type(text2))) 1390 if msg: 1391 self.fail('\n'.join(msg)) 1392 self._difftext(text1.strip().splitlines(True), text2.strip().splitlines(True), 1393 junk, msg_prefix)
1394 assertTextEqual = assertTextEquals 1395
1396 - def assertStreamEquals(self, stream1, stream2, junk=None, 1397 msg_prefix='Stream differ'):
1398 """compare two streams (using difflib and readlines())""" 1399 # if stream2 is stream2, readlines() on stream1 will also read lines 1400 # in stream2, so they'll appear different, although they're not 1401 if stream1 is stream2: 1402 return 1403 # make sure we compare from the beginning of the stream 1404 stream1.seek(0) 1405 stream2.seek(0) 1406 # compare 1407 self._difftext(stream1.readlines(), stream2.readlines(), junk, 1408 msg_prefix)
1409 1410 assertStreamEqual = assertStreamEquals
1411 - def assertFileEquals(self, fname1, fname2, junk=(' ', '\t')):
1412 """compares two files using difflib""" 1413 self.assertStreamEqual(file(fname1), file(fname2), junk, 1414 msg_prefix='Files differs\n-:%s\n+:%s\n'%(fname1, fname2))
1415 assertFileEqual = assertFileEquals 1416 1417
1418 - def assertDirEquals(self, path_a, path_b):
1419 """compares two files using difflib""" 1420 assert osp.exists(path_a), "%s doesn't exists" % path_a 1421 assert osp.exists(path_b), "%s doesn't exists" % path_b 1422 1423 all_a = [ (ipath[len(path_a):].lstrip('/'), idirs, ifiles) 1424 for ipath, idirs, ifiles in os.walk(path_a)] 1425 all_a.sort(key=itemgetter(0)) 1426 1427 all_b = [ (ipath[len(path_b):].lstrip('/'), idirs, ifiles) 1428 for ipath, idirs, ifiles in os.walk(path_b)] 1429 all_b.sort(key=itemgetter(0)) 1430 1431 iter_a, iter_b = iter(all_a), iter(all_b) 1432 partial_iter = True 1433 ipath_a, idirs_a, ifiles_a = data_a = None, None, None 1434 while True: 1435 try: 1436 ipath_a, idirs_a, ifiles_a = datas_a = iter_a.next() 1437 partial_iter = False 1438 ipath_b, idirs_b, ifiles_b = datas_b = iter_b.next() 1439 partial_iter = True 1440 1441 1442 self.assert_(ipath_a == ipath_b, 1443 "unexpected %s in %s while looking %s from %s" % 1444 (ipath_a, path_a, ipath_b, path_b)) 1445 1446 1447 errors = {} 1448 sdirs_a = set(idirs_a) 1449 sdirs_b = set(idirs_b) 1450 errors["unexpected directories"] = sdirs_a - sdirs_b 1451 errors["missing directories"] = sdirs_b - sdirs_a 1452 1453 sfiles_a = set(ifiles_a) 1454 sfiles_b = set(ifiles_b) 1455 errors["unexpected files"] = sfiles_a - sfiles_b 1456 errors["missing files"] = sfiles_b - sfiles_a 1457 1458 1459 msgs = [ "%s: %s"% (name, items) 1460 for name, items in errors.iteritems() if items] 1461 1462 if msgs: 1463 msgs.insert(0,"%s and %s differ :" % ( 1464 osp.join(path_a, ipath_a), 1465 osp.join(path_b, ipath_b), 1466 )) 1467 self.fail("\n".join(msgs)) 1468 1469 for files in (ifiles_a, ifiles_b): 1470 files.sort() 1471 1472 for index, path in enumerate(ifiles_a): 1473 self.assertFileEquals(osp.join(path_a, ipath_a, path), 1474 osp.join(path_b, ipath_b, ifiles_b[index])) 1475 1476 except StopIteration: 1477 break
1478 1479 1480 assertDirEqual = assertDirEquals 1481 1482
1483 - def assertIsInstance(self, obj, klass, msg=None, strict=False):
1484 """compares two files using difflib""" 1485 if msg is None: 1486 if strict: 1487 msg = '%r is not of class %s but of %s' 1488 else: 1489 msg = '%r is not an instance of %s but of %s' 1490 msg = msg % (obj, klass, type(obj)) 1491 if strict: 1492 self.assert_(obj.__class__ is klass, msg) 1493 else: 1494 self.assert_(isinstance(obj, klass), msg)
1495
1496 - def assertIs(self, obj, other, msg=None):
1497 """compares identity of two reference""" 1498 if msg is None: 1499 msg = "%r is not %r"%(obj, other) 1500 self.assert_(obj is other, msg)
1501 1502
1503 - def assertIsNot(self, obj, other, msg=None):
1504 """compares identity of two reference""" 1505 if msg is None: 1506 msg = "%r is %r"%(obj, other) 1507 self.assert_(obj is not other, msg )
1508
1509 - def assertNone(self, obj, msg=None):
1510 """assert obj is None""" 1511 if msg is None: 1512 msg = "reference to %r when None expected"%(obj,) 1513 self.assert_( obj is None, msg )
1514
1515 - def assertNotNone(self, obj, msg=None):
1516 """assert obj is not None""" 1517 if msg is None: 1518 msg = "unexpected reference to None" 1519 self.assert_( obj is not None, msg )
1520
1521 - def assertFloatAlmostEquals(self, obj, other, prec=1e-5, msg=None):
1522 """compares two floats""" 1523 if msg is None: 1524 msg = "%r != %r" % (obj, other) 1525 self.assert_(math.fabs(obj - other) < prec, msg)
1526
1527 - def failUnlessRaises(self, excClass, callableObj, *args, **kwargs):
1528 """override default failUnlessRaise method to return the raised 1529 exception instance. 1530 1531 Fail unless an exception of class excClass is thrown 1532 by callableObj when invoked with arguments args and keyword 1533 arguments kwargs. If a different type of exception is 1534 thrown, it will not be caught, and the test case will be 1535 deemed to have suffered an error, exactly as for an 1536 unexpected exception. 1537 """ 1538 try: 1539 callableObj(*args, **kwargs) 1540 except excClass, exc: 1541 return exc 1542 else: 1543 if hasattr(excClass, '__name__'): 1544 excName = excClass.__name__ 1545 else: 1546 excName = str(excClass) 1547 raise self.failureException, "%s not raised" % excName
1548 1549 assertRaises = failUnlessRaises 1550 1551 import doctest 1552
1553 -class SkippedSuite(unittest.TestSuite):
1554 - def test(self):
1555 """just there to trigger test execution""" 1556 self.skipped_test('doctest module has no DocTestSuite class')
1557 1558 1559 # DocTestFinder was introduced in python2.4 1560 if sys.version_info >= (2, 4):
1561 - class DocTestFinder(doctest.DocTestFinder):
1562
1563 - def __init__(self, *args, **kwargs):
1564 self.skipped = kwargs.pop('skipped', ()) 1565 doctest.DocTestFinder.__init__(self, *args, **kwargs)
1566
1567 - def _get_test(self, obj, name, module, globs, source_lines):
1568 """override default _get_test method to be able to skip tests 1569 according to skipped attribute's value 1570 1571 Note: Python (<=2.4) use a _name_filter which could be used for that 1572 purpose but it's no longer available in 2.5 1573 Python 2.5 seems to have a [SKIP] flag 1574 """ 1575 if getattr(obj, '__name__', '') in self.skipped: 1576 return None 1577 return doctest.DocTestFinder._get_test(self, obj, name, module, 1578 globs, source_lines)
1579 else: 1580 # this is a hack to make skipped work with python <= 2.3
1581 - class DocTestFinder(object):
1582 - def __init__(self, skipped):
1583 self.skipped = skipped 1584 self.original_find_tests = doctest._find_tests 1585 doctest._find_tests = self._find_tests
1586
1587 - def _find_tests(self, module, prefix=None):
1588 tests = [] 1589 for testinfo in self.original_find_tests(module, prefix): 1590 testname, _, _, _ = testinfo 1591 # testname looks like A.B.C.function_name 1592 testname = testname.split('.')[-1] 1593 if testname not in self.skipped: 1594 tests.append(testinfo) 1595 return tests
1596 1597
1598 -class DocTest(TestCase):
1599 """trigger module doctest 1600 I don't know how to make unittest.main consider the DocTestSuite instance 1601 without this hack 1602 """ 1603 skipped = ()
1604 - def __call__(self, result=None, runcondition=None, options=None):\ 1605 # pylint: disable-msg=W0613
1606 try: 1607 finder = DocTestFinder(skipped=self.skipped) 1608 if sys.version_info >= (2, 4): 1609 suite = doctest.DocTestSuite(self.module, test_finder=finder) 1610 else: 1611 suite = doctest.DocTestSuite(self.module) 1612 except AttributeError: 1613 suite = SkippedSuite() 1614 return suite.run(result)
1615 run = __call__ 1616
1617 - def test(self):
1618 """just there to trigger test execution"""
1619 1620 MAILBOX = None 1621
1622 -class MockSMTP:
1623 """fake smtplib.SMTP""" 1624
1625 - def __init__(self, host, port):
1626 self.host = host 1627 self.port = port 1628 global MAILBOX 1629 self.reveived = MAILBOX = []
1630
1631 - def set_debuglevel(self, debuglevel):
1632 """ignore debug level"""
1633
1634 - def sendmail(self, fromaddr, toaddres, body):
1635 """push sent mail in the mailbox""" 1636 self.reveived.append((fromaddr, toaddres, body))
1637
1638 - def quit(self):
1639 """ignore quit"""
1640 1641
1642 -class MockConfigParser(ConfigParser):
1643 """fake ConfigParser.ConfigParser""" 1644
1645 - def __init__(self, options):
1646 ConfigParser.__init__(self) 1647 for section, pairs in options.iteritems(): 1648 self.add_section(section) 1649 for key, value in pairs.iteritems(): 1650 self.set(section,key,value)
1651 - def write(self, _):
1652 raise NotImplementedError()
1653 1654
1655 -class MockConnection:
1656 """fake DB-API 2.0 connexion AND cursor (i.e. cursor() return self)""" 1657
1658 - def __init__(self, results):
1659 self.received = [] 1660 self.states = [] 1661 self.results = results
1662
1663 - def cursor(self):
1664 """Mock cursor method""" 1665 return self
1666 - def execute(self, query, args=None):
1667 """Mock execute method""" 1668 self.received.append( (query, args) )
1669 - def fetchone(self):
1670 """Mock fetchone method""" 1671 return self.results[0]
1672 - def fetchall(self):
1673 """Mock fetchall method""" 1674 return self.results
1675 - def commit(self):
1676 """Mock commiy method""" 1677 self.states.append( ('commit', len(self.received)) )
1678 - def rollback(self):
1679 """Mock rollback method""" 1680 self.states.append( ('rollback', len(self.received)) )
1681 - def close(self):
1682 """Mock close method""" 1683 pass
1684 1685
1686 -def mock_object(**params):
1687 """creates an object using params to set attributes 1688 >>> option = mock_object(verbose=False, index=range(5)) 1689 >>> option.verbose 1690 False 1691 >>> option.index 1692 [0, 1, 2, 3, 4] 1693 """ 1694 return type('Mock', (), params)()
1695 1696
1697 -def create_files(paths, chroot):
1698 """Creates directories and files found in <path>. 1699 1700 :param paths: list of relative paths to files or directories 1701 :param chroot: the root directory in which paths will be created 1702 1703 >>> from os.path import isdir, isfile 1704 >>> isdir('/tmp/a') 1705 False 1706 >>> create_files(['a/b/foo.py', 'a/b/c/', 'a/b/c/d/e.py'], '/tmp') 1707 >>> isdir('/tmp/a') 1708 True 1709 >>> isdir('/tmp/a/b/c') 1710 True 1711 >>> isfile('/tmp/a/b/c/d/e.py') 1712 True 1713 >>> isfile('/tmp/a/b/foo.py') 1714 True 1715 """ 1716 dirs, files = set(), set() 1717 for path in paths: 1718 path = osp.join(chroot, path) 1719 filename = osp.basename(path) 1720 # path is a directory path 1721 if filename == '': 1722 dirs.add(path) 1723 # path is a filename path 1724 else: 1725 dirs.add(osp.dirname(path)) 1726 files.add(path) 1727 for dirpath in dirs: 1728 if not osp.isdir(dirpath): 1729 os.makedirs(dirpath) 1730 for filepath in files: 1731 file(filepath, 'w').close()
1732
1733 -def enable_dbc(*args):
1734 """ 1735 Without arguments, return True if contracts can be enabled and should be 1736 enabled (see option -d), return False otherwise. 1737 1738 With arguments, return False if contracts can't or shouldn't be enabled, 1739 otherwise weave ContractAspect with items passed as arguments. 1740 """ 1741 if not ENABLE_DBC: 1742 return False 1743 try: 1744 from logilab.aspects.weaver import weaver 1745 from logilab.aspects.lib.contracts import ContractAspect 1746 except ImportError: 1747 sys.stderr.write( 1748 'Warning: logilab.aspects is not available. Contracts disabled.') 1749 return False 1750 for arg in args: 1751 weaver.weave_module(arg, ContractAspect) 1752 return True
1753 1754
1755 -class AttrObject: # XXX cf mock_object
1756 - def __init__(self, **kwargs):
1757 self.__dict__.update(kwargs)
1758
1759 -def tag(*args):
1760 """descriptor adding tag to a function""" 1761 def desc(func): 1762 assert not hasattr(func, 'tags') 1763 func.tags = Tags(args) 1764 return func
1765 return desc 1766
1767 -class Tags(set):
1768 """A set of tag able validate an expression"""
1769 - def __getitem__(self, key):
1770 return key in self
1771
1772 - def match(self, exp):
1773 return eval(exp, {}, self)
1774
1775 -def require_version(version):
1776 """ Compare version of python interpreter to the given one. Skip the test 1777 if older. 1778 """ 1779 def check_require_version(f): 1780 version_elements = version.split('.') 1781 try: 1782 compare = tuple([int(v) for v in version_elements]) 1783 except ValueError: 1784 raise ValueError('%s is not a correct version : should be X.Y[.Z].' % version) 1785 current = sys.version_info[:3] 1786 #print 'comp', current, compare 1787 if current < compare: 1788 #print 'version too old' 1789 def new_f(self, *args, **kwargs): 1790 self.skip('Need at least %s version of python. Current version is %s.' % (version, '.'.join([str(element) for element in current])))
1791 new_f.__name__ = f.__name__ 1792 return new_f 1793 else: 1794 #print 'version young enough' 1795 return f 1796 return check_require_version 1797
1798 -def require_module(module):
1799 """ Check if the given module is loaded. Skip the test if not. 1800 """ 1801 def check_require_module(f): 1802 try: 1803 __import__(module) 1804 #print module, 'imported' 1805 return f 1806 except ImportError: 1807 #print module, 'can not be imported' 1808 def new_f(self, *args, **kwargs): 1809 self.skip('%s can not be imported.' % module)
1810 new_f.__name__ = f.__name__ 1811 return new_f 1812 return check_require_module 1813