[pp:exec] implement 'commands' option

to run multiple commands in succession
and stopping if one fails
This commit is contained in:
Mike Fährmann
2025-06-13 20:19:01 +02:00
parent 9d3cf67f3e
commit fa1fc39a36
3 changed files with 93 additions and 16 deletions

View File

@@ -6717,6 +6717,25 @@ Description
and ``{_filename}``.
exec.commands
-------------
Type
``list`` of `commands <exec.command_>`__
Example
.. code:: json
[
["echo", "{user[account]}", "{id}"]
["magick", "convert" "{_path}", "\fF {_path.rpartition('.')[0]}.png"],
"rm {}",
]
Description
Multiple `commands <exec.command_>`__ to run in succession.
All `commands <exec.command_>`__ after the first returning with a non-zero
exit status will not be run.
exec.event
----------
Type

View File

@@ -11,7 +11,6 @@
from .common import PostProcessor
from .. import util, formatter
import os
import re
if util.WINDOWS:
@@ -26,17 +25,14 @@ class ExecPP(PostProcessor):
def __init__(self, job, options):
PostProcessor.__init__(self, job)
if options.get("async", False):
self._exec = self._exec_async
args = options["command"]
if isinstance(args, str):
self.args = args
self._sub = re.compile(r"\{(_directory|_filename|_path|)\}").sub
execute = self.exec_string
cmds = options.get("commands")
if cmds:
self.cmds = [self._prepare_cmd(c) for c in cmds]
execute = self.exec_many
else:
self.args = [formatter.parse(arg) for arg in args]
execute = self.exec_list
execute, self.args = self._prepare_cmd(options["command"])
if options.get("async", False):
self._exec = self._exec_async
events = options.get("event")
if events is None:
@@ -47,6 +43,13 @@ class ExecPP(PostProcessor):
self._init_archive(job, options)
def _prepare_cmd(self, cmd):
if isinstance(cmd, str):
self._sub = util.re(r"\{(_directory|_filename|_path|)\}").sub
return self.exec_string, cmd
else:
return self.exec_list, [formatter.parse(arg) for arg in cmd]
def exec_list(self, pathfmt):
archive = self.archive
kwdict = pathfmt.kwdict
@@ -60,10 +63,11 @@ class ExecPP(PostProcessor):
args = [arg.format_map(kwdict) for arg in self.args]
args[0] = os.path.expanduser(args[0])
self._exec(args, False)
retcode = self._exec(args, False)
if archive:
archive.add(kwdict)
return retcode
def exec_string(self, pathfmt):
archive = self.archive
@@ -72,10 +76,31 @@ class ExecPP(PostProcessor):
self.pathfmt = pathfmt
args = self._sub(self._replace, self.args)
self._exec(args, True)
retcode = self._exec(args, True)
if archive:
archive.add(pathfmt.kwdict)
return retcode
def exec_many(self, pathfmt):
archive = self.archive
if archive:
if archive.check(pathfmt.kwdict):
return
self.archive = False
retcode = 0
for execute, args in self.cmds:
self.args = args
retcode = execute(pathfmt)
if retcode:
# non-zero exit status
break
if archive:
self.archive = archive
archive.add(pathfmt.kwdict)
return retcode
def _exec(self, args, shell):
self.log.debug("Running '%s'", args)
@@ -83,6 +108,7 @@ class ExecPP(PostProcessor):
if retcode:
self.log.warning("'%s' returned with non-zero exit status (%d)",
args, retcode)
return retcode
def _exec_async(self, args, shell):
self.log.debug("Running '%s'", args)

View File

@@ -10,7 +10,7 @@
import os
import sys
import unittest
from unittest.mock import Mock, mock_open, patch
from unittest.mock import Mock, mock_open, patch, call
import shutil
import logging
@@ -233,6 +233,38 @@ class ExecTest(BasePostprocessorTest):
shell=False,
)
def test_command_many(self):
self._create({
"commands": [
"echo {} {_path} {_directory} {_filename} && rm {};",
["~/script.sh", "{category}", "\fE _directory.upper()"],
]
})
with patch("gallery_dl.util.Popen") as p:
i = Mock()
i.wait.return_value = 0
p.return_value = i
self._trigger(("after",))
self.assertEqual(p.call_args_list, [
call(
"echo {0} {0} {1} {2} && rm {0};".format(
self.pathfmt.realpath,
self.pathfmt.realdirectory,
self.pathfmt.filename),
shell=True,
),
call(
[
os.path.expanduser("~/script.sh"),
self.pathfmt.kwdict["category"],
self.pathfmt.realdirectory.upper(),
],
shell=False,
),
])
def test_command_returncode(self):
self._create({
"command": "echo {}",
@@ -944,8 +976,8 @@ class ZipTest(BasePostprocessorTest):
self._trigger(("finalize",))
self.assertEqual(pp.zfile.write.call_count, 3)
for call in pp.zfile.write.call_args_list:
args, kwargs = call
for call_args in pp.zfile.write.call_args_list:
args, kwargs = call_args
self.assertEqual(len(args), 2)
self.assertEqual(len(kwargs), 0)
self.assertEqual(args[0], self.pathfmt.temppath)