PyAV:Examples:HLS
PyAV를 사용하여 HLS 스트리밍을 하는 예제.
demo_av_muxing.html
hls.js를 사용한다.
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Demo AV Muxing</title>
</head>
<body>
<video id="video" loop playsinline autoplay muted controls></video>
<script src="https://cdn.jsdelivr.net/npm/hls.js@latest"></script>
<script>
var video = document.getElementById('video');
// var videoSrc = './live_stream.m3u8';
// var videoSrc = 'https://devstreaming-cdn.apple.com/videos/streaming/examples/bipbop_adv_example_hevc/master.m3u8';
var videoSrc = 'https://stream.mux.com/v69RSHhFelSm4701snP22dYz2jICy4E4FUyk02rW4gxRM.m3u8';
if (Hls.isSupported()) {
var hls = new Hls();
hls.loadSource(videoSrc);
hls.attachMedia(video);
console.debug("1")
hls.on(Hls.Events.MANIFEST_PARSED, () => {
video.muted = 'muted';
video.autoplay = 'autoplay';
video.playsinline = 'true';
video.play();
console.debug("MANIFEST_PARSED")
});
// hls.on(Hls.Events.SUBTITLE_TRACKS_UPDATED, (data, d2) => {
// console.debug("SUBTITLE_TRACKS_UPDATED")
// console.dir(data)
// console.dir(d2)
// });
// hls.on(Hls.Events.FRAG_PARSED, (data, d2) => {
// console.debug("FRAG_PARSED")
// console.dir(data)
// console.dir(d2)
// });
hls.on(Hls.Events.FRAG_CHANGED, (data, d2, d3) => {
console.debug("FRAG_CHANGED")
console.dir(data)
console.dir(d2)
console.dir(d3)
});
// hls.on(Hls.Events.SUBTITLE_FRAG_PROCESSED, (data, d2, d3, d4, d5) => {
// console.debug("SUBTITLE_FRAG_PROCESSED")
// console.dir(data)
// console.dir(d2)
// console.dir(d3)
// console.dir(d4)
// console.dir(d5)
// });
} else {
alert('Unsupported HLS.js component');
}
</script>
</body>
</html>
demo_av_muxing.py
PyAV와 OpenCV-Python을 설치하면 된다.
# -*- coding: utf-8 -*-
import os
import cv2
from asyncio import run
from av import open as av_open # noqa
from av import VideoFrame # noqa
from av.stream import Stream
from av.frame import Frame
from av.container.input import InputContainer
from av.container.output import OutputContainer
from av.video.stream import VideoStream
from av.video.codeccontext import VideoCodecContext
from av.error import InvalidDataError # noqa
from math import floor
from fractions import Fraction
from multiprocessing import Process
from datetime import datetime
KEYCODE_ESC = 27
DEMO_RTSP_URLS = [
"rtsp://192.168.0.50:554/media/1/1/Profile1",
"rtsp://192.168.0.95:8554/live.sdp",
]
DEMO_URL = DEMO_RTSP_URLS[0]
WINDOW_NAME = "Demo AV Muxing"
# noinspection SpellCheckingInspection
DEFAULT_RTSP_OPTIONS = {
"rtsp_transport": "tcp",
"fflags": "nobuffer",
}
# noinspection SpellCheckingInspection
HLS_OPTIONS = {
"strftime": "1",
"strftime_mkdir": "1",
# "hls_list_size": "0",
"hls_playlist_type": "vod",
"hls_flags": "second_level_segment_index",
"hls_segment_filename": "live_stream/video-%Y-%m-%d_%H-%M-%S-%%02d.ts",
}
OUTPUT_FILENAME = "live_stream.m3u8"
def _rounding(a: int, b: int, c: int) -> int:
return (a * b) // c
def rescale_q_rnd(a: int, bq: Fraction, cq: Fraction) -> int:
"""
Rescale a 64-bit integer by 2 rational numbers with specified rounding.
"""
b = bq.numerator * cq.denominator
c = cq.numerator * bq.denominator
return _rounding(a, b, c)
def _go_faster_stream(stream: Stream) -> None:
assert hasattr(stream, "thread_type")
setattr(stream, "thread_type", "AUTO")
def _low_delay_stream(stream: Stream) -> None:
assert hasattr(stream.codec_context, "flags")
setattr(stream.codec_context, "flags", "LOW_DELAY")
class DemoAvMuxing:
input_container: InputContainer
input_stream: VideoStream
output_container: OutputContainer
def __init__(
self,
waiting_seconds=0.001,
exit_key="q",
):
self.waiting_seconds = waiting_seconds
self.exit_key = exit_key
try:
self.input_container = av_open(
DEMO_URL,
options=DEFAULT_RTSP_OPTIONS,
timeout=(8, 8),
)
except InvalidDataError as e1:
# maybe timeout ...
print(f"InvalidDataError: {e1}")
raise
except BaseException as e2:
print(f"Error[{type(e2).__name__}]: {e2}")
raise
assert isinstance(self.input_container, InputContainer)
self.input_stream_index = 0
self.input_stream = self.input_container.streams.get(video=0)[0]
assert isinstance(self.input_stream, VideoStream)
_go_faster_stream(self.input_stream)
_low_delay_stream(self.input_stream)
input_stream_codec_context = self.input_stream.codec_context
assert isinstance(input_stream_codec_context, VideoCodecContext)
# input_stream_codec = input_stream_codec_context.codec
self.output_container = av_open(OUTPUT_FILENAME, "w", options=HLS_OPTIONS)
self.output_stream = self.output_container.add_stream(template=self.input_stream) # noqa
# self.subtitle_output_stream = self.output_container.add_stream("text")
@property
def waiting_milliseconds(self) -> int:
return floor(self.waiting_seconds * 1000)
def test_exit(self, keycode: int) -> bool:
keycode_byte = keycode & 0xFF
if keycode_byte == ord(self.exit_key.lower()):
return True
elif keycode_byte == ord(self.exit_key.upper()):
return True
elif keycode_byte == KEYCODE_ESC:
return True
else:
return False
def run(self) -> None:
cv2.namedWindow(WINDOW_NAME)
video_first_dts = None
video_first_pts = None
for packet in self.input_container.demux(self.input_stream):
# print(packet)
if video_first_dts is None:
video_first_dts = packet.dts
if video_first_pts is None:
video_first_pts = packet.pts
now = datetime.now().astimezone()
# We need to skip the "flushing" packets that `demux` generates.
if packet.dts is None:
continue
if (
packet.stream.type == "video"
and packet.stream_index == self.input_stream_index
):
# We need to assign the packet to the new stream.
packet.stream = self.output_stream
packet.dts -= video_first_dts
packet.pts -= video_first_pts
try:
self.output_container.mux(packet)
except BaseException as e:
print(e)
files = os.listdir("live_stream")
frame_now = datetime.now().astimezone()
# print(f"Packet dts: {packet.dts}")
# print(f"Packet pts: {packet.pts}")
loop_count = 0
for frame in packet.decode():
assert isinstance(frame, Frame)
# print(f"- Frame time: {frame.time}")
# print(f"- Frame dts: {frame.dts}")
# print(f"- Frame pts: {frame.pts}")
# print(f"------------------------")
loop_count += 1
image = frame.to_ndarray(format="bgr24") # noqa
cv2.imshow(WINDOW_NAME, image)
keycode = cv2.waitKey(self.waiting_milliseconds)
if keycode != -1:
return
# new_frame = VideoFrame.from_ndarray(image, format="bgr24")
# new_frame.pts = frame.pts - video_first_pts
# new_frame.time_base = frame.time_base
# for p in self.output_stream.encode(new_frame):
# self.output_container.mux(p)
end = datetime.now().astimezone()
# print(f"now: {now}")
# print(f"files: {files}")
# print(f"frame_now: {frame_now}")
# print(f"end: {end}")
# print("--------------------")
def close(self) -> None:
for packet in self.output_stream.encode(None):
self.output_container.mux(packet)
self.output_container.close()
self.input_container.close()
cv2.destroyWindow(WINDOW_NAME)
async def main() -> None:
demo = DemoAvMuxing()
demo.run()
demo.close()
async def subprocess_main() -> None:
process = Process(target=main)
process.start()
process.join()
if __name__ == "__main__":
try:
run(main())
except BaseException as _e: # noqa
print(_e)