Skip to content

PyAV:Examples:HLS

PyAV를 사용하여 HLS 스트리밍을 하는 예제.

demo_av_muxing.html

hls.js를 사용한다.

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Demo AV Muxing</title>
</head>
<body>
    <video id="video" loop playsinline autoplay muted controls></video>
    <script src="https://cdn.jsdelivr.net/npm/hls.js@latest"></script>
    <script>
        var video = document.getElementById('video');
        // var videoSrc = './live_stream.m3u8';
        // var videoSrc = 'https://devstreaming-cdn.apple.com/videos/streaming/examples/bipbop_adv_example_hevc/master.m3u8';
        var videoSrc = 'https://stream.mux.com/v69RSHhFelSm4701snP22dYz2jICy4E4FUyk02rW4gxRM.m3u8';
        if (Hls.isSupported()) {
            var hls = new Hls();
            hls.loadSource(videoSrc);
            hls.attachMedia(video);
            console.debug("1")
            hls.on(Hls.Events.MANIFEST_PARSED, () => {
                video.muted = 'muted';
                video.autoplay = 'autoplay';
                video.playsinline = 'true';
                video.play();
                console.debug("MANIFEST_PARSED")
            });

            // hls.on(Hls.Events.SUBTITLE_TRACKS_UPDATED, (data, d2) => {
            //     console.debug("SUBTITLE_TRACKS_UPDATED")
            //     console.dir(data)
            //     console.dir(d2)
            // });

            // hls.on(Hls.Events.FRAG_PARSED, (data, d2) => {
            //     console.debug("FRAG_PARSED")
            //     console.dir(data)
            //     console.dir(d2)
            // });
            hls.on(Hls.Events.FRAG_CHANGED, (data, d2, d3) => {
                console.debug("FRAG_CHANGED")
                console.dir(data)
                console.dir(d2)
                console.dir(d3)
            });
            // hls.on(Hls.Events.SUBTITLE_FRAG_PROCESSED, (data, d2, d3, d4, d5) => {
            //     console.debug("SUBTITLE_FRAG_PROCESSED")
            //     console.dir(data)
            //     console.dir(d2)
            //     console.dir(d3)
            //     console.dir(d4)
            //     console.dir(d5)
            // });
        } else {
            alert('Unsupported HLS.js component');
        }
    </script>
</body>
</html>

demo_av_muxing.py

PyAV와 OpenCV-Python을 설치하면 된다.

# -*- coding: utf-8 -*-

import os
import cv2
from asyncio import run
from av import open as av_open  # noqa
from av import VideoFrame  # noqa
from av.stream import Stream
from av.frame import Frame
from av.container.input import InputContainer
from av.container.output import OutputContainer
from av.video.stream import VideoStream
from av.video.codeccontext import VideoCodecContext
from av.error import InvalidDataError  # noqa
from math import floor
from fractions import Fraction
from multiprocessing import Process
from datetime import datetime

KEYCODE_ESC = 27
DEMO_RTSP_URLS = [
    "rtsp://192.168.0.50:554/media/1/1/Profile1",
    "rtsp://192.168.0.95:8554/live.sdp",
]
DEMO_URL = DEMO_RTSP_URLS[0]
WINDOW_NAME = "Demo AV Muxing"

# noinspection SpellCheckingInspection
DEFAULT_RTSP_OPTIONS = {
    "rtsp_transport": "tcp",
    "fflags": "nobuffer",
}

# noinspection SpellCheckingInspection
HLS_OPTIONS = {
    "strftime": "1",
    "strftime_mkdir": "1",
    # "hls_list_size": "0",
    "hls_playlist_type": "vod",
    "hls_flags": "second_level_segment_index",
    "hls_segment_filename": "live_stream/video-%Y-%m-%d_%H-%M-%S-%%02d.ts",
}

OUTPUT_FILENAME = "live_stream.m3u8"


def _rounding(a: int, b: int, c: int) -> int:
    return (a * b) // c


def rescale_q_rnd(a: int, bq: Fraction, cq: Fraction) -> int:
    """
    Rescale a 64-bit integer by 2 rational numbers with specified rounding.
    """

    b = bq.numerator * cq.denominator
    c = cq.numerator * bq.denominator
    return _rounding(a, b, c)


def _go_faster_stream(stream: Stream) -> None:
    assert hasattr(stream, "thread_type")
    setattr(stream, "thread_type", "AUTO")


