from datetime import timedelta from pathlib import Path import shutil import tempfile import unittest from unittest.mock import MagicMock from segment import Segment, AudioSegment, VideoSegment from shell_command import FFmpegConcatCommand from test_shared import ShellCommandSharedTestCase class FFmpegConcatCommandTestCase(ShellCommandSharedTestCase): """Test the FFmpegConcatCommand class.""" NORMALISATION_FILTER = "[aconc] dynaudnorm=r=0.25:f=10:b=y [anorm]" # What about testing different permutations of audio and video? def setUp(self): """Set up for test.""" # Make sure the input and output options are explicitly set, # otherwise they hang around from previous tests. self.command = FFmpegConcatCommand( input_options=["-i", "in.mov"], output_options=["out.mov"], has_video=True, has_audio=True, quiet=True) self.expected_executable = shutil.which("ffmpeg") self.expected_base_options = ["-y", "-nostdin",] self.expected_input_options = ["-i", "in.mov"] self.expected_filter_options = ["-filter_complex", ""] self.expected_output_options = [ "-codec:a", self.command.audio_codec, "-ac", "1", "-map", "[anorm]", "-codec:v", self.command.video_codec, "-pix_fmt", "yuv420p", "-map", "[vconc]", "out.mov" ] def tearDown(self): """Clean up after test.""" self.command = None def test_append_filter(self): """Test appending to the filter list.""" test_data = ( ("", [], "append empty filter"), ("normal_filter", ["normal_filter"], "append normal filter"), ) for appended, expected, description in test_data: with self.subTest(msg=description): self.command.append_filter(appended) self.assertEqual(self.command.filters, expected) def test_append_normalisation_filter(self): """Test appending a normalisation filter.""" filters = [self.NORMALISATION_FILTER] self.command.append_normalisation_filter() self.assertEqual(self.command.filters, filters) def test_append_concat_filter(self): """Test appending various concat filters.""" test_data = ( ("a", "audio"), ("v", "video"), ("f", "frame (should be ignored)"), ) expected = [] for frame_type, description in test_data: for num_segments in range(0, 3): segments = None if frame_type == "a": segments = num_segments * [AudioSegment( file="file.in", punch_in=timedelta(), punch_out=timedelta(seconds=20), input_stream=0)] elif frame_type == "v": segments = num_segments * [VideoSegment( file="file.in", punch_in=timedelta(), punch_out=timedelta(seconds=20), input_stream=0)] elif frame_type == "f": # Frame segments should be ignored by # append_concat_filter() regardless. segments = [] else: raise TypeError self.command.append_concat_filter( frame_type=frame_type, segments=segments) if frame_type not in ["a", "v"]: # Ignore frame segments. pass elif num_segments > 1: expected.append( "{inspecs} concat=n={n}:v={v}:a={a} [{t}conc]".format( inspecs=" ".join([s.output_stream_specifier() for s in segments]), n=num_segments, v=int(frame_type == "v"), a=int(frame_type == "a"), t=frame_type) ) elif num_segments == 1: expected.append( "{inspec} {a}null [{t}conc]".format( inspec=segments[0].output_stream_specifier(), a=frame_type if frame_type == "a" else "", t=frame_type) ) with self.subTest( msg="{d}: {n}".format(d=description, n=num_segments)): self.assertEqual(self.command.filters, expected) def test_build_complex_filter(self): self.command.append_normalisation_filter() self.assertEqual( self.command.build_complex_filter(), self.NORMALISATION_FILTER) test_data = ( ("", self.NORMALISATION_FILTER, "append empty filter"), ("normal_filter", "{n};normal_filter".format(n=self.NORMALISATION_FILTER), "append normal filter"), ) for appended, expected, description in test_data: with self.subTest(msg=description): self.command.append_filter(appended) self.assertEqual(self.command.build_complex_filter(), expected) # process_pattern() updates the internal ProgressBar object. # Remove ShellCommandSharedTestCase from the namespace so we don't run # the shared tests twice. See <https://stackoverflow.com/a/22836015>. del(ShellCommandSharedTestCase)