Source code for railgun.common.fileutil

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# @file: railgun/common/fileutil.py
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# This file is released under BSD 2-clause license.

import os
import zipfile
import rarfile
import tarfile

# set the global parameters of external modules
rarfile.PATH_SEP = '/'


[docs]def file_get_contents(path): """Read the file contents of `path`. :returns: File content as string, or None if any error occurred. """ try: with open(path, 'rb') as f: return unicode(f.read(), 'utf-8') except Exception: pass
[docs]def remove_firstdir(path): """Remove the first directory of given `path` and return modified str. If '/' is not found in `path`, assume that it does not contain a directory. For examples:: >>> remove_firstdir('/entity') 'entity' >>> remove_firstdir('top/entity') 'entity' >>> remove_firstdir('entity') 'entity' :param path: The input path string. :type path: :class:`str` :return: Path of which the first directory is removed. """ slash_pos = path.find('/') if slash_pos >= 0: path = path[slash_pos+1:] return path
[docs]def dirtree(parent): """Get an iterable object over all relative paths of entities under directory `parent`. :returns: iterable object over all relative paths. :raises: :class:`Exception` from the system libraries. """ def F(pa, p): for f in os.listdir(pa): fpath = os.path.join(pa, f) p2 = p + f # if directory, scan recursively if os.path.isdir(fpath): for p3 in F(fpath, p2 + '/'): yield p3 yield p2 return F(os.path.realpath(parent), '')
[docs]def packzip(base_path, files, target, path_prefix=''): """Pack all entities in `files` under `base_path` into `target` zipfile. :param base_path: the base path of all entities. :type base_path: :class:`str` :param files: iterable object over relative paths of file entities. :type files: :class:`object` :param target: Target zipfile object. :type target: :class:`zipfile.ZipFile` :param path_prefix: prefix of paths to be added into archive file. :type path_prefix: :class:`str` :return: `target` """ for f in files: fp = os.path.join(base_path, f) target.write(fp, path_prefix + f) return target
[docs]def makezip(filename): """Create a new :class:`zipfile.ZipFile` object. :param filename: the file system path of created zip file. :type filename: :class:`str` :return: :class:`zipfile.ZipFile` object. """ return zipfile.ZipFile(filename, 'w', zipfile.ZIP_DEFLATED)
[docs]class Extractor(object): """The unique interface for archive file extractors. `Railgun` system can extract various types of archive files. This class provides a unique interface to create an extractor on given archive file. The format of archive will be recognized according to file extension, For examples:: >>> Extractor.open('a.zip') <ZipExtractor instance> >>> Extractor.open('a.rar') <RarExtractor instance> Also, the :class:`Extractor` objects implements context manager, for example: .. code-block:: python with Extractor.open('a.zip') as f: for fname, fobj in f: print 'the content of %s is:' % fname print fobj.read() """ def __init__(self, fobj): self.fobj = fobj # support with statement def __enter__(self): return self def __exit__(self, type, value, tb): if self.fobj: self.fobj.close() self.fobj = None # support iteration over the extractor def __iter__(self): return self.extract() # canonical path: replace '\\' to '/' def _canonical_path(self, p): return p.replace('\\', '/') # basic method to get next file from archive
[docs] def extract(self): """Get iterable (fname, fobj) from the archive. You may simply iterate over a :class:`Extractor` object, which is same as calling to this method. :return: list of tuple (fname, fobj), where `fname` is a :class:`str`, and `fobj` is a file-like object. """ raise NotImplementedError() # basic method to get names of all files
[docs] def filelist(self): """Get iterable name lists in this archive file.""" raise NotImplementedError()
[docs] def countfiles(self, maxcount=1048576): """Count all files in the archive. :param maxcount: maximum files to count. If exceeds this limit, the method will be interrupted, and ``maxcount + 1`` will be returned. :type maxcount: :class:`int` :return: the number of files in this archive. """ counter = 0 for fname in self.filelist(): counter += 1 if counter > maxcount: break return counter
[docs] def onedir(self): """Check whether this archive contains only one top-level directory? Some students may compress a whole directory. We want to detect such situations. .. note:: OS X may add a hidden directory named `__MACOSX` to zip archives. This method will ignore such directory. :return: True if the archive file indeed contains only one top-level directory, while False otherwise. """ last_dname = None for fname in self.filelist(): # get the first directory name slash_pos = fname.find('/') if slash_pos >= 0: dname = fname[: slash_pos] else: dname = fname # ignore some meta data directories if dname == '__MACOSX': # OS X will add a hidden directory named "__MACOSX" to archive # even the user just wants to compress a single directory. # So ignore this directory. continue # check whether one dir. if last_dname is None: last_dname = dname if last_dname != dname: return False return True
@staticmethod
[docs] def open(fpath): """Open an extractor for given archive file. :param fpath: the path of archive file. :type fpath: :class:`str` :return: instance derived from :class:`Extractor`. :raises: :class:`ValueError` if the extension of given file is not supported. """ fext = os.path.splitext(fpath)[1].lower() if fext in ('.rar'): return RarExtractor(fpath) if fext in ('.zip'): return ZipExtractor(fpath) if fext in ('.tar', '.tgz', '.gz', '.bz2', '.tbz'): return TarExtractor(fpath) raise ValueError('Archive file "%s" not recognized.')
class ZipExtractor(Extractor): def __init__(self, fpath): super(ZipExtractor, self).__init__(zipfile.ZipFile(fpath, 'r')) def extract(self): for mi in self.fobj.infolist(): # ignore directory entries if mi.filename[-1] == '/': continue f = self.fobj.open(mi) yield self._canonical_path(mi.filename), f def filelist(self): for mi in self.fobj.infolist(): if mi.filename[-1] == '/': continue yield self._canonical_path(mi.filename) class RarExtractor(Extractor): def __init__(self, fpath): super(RarExtractor, self).__init__(rarfile.RarFile(fpath, 'r')) def extract(self): for mi in self.fobj.infolist(): if mi.isdir(): continue f = self.fobj.open(mi) yield self._canonical_path(mi.filename), f def filelist(self): for mi in self.fobj.infolist(): if mi.isdir(): continue yield self._canonical_path(mi.filename) class TarExtractor(Extractor): def __init__(self, fpath): super(TarExtractor, self).__init__(tarfile.open(fpath, 'r')) def extract(self): for mi in self.fobj: if not mi.isdir(): yield self._canonical_path(mi.name), self.fobj.extractfile(mi) def filelist(self): for mi in self.fobj: if not mi.isdir(): yield self._canonical_path(mi.name)