GitBucket
4.21.2
Toggle navigation
Snippets
Sign in
Files
Branches
2
Releases
1
Issues
Pull requests
Labels
Priorities
Milestones
Wiki
Forks
nigel.stanger
/
process_podcast
Browse code
Updated superclass initialisation for Python 3 (ref #32).
master
1 parent
14850c5
commit
1819c4fcbb6d0870996fef69012ed2d26dcabb2b
Nigel Stanger
authored
on 23 Sep 2018
Patch
Showing
2 changed files
segment.py
shell_command.py
Ignore Space
Show notes
View
segment.py
#!/usr/bin/env python3 from collections import OrderedDict from datetime import timedelta import errno import itertools import logging import os from pathlib import Path import globals from shell_command import (ConvertCommand, FFprobeCommand, FFmpegCommand) class SegmentError(Exception): pass class Segment(object): """A segment within the podcast. A segment has an input file, and a punch-in and punch-out point (both in seconds). """ # Automatic segment number generator. _new_segment_num = itertools.count() # Keep track of input files in the order they're loaded, so that we # can easily reference them by index in the ffmpeg command (i.e., # first input file is, 0, etc.). _input_files = OrderedDict() # A string representing the type of the segment (e.g., "audio"). # This is handy for generating temporary files and means that we # can implement this as a single method in the root class. _TYPE = "" # Which trim and setpts filters to use in ffmpeg complex filters. # As above, this lets us implement a single method in the root class. _TRIM = "" _SETPTS = "" @staticmethod def input_files(): return Segment._input_files @staticmethod def _rename_input_file(old, new): tmp = OrderedDict() for f in Segment._input_files: if (f == old): tmp[new] = Segment._input_files[f] else: tmp[f] = Segment._input_files[f] Segment._input_files = tmp def __init__(self, file="", punch_in=timedelta(), punch_out=timedelta(), input_stream=0): self.segment_number = next(self.__class__._new_segment_num) self.input_file = file self.punch_in = punch_in self.punch_out = punch_out self.input_stream = input_stream self._temp_file = "" self._temp_suffix = "mov" # List of temporary files to delete when cleaning up. self._temp_files_list = [] if (file not in self.__class__._input_files): self.__class__._input_files[file] = None self._input_options = ["-ss", str(self.punch_in.total_seconds()), "-t", str(self.get_duration()), "-i", self.input_file] self._output_options = [] def __repr__(self): return('<{c} {n}: file "{f}", in {i}, out ' '{o}>'.format(c=self.__class__.__name__, n=self.segment_number, t=self._TYPE, f=self.input_file, i=self.punch_in, o=self.punch_out)) def get_duration(self): """Return the duration of the segment in seconds.""" return (self.punch_out - self.punch_in).total_seconds() def generate_temp_filename(self, output, suffix=None): """Generate a temporary filename for the segment.""" if not suffix: suffix = self._temp_suffix return Path("temp_{t}_{o}_{n:03d}".format( t=self._TYPE, o=Path(output).stem, n=self.segment_number)).with_suffix(suffix) def generate_temp_file(self, output, width=0, height=0): """Compile the segment from the original source file(s).""" # Note: width and height are currently ignored for segments # in general. This may (or may not) need to change if there # are multiple inputs of different dimensions. fn = "generate_temp_file" self._temp_file = self.generate_temp_filename(output) command = FFmpegCommand( input_options=self._input_options + ["-codec", "copy"], output_options=self._output_options + [self._temp_file]) globals.log.debug("{cls}.{fn}(): {cmd}".format( cls=self.__class__.__name__, fn=fn, cmd=command)) if (command.run() == 0): self._temp_files_list.append(self._temp_file) return self._temp_file else: raise SegmentError( "Failed to generate temporary file {f} for " "{s}".format(f=self._temp_file, s=self)) def temp_file(self): """Return the temporary file associated with the segment.""" return self._temp_file def delete_temp_files(self): """Delete the temporary file(s) associated with the segment.""" # Note: sometimes segments (especially frame segments) may # share the same temporary file. Just ignore the file not # found exception that occurs in these cases. for f in self._temp_files_list: try: os.remove(f) except OSError as e: if (e.errno != errno.ENOENT): raise e def input_stream_specifier(self): """Return the segment's ffmpeg stream input specifier.""" return "[{n}:{t}]".format( n=self.__class__._input_files.keys().index(self.input_file), t=self._TYPE[0] if self._TYPE else "") def output_stream_specifier(self): """Return the segment's ffmpeg audio stream output specifier.""" return "[{t}{n}]".format(t=self._TYPE[0] if self._TYPE else "", n=self.segment_number) def trim_filter(self): """Return an FFMPEG trim filter for this segment.""" return ("{inspec} " "{trim}=start={pi}:duration={po},{setpts}=PTS-STARTPTS " "{outspec}".format( inspec=self.input_stream_specifier(), trim=self._TRIM, setpts=self._SETPTS, pi=self.punch_in.total_seconds(), po=self.get_duration(), outspec=self.output_stream_specifier())) class AudioSegment(Segment): """A segment of an audio input stream.""" _TYPE = "audio" _TRIM = "atrim" _SETPTS = "asetpts" def __init__(self, file="", punch_in=timedelta(), punch_out=timedelta(), input_stream=0): super().__init__(file, punch_in, punch_out, input_stream) self._temp_suffix = "wav" self._output_options = ["-ac", "1", "-map", "{n}:a".format(n=self.input_stream)] class VideoSegment(Segment): """A segment of a video input stream.""" _TYPE = "video" _TRIM = "trim" _SETPTS = "setpts" def __init__(self, file="", punch_in=timedelta(), punch_out=timedelta(), input_stream=0): super().__init__(file, punch_in, punch_out, input_stream) self._output_options = ["-map", "{n}:v".format(n=self.input_stream)] self._temp_frame_file = "" def get_last_frame_number(self): """Calculate frame number of segment's last frame using ffprobe.""" fn = "get_last_frame_number" if (self._temp_file): self._temp_frame_file = "__{f}".format(f=self._temp_file) # To speed things up, grab up to the last 5 seconds of the # segment's temporary file, as we otherwise have to scan the # entire temporary file to find the last frame, which can # take a while. command = FFmpegCommand( input_options=["-ss", str(max(self.get_duration() - 5, 0)), "-i", self._temp_file], output_options=["-codec:v", "copy", "-map", "0:v", self._temp_frame_file]) globals.log.debug("{cls}.{fn}(): {cmd}".format( cls=self.__class__.__name__, fn=fn, cmd=command)) if (command.run() == 0): self._temp_files_list.append(self._temp_frame_file) command = FFprobeCommand([self._temp_frame_file]) globals.log.debug("{cls}.{fn}(): {cmd}".format( cls=self.__class__.__name__, fn=fn, cmd=command)) return int(command.get_entries( section="stream", find_list=["nb_frames"])[0]) - 1 else: raise SegmentError( "Failed to generate temporary file to get last frame " "number for {s}".format(s=self)) else: raise SegmentError( "Can't get last frame of {s} because it has no temporary " "file".format(s=self)) def generate_frame(self, frame_number, output, width=2048, height=1536): """Create a JPEG file from the specified frame of the segment.""" # Note: width and height are currently ignored for video # segments. This will need to change if there are multiple # video inputs of different dimensions. fn = "generate_frame" temp_frame = self.generate_temp_filename(output, suffix="jpg") if (frame_number == -1): frame_number = self.get_last_frame_number() command = FFmpegCommand( input_options=["-i", self._temp_frame_file], output_options=["-filter:v", "select='eq(n, {n})'".format(n=frame_number), "-frames:v", "1", "-f", "image2", "-map", "0:v", temp_frame]) globals.log.debug("{cls}.{fn}(): {cmd}".format( cls=self.__class__.__name__, fn=fn, cmd=command)) if (command.run() == 0): self._temp_files_list.append(temp_frame) return temp_frame else: raise SegmentError( "Failed to create JPEG for frame {n} of " "{s}".format(n=frame_number, s=self)) class FrameSegment(VideoSegment): """A video segment derived from a single still frame.""" _TYPE = "frame" def __init__(self, file="", punch_in=timedelta(), punch_out=timedelta(), input_stream=0, frame_number=0): super().__init__(file, punch_in, punch_out, input_stream) self.frame_number = frame_number self._input_options = ["-loop", "1", "-t", str(self.get_duration()), "-i", self.input_file] self.__class__._input_files[file] = self._input_options[:4] def __repr__(self): return('<{c} {n}: file "{f}", in {i}, out {o}, frame number ' '{fn}>'.format(c=self.__class__.__name__, n=self.segment_number, t=self._TYPE, f=self.input_file, i=self.punch_in, o=self.punch_out, fn=self.frame_number)) def generate_temp_file(self, output, width=2048, height=1536): """Compile the segment from the original source file(s).""" fn = "generate_temp_file" self._temp_file = self.generate_temp_filename(output, suffix="jpg") command = ConvertCommand( input_options=["{f}[{n}]".format( f=self.input_file, n=self.frame_number)], output_options=["{f}".format(f=self._temp_file)], width=width, height=height) globals.log.debug("{cls}.{fn}(): {cmd}".format( cls=self.__class__.__name__, fn=fn, cmd=command)) if (command.run() == 0): self._temp_files_list.append(self._temp_file) return self._temp_file else: raise SegmentError( "Failed to generate temporary file {f} for " "{s}".format(f=self._temp_file, s=self)) def use_frame(self, frame): """Set the image to use for generating the frame video.""" self.__class__._rename_input_file(self.input_file, frame) self.input_file = frame self._input_options = ["-loop", "1", "-t", str(self.get_duration()), "-i", self.input_file] self.__class__._input_files[frame] = self._input_options[:4] def input_stream_specifier(self): """Return the segment's ffmpeg stream input specifier.""" return "[{n}:v]".format( n=self.__class__._input_files.keys().index(self.input_file)) def output_stream_specifier(self): """Return the segment's ffmpeg audio stream output specifier.""" return self.input_stream_specifier() def trim_filter(self): """Return an FFMPEG trim filter for this segment.""" return "" if (__name__ == "__main__"): pass
#!/usr/bin/env python3 from collections import OrderedDict from datetime import timedelta import errno import itertools import logging import os from pathlib import Path import globals from shell_command import (ConvertCommand, FFprobeCommand, FFmpegCommand) class SegmentError(Exception): pass class Segment(object): """A segment within the podcast. A segment has an input file, and a punch-in and punch-out point (both in seconds). """ # Automatic segment number generator. _new_segment_num = itertools.count() # Keep track of input files in the order they're loaded, so that we # can easily reference them by index in the ffmpeg command (i.e., # first input file is, 0, etc.). _input_files = OrderedDict() # A string representing the type of the segment (e.g., "audio"). # This is handy for generating temporary files and means that we # can implement this as a single method in the root class. _TYPE = "" # Which trim and setpts filters to use in ffmpeg complex filters. # As above, this lets us implement a single method in the root class. _TRIM = "" _SETPTS = "" @staticmethod def input_files(): return Segment._input_files @staticmethod def _rename_input_file(old, new): tmp = OrderedDict() for f in Segment._input_files: if (f == old): tmp[new] = Segment._input_files[f] else: tmp[f] = Segment._input_files[f] Segment._input_files = tmp def __init__(self, file="", punch_in=timedelta(), punch_out=timedelta(), input_stream=0): self.segment_number = next(self.__class__._new_segment_num) self.input_file = file self.punch_in = punch_in self.punch_out = punch_out self.input_stream = input_stream self._temp_file = "" self._temp_suffix = "mov" # List of temporary files to delete when cleaning up. self._temp_files_list = [] if (file not in self.__class__._input_files): self.__class__._input_files[file] = None self._input_options = ["-ss", str(self.punch_in.total_seconds()), "-t", str(self.get_duration()), "-i", self.input_file] self._output_options = [] def __repr__(self): return('<{c} {n}: file "{f}", in {i}, out ' '{o}>'.format(c=self.__class__.__name__, n=self.segment_number, t=self._TYPE, f=self.input_file, i=self.punch_in, o=self.punch_out)) def get_duration(self): """Return the duration of the segment in seconds.""" return (self.punch_out - self.punch_in).total_seconds() def generate_temp_filename(self, output, suffix=None): """Generate a temporary filename for the segment.""" if not suffix: suffix = self._temp_suffix return Path("temp_{t}_{o}_{n:03d}".format( t=self._TYPE, o=Path(output).stem, n=self.segment_number)).with_suffix(suffix) def generate_temp_file(self, output, width=0, height=0): """Compile the segment from the original source file(s).""" # Note: width and height are currently ignored for segments # in general. This may (or may not) need to change if there # are multiple inputs of different dimensions. fn = "generate_temp_file" self._temp_file = self.generate_temp_filename(output) command = FFmpegCommand( input_options=self._input_options + ["-codec", "copy"], output_options=self._output_options + [self._temp_file]) globals.log.debug("{cls}.{fn}(): {cmd}".format( cls=self.__class__.__name__, fn=fn, cmd=command)) if (command.run() == 0): self._temp_files_list.append(self._temp_file) return self._temp_file else: raise SegmentError( "Failed to generate temporary file {f} for " "{s}".format(f=self._temp_file, s=self)) def temp_file(self): """Return the temporary file associated with the segment.""" return self._temp_file def delete_temp_files(self): """Delete the temporary file(s) associated with the segment.""" # Note: sometimes segments (especially frame segments) may # share the same temporary file. Just ignore the file not # found exception that occurs in these cases. for f in self._temp_files_list: try: os.remove(f) except OSError as e: if (e.errno != errno.ENOENT): raise e def input_stream_specifier(self): """Return the segment's ffmpeg stream input specifier.""" return "[{n}:{t}]".format( n=self.__class__._input_files.keys().index(self.input_file), t=self._TYPE[0] if self._TYPE else "") def output_stream_specifier(self): """Return the segment's ffmpeg audio stream output specifier.""" return "[{t}{n}]".format(t=self._TYPE[0] if self._TYPE else "", n=self.segment_number) def trim_filter(self): """Return an FFMPEG trim filter for this segment.""" return ("{inspec} " "{trim}=start={pi}:duration={po},{setpts}=PTS-STARTPTS " "{outspec}".format( inspec=self.input_stream_specifier(), trim=self._TRIM, setpts=self._SETPTS, pi=self.punch_in.total_seconds(), po=self.get_duration(), outspec=self.output_stream_specifier())) class AudioSegment(Segment): """A segment of an audio input stream.""" _TYPE = "audio" _TRIM = "atrim" _SETPTS = "asetpts" def __init__(self, file="", punch_in=timedelta(), punch_out=timedelta(), input_stream=0): super(AudioSegment, self).__init__(file, punch_in, punch_out, input_stream) self._temp_suffix = "wav" self._output_options = ["-ac", "1", "-map", "{n}:a".format(n=self.input_stream)] class VideoSegment(Segment): """A segment of a video input stream.""" _TYPE = "video" _TRIM = "trim" _SETPTS = "setpts" def __init__(self, file="", punch_in=timedelta(), punch_out=timedelta(), input_stream=0): super(VideoSegment, self).__init__(file, punch_in, punch_out, input_stream) self._output_options = ["-map", "{n}:v".format(n=self.input_stream)] self._temp_frame_file = "" def get_last_frame_number(self): """Calculate frame number of segment's last frame using ffprobe.""" fn = "get_last_frame_number" if (self._temp_file): self._temp_frame_file = "__{f}".format(f=self._temp_file) # To speed things up, grab up to the last 5 seconds of the # segment's temporary file, as we otherwise have to scan the # entire temporary file to find the last frame, which can # take a while. command = FFmpegCommand( input_options=["-ss", str(max(self.get_duration() - 5, 0)), "-i", self._temp_file], output_options=["-codec:v", "copy", "-map", "0:v", self._temp_frame_file]) globals.log.debug("{cls}.{fn}(): {cmd}".format( cls=self.__class__.__name__, fn=fn, cmd=command)) if (command.run() == 0): self._temp_files_list.append(self._temp_frame_file) command = FFprobeCommand([self._temp_frame_file]) globals.log.debug("{cls}.{fn}(): {cmd}".format( cls=self.__class__.__name__, fn=fn, cmd=command)) return int(command.get_entries( section="stream", find_list=["nb_frames"])[0]) - 1 else: raise SegmentError( "Failed to generate temporary file to get last frame " "number for {s}".format(s=self)) else: raise SegmentError( "Can't get last frame of {s} because it has no temporary " "file".format(s=self)) def generate_frame(self, frame_number, output, width=2048, height=1536): """Create a JPEG file from the specified frame of the segment.""" # Note: width and height are currently ignored for video # segments. This will need to change if there are multiple # video inputs of different dimensions. fn = "generate_frame" temp_frame = self.generate_temp_filename(output, suffix="jpg") if (frame_number == -1): frame_number = self.get_last_frame_number() command = FFmpegCommand( input_options=["-i", self._temp_frame_file], output_options=["-filter:v", "select='eq(n, {n})'".format(n=frame_number), "-frames:v", "1", "-f", "image2", "-map", "0:v", temp_frame]) globals.log.debug("{cls}.{fn}(): {cmd}".format( cls=self.__class__.__name__, fn=fn, cmd=command)) if (command.run() == 0): self._temp_files_list.append(temp_frame) return temp_frame else: raise SegmentError( "Failed to create JPEG for frame {n} of " "{s}".format(n=frame_number, s=self)) class FrameSegment(VideoSegment): """A video segment derived from a single still frame.""" _TYPE = "frame" def __init__(self, file="", punch_in=timedelta(), punch_out=timedelta(), input_stream=0, frame_number=0): super(FrameSegment, self).__init__(file, punch_in, punch_out, input_stream) self.frame_number = frame_number self._input_options = ["-loop", "1", "-t", str(self.get_duration()), "-i", self.input_file] self.__class__._input_files[file] = self._input_options[:4] def __repr__(self): return('<{c} {n}: file "{f}", in {i}, out {o}, frame number ' '{fn}>'.format(c=self.__class__.__name__, n=self.segment_number, t=self._TYPE, f=self.input_file, i=self.punch_in, o=self.punch_out, fn=self.frame_number)) def generate_temp_file(self, output, width=2048, height=1536): """Compile the segment from the original source file(s).""" fn = "generate_temp_file" self._temp_file = self.generate_temp_filename(output, suffix="jpg") command = ConvertCommand( input_options=["{f}[{n}]".format( f=self.input_file, n=self.frame_number)], output_options=["{f}".format(f=self._temp_file)], width=width, height=height) globals.log.debug("{cls}.{fn}(): {cmd}".format( cls=self.__class__.__name__, fn=fn, cmd=command)) if (command.run() == 0): self._temp_files_list.append(self._temp_file) return self._temp_file else: raise SegmentError( "Failed to generate temporary file {f} for " "{s}".format(f=self._temp_file, s=self)) def use_frame(self, frame): """Set the image to use for generating the frame video.""" self.__class__._rename_input_file(self.input_file, frame) self.input_file = frame self._input_options = ["-loop", "1", "-t", str(self.get_duration()), "-i", self.input_file] self.__class__._input_files[frame] = self._input_options[:4] def input_stream_specifier(self): """Return the segment's ffmpeg stream input specifier.""" return "[{n}:v]".format( n=self.__class__._input_files.keys().index(self.input_file)) def output_stream_specifier(self): """Return the segment's ffmpeg audio stream output specifier.""" return self.input_stream_specifier() def trim_filter(self): """Return an FFMPEG trim filter for this segment.""" return "" if (__name__ == "__main__"): pass
Ignore Space
Show notes
View
shell_command.py
#!/usr/bin/env python3 import datetime import json import tempfile from pathlib import Path import re import shutil import pexpect from progress_bar import (ProgressBar) class ShellCommand(object): """A shell command. _executable contains the full path to the relevant executable. _base_options is a list of standard general options for this command. """ _executable = "" _base_options = [] _expect_patterns = [] @staticmethod def shellquote(s): """Quote a string so it can be safely pasted into the shell.""" # Note: pipes/shlex.quote() only wraps '' around things, # it doesn't do things like \( \), which we also need. # We double-quote everything because that makes it easier # to deal with single quotes. if s: regexes = [ # grouping parentheses for convert re.compile(r"^(\(|\))$"), # double quote (in middle of string) re.compile(r'^(.+".*)$'), # quotes around entire string re.compile(r"""^((?P<quote>['"]).*(?P=quote))$"""), # command line switch (starts with one or two "-") re.compile(r"^(--?.+)$"), # command path (starts with "/") re.compile(r"^(/[\w/]+)$")] substitutions = [r"\\\1", r"'\1'", r"\1", r"\1", r"\1"] num_matches = 0 for subst in zip(regexes, substitutions): (s, n) = subst[0].subn(subst[1], s) num_matches += n if num_matches == 0: # catch-all for non-word characters when none of the other # substitutions apply s = re.compile(r"^(.*\W+.*)$").sub(r'"\1"', s) return s def __init__(self, input_options=[], output_options=[], quiet=False): self.input_options = input_options self.output_options = output_options self.progress = None self.process = None def __repr__(self): return "<{cls}: {cmd}>".format( cls=self.__class__.__name__, cmd=self.command_string(quote=True)) def append_input_options(self, items=[]): """Add a list of items to the end of the input options.""" self.input_options += items def prepend_input_options(self, items=[]): """Add a list of items at the front of the input options.""" self.input_options = items + self.input_options def append_output_options(self, items=[]): """Add a list of items to the end of the output options.""" self.output_options += items def prepend_output_options(self, items=[]): """Add a list of items at the front of the output options.""" self.output_options = items + self.output_options def executable_string(self, quote=False): """Return the executable as a string.""" if quote: return ShellCommand.shellquote(self._executable) else: return self._executable def argument_string(self, quote=False): """Return the list of arguments as a string.""" args = self._base_options + self.input_options + self.output_options if quote: return " ".join([ShellCommand.shellquote(a) for a in args]) else: return " ".join(args) def argument_list(self): """Return a combined list of all arguments.""" return self._base_options + self.input_options + self.output_options def command_string(self, quote=False): """Return the entire command as a string.""" return "{exe} {arg}".format(exe=self.executable_string(quote), arg=self.argument_string(quote)) def process_pattern(self, pat): """Respond to a pexpect pattern. Return True on EOF.""" return (pat == 0) def run(self): """Execute the command in a subprocess.""" self.process = pexpect.spawn(self.executable_string(), self.argument_list()) # EOF is *always* the first pattern. patterns = self.process.compile_pattern_list( [pexpect.EOF] + self._expect_patterns) try: while True: i = self.process.expect_list(patterns, timeout=None) if self.process_pattern(i): break except (KeyboardInterrupt): pass finally: if self.progress: self.progress.finish() self.process.close() return self.process.exitstatus def get_output(self): """Execute the command in a subprocess and return the output.""" return pexpect.run(self.command_string(quote=True)) class ConvertCommand(ShellCommand): """An ImageMagick convert command.""" _executable = shutil.which("convert") _base_options = ["-density", "600", "xc:dimgrey", "null:", # dark grey background "("] def __init__(self, input_options=[], output_options=[], width=2048, height=1536): super().__init__(input_options, output_options) self._base_options = (["-size", "{w}x{h}".format(w=width, h=height)] + self._base_options) self.append_input_options( ["-resize", "{w}x{h}".format(w=width, h=height), "-background", "white", "-alpha", "remove", "-type", "truecolor", # force RGB (this and next line) "-define", "colorspace:auto-grayscale=off"]) self.prepend_output_options([")", "-gravity", "center", "-layers", "composite", "-flatten"]) class FFprobeCommand(ShellCommand): """An ffprobe shell command.""" _executable = shutil.which("ffprobe") _base_options = ["-loglevel", "error", "-show_entries", "format:stream", "-print_format", "json"] def __init__(self, input_options=[], output_options=[]): super().__init__(input_options, output_options) self.entries = None # The input file should be the last input option. assert(Path(self.input_options[-1]).exists()) self.last_modified = Path(self.input_options[-1]).stat().st_mtime def get_entries(self, section="stream", find_list=[]): """Fetch specified attributes from the input file.""" # Re-fetch if the file's changed since we last looked. modified = Path(self.input_options[-1]).stat().st_mtime if (not self.entries) or (modified > self.last_modified): js = json.loads(self.get_output()) self.entries = {"format": js["format"], "stream": js["streams"][0]} return [self.entries[section][f] for f in find_list] class FFmpegCommand(ShellCommand): """A "simple" ffmpeg shell command.""" _executable = shutil.which("ffmpeg") _base_options = ["-y", "-nostdin"] class FFmpegConcatCommand(FFmpegCommand): """An ffmpeg shell command with a complex concat filter.""" _expect_patterns = [r"time=(\d\d):(\d\d):(\d\d\.\d\d)"] def __init__(self, input_options=[], output_options=[], quiet=False, max_progress=100, has_audio=False, has_video=False, process_audio=True, process_video=True, audio_codec="pcm_s16le", video_codec="h264"): super().__init__(input_options, output_options) self.progress = ProgressBar(max_value=max_progress, quiet=quiet) self.has_video = has_video self.process_video = process_video self.video_codec = video_codec if (self.has_video): self.prepend_output_options(["-map", "[vconc]"]) if (self.process_video): self.prepend_output_options(["-pix_fmt", "yuv420p"]) self.prepend_output_options(["-codec:v", self.video_codec]) self.has_audio = has_audio self.process_audio = process_audio self.audio_codec = audio_codec if (self.has_audio): if (self.process_audio): self.prepend_output_options(["-ac", "1", "-map", "[anorm]"]) else: self.prepend_output_options(["-map", "[aconc]"]) self.prepend_output_options(["-codec:a", self.audio_codec]) self.filters = [] def append_filter(self, filter): """Append a filter to the filters list.""" if (filter): self.filters.append(filter) def append_normalisation_filter(self): """Append a normalisation audio filter to the complex filter.""" if (self.has_audio): self.append_filter("[aconc] dynaudnorm=r=0.25:f=10:b=y [anorm]") def append_concat_filter(self, frame_type, segments=[]): """Append a concat filter to the filters list""" # Ignore frame type. if frame_type in ["a", "v"]: if (len(segments) > 1): self.append_filter( "{inspecs} concat=n={n}:v={v}:a={a} [{t}conc]".format( inspecs=" ".join([s.output_stream_specifier() for s in segments]), n=len(segments), v=int(frame_type == "v"), a=int(frame_type == "a"), t=frame_type)) elif (len(segments) == 1): self.append_filter( "{inspec} {a}null [{t}conc]".format( inspec=segments[0].output_stream_specifier(), a=frame_type if frame_type == "a" else "", t=frame_type)) def build_complex_filter(self): """Build the complete complex filter. Filters in the filtergraph are separated by ";". """ return "{f}".format(f=";".join(self.filters)) def argument_string(self, quote=False): """Return the list of arguments as a string.""" args = (self._base_options + self.input_options + ["-filter_complex", self.build_complex_filter()] + self.output_options) if quote: return " ".join([ShellCommand.shellquote(a) for a in args]) else: return " ".join(args) def argument_list(self): """Return a combined list of all arguments.""" return (self._base_options + self.input_options + ["-filter_complex", self.build_complex_filter()] + self.output_options) def process_pattern(self, pat): """Respond to a pexpect pattern. Return True on EOF.""" if (pat == 1): elapsed = datetime.timedelta( hours=int(self.process.match.group(1)), minutes=int(self.process.match.group(2)), seconds=float(self.process.match.group(3))) self.progress.update(elapsed.total_seconds()) return (pat == 0) if (__name__ == "__main__"): print(ShellCommand()) print(ConvertCommand(input_options=["in.pdf[12]"], output_options=["out.png"])) # FFprobeCommand expects the input file to exist. f = tempfile.NamedTemporaryFile() print(FFprobeCommand(input_options=["-i", f.name])) f.close() print(FFmpegCommand(input_options=["-i", "in.mov"], output_options=["out.mov"])) concat = FFmpegConcatCommand(input_options=["-i", "in.mov"], output_options=["out.mov"], has_audio=True) concat.append_normalisation_filter() print(concat) print("Quoting:") for s in ["foobar", "foo bar baz", "foo(bar)", "[foobar]", "foo[bar", "foo 'bar'", '"foobar"', "'foobar'", 'foo"bar', "foo.bar", "(", ")", "'", "/foo/bar/baz/quux", "-f", "--foobar"]: print(" {s} => echo {sub}".format(s=s, sub=ShellCommand.shellquote(s)))
#!/usr/bin/env python3 import datetime import json import tempfile from pathlib import Path import re import shutil import pexpect from progress_bar import (ProgressBar) class ShellCommand(object): """A shell command. _executable contains the full path to the relevant executable. _base_options is a list of standard general options for this command. """ _executable = "" _base_options = [] _expect_patterns = [] @staticmethod def shellquote(s): """Quote a string so it can be safely pasted into the shell.""" # Note: pipes/shlex.quote() only wraps '' around things, # it doesn't do things like \( \), which we also need. # We double-quote everything because that makes it easier # to deal with single quotes. if s: regexes = [ # grouping parentheses for convert re.compile(r"^(\(|\))$"), # double quote (in middle of string) re.compile(r'^(.+".*)$'), # quotes around entire string re.compile(r"""^((?P<quote>['"]).*(?P=quote))$"""), # command line switch (starts with one or two "-") re.compile(r"^(--?.+)$"), # command path (starts with "/") re.compile(r"^(/[\w/]+)$")] substitutions = [r"\\\1", r"'\1'", r"\1", r"\1", r"\1"] num_matches = 0 for subst in zip(regexes, substitutions): (s, n) = subst[0].subn(subst[1], s) num_matches += n if num_matches == 0: # catch-all for non-word characters when none of the other # substitutions apply s = re.compile(r"^(.*\W+.*)$").sub(r'"\1"', s) return s def __init__(self, input_options=[], output_options=[], quiet=False): self.input_options = input_options self.output_options = output_options self.progress = None self.process = None def __repr__(self): return "<{cls}: {cmd}>".format( cls=self.__class__.__name__, cmd=self.command_string(quote=True)) def append_input_options(self, items=[]): """Add a list of items to the end of the input options.""" self.input_options += items def prepend_input_options(self, items=[]): """Add a list of items at the front of the input options.""" self.input_options = items + self.input_options def append_output_options(self, items=[]): """Add a list of items to the end of the output options.""" self.output_options += items def prepend_output_options(self, items=[]): """Add a list of items at the front of the output options.""" self.output_options = items + self.output_options def executable_string(self, quote=False): """Return the executable as a string.""" if quote: return ShellCommand.shellquote(self._executable) else: return self._executable def argument_string(self, quote=False): """Return the list of arguments as a string.""" args = self._base_options + self.input_options + self.output_options if quote: return " ".join([ShellCommand.shellquote(a) for a in args]) else: return " ".join(args) def argument_list(self): """Return a combined list of all arguments.""" return self._base_options + self.input_options + self.output_options def command_string(self, quote=False): """Return the entire command as a string.""" return "{exe} {arg}".format(exe=self.executable_string(quote), arg=self.argument_string(quote)) def process_pattern(self, pat): """Respond to a pexpect pattern. Return True on EOF.""" return (pat == 0) def run(self): """Execute the command in a subprocess.""" self.process = pexpect.spawn(self.executable_string(), self.argument_list()) # EOF is *always* the first pattern. patterns = self.process.compile_pattern_list( [pexpect.EOF] + self._expect_patterns) try: while True: i = self.process.expect_list(patterns, timeout=None) if self.process_pattern(i): break except (KeyboardInterrupt): pass finally: if self.progress: self.progress.finish() self.process.close() return self.process.exitstatus def get_output(self): """Execute the command in a subprocess and return the output.""" return pexpect.run(self.command_string(quote=True)) class ConvertCommand(ShellCommand): """An ImageMagick convert command.""" _executable = shutil.which("convert") _base_options = ["-density", "600", "xc:dimgrey", "null:", # dark grey background "("] def __init__(self, input_options=[], output_options=[], width=2048, height=1536): super(ConvertCommand, self).__init__(input_options, output_options) self._base_options = (["-size", "{w}x{h}".format(w=width, h=height)] + self._base_options) self.append_input_options( ["-resize", "{w}x{h}".format(w=width, h=height), "-background", "white", "-alpha", "remove", "-type", "truecolor", # force RGB (this and next line) "-define", "colorspace:auto-grayscale=off"]) self.prepend_output_options([")", "-gravity", "center", "-layers", "composite", "-flatten"]) class FFprobeCommand(ShellCommand): """An ffprobe shell command.""" _executable = shutil.which("ffprobe") _base_options = ["-loglevel", "error", "-show_entries", "format:stream", "-print_format", "json"] def __init__(self, input_options=[], output_options=[]): super(FFprobeCommand, self).__init__(input_options, output_options) self.entries = None # The input file should be the last input option. assert(Path(self.input_options[-1]).exists()) self.last_modified = Path(self.input_options[-1]).stat().st_mtime def get_entries(self, section="stream", find_list=[]): """Fetch specified attributes from the input file.""" # Re-fetch if the file's changed since we last looked. modified = Path(self.input_options[-1]).stat().st_mtime if (not self.entries) or (modified > self.last_modified): js = json.loads(self.get_output()) self.entries = {"format": js["format"], "stream": js["streams"][0]} return [self.entries[section][f] for f in find_list] class FFmpegCommand(ShellCommand): """A "simple" ffmpeg shell command.""" _executable = shutil.which("ffmpeg") _base_options = ["-y", "-nostdin"] class FFmpegConcatCommand(FFmpegCommand): """An ffmpeg shell command with a complex concat filter.""" _expect_patterns = [r"time=(\d\d):(\d\d):(\d\d\.\d\d)"] def __init__(self, input_options=[], output_options=[], quiet=False, max_progress=100, has_audio=False, has_video=False, process_audio=True, process_video=True, audio_codec="pcm_s16le", video_codec="h264"): super(FFmpegConcatCommand, self).__init__( input_options, output_options) self.progress = ProgressBar(max_value=max_progress, quiet=quiet) self.has_video = has_video self.process_video = process_video self.video_codec = video_codec if (self.has_video): self.prepend_output_options(["-map", "[vconc]"]) if (self.process_video): self.prepend_output_options(["-pix_fmt", "yuv420p"]) self.prepend_output_options(["-codec:v", self.video_codec]) self.has_audio = has_audio self.process_audio = process_audio self.audio_codec = audio_codec if (self.has_audio): if (self.process_audio): self.prepend_output_options(["-ac", "1", "-map", "[anorm]"]) else: self.prepend_output_options(["-map", "[aconc]"]) self.prepend_output_options(["-codec:a", self.audio_codec]) self.filters = [] def append_filter(self, filter): """Append a filter to the filters list.""" if (filter): self.filters.append(filter) def append_normalisation_filter(self): """Append a normalisation audio filter to the complex filter.""" if (self.has_audio): self.append_filter("[aconc] dynaudnorm=r=0.25:f=10:b=y [anorm]") def append_concat_filter(self, frame_type, segments=[]): """Append a concat filter to the filters list""" # Ignore frame type. if frame_type in ["a", "v"]: if (len(segments) > 1): self.append_filter( "{inspecs} concat=n={n}:v={v}:a={a} [{t}conc]".format( inspecs=" ".join([s.output_stream_specifier() for s in segments]), n=len(segments), v=int(frame_type == "v"), a=int(frame_type == "a"), t=frame_type)) elif (len(segments) == 1): self.append_filter( "{inspec} {a}null [{t}conc]".format( inspec=segments[0].output_stream_specifier(), a=frame_type if frame_type == "a" else "", t=frame_type)) def build_complex_filter(self): """Build the complete complex filter. Filters in the filtergraph are separated by ";". """ return "{f}".format(f=";".join(self.filters)) def argument_string(self, quote=False): """Return the list of arguments as a string.""" args = (self._base_options + self.input_options + ["-filter_complex", self.build_complex_filter()] + self.output_options) if quote: return " ".join([ShellCommand.shellquote(a) for a in args]) else: return " ".join(args) def argument_list(self): """Return a combined list of all arguments.""" return (self._base_options + self.input_options + ["-filter_complex", self.build_complex_filter()] + self.output_options) def process_pattern(self, pat): """Respond to a pexpect pattern. Return True on EOF.""" if (pat == 1): elapsed = datetime.timedelta( hours=int(self.process.match.group(1)), minutes=int(self.process.match.group(2)), seconds=float(self.process.match.group(3))) self.progress.update(elapsed.total_seconds()) return (pat == 0) if (__name__ == "__main__"): print(ShellCommand()) print(ConvertCommand(input_options=["in.pdf[12]"], output_options=["out.png"])) # FFprobeCommand expects the input file to exist. f = tempfile.NamedTemporaryFile() print(FFprobeCommand(input_options=["-i", f.name])) f.close() print(FFmpegCommand(input_options=["-i", "in.mov"], output_options=["out.mov"])) concat = FFmpegConcatCommand(input_options=["-i", "in.mov"], output_options=["out.mov"], has_audio=True) concat.append_normalisation_filter() print(concat) print("Quoting:") for s in ["foobar", "foo bar baz", "foo(bar)", "[foobar]", "foo[bar", "foo 'bar'", '"foobar"', "'foobar'", 'foo"bar', "foo.bar", "(", ")", "'", "/foo/bar/baz/quux", "-f", "--foobar"]: print(" {s} => echo {sub}".format(s=s, sub=ShellCommand.shellquote(s)))
Show line notes below