1 """unit tests for logilab.common.shellutils"""
2
3 import sys, os, tempfile, shutil
4 from os.path import join
5 import datetime, time
6
7 from logilab.common.testlib import TestCase, unittest_main
8
9 from logilab.common.shellutils import (globfind, find, ProgressBar,
10 acquire_lock, release_lock,
11 RawInput, confirm)
12
13 from logilab.common.proc import NoSuchProcess
14 from StringIO import StringIO
15
16 DATA_DIR = join('data','find_test')
17
20 files = set(find(DATA_DIR, '.py'))
21 self.assertSetEqual(files,
22 set([join(DATA_DIR, f) for f in ['__init__.py', 'module.py',
23 'module2.py', 'noendingnewline.py',
24 'nonregr.py', join('sub', 'momo.py')]]))
25 files = set(find(DATA_DIR, ('.py',), blacklist=('sub',)))
26 self.assertSetEqual(files,
27 set([join(DATA_DIR, f) for f in ['__init__.py', 'module.py',
28 'module2.py', 'noendingnewline.py',
29 'nonregr.py']]))
30
32 files = set(find(DATA_DIR, ('.py', '.pyc'), exclude=True))
33 self.assertSetEqual(files,
34 set([join(DATA_DIR, f) for f in ['foo.txt',
35 'newlines.txt',
36 'normal_file.txt',
37 'test.ini',
38 'test1.msg',
39 'test2.msg',
40 'spam.txt',
41 join('sub', 'doc.txt'),
42 'write_protected_file.txt',
43 ]]))
44
46 files = set(globfind(DATA_DIR, '*.py'))
47 self.assertSetEqual(files,
48 set([join(DATA_DIR, f) for f in ['__init__.py', 'module.py',
49 'module2.py', 'noendingnewline.py',
50 'nonregr.py', join('sub', 'momo.py')]]))
51 files = set(globfind(DATA_DIR, 'mo*.py'))
52 self.assertSetEqual(files,
53 set([join(DATA_DIR, f) for f in ['module.py', 'module2.py',
54 join('sub', 'momo.py')]]))
55 files = set(globfind(DATA_DIR, 'mo*.py', blacklist=('sub',)))
56 self.assertSetEqual(files,
57 set([join(DATA_DIR, f) for f in ['module.py', 'module2.py']]))
58
59
62 pgb_stream = StringIO()
63 expected_stream = StringIO()
64 pgb = ProgressBar(20,stream=pgb_stream)
65 self.assertEquals(pgb_stream.getvalue(), expected_stream.getvalue())
66 pgb.refresh()
67 expected_stream.write("\r["+' '*20+"]")
68 self.assertEquals(pgb_stream.getvalue(), expected_stream.getvalue())
69
71 pgb_stream = StringIO()
72 expected_stream = StringIO()
73 pgb = ProgressBar(20,35,stream=pgb_stream)
74 pgb.refresh()
75 expected_stream.write("\r["+' '*35+"]")
76 self.assertEquals(pgb_stream.getvalue(), expected_stream.getvalue())
77
79 pgb_stream = StringIO()
80 expected_stream = StringIO()
81 pgb = ProgressBar(20,3,stream=pgb_stream)
82 pgb.refresh()
83 expected_stream.write("\r["+' '*3+"]")
84 self.assertEquals(pgb_stream.getvalue(), expected_stream.getvalue())
85
87 pgb_stream = StringIO()
88 expected_stream = StringIO()
89 if size is None:
90 pgb = ProgressBar(nbops, stream=pgb_stream)
91 size=20
92 else:
93 pgb = ProgressBar(nbops, size, stream=pgb_stream)
94 last = 0
95 for round in expected:
96 if not hasattr(round, '__int__'):
97 dots, update = round
98 else:
99 dots, update = round, None
100 pgb.update()
101 if update or (update is None and dots != last):
102 last = dots
103 expected_stream.write("\r["+('.'*dots)+(' '*(size-dots))+"]")
104 self.assertEquals(pgb_stream.getvalue(), expected_stream.getvalue())
105
107 self._update_test(20, xrange(1,21))
108
110 """Test the progress bar for nbops > size"""
111 def half(total):
112 for counter in range(1,total+1):
113 yield counter / 2
114 self._update_test(40, half(40))
115
117 """Test the progress bar for nbops < size"""
118 def double(total):
119 for counter in range(1,total+1):
120 yield counter * 2
121 self._update_test(10, double(10))
122
124 """Test the progress bar for size % nbops !=0 (non int number of dots per update)"""
125 self._update_test(3, (6,13,20))
126
128 self._update_test(5, (8, 16, 25, 33, 42, (42, True)), size=42)
129
130
132
134 self.tmpdir = tempfile.mkdtemp()
135 self.lock = join(self.tmpdir, 'LOCK')
136
138 shutil.rmtree(self.tmpdir)
139
141 self.assertTrue(acquire_lock(self.lock, 1, 1))
142 self.assertTrue(os.path.exists(self.lock))
143 release_lock(self.lock)
144 self.assertFalse(os.path.exists(self.lock))
145
147 self.assertRaises(Exception, acquire_lock, self.lock, 0)
148
150 fd = os.open(self.lock, os.O_EXCL | os.O_RDWR | os.O_CREAT)
151 os.write(fd, '1111111111')
152 os.close(fd)
153 self.assertTrue(os.path.exists(self.lock))
154 self.assertRaises(Exception, acquire_lock, self.lock, 1, 1)
155
157 fd = os.open(self.lock, os.O_EXCL | os.O_RDWR | os.O_CREAT)
158 os.write(fd, '1111111111')
159 os.close(fd)
160 self.assertTrue(os.path.exists(self.lock))
161 self.assertTrue(acquire_lock(self.lock))
162
164 self.assertTrue(acquire_lock(self.lock))
165 touch = datetime.datetime.fromtimestamp(time.time() - 3601).strftime("%m%d%H%M")
166 os.system("touch -t %s %s" % (touch, self.lock))
167 self.assertRaises(UserWarning, acquire_lock, self.lock, max_try=2, delay=1)
168
229
230 if __name__ == '__main__':
231 unittest_main()
232