[postprocessor:metadata] add 'event' and 'filename' options
This commit is contained in:
@@ -6,7 +6,7 @@
|
|||||||
# it under the terms of the GNU General Public License version 2 as
|
# it under the terms of the GNU General Public License version 2 as
|
||||||
# published by the Free Software Foundation.
|
# published by the Free Software Foundation.
|
||||||
|
|
||||||
"""Write metadata to JSON files"""
|
"""Write metadata to external files"""
|
||||||
|
|
||||||
from .common import PostProcessor
|
from .common import PostProcessor
|
||||||
from .. import util
|
from .. import util
|
||||||
@@ -24,7 +24,7 @@ class MetadataPP(PostProcessor):
|
|||||||
cfmt = options.get("content-format") or options.get("format")
|
cfmt = options.get("content-format") or options.get("format")
|
||||||
if isinstance(cfmt, list):
|
if isinstance(cfmt, list):
|
||||||
cfmt = "\n".join(cfmt) + "\n"
|
cfmt = "\n".join(cfmt) + "\n"
|
||||||
self.contentfmt = util.Formatter(cfmt).format_map
|
self._content_fmt = util.Formatter(cfmt).format_map
|
||||||
ext = "txt"
|
ext = "txt"
|
||||||
elif mode == "tags":
|
elif mode == "tags":
|
||||||
self.write = self._write_tags
|
self.write = self._write_tags
|
||||||
@@ -39,47 +39,64 @@ class MetadataPP(PostProcessor):
|
|||||||
if directory:
|
if directory:
|
||||||
self._directory = self._directory_custom
|
self._directory = self._directory_custom
|
||||||
sep = os.sep + (os.altsep or "")
|
sep = os.sep + (os.altsep or "")
|
||||||
self.metadir = directory.rstrip(sep) + os.sep
|
self._metadir = directory.rstrip(sep) + os.sep
|
||||||
|
|
||||||
|
filename = options.get("filename")
|
||||||
extfmt = options.get("extension-format")
|
extfmt = options.get("extension-format")
|
||||||
if extfmt:
|
if filename:
|
||||||
self._filename = self._filename_custom
|
self._filename = self._filename_custom
|
||||||
self.extfmt = util.Formatter(extfmt).format_map
|
self._filename_fmt = util.Formatter(filename).format_map
|
||||||
|
elif extfmt:
|
||||||
|
self._filename = self._filename_extfmt
|
||||||
|
self._extension_fmt = util.Formatter(extfmt).format_map
|
||||||
else:
|
else:
|
||||||
self.extension = options.get("extension", ext)
|
self.extension = options.get("extension", ext)
|
||||||
|
|
||||||
event = "metadata" if options.get("bypost") else "file"
|
events = options.get("event")
|
||||||
job.hooks[event].append(self.run)
|
if events is None:
|
||||||
|
events = ("file",)
|
||||||
|
elif isinstance(events, str):
|
||||||
|
events = events.split(",")
|
||||||
|
for event in events:
|
||||||
|
job.hooks[event].append(self.run)
|
||||||
|
|
||||||
def run(self, pathfmt):
|
def run(self, pathfmt):
|
||||||
path = self._directory(pathfmt) + self._filename(pathfmt)
|
directory = self._directory(pathfmt)
|
||||||
with open(path, "w", encoding="utf-8") as file:
|
path = directory + self._filename(pathfmt)
|
||||||
self.write(file, pathfmt.kwdict)
|
|
||||||
|
try:
|
||||||
|
with open(path, "w", encoding="utf-8") as fp:
|
||||||
|
self.write(fp, pathfmt.kwdict)
|
||||||
|
except FileNotFoundError:
|
||||||
|
os.makedirs(directory, exist_ok=True)
|
||||||
|
with open(path, "w", encoding="utf-8") as fp:
|
||||||
|
self.write(fp, pathfmt.kwdict)
|
||||||
|
|
||||||
def _directory(self, pathfmt):
|
def _directory(self, pathfmt):
|
||||||
return pathfmt.realdirectory
|
return pathfmt.realdirectory
|
||||||
|
|
||||||
def _directory_custom(self, pathfmt):
|
def _directory_custom(self, pathfmt):
|
||||||
directory = os.path.join(pathfmt.realdirectory, self.metadir)
|
return os.path.join(pathfmt.realdirectory, self._metadir)
|
||||||
os.makedirs(directory, exist_ok=True)
|
|
||||||
return directory
|
|
||||||
|
|
||||||
def _filename(self, pathfmt):
|
def _filename(self, pathfmt):
|
||||||
return pathfmt.filename + "." + self.extension
|
return (pathfmt.filename or "metadata") + "." + self.extension
|
||||||
|
|
||||||
def _filename_custom(self, pathfmt):
|
def _filename_custom(self, pathfmt):
|
||||||
|
return self._filename_fmt(pathfmt.kwdict)
|
||||||
|
|
||||||
|
def _filename_extfmt(self, pathfmt):
|
||||||
kwdict = pathfmt.kwdict
|
kwdict = pathfmt.kwdict
|
||||||
ext = kwdict["extension"]
|
ext = kwdict["extension"]
|
||||||
kwdict["extension"] = pathfmt.extension
|
kwdict["extension"] = pathfmt.extension
|
||||||
kwdict["extension"] = pathfmt.prefix + self.extfmt(kwdict)
|
kwdict["extension"] = pathfmt.prefix + self._extension_fmt(kwdict)
|
||||||
filename = pathfmt.build_filename()
|
filename = pathfmt.build_filename()
|
||||||
kwdict["extension"] = ext
|
kwdict["extension"] = ext
|
||||||
return filename
|
return filename
|
||||||
|
|
||||||
def _write_custom(self, file, kwdict):
|
def _write_custom(self, fp, kwdict):
|
||||||
file.write(self.contentfmt(kwdict))
|
fp.write(self._content_fmt(kwdict))
|
||||||
|
|
||||||
def _write_tags(self, file, kwdict):
|
def _write_tags(self, fp, kwdict):
|
||||||
tags = kwdict.get("tags") or kwdict.get("tag_string")
|
tags = kwdict.get("tags") or kwdict.get("tag_string")
|
||||||
|
|
||||||
if not tags:
|
if not tags:
|
||||||
@@ -91,11 +108,10 @@ class MetadataPP(PostProcessor):
|
|||||||
taglist = tags.split(" ")
|
taglist = tags.split(" ")
|
||||||
tags = taglist
|
tags = taglist
|
||||||
|
|
||||||
file.write("\n".join(tags))
|
fp.write("\n".join(tags) + "\n")
|
||||||
file.write("\n")
|
|
||||||
|
|
||||||
def _write_json(self, file, kwdict):
|
def _write_json(self, fp, kwdict):
|
||||||
util.dump_json(util.filter_dict(kwdict), file, self.ascii, self.indent)
|
util.dump_json(util.filter_dict(kwdict), fp, self.ascii, self.indent)
|
||||||
|
|
||||||
|
|
||||||
__postprocessor__ = MetadataPP
|
__postprocessor__ = MetadataPP
|
||||||
|
|||||||
@@ -244,7 +244,7 @@ class MetadataTest(BasePostprocessorTest):
|
|||||||
pp = self._create(pp_info, {"foo": "bar"})
|
pp = self._create(pp_info, {"foo": "bar"})
|
||||||
self.assertEqual(pp.write, pp._write_custom)
|
self.assertEqual(pp.write, pp._write_custom)
|
||||||
self.assertEqual(pp.extension, "txt")
|
self.assertEqual(pp.extension, "txt")
|
||||||
self.assertTrue(pp.contentfmt)
|
self.assertTrue(pp._content_fmt)
|
||||||
|
|
||||||
with patch("builtins.open", mock_open()) as m:
|
with patch("builtins.open", mock_open()) as m:
|
||||||
self._trigger()
|
self._trigger()
|
||||||
@@ -261,7 +261,7 @@ class MetadataTest(BasePostprocessorTest):
|
|||||||
"extension-format": "json",
|
"extension-format": "json",
|
||||||
})
|
})
|
||||||
|
|
||||||
self.assertEqual(pp._filename, pp._filename_custom)
|
self.assertEqual(pp._filename, pp._filename_extfmt)
|
||||||
|
|
||||||
with patch("builtins.open", mock_open()) as m:
|
with patch("builtins.open", mock_open()) as m:
|
||||||
self._trigger()
|
self._trigger()
|
||||||
@@ -304,6 +304,18 @@ class MetadataTest(BasePostprocessorTest):
|
|||||||
path = self.pathfmt.realdirectory + "metadata/file.json"
|
path = self.pathfmt.realdirectory + "metadata/file.json"
|
||||||
m.assert_called_once_with(path, "w", encoding="utf-8")
|
m.assert_called_once_with(path, "w", encoding="utf-8")
|
||||||
|
|
||||||
|
def test_metadata_filename(self):
|
||||||
|
self._create({
|
||||||
|
"filename" : "{category}_{filename}_meta.data",
|
||||||
|
"extension-format": "json",
|
||||||
|
})
|
||||||
|
|
||||||
|
with patch("builtins.open", mock_open()) as m:
|
||||||
|
self._trigger()
|
||||||
|
|
||||||
|
path = self.pathfmt.realdirectory + "test_file_meta.data"
|
||||||
|
m.assert_called_once_with(path, "w", encoding="utf-8")
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _output(mock):
|
def _output(mock):
|
||||||
return "".join(
|
return "".join(
|
||||||
|
|||||||
Reference in New Issue
Block a user