when the downloader detects a '.part' file and attempts to resume its download, a FileExistsError was raised while trying to create an assumed missing directory path when said '.part' file was (re)moved during the time it was detected and the downloader attempted to open it in 'a+b' mode.
388 lines
13 KiB
Python
388 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright 2021-2023 Mike Fährmann
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License version 2 as
|
|
# published by the Free Software Foundation.
|
|
|
|
"""Filesystem path handling"""
|
|
|
|
import os
|
|
import re
|
|
import shutil
|
|
import functools
|
|
from . import util, formatter, exception
|
|
|
|
WINDOWS = util.WINDOWS
|
|
EXTENSION_MAP = {
|
|
"jpeg": "jpg",
|
|
"jpe" : "jpg",
|
|
"jfif": "jpg",
|
|
"jif" : "jpg",
|
|
"jfi" : "jpg",
|
|
}
|
|
|
|
|
|
class PathFormat():
|
|
|
|
def __init__(self, extractor):
|
|
config = extractor.config
|
|
kwdefault = config("keywords-default")
|
|
if kwdefault is None:
|
|
kwdefault = util.NONE
|
|
|
|
filename_fmt = config("filename")
|
|
try:
|
|
if filename_fmt is None:
|
|
filename_fmt = extractor.filename_fmt
|
|
elif isinstance(filename_fmt, dict):
|
|
self.filename_conditions = [
|
|
(util.compile_filter(expr),
|
|
formatter.parse(fmt, kwdefault).format_map)
|
|
for expr, fmt in filename_fmt.items() if expr
|
|
]
|
|
self.build_filename = self.build_filename_conditional
|
|
filename_fmt = filename_fmt.get("", extractor.filename_fmt)
|
|
|
|
self.filename_formatter = formatter.parse(
|
|
filename_fmt, kwdefault).format_map
|
|
except Exception as exc:
|
|
raise exception.FilenameFormatError(exc)
|
|
|
|
directory_fmt = config("directory")
|
|
self.directory_conditions = ()
|
|
try:
|
|
if directory_fmt is None:
|
|
directory_fmt = extractor.directory_fmt
|
|
elif isinstance(directory_fmt, dict):
|
|
self.directory_conditions = [
|
|
(util.compile_filter(expr), [
|
|
formatter.parse(fmt, kwdefault).format_map
|
|
for fmt in fmts
|
|
])
|
|
for expr, fmts in directory_fmt.items() if expr
|
|
]
|
|
self.build_directory = self.build_directory_conditional
|
|
directory_fmt = directory_fmt.get("", extractor.directory_fmt)
|
|
|
|
self.directory_formatters = [
|
|
formatter.parse(dirfmt, kwdefault).format_map
|
|
for dirfmt in directory_fmt
|
|
]
|
|
except Exception as exc:
|
|
raise exception.DirectoryFormatError(exc)
|
|
|
|
self.kwdict = {}
|
|
self.delete = False
|
|
self.prefix = ""
|
|
self.filename = ""
|
|
self.extension = ""
|
|
self.directory = ""
|
|
self.realdirectory = ""
|
|
self.path = ""
|
|
self.realpath = ""
|
|
self.temppath = ""
|
|
|
|
extension_map = config("extension-map")
|
|
if extension_map is None:
|
|
extension_map = EXTENSION_MAP
|
|
self.extension_map = extension_map.get
|
|
|
|
restrict = config("path-restrict", "auto")
|
|
replace = config("path-replace", "_")
|
|
if restrict == "auto":
|
|
restrict = "\\\\|/<>:\"?*" if WINDOWS else "/"
|
|
elif restrict == "unix":
|
|
restrict = "/"
|
|
elif restrict == "windows":
|
|
restrict = "\\\\|/<>:\"?*"
|
|
elif restrict == "ascii":
|
|
restrict = "^0-9A-Za-z_."
|
|
elif restrict == "ascii+":
|
|
restrict = "^0-9@-[\\]-{ #-)+-.;=!}~"
|
|
self.clean_segment = self._build_cleanfunc(restrict, replace)
|
|
|
|
remove = config("path-remove", "\x00-\x1f\x7f")
|
|
self.clean_path = self._build_cleanfunc(remove, "")
|
|
|
|
strip = config("path-strip", "auto")
|
|
if strip == "auto":
|
|
strip = ". " if WINDOWS else ""
|
|
elif strip == "unix":
|
|
strip = ""
|
|
elif strip == "windows":
|
|
strip = ". "
|
|
self.strip = strip
|
|
|
|
if WINDOWS:
|
|
self.extended = config("path-extended", True)
|
|
|
|
basedir = extractor._parentdir
|
|
if not basedir:
|
|
basedir = config("base-directory")
|
|
sep = os.sep
|
|
if basedir is None:
|
|
basedir = "." + sep + "gallery-dl" + sep
|
|
elif basedir:
|
|
basedir = util.expand_path(basedir)
|
|
altsep = os.altsep
|
|
if altsep and altsep in basedir:
|
|
basedir = basedir.replace(altsep, sep)
|
|
if basedir[-1] != sep:
|
|
basedir += sep
|
|
basedir = self.clean_path(basedir)
|
|
self.basedirectory = basedir
|
|
|
|
def _build_cleanfunc(self, chars, repl):
|
|
if not chars:
|
|
return util.identity
|
|
elif isinstance(chars, dict):
|
|
if 0 not in chars:
|
|
chars = self._process_repl_dict(chars)
|
|
chars[0] = None
|
|
|
|
def func(x, table=str.maketrans(chars)):
|
|
return x.translate(table)
|
|
elif len(chars) == 1:
|
|
def func(x, c=chars, r=repl):
|
|
return x.replace(c, r)
|
|
else:
|
|
return functools.partial(
|
|
re.compile("[" + chars + "]").sub, repl)
|
|
return func
|
|
|
|
def _process_repl_dict(self, chars):
|
|
# can't modify 'chars' while *directly* iterating over its keys
|
|
for char in [c for c in chars if len(c) > 1]:
|
|
if len(char) == 3 and char[1] == "-":
|
|
citer = range(ord(char[0]), ord(char[2])+1)
|
|
else:
|
|
citer = char
|
|
|
|
repl = chars.pop(char)
|
|
for c in citer:
|
|
chars[c] = repl
|
|
|
|
return chars
|
|
|
|
def open(self, mode="wb"):
|
|
"""Open file and return a corresponding file object"""
|
|
try:
|
|
return open(self.temppath, mode)
|
|
except FileNotFoundError:
|
|
if "r" in mode:
|
|
# '.part' file no longer exists
|
|
return util.NullContext()
|
|
os.makedirs(self.realdirectory)
|
|
return open(self.temppath, mode)
|
|
|
|
def exists(self):
|
|
"""Return True if the file exists on disk"""
|
|
if self.extension and os.path.exists(self.realpath):
|
|
return self.check_file()
|
|
return False
|
|
|
|
@staticmethod
|
|
def check_file():
|
|
return True
|
|
|
|
def _enum_file(self):
|
|
num = 1
|
|
try:
|
|
while True:
|
|
prefix = format(num) + "."
|
|
self.kwdict["extension"] = prefix + self.extension
|
|
self.build_path()
|
|
os.stat(self.realpath) # raises OSError if file doesn't exist
|
|
num += 1
|
|
except OSError:
|
|
pass
|
|
self.prefix = prefix
|
|
return False
|
|
|
|
def set_directory(self, kwdict):
|
|
"""Build directory path and create it if necessary"""
|
|
self.kwdict = kwdict
|
|
|
|
segments = self.build_directory(kwdict)
|
|
if segments:
|
|
self.directory = directory = self.basedirectory + self.clean_path(
|
|
os.sep.join(segments) + os.sep)
|
|
else:
|
|
self.directory = directory = self.basedirectory
|
|
|
|
if WINDOWS and self.extended:
|
|
directory = self._extended_path(directory)
|
|
self.realdirectory = directory
|
|
|
|
def _extended_path(self, path):
|
|
# Enable longer-than-260-character paths
|
|
path = os.path.abspath(path)
|
|
if not path.startswith("\\\\"):
|
|
path = "\\\\?\\" + path
|
|
elif not path.startswith("\\\\?\\"):
|
|
path = "\\\\?\\UNC\\" + path[2:]
|
|
|
|
# abspath() in Python 3.7+ removes trailing path separators (#402)
|
|
if path[-1] != os.sep:
|
|
return path + os.sep
|
|
return path
|
|
|
|
def set_filename(self, kwdict):
|
|
"""Set general filename data"""
|
|
self.kwdict = kwdict
|
|
self.filename = self.temppath = self.prefix = ""
|
|
|
|
ext = kwdict["extension"]
|
|
kwdict["extension"] = self.extension = self.extension_map(ext, ext)
|
|
|
|
def set_extension(self, extension, real=True):
|
|
"""Set filename extension"""
|
|
self.extension = extension = self.extension_map(extension, extension)
|
|
self.kwdict["extension"] = self.prefix + extension
|
|
|
|
def fix_extension(self, _=None):
|
|
"""Fix filenames without a given filename extension"""
|
|
try:
|
|
if not self.extension:
|
|
self.kwdict["extension"] = \
|
|
self.prefix + self.extension_map("", "")
|
|
self.build_path()
|
|
if self.path[-1] == ".":
|
|
self.path = self.path[:-1]
|
|
self.temppath = self.realpath = self.realpath[:-1]
|
|
elif not self.temppath:
|
|
self.build_path()
|
|
except exception.GalleryDLException:
|
|
raise
|
|
except Exception:
|
|
self.path = self.directory + "?"
|
|
self.realpath = self.temppath = self.realdirectory + "?"
|
|
return True
|
|
|
|
def build_filename(self, kwdict):
|
|
"""Apply 'kwdict' to filename format string"""
|
|
try:
|
|
return self.clean_path(self.clean_segment(
|
|
self.filename_formatter(kwdict)))
|
|
except Exception as exc:
|
|
raise exception.FilenameFormatError(exc)
|
|
|
|
def build_filename_conditional(self, kwdict):
|
|
try:
|
|
for condition, fmt in self.filename_conditions:
|
|
if condition(kwdict):
|
|
break
|
|
else:
|
|
fmt = self.filename_formatter
|
|
return self.clean_path(self.clean_segment(fmt(kwdict)))
|
|
except Exception as exc:
|
|
raise exception.FilenameFormatError(exc)
|
|
|
|
def build_directory(self, kwdict):
|
|
"""Apply 'kwdict' to directory format strings"""
|
|
segments = []
|
|
append = segments.append
|
|
strip = self.strip
|
|
|
|
try:
|
|
for fmt in self.directory_formatters:
|
|
segment = fmt(kwdict).strip()
|
|
if strip and segment not in {".", ".."}:
|
|
# remove trailing dots and spaces (#647)
|
|
segment = segment.rstrip(strip)
|
|
if segment:
|
|
append(self.clean_segment(segment))
|
|
return segments
|
|
except Exception as exc:
|
|
raise exception.DirectoryFormatError(exc)
|
|
|
|
def build_directory_conditional(self, kwdict):
|
|
segments = []
|
|
append = segments.append
|
|
strip = self.strip
|
|
|
|
try:
|
|
for condition, formatters in self.directory_conditions:
|
|
if condition(kwdict):
|
|
break
|
|
else:
|
|
formatters = self.directory_formatters
|
|
for fmt in formatters:
|
|
segment = fmt(kwdict).strip()
|
|
if strip and segment != "..":
|
|
segment = segment.rstrip(strip)
|
|
if segment:
|
|
append(self.clean_segment(segment))
|
|
return segments
|
|
except Exception as exc:
|
|
raise exception.DirectoryFormatError(exc)
|
|
|
|
def build_path(self):
|
|
"""Combine directory and filename to full paths"""
|
|
self.filename = filename = self.build_filename(self.kwdict)
|
|
self.path = self.directory + filename
|
|
self.realpath = self.realdirectory + filename
|
|
if not self.temppath:
|
|
self.temppath = self.realpath
|
|
|
|
def part_enable(self, part_directory=None):
|
|
"""Enable .part file usage"""
|
|
if self.extension:
|
|
self.temppath += ".part"
|
|
else:
|
|
self.kwdict["extension"] = self.prefix + self.extension_map(
|
|
"part", "part")
|
|
self.build_path()
|
|
if part_directory:
|
|
self.temppath = os.path.join(
|
|
part_directory,
|
|
os.path.basename(self.temppath),
|
|
)
|
|
|
|
def part_size(self):
|
|
"""Return size of .part file"""
|
|
try:
|
|
return os.stat(self.temppath).st_size
|
|
except OSError:
|
|
pass
|
|
return 0
|
|
|
|
def finalize(self):
|
|
"""Move tempfile to its target location"""
|
|
if self.delete:
|
|
self.delete = False
|
|
os.unlink(self.temppath)
|
|
return
|
|
|
|
if self.temppath != self.realpath:
|
|
# Move temp file to its actual location
|
|
while True:
|
|
try:
|
|
os.replace(self.temppath, self.realpath)
|
|
except FileNotFoundError:
|
|
try:
|
|
# delayed directory creation
|
|
os.makedirs(self.realdirectory)
|
|
except FileExistsError:
|
|
# file at self.temppath does not exist
|
|
return False
|
|
continue
|
|
except OSError:
|
|
# move across different filesystems
|
|
try:
|
|
shutil.copyfile(self.temppath, self.realpath)
|
|
except FileNotFoundError:
|
|
try:
|
|
os.makedirs(self.realdirectory)
|
|
except FileExistsError:
|
|
return False
|
|
shutil.copyfile(self.temppath, self.realpath)
|
|
os.unlink(self.temppath)
|
|
break
|
|
|
|
mtime = self.kwdict.get("_mtime")
|
|
if mtime:
|
|
util.set_mtime(self.realpath, mtime)
|