Package logilab :: Package common :: Package test :: Module unittest_shellutils
[frames] | no frames]

Source Code for Module logilab.common.test.unittest_shellutils

  1  """unit tests for logilab.common.shellutils""" 
  2   
  3  import sys, os, tempfile, shutil 
  4  from os.path import join 
  5  import datetime, time 
  6   
  7  from logilab.common.testlib import TestCase, unittest_main 
  8   
  9  from logilab.common.shellutils import (globfind, find, ProgressBar, 
 10                                         acquire_lock, release_lock, 
 11                                         RawInput, confirm) 
 12   
 13  from logilab.common.proc import NoSuchProcess 
 14  from StringIO import StringIO 
 15   
 16  DATA_DIR = join('data','find_test') 
 17   
18 -class FindTC(TestCase):
19 - def test_include(self):
20 files = set(find(DATA_DIR, '.py')) 21 self.assertSetEqual(files, 22 set([join(DATA_DIR, f) for f in ['__init__.py', 'module.py', 23 'module2.py', 'noendingnewline.py', 24 'nonregr.py', join('sub', 'momo.py')]])) 25 files = set(find(DATA_DIR, ('.py',), blacklist=('sub',))) 26 self.assertSetEqual(files, 27 set([join(DATA_DIR, f) for f in ['__init__.py', 'module.py', 28 'module2.py', 'noendingnewline.py', 29 'nonregr.py']]))
30
31 - def test_exclude(self):
32 files = set(find(DATA_DIR, ('.py', '.pyc'), exclude=True)) 33 self.assertSetEqual(files, 34 set([join(DATA_DIR, f) for f in ['foo.txt', 35 'newlines.txt', 36 'normal_file.txt', 37 'test.ini', 38 'test1.msg', 39 'test2.msg', 40 'spam.txt', 41 join('sub', 'doc.txt'), 42 'write_protected_file.txt', 43 ]]))
44
45 - def test_globfind(self):
46 files = set(globfind(DATA_DIR, '*.py')) 47 self.assertSetEqual(files, 48 set([join(DATA_DIR, f) for f in ['__init__.py', 'module.py', 49 'module2.py', 'noendingnewline.py', 50 'nonregr.py', join('sub', 'momo.py')]])) 51 files = set(globfind(DATA_DIR, 'mo*.py')) 52 self.assertSetEqual(files, 53 set([join(DATA_DIR, f) for f in ['module.py', 'module2.py', 54 join('sub', 'momo.py')]])) 55 files = set(globfind(DATA_DIR, 'mo*.py', blacklist=('sub',))) 56 self.assertSetEqual(files, 57 set([join(DATA_DIR, f) for f in ['module.py', 'module2.py']]))
58 59
60 -class ProgressBarTC(TestCase):
61 - def test_refresh(self):
62 pgb_stream = StringIO() 63 expected_stream = StringIO() 64 pgb = ProgressBar(20,stream=pgb_stream) 65 self.assertEquals(pgb_stream.getvalue(), expected_stream.getvalue()) # nothing print before refresh 66 pgb.refresh() 67 expected_stream.write("\r["+' '*20+"]") 68 self.assertEquals(pgb_stream.getvalue(), expected_stream.getvalue())
69
70 - def test_refresh_g_size(self):
71 pgb_stream = StringIO() 72 expected_stream = StringIO() 73 pgb = ProgressBar(20,35,stream=pgb_stream) 74 pgb.refresh() 75 expected_stream.write("\r["+' '*35+"]") 76 self.assertEquals(pgb_stream.getvalue(), expected_stream.getvalue())
77
78 - def test_refresh_l_size(self):
79 pgb_stream = StringIO() 80 expected_stream = StringIO() 81 pgb = ProgressBar(20,3,stream=pgb_stream) 82 pgb.refresh() 83 expected_stream.write("\r["+' '*3+"]") 84 self.assertEquals(pgb_stream.getvalue(), expected_stream.getvalue())
85
86 - def _update_test(self, nbops, expected, size = None):
87 pgb_stream = StringIO() 88 expected_stream = StringIO() 89 if size is None: 90 pgb = ProgressBar(nbops, stream=pgb_stream) 91 size=20 92 else: 93 pgb = ProgressBar(nbops, size, stream=pgb_stream) 94 last = 0 95 for round in expected: 96 if not hasattr(round, '__int__'): 97 dots, update = round 98 else: 99 dots, update = round, None 100 pgb.update() 101 if update or (update is None and dots != last): 102 last = dots 103 expected_stream.write("\r["+('.'*dots)+(' '*(size-dots))+"]") 104 self.assertEquals(pgb_stream.getvalue(), expected_stream.getvalue())
105
106 - def test_default(self):
107 self._update_test(20, xrange(1,21))
108
109 - def test_nbops_gt_size(self):
110 """Test the progress bar for nbops > size""" 111 def half(total): 112 for counter in range(1,total+1): 113 yield counter / 2
114 self._update_test(40, half(40))
115
116 - def test_nbops_lt_size(self):
117 """Test the progress bar for nbops < size""" 118 def double(total): 119 for counter in range(1,total+1): 120 yield counter * 2
121 self._update_test(10, double(10)) 122
123 - def test_nbops_nomul_size(self):
124 """Test the progress bar for size % nbops !=0 (non int number of dots per update)""" 125 self._update_test(3, (6,13,20))
126
127 - def test_overflow(self):
128 self._update_test(5, (8, 16, 25, 33, 42, (42, True)), size=42)
129 130
131 -class AcquireLockTC(TestCase):
132
133 - def setUp(self):
134 self.tmpdir = tempfile.mkdtemp() 135 self.lock = join(self.tmpdir, 'LOCK')
136
137 - def tearDown(self):
138 shutil.rmtree(self.tmpdir)
139
140 - def test_acquire_normal(self):
141 self.assertTrue(acquire_lock(self.lock, 1, 1)) 142 self.assertTrue(os.path.exists(self.lock)) 143 release_lock(self.lock) 144 self.assertFalse(os.path.exists(self.lock))
145
146 - def test_no_possible_acquire(self):
147 self.assertRaises(Exception, acquire_lock, self.lock, 0)
148
149 - def test_wrong_process(self):
150 fd = os.open(self.lock, os.O_EXCL | os.O_RDWR | os.O_CREAT) 151 os.write(fd, '1111111111') 152 os.close(fd) 153 self.assertTrue(os.path.exists(self.lock)) 154 self.assertRaises(Exception, acquire_lock, self.lock, 1, 1)
155
157 fd = os.open(self.lock, os.O_EXCL | os.O_RDWR | os.O_CREAT) 158 os.write(fd, '1111111111') 159 os.close(fd) 160 self.assertTrue(os.path.exists(self.lock)) 161 self.assertTrue(acquire_lock(self.lock))
162
163 - def test_locked_for_one_hour(self):
164 self.assertTrue(acquire_lock(self.lock)) 165 touch = datetime.datetime.fromtimestamp(time.time() - 3601).strftime("%m%d%H%M") 166 os.system("touch -t %s %s" % (touch, self.lock)) 167 self.assertRaises(UserWarning, acquire_lock, self.lock, max_try=2, delay=1)
168
169 -class RawInputTC(TestCase):
170
171 - def auto_input(self, *args):
172 self.input_args = args 173 return self.input_answer
174
175 - def setUp(self):
176 null_printer = lambda x: None 177 self.qa = RawInput(self.auto_input, null_printer)
178
179 - def test_ask_default(self):
180 self.input_answer = '' 181 answer = self.qa.ask('text', ('yes','no'), 'yes') 182 self.assertEquals(answer, 'yes') 183 self.input_answer = ' ' 184 answer = self.qa.ask('text', ('yes','no'), 'yes') 185 self.assertEquals(answer, 'yes')
186
187 - def test_ask_case(self):
188 self.input_answer = 'no' 189 answer = self.qa.ask('text', ('yes','no'), 'yes') 190 self.assertEquals(answer, 'no') 191 self.input_answer = 'No' 192 answer = self.qa.ask('text', ('yes','no'), 'yes') 193 self.assertEquals(answer, 'no') 194 self.input_answer = 'NO' 195 answer = self.qa.ask('text', ('yes','no'), 'yes') 196 self.assertEquals(answer, 'no') 197 self.input_answer = 'nO' 198 answer = self.qa.ask('text', ('yes','no'), 'yes') 199 self.assertEquals(answer, 'no') 200 self.input_answer = 'YES' 201 answer = self.qa.ask('text', ('yes','no'), 'yes') 202 self.assertEquals(answer, 'yes')
203
204 - def test_ask_prompt(self):
205 self.input_answer = '' 206 answer = self.qa.ask('text', ('yes','no'), 'yes') 207 self.assertEquals(self.input_args[0], 'text [Y(es)/n(o)]: ') 208 answer = self.qa.ask('text', ('y','n'), 'y') 209 self.assertEquals(self.input_args[0], 'text [Y/n]: ') 210 answer = self.qa.ask('text', ('n','y'), 'y') 211 self.assertEquals(self.input_args[0], 'text [n/Y]: ') 212 answer = self.qa.ask('text', ('yes','no','maybe','1'), 'yes') 213 self.assertEquals(self.input_args[0], 'text [Y(es)/n(o)/m(aybe)/1]: ')
214
215 - def test_ask_ambiguous(self):
216 self.input_answer = 'y' 217 self.assertRaises(Exception, self.qa.ask, 'text', ('yes','yep'), 'yes')
218
219 - def test_confirm(self):
220 self.input_answer = 'y' 221 self.assertEquals(self.qa.confirm('Say yes'), True) 222 self.assertEquals(self.qa.confirm('Say yes', default_is_yes=False), True) 223 self.input_answer = 'n' 224 self.assertEquals(self.qa.confirm('Say yes'), False) 225 self.assertEquals(self.qa.confirm('Say yes', default_is_yes=False), False) 226 self.input_answer = '' 227 self.assertEquals(self.qa.confirm('Say default'), True) 228 self.assertEquals(self.qa.confirm('Say default', default_is_yes=False), False)
229 230 if __name__ == '__main__': 231 unittest_main() 232