Package logilab :: Package common :: Module fileutils
[frames] | no frames]

Source Code for Module logilab.common.fileutils

  1  """File and file-path manipulation utilities. 
  2   
  3  :group path manipulation: first_level_directory, relative_path, is_binary,\ 
  4  get_by_ext, remove_dead_links 
  5  :group file manipulation: norm_read, norm_open, lines, stream_lines, lines,\ 
  6  write_open_mode, ensure_fs_mode, export 
  7  :sort: path manipulation, file manipulation 
  8   
  9  :copyright: 2000-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved. 
 10  :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr 
 11  :license: General Public License version 2 - http://www.gnu.org/licenses 
 12  """ 
 13  __docformat__ = "restructuredtext en" 
 14   
 15  import sys 
 16  import shutil 
 17  import mimetypes 
 18  from os.path import isabs, isdir, islink, split, exists, walk, normpath, join 
 19  from os.path import abspath 
 20  from os import sep, mkdir, remove, listdir, stat, chmod 
 21  from stat import ST_MODE, S_IWRITE 
 22  from cStringIO import StringIO 
 23   
 24  from logilab.common import STD_BLACKLIST as BASE_BLACKLIST, IGNORED_EXTENSIONS 
 25  from logilab.common.shellutils import find 
 26   
27 -def first_level_directory(path):
28 """Return the first level directory of a path. 29 30 >>> first_level_directory('home/syt/work') 31 'home' 32 >>> first_level_directory('/home/syt/work') 33 '/' 34 >>> first_level_directory('work') 35 'work' 36 >>> 37 38 :type path: str 39 :param path: the path for which we want the first level directory 40 41 :rtype: str 42 :return: the first level directory appearing in `path` 43 """ 44 head, tail = split(path) 45 while head and tail: 46 head, tail = split(head) 47 if tail: 48 return tail 49 # path was absolute, head is the fs root 50 return head
51
52 -def abspath_listdir(path):
53 """Lists path's content using absolute paths. 54 55 >>> os.listdir('/home') 56 ['adim', 'alf', 'arthur', 'auc'] 57 >>> abspath_listdir('/home') 58 ['/home/adim', '/home/alf', '/home/arthur', '/home/auc'] 59 """ 60 path = abspath(path) 61 return [join(path, filename) for filename in listdir(path)]
62 63
64 -def is_binary(filename):
65 """Return true if filename may be a binary file, according to it's 66 extension. 67 68 :type filename: str 69 :param filename: the name of the file 70 71 :rtype: bool 72 :return: 73 true if the file is a binary file (actually if it's mime type 74 isn't beginning by text/) 75 """ 76 try: 77 return not mimetypes.guess_type(filename)[0].startswith('text') 78 except AttributeError: 79 return 1
80 81
82 -def write_open_mode(filename):
83 """Return the write mode that should used to open file. 84 85 :type filename: str 86 :param filename: the name of the file 87 88 :rtype: str 89 :return: the mode that should be use to open the file ('w' or 'wb') 90 """ 91 if is_binary(filename): 92 return 'wb' 93 return 'w'
94 95
96 -def ensure_fs_mode(filepath, desired_mode=S_IWRITE):
97 """Check that the given file has the given mode(s) set, else try to 98 set it. 99 100 :type filepath: str 101 :param filepath: path of the file 102 103 :type desired_mode: int 104 :param desired_mode: 105 ORed flags describing the desired mode. Use constants from the 106 `stat` module for file permission's modes 107 """ 108 mode = stat(filepath)[ST_MODE] 109 if not mode & desired_mode: 110 chmod(filepath, mode | desired_mode)
111 112
113 -class ProtectedFile(file):
114 """A special file-object class that automatically that automatically 115 does a 'chmod +w' when needed. 116 117 XXX: for now, the way it is done allows 'normal file-objects' to be 118 created during the ProtectedFile object lifetime. 119 One way to circumvent this would be to chmod / unchmod on each 120 write operation. 121 122 One other way would be to : 123 124 - catch the IOError in the __init__ 125 126 - if IOError, then create a StringIO object 127 128 - each write operation writes in this StringIO object 129 130 - on close()/del(), write/append the StringIO content to the file and 131 do the chmod only once 132 """
133 - def __init__(self, filepath, mode):
134 self.original_mode = stat(filepath)[ST_MODE] 135 self.mode_changed = False 136 if mode in ('w', 'a', 'wb', 'ab'): 137 if not self.original_mode & S_IWRITE: 138 chmod(filepath, self.original_mode | S_IWRITE) 139 self.mode_changed = True 140 file.__init__(self, filepath, mode)
141
142 - def _restore_mode(self):
143 """restores the original mode if needed""" 144 if self.mode_changed: 145 chmod(self.name, self.original_mode) 146 # Don't re-chmod in case of several restore 147 self.mode_changed = False
148
149 - def close(self):
150 """restore mode before closing""" 151 self._restore_mode() 152 file.close(self)
153
154 - def __del__(self):
155 if not self.closed: 156 self.close()
157 158
159 -class UnresolvableError(Exception):
160 """Exception raised by relative path when it's unable to compute relative 161 path between two paths. 162 """
163
164 -def relative_path(from_file, to_file):
165 """Try to get a relative path from `from_file` to `to_file` 166 (path will be absolute if to_file is an absolute file). This function 167 is useful to create link in `from_file` to `to_file`. This typical use 168 case is used in this function description. 169 170 If both files are relative, they're expected to be relative to the same 171 directory. 172 173 >>> relative_path( from_file='toto/index.html', to_file='index.html') 174 '../index.html' 175 >>> relative_path( from_file='index.html', to_file='toto/index.html') 176 'toto/index.html' 177 >>> relative_path( from_file='tutu/index.html', to_file='toto/index.html') 178 '../toto/index.html' 179 >>> relative_path( from_file='toto/index.html', to_file='/index.html') 180 '/index.html' 181 >>> relative_path( from_file='/toto/index.html', to_file='/index.html') 182 '../index.html' 183 >>> relative_path( from_file='/toto/index.html', to_file='/toto/summary.html') 184 'summary.html' 185 >>> relative_path( from_file='index.html', to_file='index.html') 186 '' 187 >>> relative_path( from_file='/index.html', to_file='toto/index.html') 188 Traceback (most recent call last): 189 File "<string>", line 1, in ? 190 File "<stdin>", line 37, in relative_path 191 UnresolvableError 192 >>> relative_path( from_file='/index.html', to_file='/index.html') 193 '' 194 >>> 195 196 :type from_file: str 197 :param from_file: source file (where links will be inserted) 198 199 :type to_file: str 200 :param to_file: target file (on which links point) 201 202 :raise UnresolvableError: if it has been unable to guess a correct path 203 204 :rtype: str 205 :return: the relative path of `to_file` from `from_file` 206 """ 207 from_file = normpath(from_file) 208 to_file = normpath(to_file) 209 if from_file == to_file: 210 return '' 211 if isabs(to_file): 212 if not isabs(from_file): 213 return to_file 214 elif isabs(from_file): 215 raise UnresolvableError() 216 from_parts = from_file.split(sep) 217 to_parts = to_file.split(sep) 218 idem = 1 219 result = [] 220 while len(from_parts) > 1: 221 dirname = from_parts.pop(0) 222 if idem and len(to_parts) > 1 and dirname == to_parts[0]: 223 to_parts.pop(0) 224 else: 225 idem = 0 226 result.append('..') 227 result += to_parts 228 return sep.join(result)
229 230 231 from logilab.common.textutils import _LINE_RGX 232 from sys import version_info 233 _HAS_UNIV_OPEN = version_info[:2] >= (2, 3) 234 del version_info 235
236 -def norm_read(path):
237 """Return the content of the file with normalized line feeds. 238 239 :type path: str 240 :param path: path to the file to read 241 242 :rtype: str 243 :return: the content of the file with normalized line feeds 244 """ 245 if _HAS_UNIV_OPEN: 246 return open(path, 'U').read() 247 return _LINE_RGX.sub('\n', open(path).read())
248 249
250 -def norm_open(path):
251 """Return a stream for a file with content with normalized line feeds. 252 253 :type path: str 254 :param path: path to the file to open 255 256 :rtype: file or StringIO 257 :return: the opened file with normalized line feeds 258 """ 259 if _HAS_UNIV_OPEN: 260 return open(path, 'U') 261 return StringIO(_LINE_RGX.sub('\n', open(path).read()))
262 263
264 -def lines(path, comments=None):
265 """Return a list of non empty lines in the file located at `path`. 266 267 :type path: str 268 :param path: path to the file 269 270 :type comments: str or None 271 :param comments: 272 optional string which can be used to comment a line in the file 273 (i.e. lines starting with this string won't be returned) 274 275 :rtype: list 276 :return: 277 a list of stripped line in the file, without empty and commented 278 lines 279 280 :warning: at some point this function will probably return an iterator 281 """ 282 stream = norm_open(path) 283 result = stream_lines(stream, comments) 284 stream.close() 285 return result
286 287
288 -def stream_lines(stream, comments=None):
289 """Return a list of non empty lines in the given `stream`. 290 291 :type stream: object implementing 'xreadlines' or 'readlines' 292 :param stream: file like object 293 294 :type comments: str or None 295 :param comments: 296 optional string which can be used to comment a line in the file 297 (i.e. lines starting with this string won't be returned) 298 299 :rtype: list 300 :return: 301 a list of stripped line in the file, without empty and commented 302 lines 303 304 :warning: at some point this function will probably return an iterator 305 """ 306 try: 307 readlines = stream.xreadlines 308 except AttributeError: 309 readlines = stream.readlines 310 result = [] 311 for line in readlines(): 312 line = line.strip() 313 if line and (comments is None or not line.startswith(comments)): 314 result.append(line) 315 return result
316 317
318 -def export(from_dir, to_dir, 319 blacklist=BASE_BLACKLIST, ignore_ext=IGNORED_EXTENSIONS, 320 verbose=0):
321 """Make a mirror of `from_dir` in `to_dir`, omitting directories and 322 files listed in the black list or ending with one of the given 323 extensions. 324 325 :type from_dir: str 326 :param from_dir: directory to export 327 328 :type to_dir: str 329 :param to_dir: destination directory 330 331 :type blacklist: list or tuple 332 :param blacklist: 333 list of files or directories to ignore, default to the content of 334 `BASE_BLACKLIST` 335 336 :type ignore_ext: list or tuple 337 :param ignore_ext: 338 list of extensions to ignore, default to the content of 339 `IGNORED_EXTENSIONS` 340 341 :type verbose: bool 342 :param verbose: 343 flag indicating whether information about exported files should be 344 printed to stderr, default to False 345 """ 346 def make_mirror(_, directory, fnames): 347 """walk handler""" 348 for norecurs in blacklist: 349 try: 350 fnames.remove(norecurs) 351 except ValueError: 352 continue 353 for filename in fnames: 354 # don't include binary files 355 for ext in ignore_ext: 356 if filename.endswith(ext): 357 break 358 else: 359 src = join(directory, filename) 360 dest = to_dir + src[len(from_dir):] 361 if verbose: 362 print >> sys.stderr, src, '->', dest 363 if isdir(src): 364 if not exists(dest): 365 mkdir(dest) 366 else: 367 if exists(dest): 368 remove(dest) 369 shutil.copy2(src, dest)
370 try: 371 mkdir(to_dir) 372 except OSError: 373 pass 374 walk(from_dir, make_mirror, None) 375 376 396 walk(directory, _remove_dead_link, None) 397