1
2 """Run tests.
3
4 This will find all modules whose name match a given prefix in the test
5 directory, and run them. Various command line options provide
6 additional facilities.
7
8 Command line options:
9
10 -v: verbose -- run tests in verbose mode with output to stdout
11 -q: quiet -- don't print anything except if a test fails
12 -t: testdir -- directory where the tests will be found
13 -x: exclude -- add a test to exclude
14 -p: profile -- profiled execution
15 -c: capture -- capture standard out/err during tests
16 -d: dbc -- enable design-by-contract
17 -m: match -- only run test matching the tag pattern which follow
18
19 If no non-option arguments are present, prefixes used are 'test',
20 'regrtest', 'smoketest' and 'unittest'.
21
22 :copyright: 2003-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
23 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
24 :license: General Public License version 2 - http://www.gnu.org/licenses
25 """
26 __docformat__ = "restructuredtext en"
27
28
29
30
31 import sys
32 import os, os.path as osp
33 import re
34 import time
35 import getopt
36 import traceback
37 import inspect
38 import unittest
39 import difflib
40 import types
41 import tempfile
42 import math
43 from shutil import rmtree
44 from operator import itemgetter
45 import warnings
46 from compiler.consts import CO_GENERATOR
47 from ConfigParser import ConfigParser
48 from itertools import dropwhile
49
50 try:
51 from test import test_support
52 except ImportError:
53
57 test_support = TestSupport()
58
59
60 from logilab.common.compat import set, enumerate, any, sorted
61
62 from logilab.common.modutils import load_module_from_name
63 from logilab.common.debugger import Debugger, colorize_source
64 from logilab.common.decorators import cached, classproperty
65 from logilab.common import textutils
66
67
68 __all__ = ['main', 'unittest_main', 'find_tests', 'run_test', 'spawn']
69
70 DEFAULT_PREFIXES = ('test', 'regrtest', 'smoketest', 'unittest',
71 'func', 'validation')
72
73 ENABLE_DBC = False
74
75 FILE_RESTART = ".pytest.restart"
76
77
78 __unittest = 1
79
80
82 """A decorator ensuring no temporary file left when the function return
83 Work only for temporary file create with the tempfile module"""
84 def proxy(*args, **kargs):
85
86 old_tmpdir = tempfile.gettempdir()
87 new_tmpdir = tempfile.mkdtemp("-logilab-common-testlib","temp-dir-")
88 tempfile.tempdir = new_tmpdir
89 try:
90 return callable(*args, **kargs)
91 finally:
92 try:
93 rmtree(new_tmpdir, ignore_errors=True)
94 finally:
95 tempfile.tempdir = old_tmpdir
96 return proxy
97
98
99 -def run_tests(tests, quiet, verbose, runner=None, capture=0):
100 """Execute a list of tests.
101
102 :rtype: tuple
103 :return: tuple (list of passed tests, list of failed tests, list of skipped tests)
104 """
105 good = []
106 bad = []
107 skipped = []
108 all_result = None
109 for test in tests:
110 if not quiet:
111 print
112 print '-'*80
113 print "Executing", test
114 result = run_test(test, verbose, runner, capture)
115 if type(result) is type(''):
116
117 skipped.append( (test, result))
118 else:
119 if all_result is None:
120 all_result = result
121 else:
122 all_result.testsRun += result.testsRun
123 all_result.failures += result.failures
124 all_result.errors += result.errors
125 all_result.skipped += result.skipped
126 if result.errors or result.failures:
127 bad.append(test)
128 if verbose:
129 print "test", test, \
130 "failed -- %s errors, %s failures" % (
131 len(result.errors), len(result.failures))
132 else:
133 good.append(test)
134
135 return good, bad, skipped, all_result
136
141 """
142 Return a list of all applicable test modules.
143 """
144 tests = []
145 for name in os.listdir(testdir):
146 if not suffix or name.endswith(suffix):
147 for prefix in prefixes:
148 if name.startswith(prefix):
149 if remove_suffix and name.endswith(suffix):
150 name = name[:-len(suffix)]
151 if name not in excludes:
152 tests.append(name)
153 tests.sort()
154 return tests
155
156
157 -def run_test(test, verbose, runner=None, capture=0):
158 """
159 Run a single test.
160
161 test -- the name of the test
162 verbose -- if true, print more messages
163 """
164 test_support.unload(test)
165 try:
166 m = load_module_from_name(test, path=sys.path)
167
168 try:
169 suite = m.suite
170 if callable(suite):
171 suite = suite()
172 except AttributeError:
173 loader = unittest.TestLoader()
174 suite = loader.loadTestsFromModule(m)
175 if runner is None:
176 runner = SkipAwareTextTestRunner(capture=capture)
177 return runner.run(suite)
178 except KeyboardInterrupt, v:
179 raise KeyboardInterrupt, v, sys.exc_info()[2]
180 except:
181
182 type, value = sys.exc_info()[:2]
183 msg = "test %s crashed -- %s : %s" % (test, type, value)
184 if verbose:
185 traceback.print_exc()
186 return msg
187
189 """format word according to n"""
190 if n == 1:
191 return "%d %s" % (n, word)
192 else:
193 return "%d %ss" % (n, word)
194
195
196
197
198
200 """starts an interactive shell so that the user can inspect errors
201 """
202 debuggers = result.debuggers
203 descrs = result.error_descrs + result.fail_descrs
204 if len(debuggers) == 1:
205
206 debuggers[0].start()
207 else:
208 while True:
209 testindex = 0
210 print "Choose a test to debug:"
211
212 print "\n".join(['\t%s : %s' % (i, descr) for i, (_, descr)
213 in enumerate(descrs)])
214 print "Type 'exit' (or ^D) to quit"
215 print
216 try:
217 todebug = raw_input('Enter a test name: ')
218 if todebug.strip().lower() == 'exit':
219 print
220 break
221 else:
222 try:
223 testindex = int(todebug)
224 debugger = debuggers[descrs[testindex][0]]
225 except (ValueError, IndexError):
226 print "ERROR: invalid test number %r" % (todebug, )
227 else:
228 debugger.start()
229 except (EOFError, KeyboardInterrupt):
230 print
231 break
232
233
234
235 from cStringIO import StringIO
236
238
239 - def __init__(self, stream, descriptions, verbosity,
240 exitfirst=False, capture=0, printonly=None,
241 pdbmode=False, cvg=None, colorize=False):
242 super(SkipAwareTestResult, self).__init__(stream,
243 descriptions, verbosity)
244 self.skipped = []
245 self.debuggers = []
246 self.fail_descrs = []
247 self.error_descrs = []
248 self.exitfirst = exitfirst
249 self.capture = capture
250 self.printonly = printonly
251 self.pdbmode = pdbmode
252 self.cvg = cvg
253 self.colorize = colorize
254 self.pdbclass = Debugger
255 self.verbose = verbosity > 1
256
258 return getattr(self, '%s_descrs' % flavour.lower())
259
261 self.descrs_for(flavour).append( (len(self.debuggers), test_descr) )
262 if self.pdbmode:
263 self.debuggers.append(self.pdbclass(sys.exc_info()[2]))
264
265
267 """only consider non-testlib frames when formatting traceback"""
268 lgc_testlib = osp.abspath(__file__)
269 std_testlib = osp.abspath(unittest.__file__)
270 invalid = lambda fi: osp.abspath(fi[1]) in (lgc_testlib, std_testlib)
271 for frameinfo in dropwhile(invalid, frames):
272 yield frameinfo
273
275 """Converts a sys.exc_info()-style tuple of values into a string.
276
277 This method is overridden here because we want to colorize
278 lines if --color is passed, and display local variables if
279 --verbose is passed
280 """
281 exctype, exc, tb = err
282 output = ['Traceback (most recent call last)']
283 frames = inspect.getinnerframes(tb)
284 colorize = self.colorize
285 frames = enumerate(self._iter_valid_frames(frames))
286 for index, (frame, filename, lineno, funcname, ctx, ctxindex) in frames:
287 filename = osp.abspath(filename)
288 if ctx is None:
289 source = '<no source available>'
290 else:
291 source = ''.join(ctx)
292 if colorize:
293 filename = textutils.colorize_ansi(filename, 'magenta')
294 source = colorize_source(source)
295 output.append(' File "%s", line %s, in %s' % (filename, lineno, funcname))
296 output.append(' %s' % source.strip())
297 if self.verbose:
298 output.append('%r == %r' % (dir(frame), test.__module__))
299 output.append('')
300 output.append(' ' + ' local variables '.center(66, '-'))
301 for varname, value in sorted(frame.f_locals.items()):
302 output.append(' %s: %r' % (varname, value))
303 if varname == 'self':
304 for varname, value in sorted(vars(value).items()):
305 output.append(' self.%s: %r' % (varname, value))
306 output.append(' ' + '-' * 66)
307 output.append('')
308 output.append(''.join(traceback.format_exception_only(exctype, exc)))
309 return '\n'.join(output)
310
312 """err == (exc_type, exc, tcbk)"""
313 exc_type, exc, _ = err
314 if exc_type == TestSkipped:
315 self.addSkipped(test, exc)
316 else:
317 if self.exitfirst:
318 self.shouldStop = True
319 descr = self.getDescription(test)
320 super(SkipAwareTestResult, self).addError(test, err)
321 self._create_pdb(descr, 'error')
322
324 if self.exitfirst:
325 self.shouldStop = True
326 descr = self.getDescription(test)
327 super(SkipAwareTestResult, self).addFailure(test, err)
328 self._create_pdb(descr, 'fail')
329
331 self.skipped.append((test, self.getDescription(test), reason))
332 if self.showAll:
333 self.stream.writeln("SKIPPED")
334 elif self.dots:
335 self.stream.write('S')
336
338 super(SkipAwareTestResult, self).printErrors()
339 self.printSkippedList()
340
342 for _, descr, err in self.skipped:
343 self.stream.writeln(self.separator1)
344 self.stream.writeln("%s: %s" % ('SKIPPED', descr))
345 self.stream.writeln("\t%s" % err)
346
348 for (_, descr), (test, err) in zip(self.descrs_for(flavour), errors):
349 self.stream.writeln(self.separator1)
350 if self.colorize:
351 self.stream.writeln("%s: %s" % (
352 textutils.colorize_ansi(flavour, color='red'), descr))
353 else:
354 self.stream.writeln("%s: %s" % (flavour, descr))
355
356 self.stream.writeln(self.separator2)
357 self.stream.writeln(err)
358 try:
359 output, errput = test.captured_output()
360 except AttributeError:
361 pass
362 else:
363 if output:
364 self.stream.writeln(self.separator2)
365 self.stream.writeln("captured stdout".center(
366 len(self.separator2)))
367 self.stream.writeln(self.separator2)
368 self.stream.writeln(output)
369 else:
370 self.stream.writeln('no stdout'.center(
371 len(self.separator2)))
372 if errput:
373 self.stream.writeln(self.separator2)
374 self.stream.writeln("captured stderr".center(
375 len(self.separator2)))
376 self.stream.writeln(self.separator2)
377 self.stream.writeln(errput)
378 else:
379 self.stream.writeln('no stderr'.center(
380 len(self.separator2)))
381
382
383 -def run(self, result, runcondition=None, options=None):
384 for test in self._tests:
385 if result.shouldStop:
386 break
387 try:
388 test(result, runcondition, options)
389 except TypeError:
390
391
392 warnings.warn("%s should extend lgc.testlib.TestCase instead of unittest.TestCase"
393 % test)
394 test(result)
395 return result
396 unittest.TestSuite.run = run
397
398
399 TestSuite = unittest.TestSuite
400
401
403 return self.run(*args, **kwds)
404 unittest.TestSuite.__call__ = __call__
405
406
407 -class SkipAwareTextTestRunner(unittest.TextTestRunner):
408
409 - def __init__(self, stream=sys.stderr, verbosity=1,
410 exitfirst=False, capture=False, printonly=None,
411 pdbmode=False, cvg=None, test_pattern=None,
412 skipped_patterns=(), colorize=False, batchmode=False,
413 options=None):
414 super(SkipAwareTextTestRunner, self).__init__(stream=stream,
415 verbosity=verbosity)
416 self.exitfirst = exitfirst
417 self.capture = capture
418 self.printonly = printonly
419 self.pdbmode = pdbmode
420 self.cvg = cvg
421 self.test_pattern = test_pattern
422 self.skipped_patterns = skipped_patterns
423 self.colorize = colorize
424 self.batchmode = batchmode
425 self.options = options
426
427 - def _this_is_skipped(self, testedname):
428 return any([(pat in testedname) for pat in self.skipped_patterns])
429
430 - def _runcondition(self, test, skipgenerator=True):
431 if isinstance(test, InnerTest):
432 testname = test.name
433 else:
434 if isinstance(test, TestCase):
435 meth = test._get_test_method()
436 func = meth.im_func
437 testname = '%s.%s' % (meth.im_class.__name__, func.__name__)
438 elif isinstance(test, types.FunctionType):
439 func = test
440 testname = func.__name__
441 elif isinstance(test, types.MethodType):
442 func = test.im_func
443 testname = '%s.%s' % (test.im_class.__name__, func.__name__)
444 else:
445 return True
446
447 if is_generator(func) and skipgenerator:
448 return self.does_match_tags(func)
449
450
451 if self._this_is_skipped(testname):
452 return False
453 if self.test_pattern is not None:
454 try:
455 classpattern, testpattern = self.test_pattern.split('.')
456 klass, name = testname.split('.')
457 if classpattern not in klass or testpattern not in name:
458 return False
459 except ValueError:
460 if self.test_pattern not in testname:
461 return False
462
463 return self.does_match_tags(test)
464
466 if self.options is not None:
467 tags_pattern = getattr(self.options, 'tags_pattern', None)
468 if tags_pattern is not None:
469 tags = getattr(test, 'tags', None)
470 if tags is not None:
471 return tags.match(tags_pattern)
472 if isinstance(test, types.MethodType):
473 tags = getattr(test.im_class, 'tags', Tags())
474 return tags.match(tags_pattern)
475 return False
476 return True
477
478 - def _makeResult(self):
479 return SkipAwareTestResult(self.stream, self.descriptions,
480 self.verbosity, self.exitfirst, self.capture,
481 self.printonly, self.pdbmode, self.cvg,
482 self.colorize)
483
484 - def run(self, test):
485 "Run the given test case or test suite."
486 result = self._makeResult()
487 startTime = time.time()
488 test(result, self._runcondition, self.options)
489 stopTime = time.time()
490 timeTaken = stopTime - startTime
491 result.printErrors()
492 if not self.batchmode:
493 self.stream.writeln(result.separator2)
494 run = result.testsRun
495 self.stream.writeln("Ran %d test%s in %.3fs" %
496 (run, run != 1 and "s" or "", timeTaken))
497 self.stream.writeln()
498 if not result.wasSuccessful():
499 if self.colorize:
500 self.stream.write(textutils.colorize_ansi("FAILED", color='red'))
501 else:
502 self.stream.write("FAILED")
503 else:
504 if self.colorize:
505 self.stream.write(textutils.colorize_ansi("OK", color='green'))
506 else:
507 self.stream.write("OK")
508 failed, errored, skipped = map(len, (result.failures, result.errors,
509 result.skipped))
510
511 det_results = []
512 for name, value in (("failures", result.failures),
513 ("errors",result.errors),
514 ("skipped", result.skipped)):
515 if value:
516 det_results.append("%s=%i" % (name, len(value)))
517 if det_results:
518 self.stream.write(" (")
519 self.stream.write(', '.join(det_results))
520 self.stream.write(")")
521 self.stream.writeln("")
522 return result
523
524
526 """Keyword args (**kwargs) support for generative tests."""
527
529 """Variable arguments (*args) for generative tests."""
531 return tuple.__new__(cls, args)
532
533
534
536 """
537 Overrides default testloader to be able to omit classname when
538 specifying tests to run on command line.
539
540 For example, if the file test_foo.py contains ::
541
542 class FooTC(TestCase):
543 def test_foo1(self): # ...
544 def test_foo2(self): # ...
545 def test_bar1(self): # ...
546
547 class BarTC(TestCase):
548 def test_bar2(self): # ...
549
550 'python test_foo.py' will run the 3 tests in FooTC
551 'python test_foo.py FooTC' will run the 3 tests in FooTC
552 'python test_foo.py test_foo' will run test_foo1 and test_foo2
553 'python test_foo.py test_foo1' will run test_foo1
554 'python test_foo.py test_bar' will run FooTC.test_bar1 and BarTC.test_bar2
555 """
556
558 self.skipped_patterns = []
559
561 suites = []
562 for name in names:
563 suites.extend(self.loadTestsFromName(name, module))
564 return self.suiteClass(suites)
565
567 tests = {}
568 for obj in vars(module).values():
569 if (issubclass(type(obj), (types.ClassType, type)) and
570 issubclass(obj, unittest.TestCase)):
571 classname = obj.__name__
572 if classname[0] == '_' or self._this_is_skipped(classname):
573 continue
574 methodnames = []
575
576 for attrname in dir(obj):
577 if attrname.startswith(self.testMethodPrefix):
578 attr = getattr(obj, attrname)
579 if callable(attr):
580 methodnames.append(attrname)
581
582 tests[classname] = (obj, methodnames)
583 return tests
584
586 try:
587 suite = getattr(module, suitename)()
588 except AttributeError:
589 return []
590 assert hasattr(suite, '_tests'), \
591 "%s.%s is not a valid TestSuite" % (module.__name__, suitename)
592
593
594 return suite._tests
595
597 parts = name.split('.')
598 if module is None or len(parts) > 2:
599
600 return [super(NonStrictTestLoader, self).loadTestsFromName(name)]
601 tests = self._collect_tests(module)
602
603
604 collected = []
605 if len(parts) == 1:
606 pattern = parts[0]
607 if callable(getattr(module, pattern, None)
608 ) and pattern not in tests:
609
610 return self.loadTestsFromSuite(module, pattern)
611 if pattern in tests:
612
613 klass, methodnames = tests[pattern]
614 for methodname in methodnames:
615 collected = [klass(methodname)
616 for methodname in methodnames]
617 else:
618
619 for klass, methodnames in tests.values():
620 collected += [klass(methodname)
621 for methodname in methodnames]
622 elif len(parts) == 2:
623
624 classname, pattern = parts
625 klass, methodnames = tests.get(classname, (None, []))
626 for methodname in methodnames:
627 collected = [klass(methodname) for methodname in methodnames]
628 return collected
629
631 return any([(pat in testedname) for pat in self.skipped_patterns])
632
634 """Return a sorted sequence of method names found within testCaseClass
635 """
636 is_skipped = self._this_is_skipped
637 classname = testCaseClass.__name__
638 if classname[0] == '_' or is_skipped(classname):
639 return []
640 testnames = super(NonStrictTestLoader, self).getTestCaseNames(
641 testCaseClass)
642 return [testname for testname in testnames if not is_skipped(testname)]
643
644
646
647 USAGE = """\
648 Usage: %(progName)s [options] [test] [...]
649
650 Options:
651 -h, --help Show this message
652 -v, --verbose Verbose output
653 -i, --pdb Enable test failure inspection
654 -x, --exitfirst Exit on first failure
655 -c, --capture Captures and prints standard out/err only on errors
656 -p, --printonly Only prints lines matching specified pattern
657 (implies capture)
658 -s, --skip skip test matching this pattern (no regexp for now)
659 -q, --quiet Minimal output
660 --color colorize tracebacks
661
662 -m, --match Run only test whose tag match this pattern
663
664 -P, --profile FILE: Run the tests using cProfile and saving results
665 in FILE
666
667 Examples:
668 %(progName)s - run default set of tests
669 %(progName)s MyTestSuite - run suite 'MyTestSuite'
670 %(progName)s MyTestCase.testSomething - run MyTestCase.testSomething
671 %(progName)s MyTestCase - run all 'test*' test methods
672 in MyTestCase
673 """
674 - def __init__(self, module='__main__', defaultTest=None, batchmode=False,
675 cvg=None, options=None, outstream=sys.stderr):
676 self.batchmode = batchmode
677 self.cvg = cvg
678 self.options = options
679 self.outstream = outstream
680 super(SkipAwareTestProgram, self).__init__(
681 module=module, defaultTest=defaultTest,
682 testLoader=NonStrictTestLoader())
683
685 self.pdbmode = False
686 self.exitfirst = False
687 self.capture = 0
688 self.printonly = None
689 self.skipped_patterns = []
690 self.test_pattern = None
691 self.tags_pattern = None
692 self.colorize = False
693 self.profile_name = None
694 import getopt
695 try:
696 options, args = getopt.getopt(argv[1:], 'hHvixrqcp:s:m:P:',
697 ['help', 'verbose', 'quiet', 'pdb',
698 'exitfirst', 'restart', 'capture', 'printonly=',
699 'skip=', 'color', 'match=', 'profile='])
700 for opt, value in options:
701 if opt in ('-h', '-H', '--help'):
702 self.usageExit()
703 if opt in ('-i', '--pdb'):
704 self.pdbmode = True
705 if opt in ('-x', '--exitfirst'):
706 self.exitfirst = True
707 if opt in ('-r', '--restart'):
708 self.restart = True
709 self.exitfirst = True
710 if opt in ('-q', '--quiet'):
711 self.verbosity = 0
712 if opt in ('-v', '--verbose'):
713 self.verbosity = 2
714 if opt in ('-c', '--capture'):
715 self.capture += 1
716 if opt in ('-p', '--printonly'):
717 self.printonly = re.compile(value)
718 if opt in ('-s', '--skip'):
719 self.skipped_patterns = [pat.strip() for pat in
720 value.split(', ')]
721 if opt == '--color':
722 self.colorize = True
723 if opt in ('-m', '--match'):
724
725 self.options["tag_pattern"] = value
726 if opt in ('-P', '--profile'):
727 self.profile_name = value
728 self.testLoader.skipped_patterns = self.skipped_patterns
729 if self.printonly is not None:
730 self.capture += 1
731 if len(args) == 0 and self.defaultTest is None:
732 suitefunc = getattr(self.module, 'suite', None)
733 if isinstance(suitefunc, (types.FunctionType,
734 types.MethodType)):
735 self.test = self.module.suite()
736 else:
737 self.test = self.testLoader.loadTestsFromModule(self.module)
738 return
739 if len(args) > 0:
740 self.test_pattern = args[0]
741 self.testNames = args
742 else:
743 self.testNames = (self.defaultTest, )
744 self.createTests()
745 except getopt.error, msg:
746 self.usageExit(msg)
747
748
750 if self.profile_name:
751 import cProfile
752 cProfile.runctx('self._runTests()', globals(), locals(), self.profile_name )
753 else:
754 return self._runTests()
755
757 if hasattr(self.module, 'setup_module'):
758 try:
759 self.module.setup_module(self.options)
760 except Exception, exc:
761 print 'setup_module error:', exc
762 sys.exit(1)
763 self.testRunner = SkipAwareTextTestRunner(verbosity=self.verbosity,
764 stream=self.outstream,
765 exitfirst=self.exitfirst,
766 capture=self.capture,
767 printonly=self.printonly,
768 pdbmode=self.pdbmode,
769 cvg=self.cvg,
770 test_pattern=self.test_pattern,
771 skipped_patterns=self.skipped_patterns,
772 colorize=self.colorize,
773 batchmode=self.batchmode,
774 options=self.options)
775
776 def removeSucceededTests(obj, succTests):
777 """ Recursive function that removes succTests from
778 a TestSuite or TestCase
779 """
780 if isinstance(obj, TestSuite):
781 removeSucceededTests(obj._tests, succTests)
782 if isinstance(obj, list):
783 for el in obj[:]:
784 if isinstance(el, TestSuite):
785 removeSucceededTests(el, succTests)
786 elif isinstance(el, TestCase):
787 descr = '.'.join((el.__class__.__module__,
788 el.__class__.__name__,
789 el._testMethodName))
790 if descr in succTests:
791 obj.remove(el)
792
793 if getattr(self.options, 'restart', False):
794
795 try:
796 restartfile = open(FILE_RESTART, 'r')
797 try:
798 try:
799 succeededtests = list(elem.rstrip('\n\r') for elem in
800 restartfile.readlines())
801 removeSucceededTests(self.test, succeededtests)
802 except Exception, e:
803 raise e
804 finally:
805 restartfile.close()
806 except Exception ,e:
807 raise "Error while reading \
808 succeeded tests into", osp.join(os.getcwd(),FILE_RESTART)
809
810 result = self.testRunner.run(self.test)
811 if hasattr(self.module, 'teardown_module'):
812 try:
813 self.module.teardown_module(self.options, result)
814 except Exception, exc:
815 print 'teardown_module error:', exc
816 sys.exit(1)
817 if result.debuggers and self.pdbmode:
818 start_interactive_mode(result)
819 if not self.batchmode:
820 sys.exit(not result.wasSuccessful())
821 self.result = result
822
823
824
825
827 """adapted from py lib (http://codespeak.net/py)
828 Capture IO to/from a given os-level filedescriptor.
829 """
830 - def __init__(self, fd, attr='stdout', printonly=None):
831 self.targetfd = fd
832 self.tmpfile = os.tmpfile()
833 self.printonly = printonly
834
835 self._savefd = os.dup(fd)
836
837 os.dup2(self.tmpfile.fileno(), fd)
838
839 self.oldval = getattr(sys, attr)
840 setattr(sys, attr, self)
841 self.attr = attr
842
844
845 for line in msg.splitlines():
846 line += '\n'
847 if self.printonly is None or self.printonly.search(line) is None:
848 self.tmpfile.write(line)
849 else:
850 os.write(self._savefd, line)
851
852
853
854
855
856
857
858
860 """restore original fd and returns captured output"""
861
862 self.tmpfile.flush()
863 try:
864 ref_file = getattr(sys, '__%s__' % self.attr)
865 ref_file.flush()
866 except AttributeError:
867 pass
868 if hasattr(self.oldval, 'flush'):
869 self.oldval.flush()
870
871 os.dup2(self._savefd, self.targetfd)
872
873 setattr(sys, self.attr, self.oldval)
874
875 os.close(self._savefd)
876
877 self.tmpfile.seek(0)
878 return self.tmpfile.read()
879
880
881 -def _capture(which='stdout', printonly=None):
882 """private method, should not be called directly
883 (cf. capture_stdout() and capture_stderr())
884 """
885 assert which in ('stdout', 'stderr'
886 ), "Can only capture stdout or stderr, not %s" % which
887 if which == 'stdout':
888 fd = 1
889 else:
890 fd = 2
891 return FDCapture(fd, which, printonly)
892
894 """captures the standard output
895
896 returns a handle object which has a `restore()` method.
897 The restore() method returns the captured stdout and restores it
898 """
899 return _capture('stdout', printonly)
900
902 """captures the standard error output
903
904 returns a handle object which has a `restore()` method.
905 The restore() method returns the captured stderr and restores it
906 """
907 return _capture('stderr', printonly)
908
909
910 -def unittest_main(module='__main__', defaultTest=None,
911 batchmode=False, cvg=None, options=None,
912 outstream=sys.stderr):
913 """use this function if you want to have the same functionality
914 as unittest.main"""
915 return SkipAwareTestProgram(module, defaultTest, batchmode,
916 cvg, options, outstream)
917
919 """raised when a test is skipped"""
920
922 flags = function.func_code.co_flags
923 return flags & CO_GENERATOR
924
925
927 args = []
928 varargs = ()
929 kwargs = {}
930 flags = 0
931 for param in params:
932 if isinstance(param, starargs):
933 varargs = param
934 if flags:
935 raise TypeError('found starargs after keywords !')
936 flags |= 2
937 args += list(varargs)
938 elif isinstance(param, keywords):
939 kwargs = param
940 if flags & 4:
941 raise TypeError('got multiple keywords parameters')
942 flags |= 4
943 elif flags & 2 or flags & 4:
944 raise TypeError('found parameters after kwargs or args')
945 else:
946 args.append(param)
947
948 return args, kwargs
949
955
956
958 """unittest.TestCase with some additional methods"""
959
960 capture = False
961 pdbclass = Debugger
962
963 - def __init__(self, methodName='runTest'):
964 super(TestCase, self).__init__(methodName)
965
966 if sys.version_info >= (2, 5):
967 self.__exc_info = self._exc_info
968 self.__testMethodName = self._testMethodName
969 else:
970
971 self._testMethodName = self.__testMethodName
972 self._captured_stdout = ""
973 self._captured_stderr = ""
974 self._out = []
975 self._err = []
976 self._current_test_descr = None
977 self._options_ = None
978
980 """helper attribute holding the standard test's data directory
981
982 NOTE: this is a logilab's standard
983 """
984 mod = __import__(cls.__module__)
985 return osp.join(osp.dirname(osp.abspath(mod.__file__)), 'data')
986
987
988 datadir = classproperty(cached(datadir))
989
991 """joins the object's datadir and `fname`"""
992 return osp.join(cls.datadir, fname)
993 datapath = classmethod(datapath)
994
996 """sets the current test's description.
997 This can be useful for generative tests because it allows to specify
998 a description per yield
999 """
1000 self._current_test_descr = descr
1001
1002
1004 """override default unitest shortDescription to handle correctly
1005 generative tests
1006 """
1007 if self._current_test_descr is not None:
1008 return self._current_test_descr
1009 return super(TestCase, self).shortDescription()
1010
1011
1013 """return a two tuple with standard output and error stripped"""
1014 return self._captured_stdout.strip(), self._captured_stderr.strip()
1015
1017 """start_capture if enable"""
1018 if self.capture:
1019 warnings.simplefilter('ignore', DeprecationWarning)
1020 self.start_capture()
1021
1023 """stop_capture and restore previous output"""
1024 self._force_output_restore()
1025
1027 """start_capture"""
1028 self._out.append(capture_stdout(printonly or self._printonly))
1029 self._err.append(capture_stderr(printonly or self._printonly))
1030
1032 """set the pattern of line to print"""
1033 rgx = re.compile(pattern, flags)
1034 if self._out:
1035 self._out[-1].printonly = rgx
1036 self._err[-1].printonly = rgx
1037 else:
1038 self.start_capture(printonly=rgx)
1039
1041 """stop output and error capture"""
1042 if self._out:
1043 _out = self._out.pop()
1044 _err = self._err.pop()
1045 return _out.restore(), _err.restore()
1046 return '', ''
1047
1049 """remove all capture set"""
1050 while self._out:
1051 self._captured_stdout += self._out.pop().restore()
1052 self._captured_stderr += self._err.pop().restore()
1053
1054 - def quiet_run(self, result, func, *args, **kwargs):
1055 self._start_capture()
1056 try:
1057 func(*args, **kwargs)
1058 except (KeyboardInterrupt, SystemExit):
1059 self._stop_capture()
1060 raise
1061 except:
1062 self._stop_capture()
1063 result.addError(self, self.__exc_info())
1064 return False
1065 self._stop_capture()
1066 return True
1067
1069 """return the test method"""
1070 return getattr(self, self.__testMethodName)
1071
1072
1073 - def optval(self, option, default=None):
1074 """return the option value or default if the option is not define"""
1075 return getattr(self._options_, option, default)
1076
1077 - def __call__(self, result=None, runcondition=None, options=None):
1078 """rewrite TestCase.__call__ to support generative tests
1079 This is mostly a copy/paste from unittest.py (i.e same
1080 variable names, same logic, except for the generative tests part)
1081 """
1082 if result is None:
1083 result = self.defaultTestResult()
1084 result.pdbclass = self.pdbclass
1085
1086
1087 self.capture = self.capture or getattr(result, 'capture', False)
1088 self._options_ = options
1089 self._printonly = getattr(result, 'printonly', None)
1090
1091
1092 testMethod = self._get_test_method()
1093 if runcondition and not runcondition(testMethod):
1094 return
1095 result.startTest(self)
1096 try:
1097 if not self.quiet_run(result, self.setUp):
1098 return
1099 generative = is_generator(testMethod.im_func)
1100
1101 if generative:
1102 self._proceed_generative(result, testMethod,
1103 runcondition)
1104 else:
1105 status = self._proceed(result, testMethod)
1106 success = (status == 0)
1107 if not self.quiet_run(result, self.tearDown):
1108 return
1109 if not generative and success:
1110 if hasattr(options, "exitfirst") and options.exitfirst:
1111
1112 try:
1113 restartfile = open(FILE_RESTART, 'a')
1114 try:
1115 try:
1116 descr = '.'.join((self.__class__.__module__,
1117 self.__class__.__name__,
1118 self._testMethodName))
1119 restartfile.write(descr+os.linesep)
1120 except Exception, e:
1121 raise e
1122 finally:
1123 restartfile.close()
1124 except Exception, e:
1125 print >> sys.__stderr__, "Error while saving \
1126 succeeded test into", osp.join(os.getcwd(),FILE_RESTART)
1127 raise e
1128 result.addSuccess(self)
1129 finally:
1130
1131
1132 result.stopTest(self)
1133
1134
1135
1137
1138 result.testsRun -= 1
1139 self._start_capture()
1140 success = True
1141 try:
1142 for params in testfunc():
1143 if runcondition and not runcondition(testfunc,
1144 skipgenerator=False):
1145 if not (isinstance(params, InnerTest)
1146 and runcondition(params)):
1147 continue
1148 if not isinstance(params, (tuple, list)):
1149 params = (params, )
1150 func = params[0]
1151 args, kwargs = parse_generative_args(params[1:])
1152
1153 result.testsRun += 1
1154 status = self._proceed(result, func, args, kwargs)
1155 if status == 0:
1156 result.addSuccess(self)
1157 success = True
1158 else:
1159 success = False
1160 if status == 2:
1161 result.shouldStop = True
1162 if result.shouldStop:
1163 break
1164 except:
1165
1166 result.addError(self, self.__exc_info())
1167 success = False
1168 self._stop_capture()
1169 return success
1170
1171 - def _proceed(self, result, testfunc, args=(), kwargs=None):
1172 """proceed the actual test
1173 returns 0 on success, 1 on failure, 2 on error
1174
1175 Note: addSuccess can't be called here because we have to wait
1176 for tearDown to be successfully executed to declare the test as
1177 successful
1178 """
1179 self._start_capture()
1180 kwargs = kwargs or {}
1181 try:
1182 testfunc(*args, **kwargs)
1183 self._stop_capture()
1184 except self.failureException:
1185 self._stop_capture()
1186 result.addFailure(self, self.__exc_info())
1187 return 1
1188 except KeyboardInterrupt:
1189 self._stop_capture()
1190 raise
1191 except:
1192 self._stop_capture()
1193 result.addError(self, self.__exc_info())
1194 return 2
1195 return 0
1196
1198 """return a new instance of the defaultTestResult"""
1199 return SkipAwareTestResult()
1200
1201 - def skip(self, msg=None):
1202 """mark a test as skipped for the <msg> reason"""
1203 msg = msg or 'test was skipped'
1204 raise TestSkipped(msg)
1205
1207 """assert <object> are in <set>"""
1208 self.assert_(object in set, "%s not in %s" % (object, set))
1209
1211 """assert <object> are not in <set>"""
1212 self.assert_(object not in set, "%s in %s" % (object, set))
1213
1215 """compares two dicts
1216
1217 If the two dict differ, the first difference is shown in the error
1218 message
1219 """
1220 dict1 = dict(dict1)
1221 msgs = []
1222 for key, value in dict2.items():
1223 try:
1224 if dict1[key] != value:
1225 msgs.append('%r != %r for key %r' % (dict1[key], value,
1226 key))
1227 del dict1[key]
1228 except KeyError:
1229 msgs.append('missing %r key' % key)
1230 if dict1:
1231 msgs.append('dict2 is lacking %r' % dict1)
1232 if msgs:
1233 self.fail('\n'.join(msgs))
1234 assertDictEqual = assertDictEquals
1235
1236
1237
1239 """compares two iterable and shows difference between both"""
1240 got, expected = list(got), list(expected)
1241 self.assertSetEqual(set(got), set(expected), msg)
1242 if len(got) != len(expected):
1243 if msg is None:
1244 msg = ['Iterable have the same elements but not the same number',
1245 '\t<element>\t<expected>i\t<got>']
1246 got_count = {}
1247 expected_count = {}
1248 for element in got:
1249 got_count[element] = got_count.get(element,0) + 1
1250 for element in expected:
1251 expected_count[element] = expected_count.get(element,0) + 1
1252
1253
1254 for element, count in got_count.iteritems():
1255 other_count = expected_count[element]
1256 if other_count != count:
1257 msg.append('\t%s\t%s\t%s' % (element, other_count, count))
1258
1259 self.fail(msg)
1260
1261 assertUnorderedIterableEqual = assertUnorderedIterableEquals
1262 assertUnordIterEquals = assertUnordIterEqual = assertUnorderedIterableEqual
1263
1265 if not(isinstance(got, set) and isinstance(expected, set)):
1266 warnings.warn("the assertSetEquals function if now intended for set only."\
1267 "use assertUnorderedIterableEquals instead.",
1268 DeprecationWarning, 2)
1269 return self.assertUnorderedIterableEquals(got,expected, msg)
1270
1271 items={}
1272 items['missing'] = expected - got
1273 items['unexpected'] = got - expected
1274 if any(items.itervalues()):
1275 if msg is None:
1276 msg = '\n'.join('%s:\n\t%s' % (key,"\n\t".join(str(value) for value in values))
1277 for key, values in items.iteritems() if values)
1278 self.fail(msg)
1279
1280
1281 assertSetEqual = assertSetEquals
1282
1284 """compares two lists
1285
1286 If the two list differ, the first difference is shown in the error
1287 message
1288 """
1289 _l1 = list_1[:]
1290 for i, value in enumerate(list_2):
1291 try:
1292 if _l1[0] != value:
1293 from pprint import pprint
1294 pprint(list_1)
1295 pprint(list_2)
1296 self.fail('%r != %r for index %d' % (_l1[0], value, i))
1297 del _l1[0]
1298 except IndexError:
1299 if msg is None:
1300 msg = 'list_1 has only %d elements, not %s '\
1301 '(at least %r missing)'% (i, len(list_2), value)
1302 self.fail(msg)
1303 if _l1:
1304 if msg is None:
1305 msg = 'list_2 is lacking %r' % _l1
1306 self.fail(msg)
1307 assertListEqual = assertListEquals
1308
1310 """assert list of lines are equal"""
1311 lines1 = list_1.splitlines()
1312 if striplines:
1313 lines1 = [l.strip() for l in lines1]
1314 lines2 = list_2.splitlines()
1315 if striplines:
1316 lines2 = [l.strip() for l in lines2]
1317 self.assertListEquals(lines1, lines2, msg)
1318 assertLineEqual = assertLinesEquals
1319
1330
1335
1337 """compare an ElementTree Element to a tuple formatted as follow:
1338 (tagname, [attrib[, children[, text[, tail]]]])"""
1339
1340 self.assertTextEquals(element.tag, tup[0])
1341
1342 if len(element.attrib) or len(tup)>1:
1343 if len(tup)<=1:
1344 self.fail( "tuple %s has no attributes (%s expected)"%(tup,
1345 dict(element.attrib)))
1346 self.assertDictEquals(element.attrib, tup[1])
1347
1348 if len(element) or len(tup)>2:
1349 if len(tup)<=2:
1350 self.fail( "tuple %s has no children (%i expected)"%(tup,
1351 len(element)))
1352 if len(element) != len(tup[2]):
1353 self.fail( "tuple %s has %i children%s (%i expected)"%(tup,
1354 len(tup[2]),
1355 ('', 's')[len(tup[2])>1], len(element)))
1356 for index in xrange(len(tup[2])):
1357 self.assertXMLEqualsTuple(element[index], tup[2][index])
1358
1359 if element.text or len(tup)>3:
1360 if len(tup)<=3:
1361 self.fail( "tuple %s has no text value (%r expected)"%(tup,
1362 element.text))
1363 self.assertTextEquals(element.text, tup[3])
1364
1365 if element.tail or len(tup)>4:
1366 if len(tup)<=4:
1367 self.fail( "tuple %s has no tail value (%r expected)"%(tup,
1368 element.tail))
1369 self.assertTextEquals(element.tail, tup[4])
1370
1371 - def _difftext(self, lines1, lines2, junk=None, msg_prefix='Texts differ'):
1372 junk = junk or (' ', '\t')
1373
1374 result = difflib.ndiff(lines1, lines2, charjunk=lambda x: x in junk)
1375 read = []
1376 for line in result:
1377 read.append(line)
1378
1379 if not line.startswith(' '):
1380 self.fail('\n'.join(['%s\n'%msg_prefix]+read + list(result)))
1381
1382 - def assertTextEquals(self, text1, text2, junk=None,
1383 msg_prefix='Text differ'):
1384 """compare two multiline strings (using difflib and splitlines())"""
1385 msg = []
1386 if not isinstance(text1, basestring):
1387 msg.append('text1 is not a string (%s)'%(type(text1)))
1388 if not isinstance(text2, basestring):
1389 msg.append('text2 is not a string (%s)'%(type(text2)))
1390 if msg:
1391 self.fail('\n'.join(msg))
1392 self._difftext(text1.strip().splitlines(True), text2.strip().splitlines(True),
1393 junk, msg_prefix)
1394 assertTextEqual = assertTextEquals
1395
1396 - def assertStreamEquals(self, stream1, stream2, junk=None,
1397 msg_prefix='Stream differ'):
1398 """compare two streams (using difflib and readlines())"""
1399
1400
1401 if stream1 is stream2:
1402 return
1403
1404 stream1.seek(0)
1405 stream2.seek(0)
1406
1407 self._difftext(stream1.readlines(), stream2.readlines(), junk,
1408 msg_prefix)
1409
1410 assertStreamEqual = assertStreamEquals
1412 """compares two files using difflib"""
1413 self.assertStreamEqual(file(fname1), file(fname2), junk,
1414 msg_prefix='Files differs\n-:%s\n+:%s\n'%(fname1, fname2))
1415 assertFileEqual = assertFileEquals
1416
1417
1419 """compares two files using difflib"""
1420 assert osp.exists(path_a), "%s doesn't exists" % path_a
1421 assert osp.exists(path_b), "%s doesn't exists" % path_b
1422
1423 all_a = [ (ipath[len(path_a):].lstrip('/'), idirs, ifiles)
1424 for ipath, idirs, ifiles in os.walk(path_a)]
1425 all_a.sort(key=itemgetter(0))
1426
1427 all_b = [ (ipath[len(path_b):].lstrip('/'), idirs, ifiles)
1428 for ipath, idirs, ifiles in os.walk(path_b)]
1429 all_b.sort(key=itemgetter(0))
1430
1431 iter_a, iter_b = iter(all_a), iter(all_b)
1432 partial_iter = True
1433 ipath_a, idirs_a, ifiles_a = data_a = None, None, None
1434 while True:
1435 try:
1436 ipath_a, idirs_a, ifiles_a = datas_a = iter_a.next()
1437 partial_iter = False
1438 ipath_b, idirs_b, ifiles_b = datas_b = iter_b.next()
1439 partial_iter = True
1440
1441
1442 self.assert_(ipath_a == ipath_b,
1443 "unexpected %s in %s while looking %s from %s" %
1444 (ipath_a, path_a, ipath_b, path_b))
1445
1446
1447 errors = {}
1448 sdirs_a = set(idirs_a)
1449 sdirs_b = set(idirs_b)
1450 errors["unexpected directories"] = sdirs_a - sdirs_b
1451 errors["missing directories"] = sdirs_b - sdirs_a
1452
1453 sfiles_a = set(ifiles_a)
1454 sfiles_b = set(ifiles_b)
1455 errors["unexpected files"] = sfiles_a - sfiles_b
1456 errors["missing files"] = sfiles_b - sfiles_a
1457
1458
1459 msgs = [ "%s: %s"% (name, items)
1460 for name, items in errors.iteritems() if items]
1461
1462 if msgs:
1463 msgs.insert(0,"%s and %s differ :" % (
1464 osp.join(path_a, ipath_a),
1465 osp.join(path_b, ipath_b),
1466 ))
1467 self.fail("\n".join(msgs))
1468
1469 for files in (ifiles_a, ifiles_b):
1470 files.sort()
1471
1472 for index, path in enumerate(ifiles_a):
1473 self.assertFileEquals(osp.join(path_a, ipath_a, path),
1474 osp.join(path_b, ipath_b, ifiles_b[index]))
1475
1476 except StopIteration:
1477 break
1478
1479
1480 assertDirEqual = assertDirEquals
1481
1482
1484 """compares two files using difflib"""
1485 if msg is None:
1486 if strict:
1487 msg = '%r is not of class %s but of %s'
1488 else:
1489 msg = '%r is not an instance of %s but of %s'
1490 msg = msg % (obj, klass, type(obj))
1491 if strict:
1492 self.assert_(obj.__class__ is klass, msg)
1493 else:
1494 self.assert_(isinstance(obj, klass), msg)
1495
1496 - def assertIs(self, obj, other, msg=None):
1497 """compares identity of two reference"""
1498 if msg is None:
1499 msg = "%r is not %r"%(obj, other)
1500 self.assert_(obj is other, msg)
1501
1502
1504 """compares identity of two reference"""
1505 if msg is None:
1506 msg = "%r is %r"%(obj, other)
1507 self.assert_(obj is not other, msg )
1508
1510 """assert obj is None"""
1511 if msg is None:
1512 msg = "reference to %r when None expected"%(obj,)
1513 self.assert_( obj is None, msg )
1514
1516 """assert obj is not None"""
1517 if msg is None:
1518 msg = "unexpected reference to None"
1519 self.assert_( obj is not None, msg )
1520
1522 """compares two floats"""
1523 if msg is None:
1524 msg = "%r != %r" % (obj, other)
1525 self.assert_(math.fabs(obj - other) < prec, msg)
1526
1528 """override default failUnlessRaise method to return the raised
1529 exception instance.
1530
1531 Fail unless an exception of class excClass is thrown
1532 by callableObj when invoked with arguments args and keyword
1533 arguments kwargs. If a different type of exception is
1534 thrown, it will not be caught, and the test case will be
1535 deemed to have suffered an error, exactly as for an
1536 unexpected exception.
1537 """
1538 try:
1539 callableObj(*args, **kwargs)
1540 except excClass, exc:
1541 return exc
1542 else:
1543 if hasattr(excClass, '__name__'):
1544 excName = excClass.__name__
1545 else:
1546 excName = str(excClass)
1547 raise self.failureException, "%s not raised" % excName
1548
1549 assertRaises = failUnlessRaises
1550
1551 import doctest
1552
1555 """just there to trigger test execution"""
1556 self.skipped_test('doctest module has no DocTestSuite class')
1557
1558
1559
1560 if sys.version_info >= (2, 4):
1562
1564 self.skipped = kwargs.pop('skipped', ())
1565 doctest.DocTestFinder.__init__(self, *args, **kwargs)
1566
1567 - def _get_test(self, obj, name, module, globs, source_lines):
1568 """override default _get_test method to be able to skip tests
1569 according to skipped attribute's value
1570
1571 Note: Python (<=2.4) use a _name_filter which could be used for that
1572 purpose but it's no longer available in 2.5
1573 Python 2.5 seems to have a [SKIP] flag
1574 """
1575 if getattr(obj, '__name__', '') in self.skipped:
1576 return None
1577 return doctest.DocTestFinder._get_test(self, obj, name, module,
1578 globs, source_lines)
1579 else:
1580
1583 self.skipped = skipped
1584 self.original_find_tests = doctest._find_tests
1585 doctest._find_tests = self._find_tests
1586
1588 tests = []
1589 for testinfo in self.original_find_tests(module, prefix):
1590 testname, _, _, _ = testinfo
1591
1592 testname = testname.split('.')[-1]
1593 if testname not in self.skipped:
1594 tests.append(testinfo)
1595 return tests
1596
1597
1599 """trigger module doctest
1600 I don't know how to make unittest.main consider the DocTestSuite instance
1601 without this hack
1602 """
1603 skipped = ()
1604 - def __call__(self, result=None, runcondition=None, options=None):\
1605
1606 try:
1607 finder = DocTestFinder(skipped=self.skipped)
1608 if sys.version_info >= (2, 4):
1609 suite = doctest.DocTestSuite(self.module, test_finder=finder)
1610 else:
1611 suite = doctest.DocTestSuite(self.module)
1612 except AttributeError:
1613 suite = SkippedSuite()
1614 return suite.run(result)
1615 run = __call__
1616
1618 """just there to trigger test execution"""
1619
1620 MAILBOX = None
1621
1623 """fake smtplib.SMTP"""
1624
1626 self.host = host
1627 self.port = port
1628 global MAILBOX
1629 self.reveived = MAILBOX = []
1630
1632 """ignore debug level"""
1633
1634 - def sendmail(self, fromaddr, toaddres, body):
1635 """push sent mail in the mailbox"""
1636 self.reveived.append((fromaddr, toaddres, body))
1637
1640
1641
1643 """fake ConfigParser.ConfigParser"""
1644
1646 ConfigParser.__init__(self)
1647 for section, pairs in options.iteritems():
1648 self.add_section(section)
1649 for key, value in pairs.iteritems():
1650 self.set(section,key,value)
1652 raise NotImplementedError()
1653
1654
1656 """fake DB-API 2.0 connexion AND cursor (i.e. cursor() return self)"""
1657
1659 self.received = []
1660 self.states = []
1661 self.results = results
1662
1664 """Mock cursor method"""
1665 return self
1666 - def execute(self, query, args=None):
1667 """Mock execute method"""
1668 self.received.append( (query, args) )
1670 """Mock fetchone method"""
1671 return self.results[0]
1673 """Mock fetchall method"""
1674 return self.results
1676 """Mock commiy method"""
1677 self.states.append( ('commit', len(self.received)) )
1679 """Mock rollback method"""
1680 self.states.append( ('rollback', len(self.received)) )
1682 """Mock close method"""
1683 pass
1684
1685
1687 """creates an object using params to set attributes
1688 >>> option = mock_object(verbose=False, index=range(5))
1689 >>> option.verbose
1690 False
1691 >>> option.index
1692 [0, 1, 2, 3, 4]
1693 """
1694 return type('Mock', (), params)()
1695
1696
1698 """Creates directories and files found in <path>.
1699
1700 :param paths: list of relative paths to files or directories
1701 :param chroot: the root directory in which paths will be created
1702
1703 >>> from os.path import isdir, isfile
1704 >>> isdir('/tmp/a')
1705 False
1706 >>> create_files(['a/b/foo.py', 'a/b/c/', 'a/b/c/d/e.py'], '/tmp')
1707 >>> isdir('/tmp/a')
1708 True
1709 >>> isdir('/tmp/a/b/c')
1710 True
1711 >>> isfile('/tmp/a/b/c/d/e.py')
1712 True
1713 >>> isfile('/tmp/a/b/foo.py')
1714 True
1715 """
1716 dirs, files = set(), set()
1717 for path in paths:
1718 path = osp.join(chroot, path)
1719 filename = osp.basename(path)
1720
1721 if filename == '':
1722 dirs.add(path)
1723
1724 else:
1725 dirs.add(osp.dirname(path))
1726 files.add(path)
1727 for dirpath in dirs:
1728 if not osp.isdir(dirpath):
1729 os.makedirs(dirpath)
1730 for filepath in files:
1731 file(filepath, 'w').close()
1732
1734 """
1735 Without arguments, return True if contracts can be enabled and should be
1736 enabled (see option -d), return False otherwise.
1737
1738 With arguments, return False if contracts can't or shouldn't be enabled,
1739 otherwise weave ContractAspect with items passed as arguments.
1740 """
1741 if not ENABLE_DBC:
1742 return False
1743 try:
1744 from logilab.aspects.weaver import weaver
1745 from logilab.aspects.lib.contracts import ContractAspect
1746 except ImportError:
1747 sys.stderr.write(
1748 'Warning: logilab.aspects is not available. Contracts disabled.')
1749 return False
1750 for arg in args:
1751 weaver.weave_module(arg, ContractAspect)
1752 return True
1753
1754
1757 self.__dict__.update(kwargs)
1758
1760 """descriptor adding tag to a function"""
1761 def desc(func):
1762 assert not hasattr(func, 'tags')
1763 func.tags = Tags(args)
1764 return func
1765 return desc
1766
1774
1776 """ Compare version of python interpreter to the given one. Skip the test
1777 if older.
1778 """
1779 def check_require_version(f):
1780 version_elements = version.split('.')
1781 try:
1782 compare = tuple([int(v) for v in version_elements])
1783 except ValueError:
1784 raise ValueError('%s is not a correct version : should be X.Y[.Z].' % version)
1785 current = sys.version_info[:3]
1786
1787 if current < compare:
1788
1789 def new_f(self, *args, **kwargs):
1790 self.skip('Need at least %s version of python. Current version is %s.' % (version, '.'.join([str(element) for element in current])))
1791 new_f.__name__ = f.__name__
1792 return new_f
1793 else:
1794
1795 return f
1796 return check_require_version
1797
1799 """ Check if the given module is loaded. Skip the test if not.
1800 """
1801 def check_require_module(f):
1802 try:
1803 __import__(module)
1804
1805 return f
1806 except ImportError:
1807
1808 def new_f(self, *args, **kwargs):
1809 self.skip('%s can not be imported.' % module)
1810 new_f.__name__ = f.__name__
1811 return new_f
1812 return check_require_module
1813