Newer
Older
process_podcast / test_ffmpeg_concat_command.py
from datetime import timedelta
from pathlib import Path
import shutil
import tempfile
import unittest
from unittest.mock import MagicMock

from segment import Segment, AudioSegment, VideoSegment
from shell_command import FFmpegConcatCommand
from test_shared import ShellCommandSharedTestCase


class FFmpegConcatCommandTestCase(ShellCommandSharedTestCase):
    """Test the FFmpegConcatCommand class."""

    NORMALISATION_FILTER = "[aconc] dynaudnorm=r=0.25:f=10:b=y [anorm]"

    # What about testing different permutations of audio and video?
    def setUp(self):
        """Set up for test."""
        # Make sure the input and output options are explicitly set,
        # otherwise they hang around from previous tests.
        self.command = FFmpegConcatCommand(
            input_options=["-i", "in.mov"], output_options=["out.mov"],
            has_video=True, has_audio=True, quiet=True)
        self.expected_executable = shutil.which("ffmpeg")
        self.expected_base_options = ["-y", "-nostdin",]
        self.expected_input_options = ["-i", "in.mov"]
        self.expected_filter_options = ["-filter_complex", ""]
        self.expected_output_options = [
            "-codec:a", self.command.audio_codec,
            "-ac", "1",
            "-map", "[anorm]",
            "-codec:v", self.command.video_codec,
            "-pix_fmt", "yuv420p",
            "-map", "[vconc]",
            "out.mov"
        ]

    def tearDown(self):
        """Clean up after test."""
        self.command = None
    
    def test_append_filter(self):
        """Test appending to the filter list."""
        test_data = (
            ("", [], "append empty filter"),
            ("normal_filter", ["normal_filter"], "append normal filter"),
        )
        for appended, expected, description in test_data:
            with self.subTest(msg=description):
                self.command.append_filter(appended)
                self.assertEqual(self.command.filters, expected)
    
    def test_append_normalisation_filter(self):
        """Test appending a normalisation filter."""
        filters = [self.NORMALISATION_FILTER]
        self.command.append_normalisation_filter()
        self.assertEqual(self.command.filters, filters)
    
    def test_append_concat_filter(self):
        """Test appending various concat filters."""
        test_data = (
            ("a", "audio"),
            ("v", "video"),
            ("f", "frame (should be ignored)"),
        )
        expected = []
        for frame_type, description in test_data:
            for num_segments in range(0, 3):
                segments = None
                if frame_type == "a":
                    segments = num_segments * [AudioSegment(
                        file="file.in", punch_in=timedelta(),
                        punch_out=timedelta(seconds=20), input_stream=0)]
                elif frame_type == "v":
                    segments = num_segments * [VideoSegment(
                        file="file.in", punch_in=timedelta(),
                        punch_out=timedelta(seconds=20), input_stream=0)]
                elif frame_type == "f":
                    # Frame segments should be ignored by
                    # append_concat_filter() regardless.
                    segments = []
                else:
                    raise TypeError
                self.command.append_concat_filter(
                    frame_type=frame_type, segments=segments)
                if frame_type not in ["a", "v"]:
                    # Ignore frame segments.
                    pass
                elif num_segments > 1:
                    expected.append(
                        "{inspecs} concat=n={n}:v={v}:a={a} [{t}conc]".format(
                            inspecs=" ".join([s.output_stream_specifier()
                                              for s in segments]),
                            n=num_segments, v=int(frame_type == "v"),
                            a=int(frame_type == "a"), t=frame_type)
                    )
                elif num_segments == 1:
                    expected.append(
                        "{inspec} {a}null [{t}conc]".format(
                            inspec=segments[0].output_stream_specifier(),
                            a=frame_type if frame_type == "a" else "",
                            t=frame_type)
                    )
                with self.subTest(
                        msg="{d}: {n}".format(d=description, n=num_segments)):
                    self.assertEqual(self.command.filters, expected)

    def test_build_complex_filter(self):
        self.command.append_normalisation_filter()
        self.assertEqual(
            self.command.build_complex_filter(), self.NORMALISATION_FILTER)
        test_data = (
            ("", self.NORMALISATION_FILTER, "append empty filter"),
            ("normal_filter",
             "{n};normal_filter".format(n=self.NORMALISATION_FILTER),
             "append normal filter"),
        )
        for appended, expected, description in test_data:
            with self.subTest(msg=description):
                self.command.append_filter(appended)
                self.assertEqual(self.command.build_complex_filter(), expected)

# process_pattern() updates the internal ProgressBar object.


# Remove ShellCommandSharedTestCase from the namespace so we don't run
# the shared tests twice. See <https://stackoverflow.com/a/22836015>.
del(ShellCommandSharedTestCase)