GitBucket
4.21.2
Toggle navigation
Snippets
Sign in
Files
Branches
2
Releases
1
Issues
Pull requests
Labels
Priorities
Milestones
Wiki
Forks
nigel.stanger
/
process_podcast
Browse code
• Fixed parsing bug that produced incorrectly structured timestamp lists when using a duration file (introduced by restructuring of grammar).
Python2.7
master
LAST_2.7_RELEASE
1 parent
d399ac8
commit
339e66202ff9d2c03cb1d1958b8c067c20ff8cb8
Nigel Stanger
authored
on 5 Sep 2016
Patch
Showing
2 changed files
config_parser.py
process_podcast.py
Ignore Space
Show notes
View
config_parser.py
#!/usr/bin/env python import sys from pyparsing import * # pyparsing documentation: # https://sourceforge.net/p/pyparsing/code/HEAD/tree/trunk/src/HowToUsePyparsing.txt#l302 INPUTSPEC_DEFAULTS = {"type": None, "filename": None, "num": None} TIMESTAMP_DEFAULTS = {"hh": 0, "mm": 0, "ms": 0} # see http://stackoverflow.com/questions/11180622/optional-string-segment-in-pyparsing def default_input_fields(fields): """Set missing input specification values to defaults.""" set_defaults(fields, INPUTSPEC_DEFAULTS) def default_timestamp_fields(fields): """Set missing timestamp values to defaults.""" set_defaults(fields, TIMESTAMP_DEFAULTS) def set_defaults(fields, defaults): """Set missing field values to defaults.""" undefined = set(defaults.keys()) - set(fields.keys()) for k in undefined: v = defaults[k] # see http://pyparsing.wikispaces.com/share/view/71042464 fields[k] = v fields.append(v) def parser_bnf(): """Grammar for parsing podcast configuration files.""" at = Literal("@").suppress() caret = Literal("^") colon = Literal(":").suppress() left_bracket = Literal("[").suppress() period = Literal(".").suppress() right_bracket = Literal("]").suppress() # zero_index ::= [0-9]+ zero_index = Word(nums).setParseAction(lambda s, l, t: int(t[0])) # filename ::= [A-Za-z0-9][-A-Za-z0-9._ ]+ filename_first = Word(alphanums, exact=1) filename_rest = Word(alphanums + "-_/. ") filename = Combine(filename_first + Optional(filename_rest)) # millisecs ::= "." [0-9]+ millisecs = (Word(nums).setParseAction( lambda s, l, t: int(t[0][:3].ljust(3, "0"))) .setResultsName("ms")) # hours, minutes, seconds ::= zero_index hours = zero_index.setResultsName("hh") minutes = zero_index.setResultsName("mm") seconds = zero_index.setResultsName("ss") hours_minutes = hours + colon + minutes + colon | minutes + colon secs_millisecs = (seconds + Optional(period + millisecs) | period + millisecs) # timestamp ::= [[hours ":"] minutes ":"] seconds ["." millisecs] timestamp = Optional(hours_minutes) + secs_millisecs # duration_file ::= "@", filename # We need a separate item for a lonely duration file timestamp so # that we can attach a parse action just to the lonely case. Using # duration_file alone means the parse action is attached to all # instances of duration_file. duration_file = at + filename.setResultsName("filename") lonely_duration_file = at + filename.setResultsName("filename") # timespecs ::= timestamp [duration_file | {timestamp}] # If duration_file timestamp is lonely, prepend a zero timestamp. timespecs = Or( [lonely_duration_file.setParseAction( lambda s, l, t: [timestamp.parseString("00:00:00.000"), t]), Group(timestamp) + duration_file, OneOrMore(Group(timestamp.setParseAction(default_timestamp_fields)))]) # last_frame ::= "-1" | "last" last_frame = oneOf(["-1", "last"]).setParseAction(replaceWith(-1)) # frame_number ::= ":" (zero_index | last_frame) frame_number = colon - (zero_index | last_frame).setResultsName("num") # stream_number ::= ":" zero_index stream_number = colon - zero_index.setResultsName("num") # input_file ::= ":" [filename] input_file = colon - Optional(filename).setResultsName("filename") # previous_segment ::= ":" "^" previous_segment = colon - caret.setResultsName("filename") # frame_input_file ::= input_file | previous_segment frame_input_file = Or([input_file, previous_segment]) # av_trailer ::= input_file [stream_number] av_trailer = input_file + Optional(stream_number) # frame_type ::= "frame" | "f" frame_type = oneOf(["f", "frame"]).setParseAction(replaceWith("frame")) # frame_input ::= frame_type [frame_input_file [frame_number]] frame_input = (frame_type.setResultsName("type") + Optional(frame_input_file + Optional(frame_number))) # video_type ::= "video" | "v" video_type = oneOf(["v", "video"]).setParseAction(replaceWith("video")) # audio_type ::= "audio" | "a" audio_type = oneOf(["a", "audio"]).setParseAction(replaceWith("audio")) # av_input ::= (audio_type | video_type) [av_trailer] av_input = ((audio_type | video_type).setResultsName("type") + Optional(av_trailer)) # inputspec ::= "[" (av_input | frame_input) "]" inputspec = (left_bracket + delimitedList(av_input | frame_input, delim=":") .setParseAction(default_input_fields) - right_bracket) # segmentspec ::= inputspec [timespecs] segmentspec = Group(inputspec + Group(Optional(timespecs)).setResultsName("times")) # config ::= {segmentspec} config = ZeroOrMore(segmentspec) config.ignore(pythonStyleComment) return config def parse_configuration_file(config_file): """Parse a podcast configuration file.""" try: parser = parser_bnf() result = parser.parseFile(config_file, parseAll=True) except (ParseException, ParseSyntaxException) as e: print("ERROR: {m}".format(m=str(e))) sys.exit(1) return result def parse_configuration_string(config_string): """Parse a podcast configuration file.""" try: parser = parser_bnf() result = parser.parseString(config_string, parseAll=True) except (ParseException, ParseSyntaxException) as e: print("ERROR: {m}".format(m=str(e))) sys.exit(1) return result def test_parser(): tests = ["test/config1.txt", "test/config2.txt", "test/config3.txt", "test/config4.txt", "test/config5.txt"] for t in tests: print "==={f}===".format(f=t) r = parse_configuration_file(t) for s in r: print s print " type = {t}".format(t=s["type"]) print " filename = '{f}'".format(f=s["filename"]) print " num = {n}".format(n=s["num"]) print " times = {t}".format(t=s["times"]) for i, t in enumerate(s["times"]): if (isinstance(t, str)): print " punch out after duration of '{f}'".format(f=t) if (isinstance(t, ParseResults)): if (i % 2 == 0): print " punch in at: {hh:02d}:{mm:02d}:{ss:02d}.{ms:03d}".format(hh=t["hh"], mm=t["mm"], ss=t["ss"], ms=t["ms"]) else: print " punch out at: {hh:02d}:{mm:02d}:{ss:02d}.{ms:03d}".format(hh=t["hh"], mm=t["mm"], ss=t["ss"], ms=t["ms"]) print if (__name__ == "__main__"): test_parser()
#!/usr/bin/env python import sys from pyparsing import * # pyparsing documentation: # https://sourceforge.net/p/pyparsing/code/HEAD/tree/trunk/src/HowToUsePyparsing.txt#l302 INPUTSPEC_DEFAULTS = {"type": None, "filename": None, "num": None} TIMESTAMP_DEFAULTS = {"hh": 0, "mm": 0, "ms": 0} # see http://stackoverflow.com/questions/11180622/optional-string-segment-in-pyparsing def default_input_fields(fields): """Set missing input specification values to defaults.""" set_defaults(fields, INPUTSPEC_DEFAULTS) def default_timestamp_fields(fields): """Set missing timestamp values to defaults.""" set_defaults(fields, TIMESTAMP_DEFAULTS) def set_defaults(fields, defaults): """Set missing field values to defaults.""" undefined = set(defaults.keys()) - set(fields.keys()) for k in undefined: v = defaults[k] # see http://pyparsing.wikispaces.com/share/view/71042464 fields[k] = v fields.append(v) def parser_bnf(): """Grammar for parsing podcast configuration files.""" at = Literal("@").suppress() caret = Literal("^") colon = Literal(":").suppress() left_bracket = Literal("[").suppress() period = Literal(".").suppress() right_bracket = Literal("]").suppress() # zero_index ::= [0-9]+ zero_index = Word(nums).setParseAction(lambda s, l, t: int(t[0])) # filename ::= [A-Za-z0-9][-A-Za-z0-9._ ]+ filename_first = Word(alphanums, exact=1) filename_rest = Word(alphanums + "-_/. ") filename = Combine(filename_first + Optional(filename_rest)) # millisecs ::= "." [0-9]+ millisecs = (Word(nums).setParseAction( lambda s, l, t: int(t[0][:3].ljust(3, "0"))) .setResultsName("ms")) # hours, minutes, seconds ::= zero_index hours = zero_index.setResultsName("hh") minutes = zero_index.setResultsName("mm") seconds = zero_index.setResultsName("ss") hours_minutes = hours + colon + minutes + colon | minutes + colon secs_millisecs = (seconds + Optional(period + millisecs) | period + millisecs) # timestamp ::= [[hours ":"] minutes ":"] seconds ["." millisecs] timestamp = Optional(hours_minutes) + secs_millisecs # duration_file ::= "@", filename # We need a separate item for a lonely duration file timestamp so # that we can attach a parse action just to the lonely case. Using # duration_file alone means the parse action is attached to all # instances of duration_file. duration_file = at + filename lonely_duration_file = at + filename # timespecs ::= timestamp [duration_file | {timestamp}] # If duration_file timestamp is lonely, prepend a zero timestamp. timespecs = Or( [lonely_duration_file.setParseAction( lambda s, l, t: timestamp.parseString("00:00:00.000") + t), timestamp + duration_file, OneOrMore(Group(timestamp.setParseAction(default_timestamp_fields)))]) # last_frame ::= "-1" | "last" last_frame = oneOf(["-1", "last"]).setParseAction(replaceWith(-1)) # frame_number ::= ":" (zero_index | last_frame) frame_number = colon - (zero_index | last_frame).setResultsName("num") # stream_number ::= ":" zero_index stream_number = colon - zero_index.setResultsName("num") # input_file ::= ":" [filename] input_file = colon - Optional(filename).setResultsName("filename") # previous_segment ::= ":" "^" previous_segment = colon - caret.setResultsName("filename") # frame_input_file ::= input_file | previous_segment frame_input_file = Or([input_file, previous_segment]) # av_trailer ::= input_file [stream_number] av_trailer = input_file + Optional(stream_number) # frame_type ::= "frame" | "f" frame_type = oneOf(["f", "frame"]).setParseAction(replaceWith("frame")) # frame_input ::= frame_type [frame_input_file [frame_number]] frame_input = (frame_type.setResultsName("type") + Optional(frame_input_file + Optional(frame_number))) # video_type ::= "video" | "v" video_type = oneOf(["v", "video"]).setParseAction(replaceWith("video")) # audio_type ::= "audio" | "a" audio_type = oneOf(["a", "audio"]).setParseAction(replaceWith("audio")) # av_input ::= (audio_type | video_type) [av_trailer] av_input = ((audio_type | video_type).setResultsName("type") + Optional(av_trailer)) # inputspec ::= "[" (av_input | frame_input) "]" inputspec = (left_bracket + delimitedList(av_input | frame_input, delim=":") .setParseAction(default_input_fields) - right_bracket) # segmentspec ::= inputspec [timespecs] segmentspec = Group(inputspec + Group(Optional(timespecs)).setResultsName("times")) # config ::= {segmentspec} config = ZeroOrMore(segmentspec) config.ignore(pythonStyleComment) return config def parse_configuration_file(config_file): """Parse a podcast configuration file.""" try: parser = parser_bnf() result = parser.parseFile(config_file, parseAll=True) except (ParseException, ParseSyntaxException) as e: print("ERROR: {m}".format(m=str(e))) sys.exit(1) return result def parse_configuration_string(config_string): """Parse a podcast configuration file.""" try: parser = parser_bnf() result = parser.parseString(config_string, parseAll=True) except (ParseException, ParseSyntaxException) as e: print("ERROR: {m}".format(m=str(e))) sys.exit(1) return result def test_parser(): tests = ["test/config1.txt", "test/config2.txt", "test/config3.txt", "test/config4.txt", "test/config5.txt"] for t in tests: print "==={f}===".format(f=t) r = parse_configuration_file(t) for s in r: print s print " type = {t}".format(t=s["type"]) print " filename = '{f}'".format(f=s["filename"]) print " num = {n}".format(n=s["num"]) print " times = {t}".format(t=s["times"]) for i, t in enumerate(s["times"]): if (isinstance(t, str)): print " punch out after duration of '{f}'".format(f=t) if (isinstance(t, ParseResults)): if (i % 2 == 0): print " punch in at: {hh:02d}:{mm:02d}:{ss:02d}.{ms:03d}".format(hh=t["hh"], mm=t["mm"], ss=t["ss"], ms=t["ms"]) else: print " punch out at: {hh:02d}:{mm:02d}:{ss:02d}.{ms:03d}".format(hh=t["hh"], mm=t["mm"], ss=t["ss"], ms=t["ms"]) print if (__name__ == "__main__"): test_parser()
Ignore Space
Show notes
View
process_podcast.py
#!/usr/bin/env python import argparse import datetime import logging import os.path import sys from pyparsing import ParseResults import globals from config_parser import ( parse_configuration_file, parse_configuration_string) from shell_command import (FFprobeCommand, FFmpegConcatCommand) from segment import (Segment, AudioSegment, VideoSegment, FrameSegment) class InputStreamAction(argparse.Action): def __call__(self, parser, namespace, values, option_string=None): input = values.split(":") file = input[0] stream = None if (len(input) == 1) else input[1] setattr(namespace, self.dest, file) if (option_string in ["--audio", "-a"]): setattr(namespace, 'audio_stream_number', stream) elif (option_string in ["--video", "-v"]): setattr(namespace, 'video_stream_number', stream) def parse_command_line(): """Parse command line arguments.""" parser = argparse.ArgumentParser( usage="%(prog)s [options] <output>", description="where: <output> is the name of the output file " "(note: .mov seems generally best)", epilog="Default input files can be specified using either of " "--audio or --video. If neither of these are specified, " "then you must supply a configuration file using --config. " "(Of course, you can always supply a configuration file " "regardless.)\n\n" "Input streams can be taken from the same input file.\n\n" "If no segments are specified, the entire input stream is " "processed as one segment. The number and duration of segments " "can differ, but the total duration across all input streams " "should ideally be the same.") parser.set_defaults(audio_stream_number=None, video_stream_number=None) parser.add_argument( "output", help="name of the output file (note: .mov is best)") parser.add_argument( "--audio", "-a", metavar="FILE[:STREAM]", action=InputStreamAction, help="File name for the default audio input stream (can be the " "same as for other input streams). You can optionally specify " "the default audio stream number to use if the file contains " "more than one (this can be overidden in a configuration " "file). If you don't specify a stream number, it defaults " "to 0 (i.e., the first audio stream in the file).") parser.add_argument( "--configuration", "--config", "-c", dest="config", metavar="FILE", help="File name for the podcast segment configuration (plain text). " "See config_help.md details on the file " "format.".format(p=globals.PROGRAM)) parser.add_argument( "--debug", "-d", action="store_true", help="Print debugging output (overrides --quiet).") parser.add_argument( "--keep", "-k", action="store_true", help="Don't delete any generated temporary files.") parser.add_argument( "--quiet", "-q", action="store_true", help="Mute all console output (overridden by --debug).") parser.add_argument( "--video", "-v", metavar="FILE[:STREAM]", action=InputStreamAction, help="File name for the default video input stream (can be the " "same as for other input streams). You can optionally specify " "the default video stream number to use if the file contains " "more than one (this can be overidden in a configuration " "file). If you don't specify a stream number, it defaults " "to 0 (i.e., the first video stream in the file).") args = parser.parse_args() return args def check_arguments(args): """Sanity check the command line arguments.""" fn = "check_arguments" if (args.quiet): globals.log.setLevel(logging.WARNING) # --debug overrides --quiet. if (args.debug): globals.log.setLevel(logging.DEBUG) globals.log.debug("{fn}(): args = {a}".format(fn=fn, a=args)) # Must specify at least one of --audio, --video, --config. if (not any([args.audio, args.video, args.config])): globals.log.error("must specify at least one of --audio, --video, " "or --config") sys.exit(1) def get_configuration(args): # Fill in missing file names for default input streams. fn = "get_configuration" type_mapping = { "audio": {"file": args.audio, "stream": args.audio_stream_number}, "video": {"file": args.video, "stream": args.video_stream_number}} globals.log.info("Processing configuration...") if (args.config): config = parse_configuration_file(args.config) # Check that applicable default input streams have been specified. for i, c in enumerate(config): type = c["type"] if (type in type_mapping): file = type_mapping[type]["file"] stream = type_mapping[type]["stream"] # No filename in configuration. if (not c["filename"]): if (file): config[i]["filename"] = file # No filename on command line either. else: globals.log.error( "attempting to use default {s} input file, but " "--{s} hasn't been specified".format(s=type)) sys.exit(1) # No stream number in configuration. Note: 0 is a valid # stream number, so explicitly check for None. if (c["num"] is None): # Assume 0 if no stream on command line either. config[i]["num"] = 0 if stream is None else stream else: conf_list = [] for m in type_mapping: file = type_mapping[m]["file"] stream = type_mapping[m]["stream"] if (file and stream is not None): conf_list += [ "[{type}:{file}:{stream}]".format(type=m, file=file, stream=stream)] globals.log.debug("{fn}(): default config = " "{c}".format(fn=fn, c=conf_list)) config = parse_configuration_string("\n".join(conf_list)) return config def get_file_duration(file): """Calculate the duration a media file as a timedelta object.""" command = FFprobeCommand( ["-show_entries", "format=duration", "-print_format", "default=noprint_wrappers=1:nokey=1", file]) ss, ms = command.get_output().strip().split(".") ms = ms[:3].ljust(3, "0") return datetime.timedelta(seconds=int(ss), milliseconds=int(ms)) def make_new_segment(type, filename, punch_in, punch_out, num): """Make a new segment instance of the correct class.""" fn = "make_new_segment" globals.log.debug("{fn}(): type = {t}".format(fn=fn, t=type)) globals.log.debug("{fn}(): filename = {f}".format(fn=fn, f=filename)) globals.log.debug("{fn}(): punch in = {i}".format(fn=fn, i=punch_in)) globals.log.debug("{fn}(): punch out = {o}".format(fn=fn, o=punch_out)) globals.log.debug("{fn}(): num = {n}".format(fn=fn, n=num)) if (type == "audio"): return AudioSegment(file=filename, punch_in=punch_in, punch_out=punch_out, input_stream=num) elif (type == "video"): return VideoSegment(file=filename, punch_in=punch_in, punch_out=punch_out, input_stream=num) elif (type == "frame"): return FrameSegment(file=filename, punch_in=punch_in, punch_out=punch_out, frame_number=num) else: return None def process_timestamp_pair(times): """Constructs timedelta instances from a pair of config timestamps.""" fn = "process_timestamp_pair" globals.log.debug("{fn}(): t0 = {t}".format(fn=fn, t=times[0])) globals.log.debug("{fn}(): t1 = {t}".format(fn=fn, t=times[1])) # If the first item in the timestamp list in the configuration file # is a filename, the parser inserts a zero timestamp before it. We # can therefore guarantee that the first item of the pair will # always be a timestamp. t0 = datetime.timedelta( hours=times[0]["hh"], minutes=times[0]["mm"], seconds=times[0]["ss"], milliseconds=times[0]["ms"]) if (len(times[1]) == 1): # filename t1 = t0 + get_file_duration(times[1]["filename"]) elif (len(times[1]) == 4): # normal timestamp t1 = datetime.timedelta( hours=times[1]["hh"], minutes=times[1]["mm"], seconds=times[1]["ss"], milliseconds=times[1]["ms"]) else: globals.log.error("unreadable timestamp {t}".format(t=times[1])) return t0, t1 def process_time_list(type, filename, num, time_list): """Process an audio or video stream and build a list of segments.""" if (os.path.exists(filename) and type in ["audio", "video"]): stream_duration = get_file_duration(filename) else: stream_duration = 0 segments = [] # No timestamps: punch in at 0, out at stream duration. if (len(time_list) == 0): punch_in = datetime.timedelta() punch_out = stream_duration segments.append(make_new_segment(type, filename, punch_in, punch_out, num)) else: # Process each pair of timestamps as punch in, out. If there's # an odd number of items, the last one is processed separately. for t in zip(time_list[::2], time_list[1::2]): punch_in, punch_out = process_timestamp_pair(t) if (punch_in == punch_out): globals.log.warning( "punch in ({i}s) and punch out ({o}s) times are " "equal; no segment will be " "generated".format(i=punch_in.total_seconds(), o=punch_out.total_seconds())) continue elif (punch_out < punch_in): globals.log.error( "punch out time ({i}s) falls before punch in time " "({o}s); can't generate a valid " "segment".format(i=punch_in.total_seconds(), o=punch_out.total_seconds())) sys.exit(1) segments.append(make_new_segment(type, filename, punch_in, punch_out, num)) # Odd number of timestamps: punch in at last timestamp, # out at stream duration. if (len(time_list) % 2 != 0): punch_in, _ = process_timestamp_pair([time_list[-1], None]) punch_out = stream_duration - punch_in segments.append(make_new_segment(type, filename, punch_in, punch_out, num)) return segments def process_input_streams(config): """Process a list of stream specifications and build a list of segments.""" fn = "process_input_streams" globals.log.info("Processing input streams...") segments = [] for cnf in config: globals.log.debug("{fn}(): type = {t}".format(fn=fn, t=cnf["type"])) globals.log.debug( "{fn}(): filename = {f}".format(fn=fn, f=cnf["filename"])) globals.log.debug("{fn}(): num = {n}".format(fn=fn, n=cnf["num"])) globals.log.debug("{fn}(): times = {t}".format(fn=fn, t=cnf["times"])) segments += process_time_list(cnf["type"], cnf["filename"], cnf["num"], cnf["times"]) return segments def print_progress(count, total, line_end="\r"): percent = count * 100 / total outof = count * 40 / total bar = "|{c}{nc}| {p}%{nl}".format( c="+" * outof, nc="." * (40 - outof), p=percent, nl=line_end) sys.stdout.write(bar) sys.stdout.flush() def process_frame_segments(args, segments): """Post-process frame segments to set frame images, etc.""" fn = "process_frame_segments" globals.log.info("Processing frames...") frame_segments = [s for s in segments if isinstance(s, FrameSegment)] n = len(frame_segments) for i, f in enumerate(frame_segments): if (n > 0 and not any([args.debug, args.quiet])): print_progress(i, n) globals.log.debug("{fn}(): frame (before) = {b}".format(fn=fn, b=f)) # Frame segments that use a frame from the previous segment. if (f.input_file == "^"): if (f.segment_number > 0): prev = segments[f.segment_number - 1] globals.log.debug("{fn}(): prev = {p}".format(fn=fn, p=prev)) prev.generate_temp_file(args.output) f.use_frame(prev.generate_frame(f.frame_number, args.output)) else: globals.log.error( "frame segment {s} is attempting to use the last frame " "of a non-existent previous " "segment".format(s=f.segment_number)) sys.exit(1) # Frame segments whose frame comes from a PDF file. else: _, suffix = os.path.splitext(f.input_file) if (suffix.lower() == ".pdf"): f.use_frame(f.generate_temp_file(args.output)) else: globals.log.error( 'unexpected input file type "{s}" for frame segment ' "{f}".format(s=suffix, f=f.segment_number)) sys.exit(1) globals.log.debug("{fn}(): frame (after) = ""{a}".format(fn=fn, a=f)) if (n > 0 and not any([args.debug, args.quiet])): print_progress(n, n, "\n") def render_podcast(audio_segments, video_segments, output): """Stitch together the various input components into the final podcast.""" fn = "render_podcast" globals.log.info("Rendering final podcast...") command = FFmpegConcatCommand(has_audio=len(audio_segments) > 0, has_video=len(video_segments) > 0) input_files = Segment.input_files() for f in input_files: if (input_files[f]): command.append_input_options(input_files[f]) command.append_input_options(["-i", f]) for s in (audio_segments + video_segments): command.append_filter(s.trim_filter()) command.append_concat_filter("a", [s for s in audio_segments]) command.append_normalisation_filter() command.append_concat_filter("v", [s for s in video_segments]) command.append_output_options([output]) globals.log.debug("{fn}(): {c}".format(fn=fn, c=command)) command.run() def cleanup(segments): """Clean up generated temporary files.""" globals.log.info("Cleaning up...") for s in segments: s.delete_temp_files() def main(): fn = "main" logging.basicConfig( level=logging.INFO, format="%(levelname)s: {p}: %(message)s".format(p=globals.PROGRAM)) try: args = parse_command_line() check_arguments(args) config = get_configuration(args) segments = process_input_streams(config) globals.log.debug("{fn}(): audio segments = {a}".format( fn=fn, a=[s for s in segments if isinstance(s, AudioSegment)])) globals.log.debug("{fn}(): video segments = {v}".format( fn=fn, v=[s for s in segments if isinstance(s, VideoSegment)])) audio_segments = [s for s in segments if isinstance(s, AudioSegment)] video_segments = [s for s in segments if isinstance(s, VideoSegment)] audio_duration = sum([s.get_duration() for s in audio_segments]) video_duration = sum([s.get_duration() for s in video_segments]) globals.log.debug("{fn}(): audio duration = " "{a}".format(fn=fn, a=audio_duration)) globals.log.debug("{fn}(): video duration = " "{v}".format(fn=fn, v=video_duration)) if (len(audio_segments) and len(video_segments)): if (audio_duration != video_duration): globals.log.warning("total video duration ({v}s) doesn't match " "total audio duration " "({a}s)".format(v=video_duration, a=audio_duration)) process_frame_segments(args, segments) globals.log.debug("{fn}(): input files = " "{i}".format(fn=fn, i=Segment.input_files())) render_podcast(audio_segments, video_segments, args.output) if (not args.keep): cleanup(segments) except (KeyboardInterrupt): pass if (__name__ == "__main__"): main()
#!/usr/bin/env python import argparse import datetime import logging import os.path import sys from pyparsing import ParseResults import globals from config_parser import ( parse_configuration_file, parse_configuration_string) from shell_command import (FFprobeCommand, FFmpegConcatCommand) from segment import (Segment, AudioSegment, VideoSegment, FrameSegment) class InputStreamAction(argparse.Action): def __call__(self, parser, namespace, values, option_string=None): input = values.split(":") file = input[0] stream = None if (len(input) == 1) else input[1] setattr(namespace, self.dest, file) if (option_string in ["--audio", "-a"]): setattr(namespace, 'audio_stream_number', stream) elif (option_string in ["--video", "-v"]): setattr(namespace, 'video_stream_number', stream) def parse_command_line(): """Parse command line arguments.""" parser = argparse.ArgumentParser( usage="%(prog)s [options] <output>", description="where: <output> is the name of the output file " "(note: .mov seems generally best)", epilog="Default input files can be specified using either of " "--audio or --video. If neither of these are specified, " "then you must supply a configuration file using --config. " "(Of course, you can always supply a configuration file " "regardless.)\n\n" "Input streams can be taken from the same input file.\n\n" "If no segments are specified, the entire input stream is " "processed as one segment. The number and duration of segments " "can differ, but the total duration across all input streams " "should ideally be the same.") parser.set_defaults(audio_stream_number=None, video_stream_number=None) parser.add_argument( "output", help="name of the output file (note: .mov is best)") parser.add_argument( "--audio", "-a", metavar="FILE[:STREAM]", action=InputStreamAction, help="File name for the default audio input stream (can be the " "same as for other input streams). You can optionally specify " "the default audio stream number to use if the file contains " "more than one (this can be overidden in a configuration " "file). If you don't specify a stream number, it defaults " "to 0 (i.e., the first audio stream in the file).") parser.add_argument( "--configuration", "--config", "-c", dest="config", metavar="FILE", help="File name for the podcast segment configuration (plain text). " "See config_help.md details on the file " "format.".format(p=globals.PROGRAM)) parser.add_argument( "--debug", "-d", action="store_true", help="Print debugging output (overrides --quiet).") parser.add_argument( "--keep", "-k", action="store_true", help="Don't delete any generated temporary files.") parser.add_argument( "--quiet", "-q", action="store_true", help="Mute all console output (overridden by --debug).") parser.add_argument( "--video", "-v", metavar="FILE[:STREAM]", action=InputStreamAction, help="File name for the default video input stream (can be the " "same as for other input streams). You can optionally specify " "the default video stream number to use if the file contains " "more than one (this can be overidden in a configuration " "file). If you don't specify a stream number, it defaults " "to 0 (i.e., the first video stream in the file).") args = parser.parse_args() return args def check_arguments(args): """Sanity check the command line arguments.""" fn = "check_arguments" if (args.quiet): globals.log.setLevel(logging.WARNING) # --debug overrides --quiet. if (args.debug): globals.log.setLevel(logging.DEBUG) globals.log.debug("{fn}(): args = {a}".format(fn=fn, a=args)) # Must specify at least one of --audio, --video, --config. if (not any([args.audio, args.video, args.config])): globals.log.error("must specify at least one of --audio, --video, " "or --config") sys.exit(1) def get_configuration(args): # Fill in missing file names for default input streams. fn = "get_configuration" type_mapping = { "audio": {"file": args.audio, "stream": args.audio_stream_number}, "video": {"file": args.video, "stream": args.video_stream_number}} globals.log.info("Processing configuration...") if (args.config): config = parse_configuration_file(args.config) # Check that applicable default input streams have been specified. for i, c in enumerate(config): type = c["type"] if (type in type_mapping): file = type_mapping[type]["file"] stream = type_mapping[type]["stream"] # No filename in configuration. if (not c["filename"]): if (file): config[i]["filename"] = file # No filename on command line either. else: globals.log.error( "attempting to use default {s} input file, but " "--{s} hasn't been specified".format(s=type)) sys.exit(1) # No stream number in configuration. Note: 0 is a valid # stream number, so explicitly check for None. if (c["num"] is None): # Assume 0 if no stream on command line either. config[i]["num"] = 0 if stream is None else stream else: conf_list = [] for m in type_mapping: file = type_mapping[m]["file"] stream = type_mapping[m]["stream"] if (file and stream is not None): conf_list += [ "[{type}:{file}:{stream}]".format(type=m, file=file, stream=stream)] globals.log.debug("{fn}(): default config = " "{c}".format(fn=fn, c=conf_list)) config = parse_configuration_string("\n".join(conf_list)) return config def get_file_duration(file): """Calculate the duration a media file as a timedelta object.""" command = FFprobeCommand( ["-show_entries", "format=duration", "-print_format", "default=noprint_wrappers=1:nokey=1", file]) ss, ms = command.get_output().strip().split(".") ms = ms[:3].ljust(3, "0") return datetime.timedelta(seconds=int(ss), milliseconds=int(ms)) def make_new_segment(type, filename, punch_in, punch_out, num): """Make a new segment instance of the correct class.""" fn = "make_new_segment" globals.log.debug("{fn}(): type = {t}".format(fn=fn, t=type)) globals.log.debug("{fn}(): filename = {f}".format(fn=fn, f=filename)) globals.log.debug("{fn}(): punch in = {i}".format(fn=fn, i=punch_in)) globals.log.debug("{fn}(): punch out = {o}".format(fn=fn, o=punch_out)) globals.log.debug("{fn}(): num = {n}".format(fn=fn, n=num)) if (type == "audio"): return AudioSegment(file=filename, punch_in=punch_in, punch_out=punch_out, input_stream=num) elif (type == "video"): return VideoSegment(file=filename, punch_in=punch_in, punch_out=punch_out, input_stream=num) elif (type == "frame"): return FrameSegment(file=filename, punch_in=punch_in, punch_out=punch_out, frame_number=num) else: return None def process_timestamp_pair(times): """Constructs timedelta instances from a pair of config timestamps.""" fn = "process_timestamp_pair" globals.log.debug("{fn}(): t0 = {t}".format(fn=fn, t=times[0])) globals.log.debug("{fn}(): t1 = {t}".format(fn=fn, t=times[1])) # If the first item in the timestamp list in the configuration file # is a filename, the parser inserts a zero timestamp before it. We # can therefore guarantee that the first item of the pair will # always be a timestamp. t0 = datetime.timedelta( hours=times[0]["hh"], minutes=times[0]["mm"], seconds=times[0]["ss"], milliseconds=times[0]["ms"]) if (isinstance(times[1], str)): t1 = t0 + get_file_duration(times[1]) elif (isinstance(times[1], ParseResults)): t1 = datetime.timedelta( hours=times[1]["hh"], minutes=times[1]["mm"], seconds=times[1]["ss"], milliseconds=times[1]["ms"]) else: globals.log.error("unreadable timestamp {t}".format(t=times[1])) return t0, t1 def process_time_list(type, filename, num, time_list): """Process an audio or video stream and build a list of segments.""" if (os.path.exists(filename) and type in ["audio", "video"]): stream_duration = get_file_duration(filename) else: stream_duration = 0 segments = [] # No timestamps: punch in at 0, out at stream duration. if (len(time_list) == 0): punch_in = datetime.timedelta() punch_out = stream_duration segments.append(make_new_segment(type, filename, punch_in, punch_out, num)) else: # Process each pair of timestamps as punch in, out. If there's # an odd number of items, the last one is processed separately. for t in zip(time_list[::2], time_list[1::2]): punch_in, punch_out = process_timestamp_pair(t) if (punch_in == punch_out): globals.log.warning( "punch in ({i}s) and punch out ({o}s) times are " "equal; no segment will be " "generated".format(i=punch_in.total_seconds(), o=punch_out.total_seconds())) continue elif (punch_out < punch_in): globals.log.error( "punch out time ({i}s) falls before punch in time " "({o}s); can't generate a valid " "segment".format(i=punch_in.total_seconds(), o=punch_out.total_seconds())) sys.exit(1) segments.append(make_new_segment(type, filename, punch_in, punch_out, num)) # Odd number of timestamps: punch in at last timestamp, # out at stream duration. if (len(time_list) % 2 != 0): punch_in, _ = process_timestamp_pair([time_list[-1], None]) punch_out = stream_duration - punch_in segments.append(make_new_segment(type, filename, punch_in, punch_out, num)) return segments def process_input_streams(config): """Process a list of stream specifications and build a list of segments.""" fn = "process_input_streams" globals.log.info("Processing input streams...") segments = [] for cnf in config: globals.log.debug("{fn}(): type = {t}".format(fn=fn, t=cnf["type"])) globals.log.debug( "{fn}(): filename = {f}".format(fn=fn, f=cnf["filename"])) globals.log.debug("{fn}(): num = {n}".format(fn=fn, n=cnf["num"])) globals.log.debug("{fn}(): times = {t}".format(fn=fn, t=cnf["times"])) segments += process_time_list(cnf["type"], cnf["filename"], cnf["num"], cnf["times"]) return segments def print_progress(count, total, line_end="\r"): percent = count * 100 / total outof = count * 40 / total bar = "|{c}{nc}| {p}%{nl}".format( c="+" * outof, nc="." * (40 - outof), p=percent, nl=line_end) sys.stdout.write(bar) sys.stdout.flush() def process_frame_segments(args, segments): """Post-process frame segments to set frame images, etc.""" fn = "process_frame_segments" globals.log.info("Processing frames...") frame_segments = [s for s in segments if isinstance(s, FrameSegment)] n = len(frame_segments) for i, f in enumerate(frame_segments): if (n > 0 and not any([args.debug, args.quiet])): print_progress(i, n) globals.log.debug("{fn}(): frame (before) = {b}".format(fn=fn, b=f)) # Frame segments that use a frame from the previous segment. if (f.input_file == "^"): if (f.segment_number > 0): prev = segments[f.segment_number - 1] globals.log.debug("{fn}(): prev = {p}".format(fn=fn, p=prev)) prev.generate_temp_file(args.output) f.use_frame(prev.generate_frame(f.frame_number, args.output)) else: globals.log.error( "frame segment {s} is attempting to use the last frame " "of a non-existent previous " "segment".format(s=f.segment_number)) sys.exit(1) # Frame segments whose frame comes from a PDF file. else: _, suffix = os.path.splitext(f.input_file) if (suffix.lower() == ".pdf"): f.use_frame(f.generate_temp_file(args.output)) else: globals.log.error( 'unexpected input file type "{s}" for frame segment ' "{f}".format(s=suffix, f=f.segment_number)) sys.exit(1) globals.log.debug("{fn}(): frame (after) = ""{a}".format(fn=fn, a=f)) if (n > 0 and not any([args.debug, args.quiet])): print_progress(n, n, "\n") def render_podcast(audio_segments, video_segments, output): """Stitch together the various input components into the final podcast.""" fn = "render_podcast" globals.log.info("Rendering final podcast...") command = FFmpegConcatCommand(has_audio=len(audio_segments) > 0, has_video=len(video_segments) > 0) input_files = Segment.input_files() for f in input_files: if (input_files[f]): command.append_input_options(input_files[f]) command.append_input_options(["-i", f]) for s in (audio_segments + video_segments): command.append_filter(s.trim_filter()) command.append_concat_filter("a", [s for s in audio_segments]) command.append_normalisation_filter() command.append_concat_filter("v", [s for s in video_segments]) command.append_output_options([output]) globals.log.debug("{fn}(): {c}".format(fn=fn, c=command)) command.run() def cleanup(segments): """Clean up generated temporary files.""" globals.log.info("Cleaning up...") for s in segments: s.delete_temp_files() def main(): fn = "main" logging.basicConfig( level=logging.INFO, format="%(levelname)s: {p}: %(message)s".format(p=globals.PROGRAM)) try: args = parse_command_line() check_arguments(args) config = get_configuration(args) segments = process_input_streams(config) globals.log.debug("{fn}(): audio segments = {a}".format( fn=fn, a=[s for s in segments if isinstance(s, AudioSegment)])) globals.log.debug("{fn}(): video segments = {v}".format( fn=fn, v=[s for s in segments if isinstance(s, VideoSegment)])) audio_segments = [s for s in segments if isinstance(s, AudioSegment)] video_segments = [s for s in segments if isinstance(s, VideoSegment)] audio_duration = sum([s.get_duration() for s in audio_segments]) video_duration = sum([s.get_duration() for s in video_segments]) globals.log.debug("{fn}(): audio duration = " "{a}".format(fn=fn, a=audio_duration)) globals.log.debug("{fn}(): video duration = " "{v}".format(fn=fn, v=video_duration)) if (len(audio_segments) and len(video_segments)): if (audio_duration != video_duration): globals.log.warning("total video duration ({v}s) doesn't match " "total audio duration " "({a}s)".format(v=video_duration, a=audio_duration)) process_frame_segments(args, segments) globals.log.debug("{fn}(): input files = " "{i}".format(fn=fn, i=Segment.input_files())) render_podcast(audio_segments, video_segments, args.output) if (not args.keep): cleanup(segments) except (KeyboardInterrupt): pass if (__name__ == "__main__"): main()
Show line notes below