GitBucket
4.21.2
Toggle navigation
Snippets
Sign in
Files
Branches
2
Releases
1
Issues
Pull requests
Labels
Priorities
Milestones
Wiki
Forks
nigel.stanger
/
process_podcast
Browse code
Converted os.path to pathlib and all #! lines to python3 (ref #32).
master
1 parent
0b13583
commit
d1034ee6c00824906390f787dfd061e8a221d559
Nigel Stanger
authored
on 26 Aug 2018
Patch
Showing
6 changed files
config_parser.py
globals.py
process_podcast.py
progress_bar.py
segment.py
shell_command.py
Ignore Space
Show notes
View
config_parser.py
#!/usr/bin/env python3 import sys from pyparsing import * # pyparsing documentation: # https://sourceforge.net/p/pyparsing/code/HEAD/tree/trunk/src/HowToUsePyparsing.txt#l302 INPUTSPEC_DEFAULTS = {"type": None, "filename": None, "num": None} TIMESTAMP_DEFAULTS = {"hh": 0, "mm": 0, "ms": 0} # see http://stackoverflow.com/questions/11180622/optional-string-segment-in-pyparsing def default_input_fields(fields): """Set missing input specification values to defaults.""" set_defaults(fields, INPUTSPEC_DEFAULTS) def default_timestamp_fields(fields): """Set missing timestamp values to defaults.""" set_defaults(fields, TIMESTAMP_DEFAULTS) def set_defaults(fields, defaults): """Set missing field values to defaults.""" undefined = set(defaults.keys()) - set(fields.keys()) for k in undefined: v = defaults[k] # see http://pyparsing.wikispaces.com/share/view/71042464 fields[k] = v fields.append(v) def parser_bnf(): """Grammar for parsing podcast configuration files.""" at = Literal("@").suppress() caret = Literal("^") colon = Literal(":").suppress() left_bracket = Literal("[").suppress() period = Literal(".").suppress() right_bracket = Literal("]").suppress() # zero_index ::= [0-9]+ zero_index = Word(nums).setParseAction(lambda s, l, t: int(t[0])) # filename ::= [A-Za-z0-9][-A-Za-z0-9._ ]+ filename_first = Word(alphanums, exact=1) filename_rest = Word(alphanums + "-_/. ") filename = Combine(filename_first + Optional(filename_rest)) # millisecs ::= "." [0-9]+ millisecs = (Word(nums).setParseAction( lambda s, l, t: int(t[0][:3].ljust(3, "0"))) .setResultsName("ms")) # hours, minutes, seconds ::= zero_index hours = zero_index.setResultsName("hh") minutes = zero_index.setResultsName("mm") seconds = zero_index.setResultsName("ss") hours_minutes = hours + colon + minutes + colon | minutes + colon secs_millisecs = (seconds + Optional(period + millisecs) | period + millisecs) # timestamp ::= [[hours ":"] minutes ":"] seconds ["." millisecs] timestamp = Optional(hours_minutes) + secs_millisecs # duration_file ::= "@", filename # We need a separate item for a lonely duration file timestamp so # that we can attach a parse action just to the lonely case. Using # duration_file alone means the parse action is attached to all # instances of duration_file. duration_file = at + filename.setResultsName("filename") lonely_duration_file = at + filename.setResultsName("filename") # timespecs ::= timestamp [duration_file | {timestamp}] # If duration_file timestamp is lonely, prepend a zero timestamp. timespecs = Or( [lonely_duration_file.setParseAction( lambda s, l, t: [timestamp.parseString("00:00:00.000"), t]), Group(timestamp) + duration_file, OneOrMore(Group(timestamp.setParseAction(default_timestamp_fields)))]) # last_frame ::= "-1" | "last" last_frame = oneOf(["-1", "last"]).setParseAction(replaceWith(-1)) # frame_number ::= ":" (zero_index | last_frame) frame_number = colon - (zero_index | last_frame).setResultsName("num") # stream_number ::= ":" zero_index stream_number = colon - zero_index.setResultsName("num") # input_file ::= ":" [filename] input_file = colon - Optional(filename).setResultsName("filename") # previous_segment ::= ":" "^" previous_segment = colon - caret.setResultsName("filename") # frame_input_file ::= input_file | previous_segment frame_input_file = Or([input_file, previous_segment]) # av_trailer ::= input_file [stream_number] av_trailer = input_file + Optional(stream_number) # frame_type ::= "frame" | "f" frame_type = oneOf(["f", "frame"]).setParseAction(replaceWith("frame")) # frame_input ::= frame_type [frame_input_file [frame_number]] frame_input = (frame_type.setResultsName("type") + Optional(frame_input_file + Optional(frame_number))) # video_type ::= "video" | "v" video_type = oneOf(["v", "video"]).setParseAction(replaceWith("video")) # audio_type ::= "audio" | "a" audio_type = oneOf(["a", "audio"]).setParseAction(replaceWith("audio")) # av_input ::= (audio_type | video_type) [av_trailer] av_input = ((audio_type | video_type).setResultsName("type") + Optional(av_trailer)) # inputspec ::= "[" (av_input | frame_input) "]" inputspec = (left_bracket + delimitedList(av_input | frame_input, delim=":") .setParseAction(default_input_fields) - right_bracket) # segmentspec ::= inputspec [timespecs] segmentspec = Group(inputspec + Group(Optional(timespecs)).setResultsName("times")) # config ::= {segmentspec} config = ZeroOrMore(segmentspec) config.ignore(pythonStyleComment) return config def parse_configuration_file(config_file): """Parse a podcast configuration file.""" try: parser = parser_bnf() result = parser.parseFile(config_file, parseAll=True) except (ParseException, ParseSyntaxException) as e: print("ERROR: {m}".format(m=str(e))) sys.exit(1) return result def parse_configuration_string(config_string): """Parse a podcast configuration file.""" try: parser = parser_bnf() result = parser.parseString(config_string, parseAll=True) except (ParseException, ParseSyntaxException) as e: print("ERROR: {m}".format(m=str(e))) sys.exit(1) return result def test_parser(): tests = ["test/config1.txt", "test/config2.txt", "test/config3.txt", "test/config4.txt", "test/config5.txt"] for t in tests: print("==={f}===".format(f=t)) r = parse_configuration_file(t) for s in r: print(s) print(" type = {t}".format(t=s["type"])) print(" filename = '{f}'".format(f=s["filename"])) print(" num = {n}".format(n=s["num"])) print(" times = {t}".format(t=s["times"])) for i, t in enumerate(s["times"]): if (isinstance(t, str)): print(" punch out after duration of '{f}'".format(f=t)) if (isinstance(t, ParseResults)): if (i % 2 == 0): print(" punch in at: {hh:02d}:{mm:02d}:{ss:02d}.{ms:03d}".format(hh=t["hh"], mm=t["mm"], ss=t["ss"], ms=t["ms"])) else: print(" punch out at: {hh:02d}:{mm:02d}:{ss:02d}.{ms:03d}".format(hh=t["hh"], mm=t["mm"], ss=t["ss"], ms=t["ms"])) print() if (__name__ == "__main__"): test_parser()
#!/usr/bin/env python import sys from pyparsing import * # pyparsing documentation: # https://sourceforge.net/p/pyparsing/code/HEAD/tree/trunk/src/HowToUsePyparsing.txt#l302 INPUTSPEC_DEFAULTS = {"type": None, "filename": None, "num": None} TIMESTAMP_DEFAULTS = {"hh": 0, "mm": 0, "ms": 0} # see http://stackoverflow.com/questions/11180622/optional-string-segment-in-pyparsing def default_input_fields(fields): """Set missing input specification values to defaults.""" set_defaults(fields, INPUTSPEC_DEFAULTS) def default_timestamp_fields(fields): """Set missing timestamp values to defaults.""" set_defaults(fields, TIMESTAMP_DEFAULTS) def set_defaults(fields, defaults): """Set missing field values to defaults.""" undefined = set(defaults.keys()) - set(fields.keys()) for k in undefined: v = defaults[k] # see http://pyparsing.wikispaces.com/share/view/71042464 fields[k] = v fields.append(v) def parser_bnf(): """Grammar for parsing podcast configuration files.""" at = Literal("@").suppress() caret = Literal("^") colon = Literal(":").suppress() left_bracket = Literal("[").suppress() period = Literal(".").suppress() right_bracket = Literal("]").suppress() # zero_index ::= [0-9]+ zero_index = Word(nums).setParseAction(lambda s, l, t: int(t[0])) # filename ::= [A-Za-z0-9][-A-Za-z0-9._ ]+ filename_first = Word(alphanums, exact=1) filename_rest = Word(alphanums + "-_/. ") filename = Combine(filename_first + Optional(filename_rest)) # millisecs ::= "." [0-9]+ millisecs = (Word(nums).setParseAction( lambda s, l, t: int(t[0][:3].ljust(3, "0"))) .setResultsName("ms")) # hours, minutes, seconds ::= zero_index hours = zero_index.setResultsName("hh") minutes = zero_index.setResultsName("mm") seconds = zero_index.setResultsName("ss") hours_minutes = hours + colon + minutes + colon | minutes + colon secs_millisecs = (seconds + Optional(period + millisecs) | period + millisecs) # timestamp ::= [[hours ":"] minutes ":"] seconds ["." millisecs] timestamp = Optional(hours_minutes) + secs_millisecs # duration_file ::= "@", filename # We need a separate item for a lonely duration file timestamp so # that we can attach a parse action just to the lonely case. Using # duration_file alone means the parse action is attached to all # instances of duration_file. duration_file = at + filename.setResultsName("filename") lonely_duration_file = at + filename.setResultsName("filename") # timespecs ::= timestamp [duration_file | {timestamp}] # If duration_file timestamp is lonely, prepend a zero timestamp. timespecs = Or( [lonely_duration_file.setParseAction( lambda s, l, t: [timestamp.parseString("00:00:00.000"), t]), Group(timestamp) + duration_file, OneOrMore(Group(timestamp.setParseAction(default_timestamp_fields)))]) # last_frame ::= "-1" | "last" last_frame = oneOf(["-1", "last"]).setParseAction(replaceWith(-1)) # frame_number ::= ":" (zero_index | last_frame) frame_number = colon - (zero_index | last_frame).setResultsName("num") # stream_number ::= ":" zero_index stream_number = colon - zero_index.setResultsName("num") # input_file ::= ":" [filename] input_file = colon - Optional(filename).setResultsName("filename") # previous_segment ::= ":" "^" previous_segment = colon - caret.setResultsName("filename") # frame_input_file ::= input_file | previous_segment frame_input_file = Or([input_file, previous_segment]) # av_trailer ::= input_file [stream_number] av_trailer = input_file + Optional(stream_number) # frame_type ::= "frame" | "f" frame_type = oneOf(["f", "frame"]).setParseAction(replaceWith("frame")) # frame_input ::= frame_type [frame_input_file [frame_number]] frame_input = (frame_type.setResultsName("type") + Optional(frame_input_file + Optional(frame_number))) # video_type ::= "video" | "v" video_type = oneOf(["v", "video"]).setParseAction(replaceWith("video")) # audio_type ::= "audio" | "a" audio_type = oneOf(["a", "audio"]).setParseAction(replaceWith("audio")) # av_input ::= (audio_type | video_type) [av_trailer] av_input = ((audio_type | video_type).setResultsName("type") + Optional(av_trailer)) # inputspec ::= "[" (av_input | frame_input) "]" inputspec = (left_bracket + delimitedList(av_input | frame_input, delim=":") .setParseAction(default_input_fields) - right_bracket) # segmentspec ::= inputspec [timespecs] segmentspec = Group(inputspec + Group(Optional(timespecs)).setResultsName("times")) # config ::= {segmentspec} config = ZeroOrMore(segmentspec) config.ignore(pythonStyleComment) return config def parse_configuration_file(config_file): """Parse a podcast configuration file.""" try: parser = parser_bnf() result = parser.parseFile(config_file, parseAll=True) except (ParseException, ParseSyntaxException) as e: print("ERROR: {m}".format(m=str(e))) sys.exit(1) return result def parse_configuration_string(config_string): """Parse a podcast configuration file.""" try: parser = parser_bnf() result = parser.parseString(config_string, parseAll=True) except (ParseException, ParseSyntaxException) as e: print("ERROR: {m}".format(m=str(e))) sys.exit(1) return result def test_parser(): tests = ["test/config1.txt", "test/config2.txt", "test/config3.txt", "test/config4.txt", "test/config5.txt"] for t in tests: print("==={f}===".format(f=t)) r = parse_configuration_file(t) for s in r: print(s) print(" type = {t}".format(t=s["type"])) print(" filename = '{f}'".format(f=s["filename"])) print(" num = {n}".format(n=s["num"])) print(" times = {t}".format(t=s["times"])) for i, t in enumerate(s["times"]): if (isinstance(t, str)): print(" punch out after duration of '{f}'".format(f=t)) if (isinstance(t, ParseResults)): if (i % 2 == 0): print(" punch in at: {hh:02d}:{mm:02d}:{ss:02d}.{ms:03d}".format(hh=t["hh"], mm=t["mm"], ss=t["ss"], ms=t["ms"])) else: print(" punch out at: {hh:02d}:{mm:02d}:{ss:02d}.{ms:03d}".format(hh=t["hh"], mm=t["mm"], ss=t["ss"], ms=t["ms"])) print() if (__name__ == "__main__"): test_parser()
Ignore Space
Show notes
View
globals.py
#!/usr/bin/env python3 import logging PROGRAM = "process_podcast" log = logging.getLogger(PROGRAM)
#!/usr/bin/env python import logging PROGRAM = "process_podcast" log = logging.getLogger(PROGRAM)
Ignore Space
Show notes
View
process_podcast.py
#!/usr/bin/env python3 import argparse import datetime import logging from pathlib import Path, PurePath import sys from pyparsing import ParseResults import globals from config_parser import ( parse_configuration_file, parse_configuration_string) from progress_bar import (ProgressBar) from segment import (Segment, AudioSegment, VideoSegment, FrameSegment, SegmentException) from shell_command import (FFprobeCommand, FFmpegConcatCommand) class InputStreamAction(argparse.Action): def __call__(self, parser, namespace, values, option_string=None): input = values.split(":") file = input[0] stream = None if (len(input) == 1) else input[1] setattr(namespace, self.dest, file) if (option_string in ["--audio", "-a"]): setattr(namespace, 'audio_stream_number', stream) elif (option_string in ["--video", "-v"]): setattr(namespace, 'video_stream_number', stream) def parse_command_line(): """Parse command line arguments.""" parser = argparse.ArgumentParser( usage="%(prog)s [options] <output>", description="where: <output> is the name of the output file " "(note: .mov seems generally best)", epilog="Default input files can be specified using either of " "--audio or --video. If neither of these are specified, " "then you must supply a configuration file using --config. " "(Of course, you can always supply a configuration file " "regardless.)\n\n" "Input streams can be taken from the same input file.\n\n" "If no segments are specified, the entire input stream is " "processed as one segment. The number and duration of segments " "can differ, but the total duration across all input streams " "should ideally be the same.") parser.set_defaults(audio_stream_number=None, video_stream_number=None) parser.add_argument( "output", help="name of the output file (note: .mov is best)") parser.add_argument( "--audio", "-a", metavar="FILE[:STREAM]", action=InputStreamAction, help="File name for the default audio input stream (can be the " "same as for other input streams). You can optionally specify " "the default audio stream number to use if the file contains " "more than one (this can be overidden in a configuration " "file). If you don't specify a stream number, it defaults " "to 0 (i.e., the first audio stream in the file).") parser.add_argument( "--video", "-v", metavar="FILE[:STREAM]", action=InputStreamAction, help="File name for the default video input stream (can be the " "same as for other input streams). You can optionally specify " "the default video stream number to use if the file contains " "more than one (this can be overidden in a configuration " "file). If you don't specify a stream number, it defaults " "to 0 (i.e., the first video stream in the file).") parser.add_argument( "--configuration", "--config", "-c", dest="config", metavar="FILE", help="File name for the podcast segment configuration (plain text). " "See config_help.md details on the file " "format.".format(p=globals.PROGRAM)) parser.add_argument( "--copy-audio", dest="process_audio", action="store_false", default=True, help="Disable additional processing of the source audio. The audio " "will still be re-encoded using the specified audio codec " "because the concatenation filter requires it, but extra " "processing such as reduction of channels or normalisation " "will not be carried out. (Implies --no-normalise.)") parser.add_argument( "--copy-video", dest="process_video", action="store_false", default=True, help="Disable additional processing of the source video. The video " "will still be re-encoded using the specified video codec " "because the concatenation filter requires it, but extra " "processing such as remapping of colours will not be " "carried out.") parser.add_argument( "--no-normalise", dest="normalise", action="store_false", default=True, help="Disable normalisation of the source audio level (implied by " "--copy-audio).") parser.add_argument( "--audio-codec", dest="audio_codec", metavar="CODEC", default="pcm_s16le", help="Specify ffmpeg audio codec for output (default pcm_s16le). " "See the output of ffmpeg -codecs for possible codecs.") parser.add_argument( "--video-codec", dest="video_codec", metavar="CODEC", default="h264", help="Specify ffmpeg video codec for output (default h264). " "See the output of ffmpeg -codecs for possible codecs.") parser.add_argument( "--input-prefix", "-i", dest="prefix", metavar="PATH", default=".", help="Path to be prefixed to all INPUT files. This includes the " "configuration file, if applicable, and any files specified " "within the configuration file. Input files that already " "include the input prefix will not have it added again.") parser.add_argument( "--preview", "-p", metavar="RATE", nargs="?", const="1", help="Generate a preview of the podcast by only rendering a " "subset of the video frames. RATE is the number of frames " "per second to render (default 1 fps). You can specify " "fractions (e.g., 1/10 for one frame every 10 seconds). " 'NOTE: if you get a "too few arguments" error when using ' "this option, you've probably not provided an fps value and " "placed the option just before the output filename. " "Either move --preview earlier in the option list, add " 'a "--" between it and the output filename, or provide a ' "fps value.") parser.add_argument( "--debug", "-d", action="store_true", help="Print debugging output (overrides --quiet).") parser.add_argument( "--keep", "-k", action="store_true", help="Don't delete any generated temporary files.") parser.add_argument( "--quiet", "-q", action="store_true", help="Mute all console output (overridden by --debug).") args = parser.parse_args() return args def prefix_path(prefix, path): """Prepend prefix to path, unless path already starts with prefix""" assert(prefix is not None) if not path: return None elif path == "." or path.startswith(prefix): return path else: return Path(prefix, path) def check_arguments(args): """Sanity check the command line arguments.""" fn = "check_arguments" # Prepend input files with --input-prefix where applicable. args.audio, args.video, args.config = map( prefix_path, [args.prefix] * 3, [args.audio, args.video, args.config]) if args.quiet: globals.log.setLevel(logging.WARNING) # --copy-audio implies --no-normalise. if not args.process_audio: args.normalise = False # --debug overrides --quiet. if args.debug: globals.log.setLevel(logging.DEBUG) globals.log.debug("{fn}(): args = {a}".format(fn=fn, a=args)) # Must specify at least one of --audio, --video, --config. if not any([args.audio, args.video, args.config]): globals.log.error("must specify at least one of --audio, --video, " "or --config") sys.exit(1) if not Path(args.prefix).exists(): globals.log.error('input prefix "{p}" does not ' "exist".format(p=args.prefix)) sys.exit(1) def get_configuration(args): """Load podcast configuration.""" # Fill in missing file names for default input streams. fn = "get_configuration" # These types can have default input files and streams. type_mapping = { "audio": {"file": args.audio, "stream": args.audio_stream_number}, "video": {"file": args.video, "stream": args.video_stream_number}} globals.log.info("Processing configuration...") if (args.config): config = parse_configuration_file(args.config) # Check that applicable default input streams have been specified. for i, c in enumerate(config): type = c["type"] # Add prefix to filename, if applicable. if c["filename"] and (c["filename"] != "^"): config[i]["filename"] = prefix_path(args.prefix, config[i]["filename"]) if (type in type_mapping): default_file = type_mapping[type]["file"] default_stream = type_mapping[type]["stream"] error_string = ("attempting to use default {s} input file, " "but --{s} hasn't been specified".format(s=type)) else: default_file = None default_stream = 0 error_string = ("attempting to use a default input file, " "but the {s} type doesn't support this".format(s=type)) # No filename in configuration. if (not c["filename"]): if (default_file): config[i]["filename"] = default_file # No filename on command line either. else: globals.log.error(error_string) sys.exit(1) # No stream number in configuration. Note: 0 is a valid # stream number, so explicitly check for None. if (c["num"] is None): # Assume 0 if no stream on command line either. if (default_stream is None): config[i]["num"] = 0 else: config[i]["num"] = default_stream # No configuration file. else: conf_list = [] for m in type_mapping: default_file = type_mapping[m]["file"] default_stream = type_mapping[m]["stream"] if (default_file and default_stream is not None): conf_list += ["[{type}:{file}:{stream}]".format( type=m, file=default_file, stream=default_stream)] globals.log.debug("{fn}(): default config = " "{c}".format(fn=fn, c=conf_list)) config = parse_configuration_string("\n".join(conf_list)) return config def get_file_duration(file): """Calculate the duration a media file as a timedelta object.""" fn = "get_file_duration" command = FFprobeCommand([file]) globals.log.debug("{fn}(): {cmd}".format(fn=fn, cmd=command)) # Only consider the first stream. If it's the only stream in the # file, great; otherwise it seems reasonable to assume that all # streams in the same file will have the same duration. ss, ms = command.get_entries( section="format", find_list=["duration"])[0].split(".") ms = ms[:3].ljust(3, "0") globals.log.debug("{fn}(): ss = {ss}, ms = {ms}".format(fn=fn, ss=ss, ms=ms)) return datetime.timedelta(seconds=int(ss), milliseconds=int(ms)) def make_new_segment(type, filename, punch_in, punch_out, num): """Make a new segment instance of the correct class.""" fn = "make_new_segment" globals.log.debug("{fn}(): type = {t}".format(fn=fn, t=type)) globals.log.debug("{fn}(): filename = {f}".format(fn=fn, f=filename)) globals.log.debug("{fn}(): punch in = {i}".format(fn=fn, i=punch_in)) globals.log.debug("{fn}(): punch out = {o}".format(fn=fn, o=punch_out)) globals.log.debug("{fn}(): num = {n}".format(fn=fn, n=num)) if (type == "audio"): return AudioSegment(file=filename, punch_in=punch_in, punch_out=punch_out, input_stream=num) elif (type == "video"): return VideoSegment(file=filename, punch_in=punch_in, punch_out=punch_out, input_stream=num) elif (type == "frame"): return FrameSegment(file=filename, punch_in=punch_in, punch_out=punch_out, frame_number=num) else: return None def process_timestamp_pair(args, times): """Constructs timedelta instances from a pair of config timestamps.""" fn = "process_timestamp_pair" globals.log.debug("{fn}(): times[0] = {t}".format(fn=fn, t=times[0])) globals.log.debug("{fn}(): times[1] = {t}".format(fn=fn, t=times[1])) # If the first item in the timestamp list in the configuration file # is a filename, the parser inserts a zero timestamp before it. We # can therefore guarantee that the first item of the pair will # always be a timestamp. t0 = datetime.timedelta( hours=times[0]["hh"], minutes=times[0]["mm"], seconds=times[0]["ss"], milliseconds=times[0]["ms"]) if (times[1]): if (len(times[1]) == 1): # filename t1 = t0 + get_file_duration( prefix_path(args.prefix, times[1]["filename"])) elif (len(times[1]) == 4): # normal timestamp t1 = datetime.timedelta( hours=times[1]["hh"], minutes=times[1]["mm"], seconds=times[1]["ss"], milliseconds=times[1]["ms"]) else: globals.log.error("{fn}():unreadable timestamp {t}".format( fn=fn, t=times[1])) t1 = None else: t1 = None globals.log.debug("{fn}(): t0 = {t}".format(fn=fn, t=t0)) globals.log.debug("{fn}(): t1 = {t}".format(fn=fn, t=t1)) return t0, t1 def process_time_list(args, type, filename, num, time_list): """Process an audio or video stream and build a list of segments.""" fn = "process_time_list" if (Path(filename).exists() and type in ["audio", "video"]): stream_duration = get_file_duration(filename) else: stream_duration = 0 segments = [] globals.log.debug("{fn}(): stream duration = {d}".format( fn=fn, d=stream_duration)) # No timestamps: punch in at 0, out at stream duration. if (len(time_list) == 0): punch_in = datetime.timedelta() punch_out = stream_duration segments.append(make_new_segment(type, filename, punch_in, punch_out, num)) else: # Process each pair of timestamps as punch in, out. If there's # an odd number of items, the last one is processed separately. for t in zip(time_list[::2], time_list[1::2]): punch_in, punch_out = process_timestamp_pair(args, t) if (punch_in == punch_out): globals.log.warning( "punch in ({i}s) and punch out ({o}s) times are " "equal; no segment will be " "generated".format(i=punch_in.total_seconds(), o=punch_out.total_seconds())) continue elif (punch_out < punch_in): globals.log.error( "punch out time ({i}s) falls before punch in time " "({o}s); can't generate a valid " "segment".format(i=punch_in.total_seconds(), o=punch_out.total_seconds())) sys.exit(1) segments.append(make_new_segment(type, filename, punch_in, punch_out, num)) # Odd number of timestamps: punch in at last timestamp, # out at stream duration. if (len(time_list) % 2 != 0): globals.log.debug("{fn}(): odd number of timestamps".format(fn=fn)) punch_in, _ = process_timestamp_pair(args, [time_list[-1], None]) punch_out = stream_duration segments.append(make_new_segment(type, filename, punch_in, punch_out, num)) return segments def process_input_streams(args, config): """Process a list of stream specifications and build a list of segments.""" fn = "process_input_streams" globals.log.info("Processing input streams...") segments = [] for cnf in config: globals.log.debug("{fn}(): type = {t}".format(fn=fn, t=cnf["type"])) globals.log.debug( "{fn}(): filename = {f}".format(fn=fn, f=cnf["filename"])) globals.log.debug("{fn}(): num = {n}".format(fn=fn, n=cnf["num"])) globals.log.debug("{fn}(): times = {t}".format(fn=fn, t=cnf["times"])) segments += process_time_list(args, cnf["type"], cnf["filename"], cnf["num"], cnf["times"]) return segments def smallest_video_dimensions(args, segments): """Compute the smallest frame dimensions across all video inputs.""" fn = "smallest_video_dimensions" width = 2048 height = 1536 for s in segments: if isinstance(s, FrameSegment): continue command = FFprobeCommand([s.input_file]) globals.log.debug("{fn}(): {cmd}".format(fn=fn, cmd=command)) w, h = command.get_entries(find_list=["width", "height"]) if (w * h) < (width * height): width, height = w, h return width, height def process_frame_segments(args, segments, width, height): """Post-process frame segments to set frame images, etc.""" fn = "process_frame_segments" globals.log.info("Processing frames...") frame_segments = [s for s in segments if isinstance(s, FrameSegment)] n = len(frame_segments) globals.log.debug("{fn}(): num frames = {n}".format(fn=fn, n=n)) progress = ProgressBar(max_value=n, quiet=args.quiet or args.debug or n == 0) progress.update(0) for i, f in enumerate(frame_segments): try: globals.log.debug( "{fn}(): frame (before) = {b}".format(fn=fn, b=f)) # Frame segments that use a frame from the previous segment. if (f.input_file == "^"): if (f.segment_number > 0): prev = segments[f.segment_number - 1] globals.log.debug( "{fn}(): prev = {p}".format(fn=fn, p=prev)) prev.generate_temp_file(args.output, width=width, height=height) f.use_frame( prev.generate_frame(f.frame_number, args.output, width=width, height=height)) else: globals.log.error( "frame segment {s} is attempting to use the last " "frame of a non-existent previous " "segment".format(s=f.segment_number)) sys.exit(1) # Frame segments whose frame comes from a PDF file. else: suffix = PurePath(f.input_file).suffix if (suffix.lower() == ".pdf"): f.use_frame(f.generate_temp_file(args.output, width=width, height=height)) else: globals.log.error( 'unexpected input file type "{s}" for frame segment ' "{f}".format(s=suffix, f=f.segment_number)) sys.exit(1) progress.update(i) globals.log.debug("{fn}(): frame (after) = ""{a}".format(fn=fn, a=f)) except SegmentException as e: progress.finish() globals.log.error(e.message) sys.exit(1) else: progress.finish() def render_podcast(args, audio_segments, video_segments, output, duration): """Stitch together the various input components into the final podcast.""" fn = "render_podcast" globals.log.info("Rendering final podcast...") command = FFmpegConcatCommand(has_audio=len(audio_segments) > 0, has_video=len(video_segments) > 0, max_progress=duration, quiet=args.quiet and not args.debug, process_audio=args.process_audio, process_video=args.process_video, audio_codec=args.audio_codec, video_codec=args.video_codec) input_files = Segment.input_files() for f in input_files: if (input_files[f]): command.append_input_options(input_files[f]) command.append_input_options(["-i", f]) for s in (audio_segments + video_segments): command.append_filter(s.trim_filter()) command.append_concat_filter("a", [s for s in audio_segments]) if (args.normalise): command.append_normalisation_filter() command.append_concat_filter("v", [s for s in video_segments]) if args.preview: globals.log.info("PREVIEW MODE: {fps} fps".format(fps=args.preview)) command.append_output_options(["-r", args.preview]) command.append_output_options([output]) globals.log.debug("{fn}(): {c}".format(fn=fn, c=command)) if (command.run() != 0): globals.log.error("Failed to render final podcast") def cleanup(segments): """Clean up generated temporary files.""" globals.log.info("Cleaning up...") for s in segments: s.delete_temp_files() def main(): fn = "main" logging.basicConfig( level=logging.INFO, format="%(levelname)s: {p}: %(message)s".format(p=globals.PROGRAM)) segments = None try: args = parse_command_line() check_arguments(args) config = get_configuration(args) segments = process_input_streams(args, config) globals.log.debug("{fn}(): audio segments = {a}".format( fn=fn, a=[s for s in segments if isinstance(s, AudioSegment)])) globals.log.debug("{fn}(): video segments = {v}".format( fn=fn, v=[s for s in segments if isinstance(s, VideoSegment)])) audio_segments = [s for s in segments if isinstance(s, AudioSegment)] video_segments = [s for s in segments if isinstance(s, VideoSegment)] audio_duration = sum([s.get_duration() for s in audio_segments]) video_duration = sum([s.get_duration() for s in video_segments]) globals.log.debug("{fn}(): audio duration = " "{a}".format(fn=fn, a=audio_duration)) globals.log.debug("{fn}(): video duration = " "{v}".format(fn=fn, v=video_duration)) if (len(audio_segments) and len(video_segments)): if (audio_duration != video_duration): globals.log.warning("total video duration ({v}s) doesn't match " "total audio duration " "({a}s)".format(v=video_duration, a=audio_duration)) width, height = smallest_video_dimensions(args, video_segments) globals.log.debug("{fn}(): width = {w}, height = " "{h}".format(fn=fn, w=width, h=height)) process_frame_segments(args, segments, width, height) globals.log.debug("{fn}(): input files = " "{i}".format(fn=fn, i=Segment.input_files())) render_podcast(args, audio_segments, video_segments, args.output, max(audio_duration, video_duration)) except (KeyboardInterrupt): pass finally: if segments and not args.keep: cleanup(segments) if (__name__ == "__main__"): main()
#!/usr/bin/env python import argparse import datetime import logging import os.path import sys from pyparsing import ParseResults import globals from config_parser import ( parse_configuration_file, parse_configuration_string) from progress_bar import (ProgressBar) from segment import (Segment, AudioSegment, VideoSegment, FrameSegment, SegmentException) from shell_command import (FFprobeCommand, FFmpegConcatCommand) class InputStreamAction(argparse.Action): def __call__(self, parser, namespace, values, option_string=None): input = values.split(":") file = input[0] stream = None if (len(input) == 1) else input[1] setattr(namespace, self.dest, file) if (option_string in ["--audio", "-a"]): setattr(namespace, 'audio_stream_number', stream) elif (option_string in ["--video", "-v"]): setattr(namespace, 'video_stream_number', stream) def parse_command_line(): """Parse command line arguments.""" parser = argparse.ArgumentParser( usage="%(prog)s [options] <output>", description="where: <output> is the name of the output file " "(note: .mov seems generally best)", epilog="Default input files can be specified using either of " "--audio or --video. If neither of these are specified, " "then you must supply a configuration file using --config. " "(Of course, you can always supply a configuration file " "regardless.)\n\n" "Input streams can be taken from the same input file.\n\n" "If no segments are specified, the entire input stream is " "processed as one segment. The number and duration of segments " "can differ, but the total duration across all input streams " "should ideally be the same.") parser.set_defaults(audio_stream_number=None, video_stream_number=None) parser.add_argument( "output", help="name of the output file (note: .mov is best)") parser.add_argument( "--audio", "-a", metavar="FILE[:STREAM]", action=InputStreamAction, help="File name for the default audio input stream (can be the " "same as for other input streams). You can optionally specify " "the default audio stream number to use if the file contains " "more than one (this can be overidden in a configuration " "file). If you don't specify a stream number, it defaults " "to 0 (i.e., the first audio stream in the file).") parser.add_argument( "--video", "-v", metavar="FILE[:STREAM]", action=InputStreamAction, help="File name for the default video input stream (can be the " "same as for other input streams). You can optionally specify " "the default video stream number to use if the file contains " "more than one (this can be overidden in a configuration " "file). If you don't specify a stream number, it defaults " "to 0 (i.e., the first video stream in the file).") parser.add_argument( "--configuration", "--config", "-c", dest="config", metavar="FILE", help="File name for the podcast segment configuration (plain text). " "See config_help.md details on the file " "format.".format(p=globals.PROGRAM)) parser.add_argument( "--copy-audio", dest="process_audio", action="store_false", default=True, help="Disable additional processing of the source audio. The audio " "will still be re-encoded using the specified audio codec " "because the concatenation filter requires it, but extra " "processing such as reduction of channels or normalisation " "will not be carried out. (Implies --no-normalise.)") parser.add_argument( "--copy-video", dest="process_video", action="store_false", default=True, help="Disable additional processing of the source video. The video " "will still be re-encoded using the specified video codec " "because the concatenation filter requires it, but extra " "processing such as remapping of colours will not be " "carried out.") parser.add_argument( "--no-normalise", dest="normalise", action="store_false", default=True, help="Disable normalisation of the source audio level (implied by " "--copy-audio).") parser.add_argument( "--audio-codec", dest="audio_codec", metavar="CODEC", default="pcm_s16le", help="Specify ffmpeg audio codec for output (default pcm_s16le). " "See the output of ffmpeg -codecs for possible codecs.") parser.add_argument( "--video-codec", dest="video_codec", metavar="CODEC", default="h264", help="Specify ffmpeg video codec for output (default h264). " "See the output of ffmpeg -codecs for possible codecs.") parser.add_argument( "--input-prefix", "-i", dest="prefix", metavar="PATH", default=".", help="Path to be prefixed to all INPUT files. This includes the " "configuration file, if applicable, and any files specified " "within the configuration file. Input files that already " "include the input prefix will not have it added again.") parser.add_argument( "--preview", "-p", metavar="RATE", nargs="?", const="1", help="Generate a preview of the podcast by only rendering a " "subset of the video frames. RATE is the number of frames " "per second to render (default 1 fps). You can specify " "fractions (e.g., 1/10 for one frame every 10 seconds). " 'NOTE: if you get a "too few arguments" error when using ' "this option, you've probably not provided an fps value and " "placed the option just before the output filename. " "Either move --preview earlier in the option list, add " 'a "--" between it and the output filename, or provide a ' "fps value.") parser.add_argument( "--debug", "-d", action="store_true", help="Print debugging output (overrides --quiet).") parser.add_argument( "--keep", "-k", action="store_true", help="Don't delete any generated temporary files.") parser.add_argument( "--quiet", "-q", action="store_true", help="Mute all console output (overridden by --debug).") args = parser.parse_args() return args def prefix_path(prefix, path): """Prepend prefix to path, unless path already starts with prefix""" assert(prefix is not None) if not path: return None elif path == "." or path.startswith(prefix): return path else: return os.path.join(prefix, path) def check_arguments(args): """Sanity check the command line arguments.""" fn = "check_arguments" # Prepend input files with --input-prefix where applicable. args.audio, args.video, args.config = map( prefix_path, [args.prefix] * 3, [args.audio, args.video, args.config]) if args.quiet: globals.log.setLevel(logging.WARNING) # --copy-audio implies --no-normalise. if not args.process_audio: args.normalise = False # --debug overrides --quiet. if args.debug: globals.log.setLevel(logging.DEBUG) globals.log.debug("{fn}(): args = {a}".format(fn=fn, a=args)) # Must specify at least one of --audio, --video, --config. if not any([args.audio, args.video, args.config]): globals.log.error("must specify at least one of --audio, --video, " "or --config") sys.exit(1) if not os.path.exists(args.prefix): globals.log.error('input prefix "{p}" does not ' "exist".format(p=args.prefix)) sys.exit(1) def get_configuration(args): """Load podcast configuration.""" # Fill in missing file names for default input streams. fn = "get_configuration" # These types can have default input files and streams. type_mapping = { "audio": {"file": args.audio, "stream": args.audio_stream_number}, "video": {"file": args.video, "stream": args.video_stream_number}} globals.log.info("Processing configuration...") if (args.config): config = parse_configuration_file(args.config) # Check that applicable default input streams have been specified. for i, c in enumerate(config): type = c["type"] # Add prefix to filename, if applicable. if c["filename"] and (c["filename"] != "^"): config[i]["filename"] = prefix_path(args.prefix, config[i]["filename"]) if (type in type_mapping): default_file = type_mapping[type]["file"] default_stream = type_mapping[type]["stream"] error_string = ("attempting to use default {s} input file, " "but --{s} hasn't been specified".format(s=type)) else: default_file = None default_stream = 0 error_string = ("attempting to use a default input file, " "but the {s} type doesn't support this".format(s=type)) # No filename in configuration. if (not c["filename"]): if (default_file): config[i]["filename"] = default_file # No filename on command line either. else: globals.log.error(error_string) sys.exit(1) # No stream number in configuration. Note: 0 is a valid # stream number, so explicitly check for None. if (c["num"] is None): # Assume 0 if no stream on command line either. if (default_stream is None): config[i]["num"] = 0 else: config[i]["num"] = default_stream # No configuration file. else: conf_list = [] for m in type_mapping: default_file = type_mapping[m]["file"] default_stream = type_mapping[m]["stream"] if (default_file and default_stream is not None): conf_list += ["[{type}:{file}:{stream}]".format( type=m, file=default_file, stream=default_stream)] globals.log.debug("{fn}(): default config = " "{c}".format(fn=fn, c=conf_list)) config = parse_configuration_string("\n".join(conf_list)) return config def get_file_duration(file): """Calculate the duration a media file as a timedelta object.""" fn = "get_file_duration" command = FFprobeCommand([file]) globals.log.debug("{fn}(): {cmd}".format(fn=fn, cmd=command)) # Only consider the first stream. If it's the only stream in the # file, great; otherwise it seems reasonable to assume that all # streams in the same file will have the same duration. ss, ms = command.get_entries( section="format", find_list=["duration"])[0].split(".") ms = ms[:3].ljust(3, "0") globals.log.debug("{fn}(): ss = {ss}, ms = {ms}".format(fn=fn, ss=ss, ms=ms)) return datetime.timedelta(seconds=int(ss), milliseconds=int(ms)) def make_new_segment(type, filename, punch_in, punch_out, num): """Make a new segment instance of the correct class.""" fn = "make_new_segment" globals.log.debug("{fn}(): type = {t}".format(fn=fn, t=type)) globals.log.debug("{fn}(): filename = {f}".format(fn=fn, f=filename)) globals.log.debug("{fn}(): punch in = {i}".format(fn=fn, i=punch_in)) globals.log.debug("{fn}(): punch out = {o}".format(fn=fn, o=punch_out)) globals.log.debug("{fn}(): num = {n}".format(fn=fn, n=num)) if (type == "audio"): return AudioSegment(file=filename, punch_in=punch_in, punch_out=punch_out, input_stream=num) elif (type == "video"): return VideoSegment(file=filename, punch_in=punch_in, punch_out=punch_out, input_stream=num) elif (type == "frame"): return FrameSegment(file=filename, punch_in=punch_in, punch_out=punch_out, frame_number=num) else: return None def process_timestamp_pair(args, times): """Constructs timedelta instances from a pair of config timestamps.""" fn = "process_timestamp_pair" globals.log.debug("{fn}(): times[0] = {t}".format(fn=fn, t=times[0])) globals.log.debug("{fn}(): times[1] = {t}".format(fn=fn, t=times[1])) # If the first item in the timestamp list in the configuration file # is a filename, the parser inserts a zero timestamp before it. We # can therefore guarantee that the first item of the pair will # always be a timestamp. t0 = datetime.timedelta( hours=times[0]["hh"], minutes=times[0]["mm"], seconds=times[0]["ss"], milliseconds=times[0]["ms"]) if (times[1]): if (len(times[1]) == 1): # filename t1 = t0 + get_file_duration( prefix_path(args.prefix, times[1]["filename"])) elif (len(times[1]) == 4): # normal timestamp t1 = datetime.timedelta( hours=times[1]["hh"], minutes=times[1]["mm"], seconds=times[1]["ss"], milliseconds=times[1]["ms"]) else: globals.log.error("{fn}():unreadable timestamp {t}".format( fn=fn, t=times[1])) t1 = None else: t1 = None globals.log.debug("{fn}(): t0 = {t}".format(fn=fn, t=t0)) globals.log.debug("{fn}(): t1 = {t}".format(fn=fn, t=t1)) return t0, t1 def process_time_list(args, type, filename, num, time_list): """Process an audio or video stream and build a list of segments.""" fn = "process_time_list" if (os.path.exists(filename) and type in ["audio", "video"]): stream_duration = get_file_duration(filename) else: stream_duration = 0 segments = [] globals.log.debug("{fn}(): stream duration = {d}".format( fn=fn, d=stream_duration)) # No timestamps: punch in at 0, out at stream duration. if (len(time_list) == 0): punch_in = datetime.timedelta() punch_out = stream_duration segments.append(make_new_segment(type, filename, punch_in, punch_out, num)) else: # Process each pair of timestamps as punch in, out. If there's # an odd number of items, the last one is processed separately. for t in zip(time_list[::2], time_list[1::2]): punch_in, punch_out = process_timestamp_pair(args, t) if (punch_in == punch_out): globals.log.warning( "punch in ({i}s) and punch out ({o}s) times are " "equal; no segment will be " "generated".format(i=punch_in.total_seconds(), o=punch_out.total_seconds())) continue elif (punch_out < punch_in): globals.log.error( "punch out time ({i}s) falls before punch in time " "({o}s); can't generate a valid " "segment".format(i=punch_in.total_seconds(), o=punch_out.total_seconds())) sys.exit(1) segments.append(make_new_segment(type, filename, punch_in, punch_out, num)) # Odd number of timestamps: punch in at last timestamp, # out at stream duration. if (len(time_list) % 2 != 0): globals.log.debug("{fn}(): odd number of timestamps".format(fn=fn)) punch_in, _ = process_timestamp_pair(args, [time_list[-1], None]) punch_out = stream_duration segments.append(make_new_segment(type, filename, punch_in, punch_out, num)) return segments def process_input_streams(args, config): """Process a list of stream specifications and build a list of segments.""" fn = "process_input_streams" globals.log.info("Processing input streams...") segments = [] for cnf in config: globals.log.debug("{fn}(): type = {t}".format(fn=fn, t=cnf["type"])) globals.log.debug( "{fn}(): filename = {f}".format(fn=fn, f=cnf["filename"])) globals.log.debug("{fn}(): num = {n}".format(fn=fn, n=cnf["num"])) globals.log.debug("{fn}(): times = {t}".format(fn=fn, t=cnf["times"])) segments += process_time_list(args, cnf["type"], cnf["filename"], cnf["num"], cnf["times"]) return segments def smallest_video_dimensions(args, segments): """Compute the smallest frame dimensions across all video inputs.""" fn = "smallest_video_dimensions" width = 2048 height = 1536 for s in segments: if isinstance(s, FrameSegment): continue command = FFprobeCommand([s.input_file]) globals.log.debug("{fn}(): {cmd}".format(fn=fn, cmd=command)) w, h = command.get_entries(find_list=["width", "height"]) if (w * h) < (width * height): width, height = w, h return width, height def process_frame_segments(args, segments, width, height): """Post-process frame segments to set frame images, etc.""" fn = "process_frame_segments" globals.log.info("Processing frames...") frame_segments = [s for s in segments if isinstance(s, FrameSegment)] n = len(frame_segments) globals.log.debug("{fn}(): num frames = {n}".format(fn=fn, n=n)) progress = ProgressBar(max_value=n, quiet=args.quiet or args.debug or n == 0) progress.update(0) for i, f in enumerate(frame_segments): try: globals.log.debug( "{fn}(): frame (before) = {b}".format(fn=fn, b=f)) # Frame segments that use a frame from the previous segment. if (f.input_file == "^"): if (f.segment_number > 0): prev = segments[f.segment_number - 1] globals.log.debug( "{fn}(): prev = {p}".format(fn=fn, p=prev)) prev.generate_temp_file(args.output, width=width, height=height) f.use_frame( prev.generate_frame(f.frame_number, args.output, width=width, height=height)) else: globals.log.error( "frame segment {s} is attempting to use the last " "frame of a non-existent previous " "segment".format(s=f.segment_number)) sys.exit(1) # Frame segments whose frame comes from a PDF file. else: _, suffix = os.path.splitext(f.input_file) if (suffix.lower() == ".pdf"): f.use_frame(f.generate_temp_file(args.output, width=width, height=height)) else: globals.log.error( 'unexpected input file type "{s}" for frame segment ' "{f}".format(s=suffix, f=f.segment_number)) sys.exit(1) progress.update(i) globals.log.debug("{fn}(): frame (after) = ""{a}".format(fn=fn, a=f)) except SegmentException as e: progress.finish() globals.log.error(e.message) sys.exit(1) else: progress.finish() def render_podcast(args, audio_segments, video_segments, output, duration): """Stitch together the various input components into the final podcast.""" fn = "render_podcast" globals.log.info("Rendering final podcast...") command = FFmpegConcatCommand(has_audio=len(audio_segments) > 0, has_video=len(video_segments) > 0, max_progress=duration, quiet=args.quiet and not args.debug, process_audio=args.process_audio, process_video=args.process_video, audio_codec=args.audio_codec, video_codec=args.video_codec) input_files = Segment.input_files() for f in input_files: if (input_files[f]): command.append_input_options(input_files[f]) command.append_input_options(["-i", f]) for s in (audio_segments + video_segments): command.append_filter(s.trim_filter()) command.append_concat_filter("a", [s for s in audio_segments]) if (args.normalise): command.append_normalisation_filter() command.append_concat_filter("v", [s for s in video_segments]) if args.preview: globals.log.info("PREVIEW MODE: {fps} fps".format(fps=args.preview)) command.append_output_options(["-r", args.preview]) command.append_output_options([output]) globals.log.debug("{fn}(): {c}".format(fn=fn, c=command)) if (command.run() != 0): globals.log.error("Failed to render final podcast") def cleanup(segments): """Clean up generated temporary files.""" globals.log.info("Cleaning up...") for s in segments: s.delete_temp_files() def main(): fn = "main" logging.basicConfig( level=logging.INFO, format="%(levelname)s: {p}: %(message)s".format(p=globals.PROGRAM)) segments = None try: args = parse_command_line() check_arguments(args) config = get_configuration(args) segments = process_input_streams(args, config) globals.log.debug("{fn}(): audio segments = {a}".format( fn=fn, a=[s for s in segments if isinstance(s, AudioSegment)])) globals.log.debug("{fn}(): video segments = {v}".format( fn=fn, v=[s for s in segments if isinstance(s, VideoSegment)])) audio_segments = [s for s in segments if isinstance(s, AudioSegment)] video_segments = [s for s in segments if isinstance(s, VideoSegment)] audio_duration = sum([s.get_duration() for s in audio_segments]) video_duration = sum([s.get_duration() for s in video_segments]) globals.log.debug("{fn}(): audio duration = " "{a}".format(fn=fn, a=audio_duration)) globals.log.debug("{fn}(): video duration = " "{v}".format(fn=fn, v=video_duration)) if (len(audio_segments) and len(video_segments)): if (audio_duration != video_duration): globals.log.warning("total video duration ({v}s) doesn't match " "total audio duration " "({a}s)".format(v=video_duration, a=audio_duration)) width, height = smallest_video_dimensions(args, video_segments) globals.log.debug("{fn}(): width = {w}, height = " "{h}".format(fn=fn, w=width, h=height)) process_frame_segments(args, segments, width, height) globals.log.debug("{fn}(): input files = " "{i}".format(fn=fn, i=Segment.input_files())) render_podcast(args, audio_segments, video_segments, args.output, max(audio_duration, video_duration)) except (KeyboardInterrupt): pass finally: if segments and not args.keep: cleanup(segments) if (__name__ == "__main__"): main()
Ignore Space
Show notes
View
progress_bar.py
#!/usr/bin/env python3 import sys import time class ProgressBar(object): """A simple progress bar with a percent completed value.""" def __init__(self, initial_value=0, max_value=100, print_width=50, newline="\r", quiet=False): self.value = self.initial_value = initial_value self.max_value = max_value self.print_width = print_width self.newline = newline self.quiet = quiet def set(self, value=0): """Set the current value of the progress bar.""" self.value = value def reset(self): """Reset the progress bar to its initial value.""" self.set(value=self.initial_value) def finish(self): """Complete the progress bar by setting it to its maximum value.""" self.update(value=self.max_value) if not self.quiet: print() def update(self, value=0): """Set the current value of the progress bar and redraw it.""" self.set(value) self.draw() def draw(self): """Draw the current state of the progress bar.""" if not self.quiet: percent = int(self.value * 100 / self.max_value) dots = int(self.value * self.print_width / self.max_value) bar = "{nl}[{c}{nc}] {p}% ".format( c="+" * dots, nc="." * (self.print_width - dots), p=percent, nl=self.newline) sys.stdout.write(bar) sys.stdout.flush() if __name__ == "__main__": p = ProgressBar() for i in range(0, 100): p.update(value=i) time.sleep(0.05) p.finish()
#!/usr/bin/env python import sys import time class ProgressBar(object): """A simple progress bar with a percent completed value.""" def __init__(self, initial_value=0, max_value=100, print_width=50, newline="\r", quiet=False): self.value = self.initial_value = initial_value self.max_value = max_value self.print_width = print_width self.newline = newline self.quiet = quiet def set(self, value=0): """Set the current value of the progress bar.""" self.value = value def reset(self): """Reset the progress bar to its initial value.""" self.set(value=self.initial_value) def finish(self): """Complete the progress bar by setting it to its maximum value.""" self.update(value=self.max_value) if not self.quiet: print() def update(self, value=0): """Set the current value of the progress bar and redraw it.""" self.set(value) self.draw() def draw(self): """Draw the current state of the progress bar.""" if not self.quiet: percent = int(self.value * 100 / self.max_value) dots = int(self.value * self.print_width / self.max_value) bar = "{nl}[{c}{nc}] {p}% ".format( c="+" * dots, nc="." * (self.print_width - dots), p=percent, nl=self.newline) sys.stdout.write(bar) sys.stdout.flush() if __name__ == "__main__": p = ProgressBar() for i in range(0, 100): p.update(value=i) time.sleep(0.05) p.finish()
Ignore Space
Show notes
View
segment.py
#!/usr/bin/env python3 from collections import OrderedDict import errno import itertools import logging import os from pathlib import Path import globals from shell_command import (ConvertCommand, FFprobeCommand, FFmpegCommand) class SegmentException(Exception): pass class Segment(object): """A segment within the podcast. A segment has an input file, and a punch-in and punch-out point (both in seconds). """ # Automatic segment number generator. _new_segment_num = itertools.count().next # Keep track of input files in the order they're loaded, so that we # can easily reference them by index in the ffmpeg command (i.e., # first input file is, 0, etc.). _input_files = OrderedDict() # A string representing the type of the segment (e.g., "audio"). # This is handy for generating temporary files and means that we # can implement this as a single method in the root class. _TYPE = "" # Which trim and setpts filters to use in ffmpeg complex filters. # As above, this lets us implement a single method in the root class. _TRIM = "" _SETPTS = "" @staticmethod def input_files(): return Segment._input_files @staticmethod def _rename_input_file(old, new): tmp = OrderedDict() for f in Segment._input_files: if (f == old): tmp[new] = Segment._input_files[f] else: tmp[f] = Segment._input_files[f] Segment._input_files = tmp def __init__(self, file="", punch_in=0, punch_out=0, input_stream=0): self.segment_number = self.__class__._new_segment_num() self.input_file = file self.punch_in = punch_in self.punch_out = punch_out self.input_stream = input_stream self._temp_file = "" self._temp_suffix = "mov" # List of temporary files to delete when cleaning up. self._temp_files_list = [] if (file not in self.__class__._input_files): self.__class__._input_files[file] = None self._input_options = ["-ss", str(self.punch_in.total_seconds()), "-t", str(self.get_duration()), "-i", self.input_file] self._output_options = [] def __repr__(self): return('<{c} {n}: file "{f}", in {i}, out ' '{o}>'.format(c=self.__class__.__name__, n=self.segment_number, t=self._TYPE, f=self.input_file, i=self.punch_in, o=self.punch_out)) def get_duration(self): """Return the duration of the segment in seconds.""" return (self.punch_out - self.punch_in).total_seconds() def generate_temp_filename(self, output, suffix=None): """Generate a temporary filename for the segment.""" if not suffix: suffix = self._temp_suffix return Path("temp_{t}_{o}_{n:03d}".format( t=self._TYPE, o=Path(output).stem, n=self.segment_number)).with_suffix(suffix) def generate_temp_file(self, output, width=0, height=0): """Compile the segment from the original source file(s).""" # Note: width and height are currently ignored for segments # in general. This may (or may not) need to change if there # are multiple inputs of different dimensions. fn = "generate_temp_file" self._temp_file = self.generate_temp_filename(output) command = FFmpegCommand( input_options=self._input_options + ["-codec", "copy"], output_options=self._output_options + [self._temp_file]) globals.log.debug("{cls}.{fn}(): {cmd}".format( cls=self.__class__.__name__, fn=fn, cmd=command)) if (command.run() == 0): self._temp_files_list.append(self._temp_file) return self._temp_file else: raise SegmentException( "Failed to generate temporary file {f} for " "{s}".format(f=self._temp_file, s=self)) def temp_file(self): """Return the temporary file associated with the segment.""" return self._temp_file def delete_temp_files(self): """Delete the temporary file(s) associated with the segment.""" # Note: sometimes segments (especially frame segments) may # share the same temporary file. Just ignore the file not # found exception that occurs in these cases. for f in self._temp_files_list: try: os.remove(f) except OSError as e: if (e.errno != errno.ENOENT): raise e def input_stream_specifier(self): """Return the segment's ffmpeg stream input specifier.""" return "[{n}:{t}]".format( n=self.__class__._input_files.keys().index(self.input_file), t=self._TYPE[0] if self._TYPE else "") def output_stream_specifier(self): """Return the segment's ffmpeg audio stream output specifier.""" return "[{t}{n}]".format(t=self._TYPE[0] if self._TYPE else "", n=self.segment_number) def trim_filter(self): """Return an FFMPEG trim filter for this segment.""" return ("{inspec} " "{trim}=start={pi}:duration={po},{setpts}=PTS-STARTPTS " "{outspec}".format( inspec=self.input_stream_specifier(), trim=self._TRIM, setpts=self._SETPTS, pi=self.punch_in.total_seconds(), po=self.get_duration(), outspec=self.output_stream_specifier())) class AudioSegment(Segment): """A segment of an audio input stream.""" _TYPE = "audio" _TRIM = "atrim" _SETPTS = "asetpts" def __init__(self, file="", punch_in=0, punch_out=0, input_stream=0): super(AudioSegment, self).__init__(file, punch_in, punch_out, input_stream) self._temp_suffix = "wav" self._output_options = ["-ac", "1", "-map", "{n}:a".format(n=self.input_stream)] class VideoSegment(Segment): """A segment of a video input stream.""" _TYPE = "video" _TRIM = "trim" _SETPTS = "setpts" def __init__(self, file="", punch_in=0, punch_out=0, input_stream=0): super(VideoSegment, self).__init__(file, punch_in, punch_out, input_stream) self._output_options = ["-map", "{n}:v".format(n=self.input_stream)] self._temp_frame_file = "" def get_last_frame_number(self): """Calculate frame number of segment's last frame using ffprobe.""" fn = "get_last_frame_number" if (self._temp_file): self._temp_frame_file = "__{f}".format(f=self._temp_file) # To speed things up, grab up to the last 5 seconds of the # segment's temporary file, as we otherwise have to scan the # entire temporary file to find the last frame, which can # take a while. command = FFmpegCommand( input_options=["-ss", str(max(self.get_duration() - 5, 0)), "-i", self._temp_file], output_options=["-codec:v", "copy", "-map", "0:v", self._temp_frame_file]) globals.log.debug("{cls}.{fn}(): {cmd}".format( cls=self.__class__.__name__, fn=fn, cmd=command)) if (command.run() == 0): self._temp_files_list.append(self._temp_frame_file) command = FFprobeCommand([self._temp_frame_file]) globals.log.debug("{cls}.{fn}(): {cmd}".format( cls=self.__class__.__name__, fn=fn, cmd=command)) return int(command.get_entries( section="stream", find_list=["nb_frames"])[0]) - 1 else: raise SegmentException( "Failed to generate temporary file to get last frame " "number for {s}".format(s=self)) else: raise SegmentException( "Can't get last frame of {s} because it has no temporary " "file".format(s=self)) def generate_frame(self, frame_number, output, width=2048, height=1536): """Create a JPEG file from the specified frame of the segment.""" # Note: width and height are currently ignored for video # segments. This will need to change if there are multiple # video inputs of different dimensions. fn = "generate_frame" temp_frame = self.generate_temp_filename(output, suffix="jpg") if (frame_number == -1): frame_number = self.get_last_frame_number() command = FFmpegCommand( input_options=["-i", self._temp_frame_file], output_options=["-filter:v", "select='eq(n, {n})'".format(n=frame_number), "-frames:v", "1", "-f", "image2", "-map", "0:v", temp_frame]) globals.log.debug("{cls}.{fn}(): {cmd}".format( cls=self.__class__.__name__, fn=fn, cmd=command)) if (command.run() == 0): self._temp_files_list.append(temp_frame) return temp_frame else: raise SegmentException( "Failed to create JPEG for frame {n} of " "{s}".format(n=frame_number, s=self)) class FrameSegment(VideoSegment): """A video segment derived from a single still frame.""" _TYPE = "frame" def __init__(self, file="", punch_in=0, punch_out=0, input_stream=0, frame_number=0): super(FrameSegment, self).__init__(file, punch_in, punch_out, input_stream) self.frame_number = frame_number self._input_options = ["-loop", "1", "-t", str(self.get_duration()), "-i", self.input_file] self.__class__._input_files[file] = self._input_options[:4] def __repr__(self): return('<{c} {n}: file "{f}", in {i}, out {o}, frame number ' '{fn}>'.format(c=self.__class__.__name__, n=self.segment_number, t=self._TYPE, f=self.input_file, i=self.punch_in, o=self.punch_out, fn=self.frame_number)) def generate_temp_file(self, output, width=2048, height=1536): """Compile the segment from the original source file(s).""" fn = "generate_temp_file" self._temp_file = self.generate_temp_filename(output, suffix="jpg") command = ConvertCommand( input_options=["{f}[{n}]".format( f=self.input_file, n=self.frame_number)], output_options=["{f}".format(f=self._temp_file)], width=width, height=height) globals.log.debug("{cls}.{fn}(): {cmd}".format( cls=self.__class__.__name__, fn=fn, cmd=command)) if (command.run() == 0): self._temp_files_list.append(self._temp_file) return self._temp_file else: raise SegmentException( "Failed to generate temporary file {f} for " "{s}".format(f=self._temp_file, s=self)) def use_frame(self, frame): """Set the image to use for generating the frame video.""" self.__class__._rename_input_file(self.input_file, frame) self.input_file = frame self._input_options = ["-loop", "1", "-t", str(self.get_duration()), "-i", self.input_file] self.__class__._input_files[frame] = self._input_options[:4] def input_stream_specifier(self): """Return the segment's ffmpeg stream input specifier.""" return "[{n}:v]".format( n=self.__class__._input_files.keys().index(self.input_file)) def output_stream_specifier(self): """Return the segment's ffmpeg audio stream output specifier.""" return self.input_stream_specifier() def trim_filter(self): """Return an FFMPEG trim filter for this segment.""" return "" if (__name__ == "__main__"): pass
#!/usr/bin/env python from collections import OrderedDict import errno import itertools import logging import os import os.path import globals from shell_command import (ConvertCommand, FFprobeCommand, FFmpegCommand) class SegmentException(Exception): pass class Segment(object): """A segment within the podcast. A segment has an input file, and a punch-in and punch-out point (both in seconds). """ # Automatic segment number generator. _new_segment_num = itertools.count().next # Keep track of input files in the order they're loaded, so that we # can easily reference them by index in the ffmpeg command (i.e., # first input file is, 0, etc.). _input_files = OrderedDict() # A string representing the type of the segment (e.g., "audio"). # This is handy for generating temporary files and means that we # can implement this as a single method in the root class. _TYPE = "" # Which trim and setpts filters to use in ffmpeg complex filters. # As above, this lets us implement a single method in the root class. _TRIM = "" _SETPTS = "" @staticmethod def input_files(): return Segment._input_files @staticmethod def _rename_input_file(old, new): tmp = OrderedDict() for f in Segment._input_files: if (f == old): tmp[new] = Segment._input_files[f] else: tmp[f] = Segment._input_files[f] Segment._input_files = tmp def __init__(self, file="", punch_in=0, punch_out=0, input_stream=0): self.segment_number = self.__class__._new_segment_num() self.input_file = file self.punch_in = punch_in self.punch_out = punch_out self.input_stream = input_stream self._temp_file = "" self._temp_suffix = "mov" # List of temporary files to delete when cleaning up. self._temp_files_list = [] if (file not in self.__class__._input_files): self.__class__._input_files[file] = None self._input_options = ["-ss", str(self.punch_in.total_seconds()), "-t", str(self.get_duration()), "-i", self.input_file] self._output_options = [] def __repr__(self): return('<{c} {n}: file "{f}", in {i}, out ' '{o}>'.format(c=self.__class__.__name__, n=self.segment_number, t=self._TYPE, f=self.input_file, i=self.punch_in, o=self.punch_out)) def get_duration(self): """Return the duration of the segment in seconds.""" return (self.punch_out - self.punch_in).total_seconds() def generate_temp_filename(self, output, suffix=None): """Generate a temporary filename for the segment.""" if not suffix: suffix = self._temp_suffix return os.path.extsep.join( ["temp_{t}_{o}_{n:03d}".format( t=self._TYPE, o=os.path.splitext(output)[0], n=self.segment_number), suffix]) def generate_temp_file(self, output, width=0, height=0): """Compile the segment from the original source file(s).""" # Note: width and height are currently ignored for segments # in general. This may (or may not) need to change if there # are multiple inputs of different dimensions. fn = "generate_temp_file" self._temp_file = self.generate_temp_filename(output) command = FFmpegCommand( input_options=self._input_options + ["-codec", "copy"], output_options=self._output_options + [self._temp_file]) globals.log.debug("{cls}.{fn}(): {cmd}".format( cls=self.__class__.__name__, fn=fn, cmd=command)) if (command.run() == 0): self._temp_files_list.append(self._temp_file) return self._temp_file else: raise SegmentException( "Failed to generate temporary file {f} for " "{s}".format(f=self._temp_file, s=self)) def temp_file(self): """Return the temporary file associated with the segment.""" return self._temp_file def delete_temp_files(self): """Delete the temporary file(s) associated with the segment.""" # Note: sometimes segments (especially frame segments) may # share the same temporary file. Just ignore the file not # found exception that occurs in these cases. for f in self._temp_files_list: try: os.remove(f) except OSError as e: if (e.errno != errno.ENOENT): raise e def input_stream_specifier(self): """Return the segment's ffmpeg stream input specifier.""" return "[{n}:{t}]".format( n=self.__class__._input_files.keys().index(self.input_file), t=self._TYPE[0] if self._TYPE else "") def output_stream_specifier(self): """Return the segment's ffmpeg audio stream output specifier.""" return "[{t}{n}]".format(t=self._TYPE[0] if self._TYPE else "", n=self.segment_number) def trim_filter(self): """Return an FFMPEG trim filter for this segment.""" return ("{inspec} " "{trim}=start={pi}:duration={po},{setpts}=PTS-STARTPTS " "{outspec}".format( inspec=self.input_stream_specifier(), trim=self._TRIM, setpts=self._SETPTS, pi=self.punch_in.total_seconds(), po=self.get_duration(), outspec=self.output_stream_specifier())) class AudioSegment(Segment): """A segment of an audio input stream.""" _TYPE = "audio" _TRIM = "atrim" _SETPTS = "asetpts" def __init__(self, file="", punch_in=0, punch_out=0, input_stream=0): super(AudioSegment, self).__init__(file, punch_in, punch_out, input_stream) self._temp_suffix = "wav" self._output_options = ["-ac", "1", "-map", "{n}:a".format(n=self.input_stream)] class VideoSegment(Segment): """A segment of a video input stream.""" _TYPE = "video" _TRIM = "trim" _SETPTS = "setpts" def __init__(self, file="", punch_in=0, punch_out=0, input_stream=0): super(VideoSegment, self).__init__(file, punch_in, punch_out, input_stream) self._output_options = ["-map", "{n}:v".format(n=self.input_stream)] self._temp_frame_file = "" def get_last_frame_number(self): """Calculate frame number of segment's last frame using ffprobe.""" fn = "get_last_frame_number" if (self._temp_file): self._temp_frame_file = "__{f}".format(f=self._temp_file) # To speed things up, grab up to the last 5 seconds of the # segment's temporary file, as we otherwise have to scan the # entire temporary file to find the last frame, which can # take a while. command = FFmpegCommand( input_options=["-ss", str(max(self.get_duration() - 5, 0)), "-i", self._temp_file], output_options=["-codec:v", "copy", "-map", "0:v", self._temp_frame_file]) globals.log.debug("{cls}.{fn}(): {cmd}".format( cls=self.__class__.__name__, fn=fn, cmd=command)) if (command.run() == 0): self._temp_files_list.append(self._temp_frame_file) command = FFprobeCommand([self._temp_frame_file]) globals.log.debug("{cls}.{fn}(): {cmd}".format( cls=self.__class__.__name__, fn=fn, cmd=command)) return int(command.get_entries( section="stream", find_list=["nb_frames"])[0]) - 1 else: raise SegmentException( "Failed to generate temporary file to get last frame " "number for {s}".format(s=self)) else: raise SegmentException( "Can't get last frame of {s} because it has no temporary " "file".format(s=self)) def generate_frame(self, frame_number, output, width=2048, height=1536): """Create a JPEG file from the specified frame of the segment.""" # Note: width and height are currently ignored for video # segments. This will need to change if there are multiple # video inputs of different dimensions. fn = "generate_frame" temp_frame = self.generate_temp_filename(output, suffix="jpg") if (frame_number == -1): frame_number = self.get_last_frame_number() command = FFmpegCommand( input_options=["-i", self._temp_frame_file], output_options=["-filter:v", "select='eq(n, {n})'".format(n=frame_number), "-frames:v", "1", "-f", "image2", "-map", "0:v", temp_frame]) globals.log.debug("{cls}.{fn}(): {cmd}".format( cls=self.__class__.__name__, fn=fn, cmd=command)) if (command.run() == 0): self._temp_files_list.append(temp_frame) return temp_frame else: raise SegmentException( "Failed to create JPEG for frame {n} of " "{s}".format(n=frame_number, s=self)) class FrameSegment(VideoSegment): """A video segment derived from a single still frame.""" _TYPE = "frame" def __init__(self, file="", punch_in=0, punch_out=0, input_stream=0, frame_number=0): super(FrameSegment, self).__init__(file, punch_in, punch_out, input_stream) self.frame_number = frame_number self._input_options = ["-loop", "1", "-t", str(self.get_duration()), "-i", self.input_file] self.__class__._input_files[file] = self._input_options[:4] def __repr__(self): return('<{c} {n}: file "{f}", in {i}, out {o}, frame number ' '{fn}>'.format(c=self.__class__.__name__, n=self.segment_number, t=self._TYPE, f=self.input_file, i=self.punch_in, o=self.punch_out, fn=self.frame_number)) def generate_temp_file(self, output, width=2048, height=1536): """Compile the segment from the original source file(s).""" fn = "generate_temp_file" self._temp_file = self.generate_temp_filename(output, suffix="jpg") command = ConvertCommand( input_options=["{f}[{n}]".format( f=self.input_file, n=self.frame_number)], output_options=["{f}".format(f=self._temp_file)], width=width, height=height) globals.log.debug("{cls}.{fn}(): {cmd}".format( cls=self.__class__.__name__, fn=fn, cmd=command)) if (command.run() == 0): self._temp_files_list.append(self._temp_file) return self._temp_file else: raise SegmentException( "Failed to generate temporary file {f} for " "{s}".format(f=self._temp_file, s=self)) def use_frame(self, frame): """Set the image to use for generating the frame video.""" self.__class__._rename_input_file(self.input_file, frame) self.input_file = frame self._input_options = ["-loop", "1", "-t", str(self.get_duration()), "-i", self.input_file] self.__class__._input_files[frame] = self._input_options[:4] def input_stream_specifier(self): """Return the segment's ffmpeg stream input specifier.""" return "[{n}:v]".format( n=self.__class__._input_files.keys().index(self.input_file)) def output_stream_specifier(self): """Return the segment's ffmpeg audio stream output specifier.""" return self.input_stream_specifier() def trim_filter(self): """Return an FFMPEG trim filter for this segment.""" return "" if (__name__ == "__main__"): pass
Ignore Space
Show notes
View
shell_command.py
#!/usr/bin/env python3 import datetime import distutils.spawn import json import tempfile from pathlib import Path import re import pexpect from progress_bar import (ProgressBar) class ShellCommand(object): """A shell command. _executable contains the full path to the relevant executable. _base_options is a list of standard general options for this command. """ _executable = "" _base_options = [] _expect_patterns = [] @staticmethod def shellquote(s): """Quote a string so it can be safely pasted into the shell.""" # Note: pipes/shlex.quote() only wraps '' around things, # it doesn't do things like \( \), which we also need. # We double-quote everything because that makes it easier # to deal with single quotes. if s: regexes = [ # grouping parentheses for convert re.compile(r"^(\(|\))$"), # double quote (in middle of string) re.compile(r'^(.+".*)$'), # quotes around entire string re.compile(r"""^((?P<quote>['"]).*(?P=quote))$"""), # command line switch (starts with one or two "-") re.compile(r"^(--?.+)$"), # command path (starts with "/") re.compile(r"^(/[\w/]+)$")] substitutions = [r"\\\1", r"'\1'", r"\1", r"\1", r"\1"] num_matches = 0 for subst in zip(regexes, substitutions): (s, n) = subst[0].subn(subst[1], s) num_matches += n if num_matches == 0: # catch-all for non-word characters when none of the other # substitutions apply s = re.compile(r"^(.*\W+.*)$").sub(r'"\1"', s) return s def __init__(self, input_options=[], output_options=[], quiet=False): self.input_options = input_options self.output_options = output_options self.progress = None self.process = None def __repr__(self): return "<{cls}: {cmd}>".format( cls=self.__class__.__name__, cmd=self.command_string(quote=True)) def append_input_options(self, items=[]): """Add a list of items to the end of the input options.""" self.input_options += items def prepend_input_options(self, items=[]): """Add a list of items at the front of the input options.""" self.input_options = items + self.input_options def append_output_options(self, items=[]): """Add a list of items to the end of the output options.""" self.output_options += items def prepend_output_options(self, items=[]): """Add a list of items at the front of the output options.""" self.output_options = items + self.output_options def executable_string(self, quote=False): """Return the executable as a string.""" if quote: return ShellCommand.shellquote(self._executable) else: return self._executable def argument_string(self, quote=False): """Return the list of arguments as a string.""" args = self._base_options + self.input_options + self.output_options if quote: return " ".join([ShellCommand.shellquote(a) for a in args]) else: return " ".join(args) def argument_list(self): """Return a combined list of all arguments.""" return self._base_options + self.input_options + self.output_options def command_string(self, quote=False): """Return the entire command as a string.""" return "{exe} {arg}".format(exe=self.executable_string(quote), arg=self.argument_string(quote)) def process_pattern(self, pat): """Respond to a pexpect pattern. Return True on EOF.""" return (pat == 0) def run(self): """Execute the command in a subprocess.""" self.process = pexpect.spawn(self.executable_string(), self.argument_list()) # EOF is *always* the first pattern. patterns = self.process.compile_pattern_list( [pexpect.EOF] + self._expect_patterns) try: while True: i = self.process.expect_list(patterns, timeout=None) if self.process_pattern(i): break except (KeyboardInterrupt): pass finally: if self.progress: self.progress.finish() self.process.close() return self.process.exitstatus def get_output(self): """Execute the command in a subprocess and return the output.""" return pexpect.run(self.command_string(quote=True)) class ConvertCommand(ShellCommand): """An ImageMagick convert command.""" _executable = distutils.spawn.find_executable("convert") _base_options = ["-density", "600", "xc:dimgrey", "null:", # dark grey background "("] def __init__(self, input_options=[], output_options=[], width=2048, height=1536): super(ConvertCommand, self).__init__(input_options, output_options) self._base_options = (["-size", "{w}x{h}".format(w=width, h=height)] + self._base_options) self.append_input_options( ["-resize", "{w}x{h}".format(w=width, h=height), "-background", "white", "-alpha", "remove", "-type", "truecolor", # force RGB (this and next line) "-define", "colorspace:auto-grayscale=off"]) self.prepend_output_options([")", "-gravity", "center", "-layers", "composite", "-flatten"]) class FFprobeCommand(ShellCommand): """An ffprobe shell command.""" _executable = distutils.spawn.find_executable("ffprobe") _base_options = ["-loglevel", "error", "-show_entries", "format:stream", "-print_format", "json"] def __init__(self, input_options=[], output_options=[]): super(FFprobeCommand, self).__init__(input_options, output_options) self.entries = None # The input file should be the last input option. assert(Path(self.input_options[-1]).exists()) self.last_modified = Path(self.input_options[-1]).stat().st_mtime def get_entries(self, section="stream", find_list=[]): """Fetch specified attributes from the input file.""" # Re-fetch if the file's changed since we last looked. modified = Path(self.input_options[-1]).stat().st_mtime if (not self.entries) or (modified > self.last_modified): js = json.loads(self.get_output()) self.entries = {"format": js["format"], "stream": js["streams"][0]} return [self.entries[section][f] for f in find_list] class FFmpegCommand(ShellCommand): """A "simple" ffmpeg shell command.""" _executable = distutils.spawn.find_executable("ffmpeg") _base_options = ["-y", "-nostdin"] class FFmpegConcatCommand(FFmpegCommand): """An ffmpeg shell command with a complex concat filter.""" _expect_patterns = [r"time=(\d\d):(\d\d):(\d\d\.\d\d)"] def __init__(self, input_options=[], output_options=[], quiet=False, max_progress=100, has_audio=False, has_video=False, process_audio=True, process_video=True, audio_codec="pcm_s16le", video_codec="h264"): super(FFmpegConcatCommand, self).__init__( input_options, output_options) self.progress = ProgressBar(max_value=max_progress, quiet=quiet) self.has_video = has_video self.process_video = process_video self.video_codec = video_codec if (self.has_video): self.prepend_output_options(["-map", "[vconc]"]) if (self.process_video): self.prepend_output_options(["-pix_fmt", "yuv420p"]) self.prepend_output_options(["-codec:v", self.video_codec]) self.has_audio = has_audio self.process_audio = process_audio self.audio_codec = audio_codec if (self.has_audio): if (self.process_audio): self.prepend_output_options(["-ac", "1", "-map", "[anorm]"]) else: self.prepend_output_options(["-map", "[aconc]"]) self.prepend_output_options(["-codec:a", self.audio_codec]) self.filters = [] def append_filter(self, filter): """Append a filter to the filters list.""" if (filter): self.filters.append(filter) def append_normalisation_filter(self): """Append a normalisation audio filter to the complex filter.""" if (self.has_audio): self.append_filter("[aconc] dynaudnorm=r=0.25:f=10:b=y [anorm]") def append_concat_filter(self, type, segments=[]): """Append a concat filter to the filters list""" if (len(segments) > 1): self.append_filter( "{inspecs} concat=n={n}:v={v}:a={a} [{t}conc]".format( inspecs=" ".join([s.output_stream_specifier() for s in segments]), n=len(segments), v=int(type == "v"), a=int(type == "a"), t=type)) elif (len(segments) == 1): self.append_filter( "{inspec} {a}null [{t}conc]".format( inspec=segments[0].output_stream_specifier(), a=type if type == "a" else "", t=type)) def build_complex_filter(self): """Build the complete complex filter. Filters in the filtergraph are separated by ";". """ return "{f}".format(f=";".join(self.filters)) def argument_string(self, quote=False): """Return the list of arguments as a string.""" args = (self._base_options + self.input_options + ["-filter_complex", self.build_complex_filter()] + self.output_options) if quote: return " ".join([ShellCommand.shellquote(a) for a in args]) else: return " ".join(args) def argument_list(self): """Return a combined list of all arguments.""" return (self._base_options + self.input_options + ["-filter_complex", self.build_complex_filter()] + self.output_options) def process_pattern(self, pat): """Respond to a pexpect pattern. Return True on EOF.""" if (pat == 1): elapsed = datetime.timedelta( hours=int(self.process.match.group(1)), minutes=int(self.process.match.group(2)), seconds=float(self.process.match.group(3))) self.progress.update(elapsed.total_seconds()) return (pat == 0) if (__name__ == "__main__"): print(ShellCommand()) print(ConvertCommand(input_options=["in.pdf[12]"], output_options=["out.png"])) # FFprobeCommand expects the input file to exist. f = tempfile.NamedTemporaryFile() print(FFprobeCommand(input_options=["-i", f.name])) f.close() print(FFmpegCommand(input_options=["-i", "in.mov"], output_options=["out.mov"])) concat = FFmpegConcatCommand(input_options=["-i", "in.mov"], output_options=["out.mov"], has_audio=True) concat.append_normalisation_filter() print(concat) print("Quoting:") for s in ["foobar", "foo bar baz", "foo(bar)", "[foobar]", "foo[bar", "foo 'bar'", '"foobar"', "'foobar'", 'foo"bar', "foo.bar", "(", ")", "'", "/foo/bar/baz/quux", "-f", "--foobar"]: print(" {s} => echo {sub}".format(s=s, sub=ShellCommand.shellquote(s)))
#!/usr/bin/env python import datetime import distutils.spawn import json import tempfile import os.path import re import pexpect from progress_bar import (ProgressBar) class ShellCommand(object): """A shell command. _executable contains the full path to the relevant executable. _base_options is a list of standard general options for this command. """ _executable = "" _base_options = [] _expect_patterns = [] @staticmethod def shellquote(s): """Quote a string so it can be safely pasted into the shell.""" # Note: pipes/shlex.quote() only wraps '' around things, # it doesn't do things like \( \), which we also need. # We double-quote everything because that makes it easier # to deal with single quotes. if s: regexes = [ # grouping parentheses for convert re.compile(r"^(\(|\))$"), # double quote (in middle of string) re.compile(r'^(.+".*)$'), # quotes around entire string re.compile(r"""^((?P<quote>['"]).*(?P=quote))$"""), # command line switch (starts with one or two "-") re.compile(r"^(--?.+)$"), # command path (starts with "/") re.compile(r"^(/[\w/]+)$")] substitutions = [r"\\\1", r"'\1'", r"\1", r"\1", r"\1"] num_matches = 0 for subst in zip(regexes, substitutions): (s, n) = subst[0].subn(subst[1], s) num_matches += n if num_matches == 0: # catch-all for non-word characters when none of the other # substitutions apply s = re.compile(r"^(.*\W+.*)$").sub(r'"\1"', s) return s def __init__(self, input_options=[], output_options=[], quiet=False): self.input_options = input_options self.output_options = output_options self.progress = None self.process = None def __repr__(self): return "<{cls}: {cmd}>".format( cls=self.__class__.__name__, cmd=self.command_string(quote=True)) def append_input_options(self, items=[]): """Add a list of items to the end of the input options.""" self.input_options += items def prepend_input_options(self, items=[]): """Add a list of items at the front of the input options.""" self.input_options = items + self.input_options def append_output_options(self, items=[]): """Add a list of items to the end of the output options.""" self.output_options += items def prepend_output_options(self, items=[]): """Add a list of items at the front of the output options.""" self.output_options = items + self.output_options def executable_string(self, quote=False): """Return the executable as a string.""" if quote: return ShellCommand.shellquote(self._executable) else: return self._executable def argument_string(self, quote=False): """Return the list of arguments as a string.""" args = self._base_options + self.input_options + self.output_options if quote: return " ".join([ShellCommand.shellquote(a) for a in args]) else: return " ".join(args) def argument_list(self): """Return a combined list of all arguments.""" return self._base_options + self.input_options + self.output_options def command_string(self, quote=False): """Return the entire command as a string.""" return "{exe} {arg}".format(exe=self.executable_string(quote), arg=self.argument_string(quote)) def process_pattern(self, pat): """Respond to a pexpect pattern. Return True on EOF.""" return (pat == 0) def run(self): """Execute the command in a subprocess.""" self.process = pexpect.spawn(self.executable_string(), self.argument_list()) # EOF is *always* the first pattern. patterns = self.process.compile_pattern_list( [pexpect.EOF] + self._expect_patterns) try: while True: i = self.process.expect_list(patterns, timeout=None) if self.process_pattern(i): break except (KeyboardInterrupt): pass finally: if self.progress: self.progress.finish() self.process.close() return self.process.exitstatus def get_output(self): """Execute the command in a subprocess and return the output.""" return pexpect.run(self.command_string(quote=True)) class ConvertCommand(ShellCommand): """An ImageMagick convert command.""" _executable = distutils.spawn.find_executable("convert") _base_options = ["-density", "600", "xc:dimgrey", "null:", # dark grey background "("] def __init__(self, input_options=[], output_options=[], width=2048, height=1536): super(ConvertCommand, self).__init__(input_options, output_options) self._base_options = (["-size", "{w}x{h}".format(w=width, h=height)] + self._base_options) self.append_input_options( ["-resize", "{w}x{h}".format(w=width, h=height), "-background", "white", "-alpha", "remove", "-type", "truecolor", # force RGB (this and next line) "-define", "colorspace:auto-grayscale=off"]) self.prepend_output_options([")", "-gravity", "center", "-layers", "composite", "-flatten"]) class FFprobeCommand(ShellCommand): """An ffprobe shell command.""" _executable = distutils.spawn.find_executable("ffprobe") _base_options = ["-loglevel", "error", "-show_entries", "format:stream", "-print_format", "json"] def __init__(self, input_options=[], output_options=[]): super(FFprobeCommand, self).__init__(input_options, output_options) self.entries = None # The input file should be the last input option. assert(os.path.exists(self.input_options[-1])) self.last_modified = os.path.getmtime(self.input_options[-1]) def get_entries(self, section="stream", find_list=[]): """Fetch specified attributes from the input file.""" # Re-fetch if the file's changed since we last looked. modified = os.path.getmtime(self.input_options[-1]) if (not self.entries) or (modified > self.last_modified): js = json.loads(self.get_output()) self.entries = {"format": js["format"], "stream": js["streams"][0]} return [self.entries[section][f] for f in find_list] class FFmpegCommand(ShellCommand): """A "simple" ffmpeg shell command.""" _executable = distutils.spawn.find_executable("ffmpeg") _base_options = ["-y", "-nostdin"] class FFmpegConcatCommand(FFmpegCommand): """An ffmpeg shell command with a complex concat filter.""" _expect_patterns = [r"time=(\d\d):(\d\d):(\d\d\.\d\d)"] def __init__(self, input_options=[], output_options=[], quiet=False, max_progress=100, has_audio=False, has_video=False, process_audio=True, process_video=True, audio_codec="pcm_s16le", video_codec="h264"): super(FFmpegConcatCommand, self).__init__( input_options, output_options) self.progress = ProgressBar(max_value=max_progress, quiet=quiet) self.has_video = has_video self.process_video = process_video self.video_codec = video_codec if (self.has_video): self.prepend_output_options(["-map", "[vconc]"]) if (self.process_video): self.prepend_output_options(["-pix_fmt", "yuv420p"]) self.prepend_output_options(["-codec:v", self.video_codec]) self.has_audio = has_audio self.process_audio = process_audio self.audio_codec = audio_codec if (self.has_audio): if (self.process_audio): self.prepend_output_options(["-ac", "1", "-map", "[anorm]"]) else: self.prepend_output_options(["-map", "[aconc]"]) self.prepend_output_options(["-codec:a", self.audio_codec]) self.filters = [] def append_filter(self, filter): """Append a filter to the filters list.""" if (filter): self.filters.append(filter) def append_normalisation_filter(self): """Append a normalisation audio filter to the complex filter.""" if (self.has_audio): self.append_filter("[aconc] dynaudnorm=r=0.25:f=10:b=y [anorm]") def append_concat_filter(self, type, segments=[]): """Append a concat filter to the filters list""" if (len(segments) > 1): self.append_filter( "{inspecs} concat=n={n}:v={v}:a={a} [{t}conc]".format( inspecs=" ".join([s.output_stream_specifier() for s in segments]), n=len(segments), v=int(type == "v"), a=int(type == "a"), t=type)) elif (len(segments) == 1): self.append_filter( "{inspec} {a}null [{t}conc]".format( inspec=segments[0].output_stream_specifier(), a=type if type == "a" else "", t=type)) def build_complex_filter(self): """Build the complete complex filter. Filters in the filtergraph are separated by ";". """ return "{f}".format(f=";".join(self.filters)) def argument_string(self, quote=False): """Return the list of arguments as a string.""" args = (self._base_options + self.input_options + ["-filter_complex", self.build_complex_filter()] + self.output_options) if quote: return " ".join([ShellCommand.shellquote(a) for a in args]) else: return " ".join(args) def argument_list(self): """Return a combined list of all arguments.""" return (self._base_options + self.input_options + ["-filter_complex", self.build_complex_filter()] + self.output_options) def process_pattern(self, pat): """Respond to a pexpect pattern. Return True on EOF.""" if (pat == 1): elapsed = datetime.timedelta( hours=int(self.process.match.group(1)), minutes=int(self.process.match.group(2)), seconds=float(self.process.match.group(3))) self.progress.update(elapsed.total_seconds()) return (pat == 0) if (__name__ == "__main__"): print(ShellCommand()) print(ConvertCommand(input_options=["in.pdf[12]"], output_options=["out.png"])) # FFprobeCommand expects the input file to exist. f = tempfile.NamedTemporaryFile() print(FFprobeCommand(input_options=["-i", f.name])) f.close() print(FFmpegCommand(input_options=["-i", "in.mov"], output_options=["out.mov"])) concat = FFmpegConcatCommand(input_options=["-i", "in.mov"], output_options=["out.mov"], has_audio=True) concat.append_normalisation_filter() print(concat) print("Quoting:") for s in ["foobar", "foo bar baz", "foo(bar)", "[foobar]", "foo[bar", "foo 'bar'", '"foobar"', "'foobar'", 'foo"bar', "foo.bar", "(", ")", "'", "/foo/bar/baz/quux", "-f", "--foobar"]: print(" {s} => echo {sub}".format(s=s, sub=ShellCommand.shellquote(s)))
Show line notes below