1 """shell/term utilities, useful to write some python scripts instead of shell
2 scripts.
3
4 :author: Logilab
5 :copyright: 2000-2009 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
6 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
7 :license: General Public License version 2 - http://www.gnu.org/licenses
8 """
9 __docformat__ = "restructuredtext en"
10
11 import os
12 import glob
13 import shutil
14 import stat
15 import sys
16 import tempfile
17 import time
18 import fnmatch
19 import errno
20 from os.path import exists, isdir, islink, basename, join, walk
21
22 from logilab.common import STD_BLACKLIST
23 try:
24 from logilab.common.proc import ProcInfo, NoSuchProcess
25 except ImportError:
28
31
32
33 -def chown(path, login=None, group=None):
34 """Same as `os.chown` function but accepting user login or group name as
35 argument. If login or group is omitted, it's left unchanged.
36
37 Note: you must own the file to chown it (or be root). Otherwise OSError is raised.
38 """
39 if login is None:
40 uid = -1
41 else:
42 try:
43 uid = int(login)
44 except ValueError:
45 import pwd
46 uid = pwd.getpwnam(login).pw_uid
47 if group is None:
48 gid = -1
49 else:
50 try:
51 gid = int(group)
52 except ValueError:
53 import grp
54 gid = grp.getgrnam(group).gr_gid
55 os.chown(path, uid, gid)
56
57 -def mv(source, destination, _action=shutil.move):
58 """A shell-like mv, supporting wildcards.
59 """
60 sources = glob.glob(source)
61 if len(sources) > 1:
62 assert isdir(destination)
63 for filename in sources:
64 _action(filename, join(destination, basename(filename)))
65 else:
66 try:
67 source = sources[0]
68 except IndexError:
69 raise OSError('No file matching %s' % source)
70 if isdir(destination) and exists(destination):
71 destination = join(destination, basename(source))
72 try:
73 _action(source, destination)
74 except OSError, ex:
75 raise OSError('Unable to move %r to %r (%s)' % (
76 source, destination, ex))
77
79 """A shell-like rm, supporting wildcards.
80 """
81 for wfile in files:
82 for filename in glob.glob(wfile):
83 if islink(filename):
84 os.remove(filename)
85 elif isdir(filename):
86 shutil.rmtree(filename)
87 else:
88 os.remove(filename)
89
90 -def cp(source, destination):
91 """A shell-like cp, supporting wildcards.
92 """
93 mv(source, destination, _action=shutil.copy)
94
96 """Recursively find files ending with the given extensions from the directory.
97
98 :type directory: str
99 :param directory:
100 directory where the search should start
101
102 :type exts: basestring or list or tuple
103 :param exts:
104 extensions or lists or extensions to search
105
106 :type exclude: boolean
107 :param exts:
108 if this argument is True, returning files NOT ending with the given
109 extensions
110
111 :type blacklist: list or tuple
112 :param blacklist:
113 optional list of files or directory to ignore, default to the value of
114 `logilab.common.STD_BLACKLIST`
115
116 :rtype: list
117 :return:
118 the list of all matching files
119 """
120 if isinstance(exts, basestring):
121 exts = (exts,)
122 if exclude:
123 def match(filename, exts):
124 for ext in exts:
125 if filename.endswith(ext):
126 return False
127 return True
128 else:
129 def match(filename, exts):
130 for ext in exts:
131 if filename.endswith(ext):
132 return True
133 return False
134 def func(files, directory, fnames):
135 """walk handler"""
136
137 for norecurs in blacklist:
138 try:
139 fnames.remove(norecurs)
140 except ValueError:
141 continue
142 for filename in fnames:
143 src = join(directory, filename)
144 if isdir(src):
145 continue
146 if match(filename, exts):
147 files.append(src)
148 files = []
149 walk(directory, func, files)
150 return files
151
154 """Recursively finds files matching glob `pattern` under `directory`.
155
156 This is an alternative to `logilab.common.shellutils.find`.
157
158 :type directory: str
159 :param directory:
160 directory where the search should start
161
162 :type pattern: basestring
163 :param pattern:
164 the glob pattern (e.g *.py, foo*.py, etc.)
165
166 :type blacklist: list or tuple
167 :param blacklist:
168 optional list of files or directory to ignore, default to the value of
169 `logilab.common.STD_BLACKLIST`
170
171 :rtype: iterator
172 :return:
173 iterator over the list of all matching files
174 """
175 for curdir, dirnames, filenames in os.walk(directory):
176 for fname in fnmatch.filter(filenames, pattern):
177 yield join(curdir, fname)
178 for skipped in blacklist:
179 if skipped in dirnames:
180 dirnames.remove(skipped)
181
182 -def unzip(archive, destdir):
183 import zipfile
184 if not exists(destdir):
185 os.mkdir(destdir)
186 zfobj = zipfile.ZipFile(archive)
187 for name in zfobj.namelist():
188 if name.endswith('/'):
189 os.mkdir(join(destdir, name))
190 else:
191 outfile = open(join(destdir, name), 'wb')
192 outfile.write(zfobj.read(name))
193 outfile.close()
194
196 """This is a deadlock safe version of popen2 (no stdin), that returns
197 an object with errorlevel, out and err.
198 """
199
201 outfile = tempfile.mktemp()
202 errfile = tempfile.mktemp()
203 self.status = os.system("( %s ) >%s 2>%s" %
204 (command, outfile, errfile)) >> 8
205 self.out = open(outfile,"r").read()
206 self.err = open(errfile,"r").read()
207 os.remove(outfile)
208 os.remove(errfile)
209
210 -def acquire_lock(lock_file, max_try=10, delay=10, max_delay=3600):
211 """Acquire a lock represented by a file on the file system
212
213 If the process written in lock file doesn't exist anymore, we remove the
214 lock file immediately
215 If age of the lock_file is greater than max_delay, then we raise a UserWarning
216 """
217 count = abs(max_try)
218 while count:
219 try:
220 fd = os.open(lock_file, os.O_EXCL | os.O_RDWR | os.O_CREAT)
221 os.write(fd, str(os.getpid()))
222 os.close(fd)
223 return True
224 except OSError, e:
225 if e.errno == errno.EEXIST:
226 try:
227 fd = open(lock_file, "r")
228 pid = int(fd.readline())
229 pi = ProcInfo(pid)
230 age = (time.time() - os.stat(lock_file)[stat.ST_MTIME])
231 if age / max_delay > 1 :
232 raise UserWarning("Command '%s' (pid %s) has locked the "
233 "file '%s' for %s minutes"
234 % (pi.name(), pid, lock_file, age/60))
235 except UserWarning:
236 raise
237 except NoSuchProcess:
238 os.remove(lock_file)
239 except Exception:
240
241
242
243
244 pass
245 else:
246 raise
247 count -= 1
248 time.sleep(delay)
249 else:
250 raise Exception('Unable to acquire %s' % lock_file)
251
253 """Release a lock represented by a file on the file system."""
254 os.remove(lock_file)
255
258 """A simple text progression bar."""
259
260 - def __init__(self, nbops, size=20, stream=sys.stdout, title=''):
261 self._fstr = '\r%s[%%-%ss]' % (title, int(size))
262 self._stream = stream
263 self._total = nbops
264 self._size = size
265 self._current = 0
266 self._progress = 0
267
269 """Update the progression bar."""
270 self._current += 1
271 progress = int((float(self._current)/float(self._total))*self._size)
272 if progress > self._progress:
273 self._progress = progress
274 self.refresh()
275
277 """Refresh the progression bar display."""
278 self._stream.write(self._fstr % ('.' * min(self._progress, self._size)) )
279 self._stream.flush()
280
281 from logilab.common.deprecation import deprecated
282
283 @deprecated('confirm() is deprecated, use RawInput.confirm() instead')
284 -def confirm(question, default_is_yes=True):
285 """ask for confirmation and return true on positive answer"""
286 return RawInput().confirm(question, default_is_yes)
287
333
334 ASK = RawInput()
338 """avoid using os.getlogin() because of strange tty / stdin problems
339 (man 3 getlogin)
340 Another solution would be to use $LOGNAME, $USER or $USERNAME
341 """
342 import pwd
343 return pwd.getpwuid(os.getuid())[0]
344