Source code for libacbf.archivereader

import os
import shutil
from io import UnsupportedOperation
from pathlib import Path
from typing import Set, Optional, Union, Literal, BinaryIO
from tempfile import TemporaryDirectory
from zipfile import ZipFile, is_zipfile
from py7zr import SevenZipFile, is_7zfile
from rarfile import RarFile, is_rarfile
import tarfile as tar

from libacbf.constants import ArchiveTypes
from libacbf.exceptions import EditRARArchiveError, UnsupportedArchive


[docs]def get_archive_type(file: Union[str, Path, BinaryIO]) -> ArchiveTypes: """Get the type of archive. Parameters ---------- file : str | pathlib.Path | BinaryIO File to check. Returns ------- ArchiveTypes(Enum) Returns :class:`ArchiveTypes <libacbf.constants.ArchiveTypes>` enum. Raises ------ UnsupportedArchive Raised if file is not of a supported archive type. """ if isinstance(file, ZipFile): return ArchiveTypes.Zip elif isinstance(file, SevenZipFile): return ArchiveTypes.SevenZip elif isinstance(file, tar.TarFile): return ArchiveTypes.Tar elif isinstance(file, RarFile): return ArchiveTypes.Rar if is_7zfile(file): return ArchiveTypes.SevenZip elif is_zipfile(file): return ArchiveTypes.Zip elif is_rarfile(file): return ArchiveTypes.Rar elif tar.is_tarfile(file): return ArchiveTypes.Tar else: raise UnsupportedArchive
[docs]class ArchiveReader: """This can read and write Zip, 7Zip and Tar archives. Rar archives are read-only. Notes ----- Writing and creating archives uses the default options for each type. You cannot use this module to change compression levels or other options. Parameters ---------- file : str | pathlib.Path | BinaryIO Archive file to be used. mode : 'r' | 'w' Mode to open file in. Can be ``'r'`` for read-only or ``'w'`` for read-write. Nothing is overwritten. Attributes ---------- archive : zipfile.ZipFile | tarfile.TarFile | py7zr.SevenZipFile | rarfile.RarFile The archive being used. type : ArchiveTypes The type of archive. See enum for possible types. mode : 'r' | 'w' Mode to open file in. Can be ``'r'`` for read-only or ``'w'`` for read-write. Nothing is overwritten. _extract : tempfile.TemporaryDirectory | None The contents of the archive are extracted to a temporary directory in write mode only and this is used for listing, reading and writing. It is created in the same directory as the archive or, if the path is not found, it is created in the system temp directory. _arc_path : pathlib.Path | None The path to the temporary directory the archive is extracted to in write mode. _source : str | Path | BinaryIO The file passed in. """
[docs] def __init__(self, file: Union[str, Path, BinaryIO], mode: Literal['r', 'w'] = 'r'): self._extract = None self._arc_path = None self._source = file self.mode: Literal['r', 'w'] = mode self.type: ArchiveTypes = get_archive_type(file) if isinstance(file, str): file = Path(file).resolve(True) if hasattr(file, "seek"): file.seek(0) if mode == 'w': if self.type == ArchiveTypes.Rar: raise EditRARArchiveError arc = None if self.type == ArchiveTypes.Zip: arc = ZipFile(file, 'r') elif self.type == ArchiveTypes.SevenZip: arc = SevenZipFile(file, 'r') elif self.type == ArchiveTypes.Tar: if isinstance(file, (str, Path)): arc = tar.open(file, mode='r') else: arc = tar.open(fileobj=file, mode='r') elif self.type == ArchiveTypes.Rar: arc = RarFile(file) self.archive: Union[ZipFile, SevenZipFile, tar.TarFile, RarFile] = arc if mode == 'w': if self.filepath is not None: self._extract = TemporaryDirectory(dir=self.filepath.parent) else: self._extract = TemporaryDirectory() self._arc_path = Path(self._extract.name) self.archive.extractall(self._arc_path)
@property def filepath(self) -> Optional[Path]: """Path to the archive file. Returns ``None`` if it does not have a path. """ name = None if self.type in (ArchiveTypes.Zip, ArchiveTypes.SevenZip, ArchiveTypes.Rar): name = self.archive.filename elif self.type == ArchiveTypes.Tar: name = self.archive.name if name is not None: name = Path(name) return name @property def filename(self) -> Optional[str]: """Name of the archive file. Returns ``None`` if it does not have a path. """ return self.filepath.name
[docs] def _get_acbf_file(self) -> Optional[str]: """Returns the name of the first file with the ``.acbf`` extension at the root level of the archive or ``None`` if no file is found. """ acbf_file = None if self._arc_path is not None: for i in self._arc_path.glob("*.acbf"): if i.is_file(): acbf_file = i.relative_to(self._arc_path) break else: if self.type in (ArchiveTypes.Zip, ArchiveTypes.Rar): for i in self.archive.infolist(): if not i.is_dir() and '/' not in i.filename and i.filename.endswith(".acbf"): acbf_file = i.filename break elif self.type == ArchiveTypes.SevenZip: self.archive.reset() for i in self.archive.list(): if not i.is_directory and '/' not in i.filename and i.filename.endswith(".acbf"): acbf_file = i.filename break elif self.type == ArchiveTypes.Tar: for i in self.archive.getmembers(): if i.isfile() and '/' not in i.name and i.name.endswith(".acbf"): acbf_file = i.name break return acbf_file
[docs] def list_files(self) -> Set[str]: """Returns a list of all the names of the files in the archive. """ if self._arc_path is not None: return {str(x.relative_to(self._arc_path)) for x in self._arc_path.rglob('*') if x.is_file()} else: if self.type in (ArchiveTypes.Zip, ArchiveTypes.Rar): return {x.filename for x in self.archive.infolist() if not x.is_dir()} elif self.type == ArchiveTypes.Tar: return {x.name for x in self.archive.getmembers() if x.isfile()} elif self.type == ArchiveTypes.SevenZip: self.archive.reset() return {x.filename for x in self.archive.list() if not x.is_directory}
[docs] def list_dirs(self) -> Set[str]: """Returns a list of all the directories in the archive. """ if self._arc_path is not None: return {str(x.relative_to(self._arc_path)) for x in self._arc_path.rglob('*') if x.is_dir()} else: if self.type in (ArchiveTypes.Zip, ArchiveTypes.Rar): return {x.filename for x in self.archive.infolist() if x.is_dir()} elif self.type == ArchiveTypes.Tar: return {x.name for x in self.archive.getmembers() if x.isdir()} elif self.type == ArchiveTypes.SevenZip: self.archive.reset() return {x.filename for x in self.archive.list() if x.is_directory}
[docs] def read(self, target: str) -> Optional[bytes]: """Get file as bytes from archive. Parameters ---------- target : str Path relative to root of archive. Returns ------- bytes Contents of file. """ contents = None if self._arc_path is not None: with open(self._arc_path / target, 'rb') as file: contents = file.read() else: if self.type in (ArchiveTypes.Zip, ArchiveTypes.Rar): with self.archive.open(target, 'r') as file: contents = file.read() elif self.type == ArchiveTypes.SevenZip: self.archive.reset() with self.archive.read([target])[target] as file: contents = file.read() elif self.type == ArchiveTypes.Tar: with self.archive.extractfile(target) as file: contents = file.read() return contents
[docs] def write(self, target: Union[str, Path, bytes], arcname: Optional[str] = None): """Write file to archive. Parameters ---------- target : str | Path | bytes File to be written. Reads a file on disk if string or path is passed. Writes data directly if bytes is passed. arcname : str, default=Name of target file Name of file in archive. """ if self.mode == 'r': raise UnsupportedOperation("Archive is not writeable.") if isinstance(target, str): target = Path(target) contents = None if isinstance(target, Path): target = target.resolve(True) with open(target, 'rb') as src: contents = src.read() if isinstance(target, bytes): contents = target if arcname is None: if isinstance(target, bytes): raise AttributeError("`arcname` is required if `target` is bytes.") arcname = target.name if not (self._arc_path / arcname).resolve().is_relative_to(self._arc_path.resolve()): raise ValueError("`arcname` does not resolve to a file inside the archive.") os.makedirs(self._arc_path / Path(arcname).parent, exist_ok=True) with open(self._arc_path / arcname, 'wb') as file: file.write(contents)
[docs] def delete(self, target: Union[str, Path], recursive: bool = False): """File to delete from archive. Parameters ---------- target : str | Path Path of file to delete relative to root of archive. recursive : bool, default=False Whether to remove directories recursively. """ if self.mode == 'r': UnsupportedOperation("Archive is not writeable.") if isinstance(target, str): target = Path(target) target = (self._arc_path / target).resolve(True) if not target.resolve().is_relative_to(self._arc_path.resolve()): raise ValueError("`target` does not resolve to a file inside the archive.") if target.is_file(): try: os.remove(self._arc_path / target) except FileNotFoundError: pass else: if recursive: shutil.rmtree(self._arc_path / target) else: try: os.rmdir(self._arc_path / target) except FileNotFoundError: pass
[docs] def close(self): """Close archive file. Save changes if writeable. """ self.archive.close() if self.mode != 'r': if self.type == ArchiveTypes.Zip: with ZipFile(self._source, 'w') as arc: for i in self.list_files(): arc.write(self._arc_path / i, i) elif self.type == ArchiveTypes.SevenZip: with SevenZipFile(self._source, 'w') as arc: for i in self.list_files(): arc.write(self._arc_path / i, i) elif self.type == ArchiveTypes.Tar: with tar.open(self._source, 'w') as arc: for i in self.list_files(): arc.add(self._arc_path / i, i) if self._extract is not None: self._extract.cleanup()
[docs] def __enter__(self): return self
[docs] def __exit__(self, exception_type, exception_value, traceback): if exception_type is not None: if self._extract is not None: self._extract.cleanup() self.archive.close() else: self.close()