GitBucket
4.21.2
Toggle navigation
Snippets
Sign in
Files
Branches
2
Releases
1
Issues
Pull requests
Labels
Priorities
Milestones
Wiki
Forks
nigel.stanger
/
process_podcast
Browse code
Converted all print statements to functions for Python 3 (ref #32).
master
1 parent
8960251
commit
0b13583a74358a08e2cd8b02b33314d86d00fc4e
Nigel Stanger
authored
on 26 Aug 2018
Patch
Showing
3 changed files
config_parser.py
progress_bar.py
shell_command.py
Ignore Space
Show notes
View
config_parser.py
#!/usr/bin/env python import sys from pyparsing import * # pyparsing documentation: # https://sourceforge.net/p/pyparsing/code/HEAD/tree/trunk/src/HowToUsePyparsing.txt#l302 INPUTSPEC_DEFAULTS = {"type": None, "filename": None, "num": None} TIMESTAMP_DEFAULTS = {"hh": 0, "mm": 0, "ms": 0} # see http://stackoverflow.com/questions/11180622/optional-string-segment-in-pyparsing def default_input_fields(fields): """Set missing input specification values to defaults.""" set_defaults(fields, INPUTSPEC_DEFAULTS) def default_timestamp_fields(fields): """Set missing timestamp values to defaults.""" set_defaults(fields, TIMESTAMP_DEFAULTS) def set_defaults(fields, defaults): """Set missing field values to defaults.""" undefined = set(defaults.keys()) - set(fields.keys()) for k in undefined: v = defaults[k] # see http://pyparsing.wikispaces.com/share/view/71042464 fields[k] = v fields.append(v) def parser_bnf(): """Grammar for parsing podcast configuration files.""" at = Literal("@").suppress() caret = Literal("^") colon = Literal(":").suppress() left_bracket = Literal("[").suppress() period = Literal(".").suppress() right_bracket = Literal("]").suppress() # zero_index ::= [0-9]+ zero_index = Word(nums).setParseAction(lambda s, l, t: int(t[0])) # filename ::= [A-Za-z0-9][-A-Za-z0-9._ ]+ filename_first = Word(alphanums, exact=1) filename_rest = Word(alphanums + "-_/. ") filename = Combine(filename_first + Optional(filename_rest)) # millisecs ::= "." [0-9]+ millisecs = (Word(nums).setParseAction( lambda s, l, t: int(t[0][:3].ljust(3, "0"))) .setResultsName("ms")) # hours, minutes, seconds ::= zero_index hours = zero_index.setResultsName("hh") minutes = zero_index.setResultsName("mm") seconds = zero_index.setResultsName("ss") hours_minutes = hours + colon + minutes + colon | minutes + colon secs_millisecs = (seconds + Optional(period + millisecs) | period + millisecs) # timestamp ::= [[hours ":"] minutes ":"] seconds ["." millisecs] timestamp = Optional(hours_minutes) + secs_millisecs # duration_file ::= "@", filename # We need a separate item for a lonely duration file timestamp so # that we can attach a parse action just to the lonely case. Using # duration_file alone means the parse action is attached to all # instances of duration_file. duration_file = at + filename.setResultsName("filename") lonely_duration_file = at + filename.setResultsName("filename") # timespecs ::= timestamp [duration_file | {timestamp}] # If duration_file timestamp is lonely, prepend a zero timestamp. timespecs = Or( [lonely_duration_file.setParseAction( lambda s, l, t: [timestamp.parseString("00:00:00.000"), t]), Group(timestamp) + duration_file, OneOrMore(Group(timestamp.setParseAction(default_timestamp_fields)))]) # last_frame ::= "-1" | "last" last_frame = oneOf(["-1", "last"]).setParseAction(replaceWith(-1)) # frame_number ::= ":" (zero_index | last_frame) frame_number = colon - (zero_index | last_frame).setResultsName("num") # stream_number ::= ":" zero_index stream_number = colon - zero_index.setResultsName("num") # input_file ::= ":" [filename] input_file = colon - Optional(filename).setResultsName("filename") # previous_segment ::= ":" "^" previous_segment = colon - caret.setResultsName("filename") # frame_input_file ::= input_file | previous_segment frame_input_file = Or([input_file, previous_segment]) # av_trailer ::= input_file [stream_number] av_trailer = input_file + Optional(stream_number) # frame_type ::= "frame" | "f" frame_type = oneOf(["f", "frame"]).setParseAction(replaceWith("frame")) # frame_input ::= frame_type [frame_input_file [frame_number]] frame_input = (frame_type.setResultsName("type") + Optional(frame_input_file + Optional(frame_number))) # video_type ::= "video" | "v" video_type = oneOf(["v", "video"]).setParseAction(replaceWith("video")) # audio_type ::= "audio" | "a" audio_type = oneOf(["a", "audio"]).setParseAction(replaceWith("audio")) # av_input ::= (audio_type | video_type) [av_trailer] av_input = ((audio_type | video_type).setResultsName("type") + Optional(av_trailer)) # inputspec ::= "[" (av_input | frame_input) "]" inputspec = (left_bracket + delimitedList(av_input | frame_input, delim=":") .setParseAction(default_input_fields) - right_bracket) # segmentspec ::= inputspec [timespecs] segmentspec = Group(inputspec + Group(Optional(timespecs)).setResultsName("times")) # config ::= {segmentspec} config = ZeroOrMore(segmentspec) config.ignore(pythonStyleComment) return config def parse_configuration_file(config_file): """Parse a podcast configuration file.""" try: parser = parser_bnf() result = parser.parseFile(config_file, parseAll=True) except (ParseException, ParseSyntaxException) as e: print("ERROR: {m}".format(m=str(e))) sys.exit(1) return result def parse_configuration_string(config_string): """Parse a podcast configuration file.""" try: parser = parser_bnf() result = parser.parseString(config_string, parseAll=True) except (ParseException, ParseSyntaxException) as e: print("ERROR: {m}".format(m=str(e))) sys.exit(1) return result def test_parser(): tests = ["test/config1.txt", "test/config2.txt", "test/config3.txt", "test/config4.txt", "test/config5.txt"] for t in tests: print("==={f}===".format(f=t)) r = parse_configuration_file(t) for s in r: print(s) print(" type = {t}".format(t=s["type"])) print(" filename = '{f}'".format(f=s["filename"])) print(" num = {n}".format(n=s["num"])) print(" times = {t}".format(t=s["times"])) for i, t in enumerate(s["times"]): if (isinstance(t, str)): print(" punch out after duration of '{f}'".format(f=t)) if (isinstance(t, ParseResults)): if (i % 2 == 0): print(" punch in at: {hh:02d}:{mm:02d}:{ss:02d}.{ms:03d}".format(hh=t["hh"], mm=t["mm"], ss=t["ss"], ms=t["ms"])) else: print(" punch out at: {hh:02d}:{mm:02d}:{ss:02d}.{ms:03d}".format(hh=t["hh"], mm=t["mm"], ss=t["ss"], ms=t["ms"])) print() if (__name__ == "__main__"): test_parser()
#!/usr/bin/env python import sys from pyparsing import * # pyparsing documentation: # https://sourceforge.net/p/pyparsing/code/HEAD/tree/trunk/src/HowToUsePyparsing.txt#l302 INPUTSPEC_DEFAULTS = {"type": None, "filename": None, "num": None} TIMESTAMP_DEFAULTS = {"hh": 0, "mm": 0, "ms": 0} # see http://stackoverflow.com/questions/11180622/optional-string-segment-in-pyparsing def default_input_fields(fields): """Set missing input specification values to defaults.""" set_defaults(fields, INPUTSPEC_DEFAULTS) def default_timestamp_fields(fields): """Set missing timestamp values to defaults.""" set_defaults(fields, TIMESTAMP_DEFAULTS) def set_defaults(fields, defaults): """Set missing field values to defaults.""" undefined = set(defaults.keys()) - set(fields.keys()) for k in undefined: v = defaults[k] # see http://pyparsing.wikispaces.com/share/view/71042464 fields[k] = v fields.append(v) def parser_bnf(): """Grammar for parsing podcast configuration files.""" at = Literal("@").suppress() caret = Literal("^") colon = Literal(":").suppress() left_bracket = Literal("[").suppress() period = Literal(".").suppress() right_bracket = Literal("]").suppress() # zero_index ::= [0-9]+ zero_index = Word(nums).setParseAction(lambda s, l, t: int(t[0])) # filename ::= [A-Za-z0-9][-A-Za-z0-9._ ]+ filename_first = Word(alphanums, exact=1) filename_rest = Word(alphanums + "-_/. ") filename = Combine(filename_first + Optional(filename_rest)) # millisecs ::= "." [0-9]+ millisecs = (Word(nums).setParseAction( lambda s, l, t: int(t[0][:3].ljust(3, "0"))) .setResultsName("ms")) # hours, minutes, seconds ::= zero_index hours = zero_index.setResultsName("hh") minutes = zero_index.setResultsName("mm") seconds = zero_index.setResultsName("ss") hours_minutes = hours + colon + minutes + colon | minutes + colon secs_millisecs = (seconds + Optional(period + millisecs) | period + millisecs) # timestamp ::= [[hours ":"] minutes ":"] seconds ["." millisecs] timestamp = Optional(hours_minutes) + secs_millisecs # duration_file ::= "@", filename # We need a separate item for a lonely duration file timestamp so # that we can attach a parse action just to the lonely case. Using # duration_file alone means the parse action is attached to all # instances of duration_file. duration_file = at + filename.setResultsName("filename") lonely_duration_file = at + filename.setResultsName("filename") # timespecs ::= timestamp [duration_file | {timestamp}] # If duration_file timestamp is lonely, prepend a zero timestamp. timespecs = Or( [lonely_duration_file.setParseAction( lambda s, l, t: [timestamp.parseString("00:00:00.000"), t]), Group(timestamp) + duration_file, OneOrMore(Group(timestamp.setParseAction(default_timestamp_fields)))]) # last_frame ::= "-1" | "last" last_frame = oneOf(["-1", "last"]).setParseAction(replaceWith(-1)) # frame_number ::= ":" (zero_index | last_frame) frame_number = colon - (zero_index | last_frame).setResultsName("num") # stream_number ::= ":" zero_index stream_number = colon - zero_index.setResultsName("num") # input_file ::= ":" [filename] input_file = colon - Optional(filename).setResultsName("filename") # previous_segment ::= ":" "^" previous_segment = colon - caret.setResultsName("filename") # frame_input_file ::= input_file | previous_segment frame_input_file = Or([input_file, previous_segment]) # av_trailer ::= input_file [stream_number] av_trailer = input_file + Optional(stream_number) # frame_type ::= "frame" | "f" frame_type = oneOf(["f", "frame"]).setParseAction(replaceWith("frame")) # frame_input ::= frame_type [frame_input_file [frame_number]] frame_input = (frame_type.setResultsName("type") + Optional(frame_input_file + Optional(frame_number))) # video_type ::= "video" | "v" video_type = oneOf(["v", "video"]).setParseAction(replaceWith("video")) # audio_type ::= "audio" | "a" audio_type = oneOf(["a", "audio"]).setParseAction(replaceWith("audio")) # av_input ::= (audio_type | video_type) [av_trailer] av_input = ((audio_type | video_type).setResultsName("type") + Optional(av_trailer)) # inputspec ::= "[" (av_input | frame_input) "]" inputspec = (left_bracket + delimitedList(av_input | frame_input, delim=":") .setParseAction(default_input_fields) - right_bracket) # segmentspec ::= inputspec [timespecs] segmentspec = Group(inputspec + Group(Optional(timespecs)).setResultsName("times")) # config ::= {segmentspec} config = ZeroOrMore(segmentspec) config.ignore(pythonStyleComment) return config def parse_configuration_file(config_file): """Parse a podcast configuration file.""" try: parser = parser_bnf() result = parser.parseFile(config_file, parseAll=True) except (ParseException, ParseSyntaxException) as e: print("ERROR: {m}".format(m=str(e))) sys.exit(1) return result def parse_configuration_string(config_string): """Parse a podcast configuration file.""" try: parser = parser_bnf() result = parser.parseString(config_string, parseAll=True) except (ParseException, ParseSyntaxException) as e: print("ERROR: {m}".format(m=str(e))) sys.exit(1) return result def test_parser(): tests = ["test/config1.txt", "test/config2.txt", "test/config3.txt", "test/config4.txt", "test/config5.txt"] for t in tests: print "==={f}===".format(f=t) r = parse_configuration_file(t) for s in r: print s print " type = {t}".format(t=s["type"]) print " filename = '{f}'".format(f=s["filename"]) print " num = {n}".format(n=s["num"]) print " times = {t}".format(t=s["times"]) for i, t in enumerate(s["times"]): if (isinstance(t, str)): print " punch out after duration of '{f}'".format(f=t) if (isinstance(t, ParseResults)): if (i % 2 == 0): print " punch in at: {hh:02d}:{mm:02d}:{ss:02d}.{ms:03d}".format(hh=t["hh"], mm=t["mm"], ss=t["ss"], ms=t["ms"]) else: print " punch out at: {hh:02d}:{mm:02d}:{ss:02d}.{ms:03d}".format(hh=t["hh"], mm=t["mm"], ss=t["ss"], ms=t["ms"]) print if (__name__ == "__main__"): test_parser()
Ignore Space
Show notes
View
progress_bar.py
#!/usr/bin/env python import sys import time class ProgressBar(object): """A simple progress bar with a percent completed value.""" def __init__(self, initial_value=0, max_value=100, print_width=50, newline="\r", quiet=False): self.value = self.initial_value = initial_value self.max_value = max_value self.print_width = print_width self.newline = newline self.quiet = quiet def set(self, value=0): """Set the current value of the progress bar.""" self.value = value def reset(self): """Reset the progress bar to its initial value.""" self.set(value=self.initial_value) def finish(self): """Complete the progress bar by setting it to its maximum value.""" self.update(value=self.max_value) if not self.quiet: print() def update(self, value=0): """Set the current value of the progress bar and redraw it.""" self.set(value) self.draw() def draw(self): """Draw the current state of the progress bar.""" if not self.quiet: percent = int(self.value * 100 / self.max_value) dots = int(self.value * self.print_width / self.max_value) bar = "{nl}[{c}{nc}] {p}% ".format( c="+" * dots, nc="." * (self.print_width - dots), p=percent, nl=self.newline) sys.stdout.write(bar) sys.stdout.flush() if __name__ == "__main__": p = ProgressBar() for i in range(0, 100): p.update(value=i) time.sleep(0.05) p.finish()
#!/usr/bin/env python import sys import time class ProgressBar(object): """A simple progress bar with a percent completed value.""" def __init__(self, initial_value=0, max_value=100, print_width=50, newline="\r", quiet=False): self.value = self.initial_value = initial_value self.max_value = max_value self.print_width = print_width self.newline = newline self.quiet = quiet def set(self, value=0): """Set the current value of the progress bar.""" self.value = value def reset(self): """Reset the progress bar to its initial value.""" self.set(value=self.initial_value) def finish(self): """Complete the progress bar by setting it to its maximum value.""" self.update(value=self.max_value) if not self.quiet: print def update(self, value=0): """Set the current value of the progress bar and redraw it.""" self.set(value) self.draw() def draw(self): """Draw the current state of the progress bar.""" if not self.quiet: percent = int(self.value * 100 / self.max_value) dots = int(self.value * self.print_width / self.max_value) bar = "{nl}[{c}{nc}] {p}% ".format( c="+" * dots, nc="." * (self.print_width - dots), p=percent, nl=self.newline) sys.stdout.write(bar) sys.stdout.flush() if __name__ == "__main__": p = ProgressBar() for i in range(0, 100): p.update(value=i) time.sleep(0.05) p.finish()
Ignore Space
Show notes
View
shell_command.py
#!/usr/bin/env python import datetime import distutils.spawn import json import tempfile import os.path import re import pexpect from progress_bar import (ProgressBar) class ShellCommand(object): """A shell command. _executable contains the full path to the relevant executable. _base_options is a list of standard general options for this command. """ _executable = "" _base_options = [] _expect_patterns = [] @staticmethod def shellquote(s): """Quote a string so it can be safely pasted into the shell.""" # Note: pipes/shlex.quote() only wraps '' around things, # it doesn't do things like \( \), which we also need. # We double-quote everything because that makes it easier # to deal with single quotes. if s: regexes = [ # grouping parentheses for convert re.compile(r"^(\(|\))$"), # double quote (in middle of string) re.compile(r'^(.+".*)$'), # quotes around entire string re.compile(r"""^((?P<quote>['"]).*(?P=quote))$"""), # command line switch (starts with one or two "-") re.compile(r"^(--?.+)$"), # command path (starts with "/") re.compile(r"^(/[\w/]+)$")] substitutions = [r"\\\1", r"'\1'", r"\1", r"\1", r"\1"] num_matches = 0 for subst in zip(regexes, substitutions): (s, n) = subst[0].subn(subst[1], s) num_matches += n if num_matches == 0: # catch-all for non-word characters when none of the other # substitutions apply s = re.compile(r"^(.*\W+.*)$").sub(r'"\1"', s) return s def __init__(self, input_options=[], output_options=[], quiet=False): self.input_options = input_options self.output_options = output_options self.progress = None self.process = None def __repr__(self): return "<{cls}: {cmd}>".format( cls=self.__class__.__name__, cmd=self.command_string(quote=True)) def append_input_options(self, items=[]): """Add a list of items to the end of the input options.""" self.input_options += items def prepend_input_options(self, items=[]): """Add a list of items at the front of the input options.""" self.input_options = items + self.input_options def append_output_options(self, items=[]): """Add a list of items to the end of the output options.""" self.output_options += items def prepend_output_options(self, items=[]): """Add a list of items at the front of the output options.""" self.output_options = items + self.output_options def executable_string(self, quote=False): """Return the executable as a string.""" if quote: return ShellCommand.shellquote(self._executable) else: return self._executable def argument_string(self, quote=False): """Return the list of arguments as a string.""" args = self._base_options + self.input_options + self.output_options if quote: return " ".join([ShellCommand.shellquote(a) for a in args]) else: return " ".join(args) def argument_list(self): """Return a combined list of all arguments.""" return self._base_options + self.input_options + self.output_options def command_string(self, quote=False): """Return the entire command as a string.""" return "{exe} {arg}".format(exe=self.executable_string(quote), arg=self.argument_string(quote)) def process_pattern(self, pat): """Respond to a pexpect pattern. Return True on EOF.""" return (pat == 0) def run(self): """Execute the command in a subprocess.""" self.process = pexpect.spawn(self.executable_string(), self.argument_list()) # EOF is *always* the first pattern. patterns = self.process.compile_pattern_list( [pexpect.EOF] + self._expect_patterns) try: while True: i = self.process.expect_list(patterns, timeout=None) if self.process_pattern(i): break except (KeyboardInterrupt): pass finally: if self.progress: self.progress.finish() self.process.close() return self.process.exitstatus def get_output(self): """Execute the command in a subprocess and return the output.""" return pexpect.run(self.command_string(quote=True)) class ConvertCommand(ShellCommand): """An ImageMagick convert command.""" _executable = distutils.spawn.find_executable("convert") _base_options = ["-density", "600", "xc:dimgrey", "null:", # dark grey background "("] def __init__(self, input_options=[], output_options=[], width=2048, height=1536): super(ConvertCommand, self).__init__(input_options, output_options) self._base_options = (["-size", "{w}x{h}".format(w=width, h=height)] + self._base_options) self.append_input_options( ["-resize", "{w}x{h}".format(w=width, h=height), "-background", "white", "-alpha", "remove", "-type", "truecolor", # force RGB (this and next line) "-define", "colorspace:auto-grayscale=off"]) self.prepend_output_options([")", "-gravity", "center", "-layers", "composite", "-flatten"]) class FFprobeCommand(ShellCommand): """An ffprobe shell command.""" _executable = distutils.spawn.find_executable("ffprobe") _base_options = ["-loglevel", "error", "-show_entries", "format:stream", "-print_format", "json"] def __init__(self, input_options=[], output_options=[]): super(FFprobeCommand, self).__init__(input_options, output_options) self.entries = None # The input file should be the last input option. assert(os.path.exists(self.input_options[-1])) self.last_modified = os.path.getmtime(self.input_options[-1]) def get_entries(self, section="stream", find_list=[]): """Fetch specified attributes from the input file.""" # Re-fetch if the file's changed since we last looked. modified = os.path.getmtime(self.input_options[-1]) if (not self.entries) or (modified > self.last_modified): js = json.loads(self.get_output()) self.entries = {"format": js["format"], "stream": js["streams"][0]} return [self.entries[section][f] for f in find_list] class FFmpegCommand(ShellCommand): """A "simple" ffmpeg shell command.""" _executable = distutils.spawn.find_executable("ffmpeg") _base_options = ["-y", "-nostdin"] class FFmpegConcatCommand(FFmpegCommand): """An ffmpeg shell command with a complex concat filter.""" _expect_patterns = [r"time=(\d\d):(\d\d):(\d\d\.\d\d)"] def __init__(self, input_options=[], output_options=[], quiet=False, max_progress=100, has_audio=False, has_video=False, process_audio=True, process_video=True, audio_codec="pcm_s16le", video_codec="h264"): super(FFmpegConcatCommand, self).__init__( input_options, output_options) self.progress = ProgressBar(max_value=max_progress, quiet=quiet) self.has_video = has_video self.process_video = process_video self.video_codec = video_codec if (self.has_video): self.prepend_output_options(["-map", "[vconc]"]) if (self.process_video): self.prepend_output_options(["-pix_fmt", "yuv420p"]) self.prepend_output_options(["-codec:v", self.video_codec]) self.has_audio = has_audio self.process_audio = process_audio self.audio_codec = audio_codec if (self.has_audio): if (self.process_audio): self.prepend_output_options(["-ac", "1", "-map", "[anorm]"]) else: self.prepend_output_options(["-map", "[aconc]"]) self.prepend_output_options(["-codec:a", self.audio_codec]) self.filters = [] def append_filter(self, filter): """Append a filter to the filters list.""" if (filter): self.filters.append(filter) def append_normalisation_filter(self): """Append a normalisation audio filter to the complex filter.""" if (self.has_audio): self.append_filter("[aconc] dynaudnorm=r=0.25:f=10:b=y [anorm]") def append_concat_filter(self, type, segments=[]): """Append a concat filter to the filters list""" if (len(segments) > 1): self.append_filter( "{inspecs} concat=n={n}:v={v}:a={a} [{t}conc]".format( inspecs=" ".join([s.output_stream_specifier() for s in segments]), n=len(segments), v=int(type == "v"), a=int(type == "a"), t=type)) elif (len(segments) == 1): self.append_filter( "{inspec} {a}null [{t}conc]".format( inspec=segments[0].output_stream_specifier(), a=type if type == "a" else "", t=type)) def build_complex_filter(self): """Build the complete complex filter. Filters in the filtergraph are separated by ";". """ return "{f}".format(f=";".join(self.filters)) def argument_string(self, quote=False): """Return the list of arguments as a string.""" args = (self._base_options + self.input_options + ["-filter_complex", self.build_complex_filter()] + self.output_options) if quote: return " ".join([ShellCommand.shellquote(a) for a in args]) else: return " ".join(args) def argument_list(self): """Return a combined list of all arguments.""" return (self._base_options + self.input_options + ["-filter_complex", self.build_complex_filter()] + self.output_options) def process_pattern(self, pat): """Respond to a pexpect pattern. Return True on EOF.""" if (pat == 1): elapsed = datetime.timedelta( hours=int(self.process.match.group(1)), minutes=int(self.process.match.group(2)), seconds=float(self.process.match.group(3))) self.progress.update(elapsed.total_seconds()) return (pat == 0) if (__name__ == "__main__"): print(ShellCommand()) print(ConvertCommand(input_options=["in.pdf[12]"], output_options=["out.png"])) # FFprobeCommand expects the input file to exist. f = tempfile.NamedTemporaryFile() print(FFprobeCommand(input_options=["-i", f.name])) f.close() print(FFmpegCommand(input_options=["-i", "in.mov"], output_options=["out.mov"])) concat = FFmpegConcatCommand(input_options=["-i", "in.mov"], output_options=["out.mov"], has_audio=True) concat.append_normalisation_filter() print(concat) print("Quoting:") for s in ["foobar", "foo bar baz", "foo(bar)", "[foobar]", "foo[bar", "foo 'bar'", '"foobar"', "'foobar'", 'foo"bar', "foo.bar", "(", ")", "'", "/foo/bar/baz/quux", "-f", "--foobar"]: print(" {s} => echo {sub}".format(s=s, sub=ShellCommand.shellquote(s)))
#!/usr/bin/env python import datetime import distutils.spawn import json import tempfile import os.path import re import pexpect from progress_bar import (ProgressBar) class ShellCommand(object): """A shell command. _executable contains the full path to the relevant executable. _base_options is a list of standard general options for this command. """ _executable = "" _base_options = [] _expect_patterns = [] @staticmethod def shellquote(s): """Quote a string so it can be safely pasted into the shell.""" # Note: pipes/shlex.quote() only wraps '' around things, # it doesn't do things like \( \), which we also need. # We double-quote everything because that makes it easier # to deal with single quotes. if s: regexes = [ # grouping parentheses for convert re.compile(r"^(\(|\))$"), # double quote (in middle of string) re.compile(r'^(.+".*)$'), # quotes around entire string re.compile(r"""^((?P<quote>['"]).*(?P=quote))$"""), # command line switch (starts with one or two "-") re.compile(r"^(--?.+)$"), # command path (starts with "/") re.compile(r"^(/[\w/]+)$")] substitutions = [r"\\\1", r"'\1'", r"\1", r"\1", r"\1"] num_matches = 0 for subst in zip(regexes, substitutions): (s, n) = subst[0].subn(subst[1], s) num_matches += n if num_matches == 0: # catch-all for non-word characters when none of the other # substitutions apply s = re.compile(r"^(.*\W+.*)$").sub(r'"\1"', s) return s def __init__(self, input_options=[], output_options=[], quiet=False): self.input_options = input_options self.output_options = output_options self.progress = None self.process = None def __repr__(self): return "<{cls}: {cmd}>".format( cls=self.__class__.__name__, cmd=self.command_string(quote=True)) def append_input_options(self, items=[]): """Add a list of items to the end of the input options.""" self.input_options += items def prepend_input_options(self, items=[]): """Add a list of items at the front of the input options.""" self.input_options = items + self.input_options def append_output_options(self, items=[]): """Add a list of items to the end of the output options.""" self.output_options += items def prepend_output_options(self, items=[]): """Add a list of items at the front of the output options.""" self.output_options = items + self.output_options def executable_string(self, quote=False): """Return the executable as a string.""" if quote: return ShellCommand.shellquote(self._executable) else: return self._executable def argument_string(self, quote=False): """Return the list of arguments as a string.""" args = self._base_options + self.input_options + self.output_options if quote: return " ".join([ShellCommand.shellquote(a) for a in args]) else: return " ".join(args) def argument_list(self): """Return a combined list of all arguments.""" return self._base_options + self.input_options + self.output_options def command_string(self, quote=False): """Return the entire command as a string.""" return "{exe} {arg}".format(exe=self.executable_string(quote), arg=self.argument_string(quote)) def process_pattern(self, pat): """Respond to a pexpect pattern. Return True on EOF.""" return (pat == 0) def run(self): """Execute the command in a subprocess.""" self.process = pexpect.spawn(self.executable_string(), self.argument_list()) # EOF is *always* the first pattern. patterns = self.process.compile_pattern_list( [pexpect.EOF] + self._expect_patterns) try: while True: i = self.process.expect_list(patterns, timeout=None) if self.process_pattern(i): break except (KeyboardInterrupt): pass finally: if self.progress: self.progress.finish() self.process.close() return self.process.exitstatus def get_output(self): """Execute the command in a subprocess and return the output.""" return pexpect.run(self.command_string(quote=True)) class ConvertCommand(ShellCommand): """An ImageMagick convert command.""" _executable = distutils.spawn.find_executable("convert") _base_options = ["-density", "600", "xc:dimgrey", "null:", # dark grey background "("] def __init__(self, input_options=[], output_options=[], width=2048, height=1536): super(ConvertCommand, self).__init__(input_options, output_options) self._base_options = (["-size", "{w}x{h}".format(w=width, h=height)] + self._base_options) self.append_input_options( ["-resize", "{w}x{h}".format(w=width, h=height), "-background", "white", "-alpha", "remove", "-type", "truecolor", # force RGB (this and next line) "-define", "colorspace:auto-grayscale=off"]) self.prepend_output_options([")", "-gravity", "center", "-layers", "composite", "-flatten"]) class FFprobeCommand(ShellCommand): """An ffprobe shell command.""" _executable = distutils.spawn.find_executable("ffprobe") _base_options = ["-loglevel", "error", "-show_entries", "format:stream", "-print_format", "json"] def __init__(self, input_options=[], output_options=[]): super(FFprobeCommand, self).__init__(input_options, output_options) self.entries = None # The input file should be the last input option. assert(os.path.exists(self.input_options[-1])) self.last_modified = os.path.getmtime(self.input_options[-1]) def get_entries(self, section="stream", find_list=[]): """Fetch specified attributes from the input file.""" # Re-fetch if the file's changed since we last looked. modified = os.path.getmtime(self.input_options[-1]) if (not self.entries) or (modified > self.last_modified): js = json.loads(self.get_output()) self.entries = {"format": js["format"], "stream": js["streams"][0]} return [self.entries[section][f] for f in find_list] class FFmpegCommand(ShellCommand): """A "simple" ffmpeg shell command.""" _executable = distutils.spawn.find_executable("ffmpeg") _base_options = ["-y", "-nostdin"] class FFmpegConcatCommand(FFmpegCommand): """An ffmpeg shell command with a complex concat filter.""" _expect_patterns = [r"time=(\d\d):(\d\d):(\d\d\.\d\d)"] def __init__(self, input_options=[], output_options=[], quiet=False, max_progress=100, has_audio=False, has_video=False, process_audio=True, process_video=True, audio_codec="pcm_s16le", video_codec="h264"): super(FFmpegConcatCommand, self).__init__( input_options, output_options) self.progress = ProgressBar(max_value=max_progress, quiet=quiet) self.has_video = has_video self.process_video = process_video self.video_codec = video_codec if (self.has_video): self.prepend_output_options(["-map", "[vconc]"]) if (self.process_video): self.prepend_output_options(["-pix_fmt", "yuv420p"]) self.prepend_output_options(["-codec:v", self.video_codec]) self.has_audio = has_audio self.process_audio = process_audio self.audio_codec = audio_codec if (self.has_audio): if (self.process_audio): self.prepend_output_options(["-ac", "1", "-map", "[anorm]"]) else: self.prepend_output_options(["-map", "[aconc]"]) self.prepend_output_options(["-codec:a", self.audio_codec]) self.filters = [] def append_filter(self, filter): """Append a filter to the filters list.""" if (filter): self.filters.append(filter) def append_normalisation_filter(self): """Append a normalisation audio filter to the complex filter.""" if (self.has_audio): self.append_filter("[aconc] dynaudnorm=r=0.25:f=10:b=y [anorm]") def append_concat_filter(self, type, segments=[]): """Append a concat filter to the filters list""" if (len(segments) > 1): self.append_filter( "{inspecs} concat=n={n}:v={v}:a={a} [{t}conc]".format( inspecs=" ".join([s.output_stream_specifier() for s in segments]), n=len(segments), v=int(type == "v"), a=int(type == "a"), t=type)) elif (len(segments) == 1): self.append_filter( "{inspec} {a}null [{t}conc]".format( inspec=segments[0].output_stream_specifier(), a=type if type == "a" else "", t=type)) def build_complex_filter(self): """Build the complete complex filter. Filters in the filtergraph are separated by ";". """ return "{f}".format(f=";".join(self.filters)) def argument_string(self, quote=False): """Return the list of arguments as a string.""" args = (self._base_options + self.input_options + ["-filter_complex", self.build_complex_filter()] + self.output_options) if quote: return " ".join([ShellCommand.shellquote(a) for a in args]) else: return " ".join(args) def argument_list(self): """Return a combined list of all arguments.""" return (self._base_options + self.input_options + ["-filter_complex", self.build_complex_filter()] + self.output_options) def process_pattern(self, pat): """Respond to a pexpect pattern. Return True on EOF.""" if (pat == 1): elapsed = datetime.timedelta( hours=int(self.process.match.group(1)), minutes=int(self.process.match.group(2)), seconds=float(self.process.match.group(3))) self.progress.update(elapsed.total_seconds()) return (pat == 0) if (__name__ == "__main__"): print ShellCommand() print ConvertCommand(input_options=["in.pdf[12]"], output_options=["out.png"]) # FFprobeCommand expects the input file to exist. f = tempfile.NamedTemporaryFile() print FFprobeCommand(input_options=["-i", f.name]) f.close() print FFmpegCommand(input_options=["-i", "in.mov"], output_options=["out.mov"]) concat = FFmpegConcatCommand(input_options=["-i", "in.mov"], output_options=["out.mov"], has_audio=True) concat.append_normalisation_filter() print concat print "Quoting:" for s in ["foobar", "foo bar baz", "foo(bar)", "[foobar]", "foo[bar", "foo 'bar'", '"foobar"', "'foobar'", 'foo"bar', "foo.bar", "(", ")", "'", "/foo/bar/baz/quux", "-f", "--foobar"]: print " {s} => echo {sub}".format(s=s, sub=ShellCommand.shellquote(s))
Show line notes below