Package logilab :: Package common :: Module shellutils
[frames] | no frames]

Source Code for Module logilab.common.shellutils

  1  """shell/term utilities, useful to write some python scripts instead of shell 
  2  scripts. 
  3   
  4  :author:    Logilab 
  5  :copyright: 2000-2009 LOGILAB S.A. (Paris, FRANCE), all rights reserved. 
  6  :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr 
  7  :license: General Public License version 2 - http://www.gnu.org/licenses 
  8  """ 
  9  __docformat__ = "restructuredtext en" 
 10   
 11  import os 
 12  import glob 
 13  import shutil 
 14  import stat 
 15  import sys 
 16  import tempfile 
 17  import time 
 18  import fnmatch 
 19  import errno 
 20  from os.path import exists, isdir, islink, basename, join, walk 
 21   
 22  from logilab.common import STD_BLACKLIST 
 23  try: 
 24      from logilab.common.proc import ProcInfo, NoSuchProcess 
 25  except ImportError: 
26 # windows platform 27 - class NoSuchProcess(Exception): pass
28
29 - def ProcInfo(pid):
30 raise NoSuchProcess()
31
32 33 -def chown(path, login=None, group=None):
34 """Same as `os.chown` function but accepting user login or group name as 35 argument. If login or group is omitted, it's left unchanged. 36 37 Note: you must own the file to chown it (or be root). Otherwise OSError is raised. 38 """ 39 if login is None: 40 uid = -1 41 else: 42 try: 43 uid = int(login) 44 except ValueError: 45 import pwd # Platforms: Unix 46 uid = pwd.getpwnam(login).pw_uid 47 if group is None: 48 gid = -1 49 else: 50 try: 51 gid = int(group) 52 except ValueError: 53 import grp 54 gid = grp.getgrnam(group).gr_gid 55 os.chown(path, uid, gid)
56
57 -def mv(source, destination, _action=shutil.move):
58 """A shell-like mv, supporting wildcards. 59 """ 60 sources = glob.glob(source) 61 if len(sources) > 1: 62 assert isdir(destination) 63 for filename in sources: 64 _action(filename, join(destination, basename(filename))) 65 else: 66 try: 67 source = sources[0] 68 except IndexError: 69 raise OSError('No file matching %s' % source) 70 if isdir(destination) and exists(destination): 71 destination = join(destination, basename(source)) 72 try: 73 _action(source, destination) 74 except OSError, ex: 75 raise OSError('Unable to move %r to %r (%s)' % ( 76 source, destination, ex))
77
78 -def rm(*files):
79 """A shell-like rm, supporting wildcards. 80 """ 81 for wfile in files: 82 for filename in glob.glob(wfile): 83 if islink(filename): 84 os.remove(filename) 85 elif isdir(filename): 86 shutil.rmtree(filename) 87 else: 88 os.remove(filename)
89
90 -def cp(source, destination):
91 """A shell-like cp, supporting wildcards. 92 """ 93 mv(source, destination, _action=shutil.copy)
94
95 -def find(directory, exts, exclude=False, blacklist=STD_BLACKLIST):
96 """Recursively find files ending with the given extensions from the directory. 97 98 :type directory: str 99 :param directory: 100 directory where the search should start 101 102 :type exts: basestring or list or tuple 103 :param exts: 104 extensions or lists or extensions to search 105 106 :type exclude: boolean 107 :param exts: 108 if this argument is True, returning files NOT ending with the given 109 extensions 110 111 :type blacklist: list or tuple 112 :param blacklist: 113 optional list of files or directory to ignore, default to the value of 114 `logilab.common.STD_BLACKLIST` 115 116 :rtype: list 117 :return: 118 the list of all matching files 119 """ 120 if isinstance(exts, basestring): 121 exts = (exts,) 122 if exclude: 123 def match(filename, exts): 124 for ext in exts: 125 if filename.endswith(ext): 126 return False 127 return True
128 else: 129 def match(filename, exts): 130 for ext in exts: 131 if filename.endswith(ext): 132 return True 133 return False 134 def func(files, directory, fnames): 135 """walk handler""" 136 # remove files/directories in the black list 137 for norecurs in blacklist: 138 try: 139 fnames.remove(norecurs) 140 except ValueError: 141 continue 142 for filename in fnames: 143 src = join(directory, filename) 144 if isdir(src): 145 continue 146 if match(filename, exts): 147 files.append(src) 148 files = [] 149 walk(directory, func, files) 150 return files 151
152 153 -def globfind(directory, pattern, blacklist=STD_BLACKLIST):
154 """Recursively finds files matching glob `pattern` under `directory`. 155 156 This is an alternative to `logilab.common.shellutils.find`. 157 158 :type directory: str 159 :param directory: 160 directory where the search should start 161 162 :type pattern: basestring 163 :param pattern: 164 the glob pattern (e.g *.py, foo*.py, etc.) 165 166 :type blacklist: list or tuple 167 :param blacklist: 168 optional list of files or directory to ignore, default to the value of 169 `logilab.common.STD_BLACKLIST` 170 171 :rtype: iterator 172 :return: 173 iterator over the list of all matching files 174 """ 175 for curdir, dirnames, filenames in os.walk(directory): 176 for fname in fnmatch.filter(filenames, pattern): 177 yield join(curdir, fname) 178 for skipped in blacklist: 179 if skipped in dirnames: 180 dirnames.remove(skipped)
181
182 -def unzip(archive, destdir):
183 import zipfile 184 if not exists(destdir): 185 os.mkdir(destdir) 186 zfobj = zipfile.ZipFile(archive) 187 for name in zfobj.namelist(): 188 if name.endswith('/'): 189 os.mkdir(join(destdir, name)) 190 else: 191 outfile = open(join(destdir, name), 'wb') 192 outfile.write(zfobj.read(name)) 193 outfile.close()
194
195 -class Execute:
196 """This is a deadlock safe version of popen2 (no stdin), that returns 197 an object with errorlevel, out and err. 198 """ 199
200 - def __init__(self, command):
201 outfile = tempfile.mktemp() 202 errfile = tempfile.mktemp() 203 self.status = os.system("( %s ) >%s 2>%s" % 204 (command, outfile, errfile)) >> 8 205 self.out = open(outfile,"r").read() 206 self.err = open(errfile,"r").read() 207 os.remove(outfile) 208 os.remove(errfile)
209
210 -def acquire_lock(lock_file, max_try=10, delay=10, max_delay=3600):
211 """Acquire a lock represented by a file on the file system 212 213 If the process written in lock file doesn't exist anymore, we remove the 214 lock file immediately 215 If age of the lock_file is greater than max_delay, then we raise a UserWarning 216 """ 217 count = abs(max_try) 218 while count: 219 try: 220 fd = os.open(lock_file, os.O_EXCL | os.O_RDWR | os.O_CREAT) 221 os.write(fd, str(os.getpid())) 222 os.close(fd) 223 return True 224 except OSError, e: 225 if e.errno == errno.EEXIST: 226 try: 227 fd = open(lock_file, "r") 228 pid = int(fd.readline()) 229 pi = ProcInfo(pid) 230 age = (time.time() - os.stat(lock_file)[stat.ST_MTIME]) 231 if age / max_delay > 1 : 232 raise UserWarning("Command '%s' (pid %s) has locked the " 233 "file '%s' for %s minutes" 234 % (pi.name(), pid, lock_file, age/60)) 235 except UserWarning: 236 raise 237 except NoSuchProcess: 238 os.remove(lock_file) 239 except Exception: 240 # The try block is not essential. can be skipped. 241 # Note: ProcInfo object is only available for linux 242 # process information are not accessible... 243 # or lock_file is no more present... 244 pass 245 else: 246 raise 247 count -= 1 248 time.sleep(delay) 249 else: 250 raise Exception('Unable to acquire %s' % lock_file)
251
252 -def release_lock(lock_file):
253 """Release a lock represented by a file on the file system.""" 254 os.remove(lock_file)
255
256 257 -class ProgressBar(object):
258 """A simple text progression bar.""" 259
260 - def __init__(self, nbops, size=20, stream=sys.stdout, title=''):
261 self._fstr = '\r%s[%%-%ss]' % (title, int(size)) 262 self._stream = stream 263 self._total = nbops 264 self._size = size 265 self._current = 0 266 self._progress = 0
267
268 - def update(self):
269 """Update the progression bar.""" 270 self._current += 1 271 progress = int((float(self._current)/float(self._total))*self._size) 272 if progress > self._progress: 273 self._progress = progress 274 self.refresh()
275
276 - def refresh(self):
277 """Refresh the progression bar display.""" 278 self._stream.write(self._fstr % ('.' * min(self._progress, self._size)) ) 279 self._stream.flush()
280 281 from logilab.common.deprecation import deprecated
282 283 @deprecated('confirm() is deprecated, use RawInput.confirm() instead') 284 -def confirm(question, default_is_yes=True):
285 """ask for confirmation and return true on positive answer""" 286 return RawInput().confirm(question, default_is_yes)
287
288 289 -class RawInput(object):
290
291 - def __init__(self, input=None, printer=None):
292 self._input = input or raw_input 293 self._print = printer
294
295 - def ask(self, question, options, default):
296 assert default in options 297 choices = [] 298 for option in options: 299 if option == default: 300 label = option[0].upper() 301 else: 302 label = option[0].lower() 303 if len(option) > 1: 304 label += '(%s)' % option[1:].lower() 305 choices.append((option, label)) 306 prompt = "%s [%s]: " % (question, 307 '/'.join(opt[1] for opt in choices)) 308 tries = 3 309 while tries > 0: 310 answer = self._input(prompt).strip().lower() 311 if not answer: 312 return default 313 possible = [option for option, label in choices 314 if option.lower().startswith(answer)] 315 if len(possible) == 1: 316 return possible[0] 317 elif len(possible) == 0: 318 msg = '%s is not an option.' % answer 319 else: 320 msg = ('%s is an ambiguous answer, do you mean %s ?' % ( 321 answer, ' or '.join(possible))) 322 if self._print: 323 self._print(msg) 324 else: 325 print msg 326 tries -= 1 327 raise Exception('unable to get a sensible answer')
328
329 - def confirm(self, question, default_is_yes=True):
330 default = default_is_yes and 'y' or 'n' 331 answer = self.ask(question, ('y','n'), default) 332 return answer == 'y'
333 334 ASK = RawInput()
335 336 337 -def getlogin():
338 """avoid using os.getlogin() because of strange tty / stdin problems 339 (man 3 getlogin) 340 Another solution would be to use $LOGNAME, $USER or $USERNAME 341 """ 342 import pwd # Platforms: Unix 343 return pwd.getpwuid(os.getuid())[0]
344