def _low_delay_stream(stream: Stream) -> None:
    assert hasattr(stream.codec_context, "flags")
    setattr(stream.codec_context, "flags", "LOW_DELAY")


class DemoAvMuxing:

    input_container: InputContainer
    input_stream: VideoStream

    output_container: OutputContainer

    def __init__(
        self,
        waiting_seconds=0.001,
        exit_key="q",
    ):
        self.waiting_seconds = waiting_seconds
        self.exit_key = exit_key

        try:
            self.input_container = av_open(
                DEMO_URL,
                options=DEFAULT_RTSP_OPTIONS,
                timeout=(8, 8),
            )
        except InvalidDataError as e1:
            # maybe timeout ...
            print(f"InvalidDataError: {e1}")
            raise
        except BaseException as e2:
            print(f"Error[{type(e2).__name__}]: {e2}")
            raise
        assert isinstance(self.input_container, InputContainer)

        self.input_stream_index = 0
        self.input_stream = self.input_container.streams.get(video=0)[0]
        assert isinstance(self.input_stream, VideoStream)

        _go_faster_stream(self.input_stream)
        _low_delay_stream(self.input_stream)

        input_stream_codec_context = self.input_stream.codec_context
        assert isinstance(input_stream_codec_context, VideoCodecContext)

        # input_stream_codec = input_stream_codec_context.codec

        self.output_container = av_open(OUTPUT_FILENAME, "w", options=HLS_OPTIONS)
        self.output_stream = self.output_container.add_stream(template=self.input_stream)  # noqa
        # self.subtitle_output_stream = self.output_container.add_stream("text")

    @property
    def waiting_milliseconds(self) -> int:
        return floor(self.waiting_seconds * 1000)

    def test_exit(self, keycode: int) -> bool:
        keycode_byte = keycode & 0xFF
        if keycode_byte == ord(self.exit_key.lower()):
            return True
        elif keycode_byte == ord(self.exit_key.upper()):
            return True
        elif keycode_byte == KEYCODE_ESC:
            return True
        else:
            return False

    def run(self) -> None:
        cv2.namedWindow(WINDOW_NAME)

        video_first_dts = None
        video_first_pts = None

        for packet in self.input_container.demux(self.input_stream):
            # print(packet)

            if video_first_dts is None:
                video_first_dts = packet.dts
            if video_first_pts is None:
                video_first_pts = packet.pts

            now = datetime.now().astimezone()

            # We need to skip the "flushing" packets that `demux` generates.
            if packet.dts is None:
                continue

            if (
                packet.stream.type == "video"
                and packet.stream_index == self.input_stream_index
            ):
                # We need to assign the packet to the new stream.
                packet.stream = self.output_stream

                packet.dts -= video_first_dts
                packet.pts -= video_first_pts
                try:
                    self.output_container.mux(packet)
                except BaseException as e:
                    print(e)

            files = os.listdir("live_stream")
            frame_now = datetime.now().astimezone()

            # print(f"Packet dts: {packet.dts}")
            # print(f"Packet pts: {packet.pts}")

            loop_count = 0
            for frame in packet.decode():
                assert isinstance(frame, Frame)
                # print(f"- Frame time: {frame.time}")
                # print(f"- Frame dts: {frame.dts}")
                # print(f"- Frame pts: {frame.pts}")
                # print(f"------------------------")

                loop_count += 1

                image = frame.to_ndarray(format="bgr24")  # noqa

                cv2.imshow(WINDOW_NAME, image)
                keycode = cv2.waitKey(self.waiting_milliseconds)
                if keycode != -1:
                    return

                # new_frame = VideoFrame.from_ndarray(image, format="bgr24")
                # new_frame.pts = frame.pts - video_first_pts
                # new_frame.time_base = frame.time_base
                # for p in self.output_stream.encode(new_frame):
                #     self.output_container.mux(p)

            end = datetime.now().astimezone()
            # print(f"now: {now}")
            # print(f"files: {files}")
            # print(f"frame_now: {frame_now}")
            # print(f"end: {end}")
            # print("--------------------")

    def close(self) -> None:
        for packet in self.output_stream.encode(None):
            self.output_container.mux(packet)
        self.output_container.close()
        self.input_container.close()
        cv2.destroyWindow(WINDOW_NAME)


async def main() -> None:
    demo = DemoAvMuxing()
    demo.run()
    demo.close()


async def subprocess_main() -> None:
    process = Process(target=main)
    process.start()
    process.join()


if __name__ == "__main__":
    try:
        run(main())
    except BaseException as _e:  # noqa
        print(_e)

See also