Skip to content

PyAV

PyAV is a Pythonic binding for the FFmpeg libraries. We aim to provide all of the power and control of the underlying library, but manage the gritty details as much as possible.

APIs

Globals

Main entrypoint to opening files/streams:

av.open(file, mode='r', **kwargs)

Examples:

av.open("/path/to/some.mp4")  # File
av.open("https://file-examples-com.github.io/uploads/2017/04/file_example_MP4_480_1_5MG.mp4")  # URL
av.open("rtsp://192.168.0.50:554/media/1/1/Profile1")  # RTSP
av.open("rtsp://192.168.0.95:8554/live.sdp")  # RTSP
av.open("/dev/video0", format="v4l2", options={"video_size": "640x480"})  # Linux
av.open("default:none", format="avfoundation", options={"video_size": "640x480"})  # macOS
av.open("video=Integrated Camera", format="dshow", options={"video_size": "640x480"})  # Windows

av.VideoStream

object dir:

average_rate, base_rate, codec_context, container, decode, duration, encode, frames, guessed_rate, id, index, language, metadata, profile, start_time, time_base, type
time_base
타임스탬프가 표시되는 시간 단위(in Fractional 초)입니다.
Python:fractions#Fraction 클래스로 표현된다.
만약 Fraction(1, 60000) 라면 1/60000초가 된다.
start_time
이 스트림의 첫 번째 프레임에 대한 time_base 단위의 프레젠테이션 타임스탬프입니다.
duration
이 스트림의 지속 시간(time_base 단위)입니다.
time_base1/60000이고, duration2821819라면, duration * time_base = Fraction(2821819, 60000), 즉 2821819/60000인, 약 47.03초가 된다.
frames
이 스트림에 포함된 프레임 수입니다.
알 수 없는 경우 0을 반환합니다.
average_rate
이 비디오 스트림의 평균 프레임 속도입니다.
이것은 처음 몇 프레임을 보고 속도를 평균하여 파일을 열 때 계산됩니다.
base_rate
이 스트림의 기본 프레임 속도입니다.
이것은 프레임의 타임스탬프가 정확하게 표현될 수 있는 가장 낮은 프레임 속도로 계산됩니다. 자세한 내용은 AVStream.r_frame_rate를 참조하세요.
guessed_rate
이 스트림의 예상 프레임 속도입니다.
이것은 av_guess_frame_rate에 대한 래퍼이며, "the" 프레임 속도가 무엇인지 결정하기 위해 여러 경험적 방법을 사용합니다.

참고로 dir함수로 출력되지 않는 속성이 존재한다. PyAV 코드의 ${PYAV_ROOT_DIR}/av/stream.pyx 코드를 보면 다음과 같은 내용이 있다:

cdef class Stream(object):
    # ...

    def __getattr__(self, name):
        # avoid an infinite loop for unsupported codecs
        if self.codec_context is None:
            return

        try:
            return getattr(self.codec_context, name)
        except AttributeError:
            try:
                return getattr(self.codec_context.codec, name)
            except AttributeError:
                raise AttributeError(name)

    def __setattr__(self, name, value):
        setattr(self.codec_context, name, value)

즉, getattr의 경우 codec_context 에서, 여기에도 없다면 codec_context.codec 에서 속성을 획득한다.

반대로 setattr의 경우 codec_context 으로 저장한다.

av.VideoFrame

object dir:

dts, format, from_image, from_ndarray, height, index, interlaced_frame, is_corupt, key_frame, pict_type, planes, pts, reformat, side_data, time, time_base, to_image, to_nd_array, to_ndarray, to_rgb, width

av.VideoCodecContext

object dir:

ac_pred, bit_rate, bit_rate_tolerance, bitexact, chunks, close, closed_gop, codec, codec_tag, coded_height, coded_width, create, decode, display_aspect_ratio, drop_changed, drop_frame_timecode, encode, encoded_frame_count, export_mvs, extradata, extradata_size, fast, flags, flags2, format, four_mv, framerate, global_header, gop_size, gray, has_b_frames, height, ignore_crop, interlaced_dct, interlaced_me, is_decoder, is_encoder, is_open, local_header, loop_filter, low_delay, max_bit_rate, name, no_output, open, options, output_corrupt, parse, pass1, pass2, pix_fmt, profile, psnr, qpel, qscale, rate, reformatter, ro_flush_noop, sample_aspect_ratio, show_all, skip_frame, skip_manual, thread_count, thread_type, ticks_per_frame, time_base, truncated, type, unaligned, width

av.codec.codec.Codec

object dir:

audio_formats, audio_rates, auto_threads, avoid_probing, bitmap_sub, capabilities, channel_conf, create, delay, descriptor, dr1, draw_horiz_band, encoder_reordered_opaque, experimental, frame_rates, frame_threads, hardware, hwaccel, hwaccel_vdpau, hybrid, id, intra_only, is_decoder, is_encoder, long_name, lossless, lossy, name, neg_linesizes, param_change, properties, reorder, slice_threads, small_last_frame, subframes, text_sub, truncated, type, variable_frame_size, video_formats

Examples

Video player

import numpy as np
import cv2

import av

video0 = 'small.mp4'
video1 = 'bus-fire.mp4'
video2 = 'rtsp://admin:[email protected]/stream1'

container = av.open(video1, options={'rtsp_transport': 'udp'})
container.streams.video[0].thread_type = 'AUTO'  # Go faster!

for frame in container.decode(video=0):

    print(frame)
    array = frame.to_ndarray(format='bgr24')

    cv2.imshow('frame', array)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

Obtain Total Frame

import av

container = av.open("My_Super_Video.mp4")
total_frames = container.streams.video[0].frames

Saving Keyframes

import av
import av.datasets

content = av.datasets.curated("pexels/time-lapse-video-of-night-sky-857195.mp4")
with av.open(content) as container:
    # Signal that we only want to look at keyframes.
    stream = container.streams.video[0]
    stream.codec_context.skip_frame = "NONKEY"

    for frame in container.decode(stream):

        print(frame)

        # We use `frame.pts` as `frame.index` won't make must sense with the `skip_frame`.
        frame.to_image().save(
            "night-sky.{:04d}.jpg".format(frame.pts),
            quality=80,
        )

Video player (Fast options)

RTSP 스트리밍을 최대한 빠르게 출력하는 옵션 FFmpeg#Minimize delay 항목 참조

import av
import time
import numpy as np
import cv2

video0 = 'small.mp4'
video1 = 'bus-fire.mp4'
video2 = 'rtsp://admin:[email protected]/stream1'

video_src = video2
video_index = 0
frame_format = 'bgr24'
frame_width = 1270
frame_height = 720
options = {
    'rtsp_transport': 'udp',
    'fflags': 'nobuffer'
}
container_options = {}
stream_options = []

container = av.open(video_src, options=options,
                    container_options=container_options,
                    stream_options=stream_options)
container.streams.video[video_index].thread_type = 'AUTO'  # Go faster! (SLICE, AUTO, FRAME, NONE)
container.streams.video[video_index].codec_context.flags = 'LOW_DELAY'
frames = container.decode(video=video_index)

while True:

    begin = time.time()
    frame = next(frames)

    last_frame = frame.to_ndarray(width=frame_width,
                                  height=frame_height,
                                  format=frame_format,
                                  interpolation='FAST_BILINEAR')
    last_index = frame.index
    last_pts = frame.pts

    end = time.time()
    elapsed = int((end-begin)*1000)
    print(f'index={last_index}, pts={last_pts}, shape={last_frame.shape}, elapsed={elapsed}ms')

    cv2.imshow('frame', last_frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

RTSP to RTSP

# -*- coding: utf-8 -*-

from argparse import Namespace
from typing import Callable

import av
import cv2
import numpy as np

from ddrm.logging.logging import logger


class CamberApp:
    def __init__(self, args: Namespace, printer: Callable[..., None] = print):
        assert args is not None
        assert printer is not None

        assert isinstance(args.debug, bool)
        assert isinstance(args.verbose, int)
        assert isinstance(args.bind, str)
        assert isinstance(args.port, int)
        assert isinstance(args.timeout, float)
        assert isinstance(args.source, str)
        assert isinstance(args.format, str)
        assert isinstance(args.destination, str)

        self._debug = args.debug
        self._verbose = args.verbose
        self._bind = args.bind
        self._port = args.port
        self._timeout = args.timeout
        self._source = args.source
        self._format = args.format
        self._destination = args.destination

        logger.info(f"Debug enabled: {self._debug}")
        logger.info(f"Verbose level: {self._verbose}")
        logger.info(f"Web Server bind: {self._bind}")
        logger.info(f"Web Server port: {self._port}")
        logger.info(f"Web Server timeout: {self._timeout}s")
        logger.info(f"Source URL: {self._source}")
        logger.info(f"Destination format: {self._format}")
        logger.info(f"Destination URL: {self._destination}")

    def run(self) -> int:
        format_options = {'rtsp_transport': 'tcp', 'fflags': 'nobuffer'}

        logger.debug("Open input ..")
        input_container = av.open(self._source, options=format_options)

        logger.debug("Open output ..")
        output_container = av.open(self._destination, mode="w", format=self._format)

        video_stream = None
        for stream in input_container.streams:
            if stream.type == "video":
                video_stream = stream
                break

        if video_stream is None:
            logger.error("Not found video stream from source")
            return 1

        video_stream.thread_type = "AUTO"
        # video_stream.no_buffer = True
        # , 'flags': 'no_buffer'
        video_stream.codec_context.low_delay = True
        logger.debug(f"Find video stream: {video_stream}")
        # output_stream = output_container.add_stream(template=video_stream)

        output_stream = output_container.add_stream('libx264')
        output_stream.width = video_stream.width
        output_stream.height = video_stream.height
        output_stream.pix_fmt = 'yuv420p'
        output_stream.options = {
            'preset': 'fast',
            'crf': '28',
            'turn': 'zerolatency',
        }

        # ffmpeg time is complicated
        # more at https://github.com/PyAV-Org/PyAV/blob/main/docs/api/time.rst
        # our situation is the "encoding" one

        # this is independent of the "fps" you give above
        # 1/1000 means milliseconds (and you can use that, no problem)
        # 1/2 means half a second (would be okay for the delays we use below)
        # 1/30 means ~33 milliseconds
        # you should use the least fraction that makes sense for you

        logger.warning(f"input_stream.time_base: {video_stream.time_base}")
        logger.warning(f"output_stream.time_base: {output_stream.time_base}")
        # output_stream.time_base = video_stream.time_base

        # this says when to show the next frame
        # (increment by how long the current frame will be shown)
        my_pts = 0  # [seconds]
        # below we'll calculate that into our chosen time base

        frame_i = 0

        for packet in input_container.demux(video_stream):

            # We need to skip the "flushing" packets that `demux` generates.
            if packet.dts is None:
                continue

            # Discard frames from previous packets to get the latest frame.
            frames = [frame for frame in packet.decode()]
            if not frames:
                continue

            if len(frames) >= 2:
                # Discard frames from previous packets to get the latest frame.
                logger.debug(f"Discard {len(frames) - 1} frames")

            latest_frame = frames[-1]
            logger.debug(f"Latest frame: {latest_frame}")

            image = latest_frame.to_ndarray(format="bgr24")

            # gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
            # colord_gray = cv2.cvtColor(gray, cv2.COLOR_GRAY2RGB)

            result_image = image
            av_frame = av.VideoFrame.from_ndarray(result_image, format="bgr24")

            # av_frame.pts = latest_frame.pts
            # av_frame.time_base = latest_frame.time_base

            # # seconds -> counts of time_base
            # av_frame.pts = int(round(my_pts / output_stream.time_base))
            #
            # # increment by display time to pre-determine next frame's PTS
            # my_pts += 1.0 if ((frame_i // 3) % 2 == 0) else 0.5
            # frame_i += 1

            logger.debug(f"Latest av frame: {av_frame}")

            for output_packet in output_stream.encode(av_frame):
                output_container.mux(output_packet)

        input_container.close()
        output_container.mux(output_stream.encode(None))
        output_container.close()

        return 0


def camber_main(args: Namespace, printer: Callable[..., None] = print) -> int:
    assert args is not None
    assert printer is not None
    return CamberApp(args).run()

다음과 같이 실행.

./run "rtsp://admin:[email protected]/stream1" rtsp "rtsp://localhost:8554/mystream2"

Disable Warnings

만약, aiortc를 사용한다면 다음과 같은 경고가 출력될 수 있다:

[swscaler @ 0x563c4a97ce80] deprecated pixel format used, make sure you did set range correctly

발생 지점은, swscaler를 사용하는 시점, VideoFrame를 취득하는 시점인,

from av.video.frame import VideoFrame
# ...
assert isinstance(frame, VideoFrame)
frame.to_ndarray(format="bgr24")

이다.

좀더 상세적으로 보면,

  1. frame.to_ndarray의 구현은, cython으로 되어 있다.
  2. 안에, cdef VideoFrame frame = self.reformat(**kwargs) 부분이다.
  3. frame.reformat을 보면, VideoReformatter객체를 생성하고, VideoReformatter.reformat을 호출한다.
  4. VideoReformatter.reformat를 보면, VideoFormat을 획득하고 sws_scale를 최종적으로 호출하게 된다.

FFmpeg를 직접 사용한다면 Libswscale#deprecated pixel format used, make sure you did set range correctly 를 참고하여 직접 수정해도 되지만 이 방법은 어렵다.

몇 가지 해결 방법이 있다.

av.logging.restore_default_callback 사용 금지

aiortc의 경우 aiortc.__init__.py에서 바로 호출된다. aiortc를 사용한다면 사용 불가. (아니면 직접 커스텀 lib 만들던가)

# Disable PyAV's logging framework as it can lead to thread deadlocks.
av.logging.restore_default_callback()

av.logging.Capture 사용

캡쳐 객체 사용. av.logging.restore_default_callback 를 사용한다면 사용 불가

import av.logging

def do_log(message):
    av.logging.log(av.logging.INFO, 'test', message)

with av.logging.Capture() as logs:
    frame.to_ndarray(format="bgr24")

libav 의 로그 레벨 변경

공식 문서 참조. 간단히:

av.logging.set_level(av.logging.ERROR)

위 방법은 아래와 방법과 동일한 효과를 내지만 위의 방법이 좀 더 효과적이다. (PyAV관점에서, 전역 변수를 관리하여 로깅 출력을 관리한다)

logging.getLogger('libav').setLevel(logging.ERROR)

이 방법도 av.logging.restore_default_callback 를 사용한다면 사용 불가

Standard Stream의 stderr redirection

문제는 Pythonsys.stdout을 제어하면 안된다. Pure C 레벨의 표준출력(stderr)을 제어해야 한다.

자세한 내용은 Python 에서 표준출력/표준에러 Redirection 이슈 항목 참조.

문제는 contextlib을 사용한다면 잦은 file open, file close 때문에 성능에 영향을 주며, 다른 Pure C 레벨의 출력또한 영향을 준다.

ctypes 사용한 후킹

python.ctypes를 사용하여, 라이브러리를 동적 로딩, av_log_set_level를 호출하여 ffmpeg 자체의 로깅 레벨를 조정하면 된다.

우선, cython으로 묶여 있는 PyAV의 libavutil라이브러리를 찾아야 한다. 이를 위해 av 모듈 위치를 검색하는 함수를 구현한다:

# -*- coding: utf-8 -*-

import os
from importlib import import_module

def get_module_directory(module) -> str:
    module_path = getattr(module, "__path__", None)
    if module_path:
        assert isinstance(module_path, list)
        return module_path[0]

    module_file = getattr(module, "__file__", None)
    if module_file:
        assert isinstance(module_file, str)
        return os.path.dirname(module_file)

    raise RuntimeError(f"The '{module.__name__}' module path is unknown")

PyAV에서 사용하는 libavutil 동적 라이브러리는 site-packages의 av.libs에 위치한다. 파일명은 libavutil-8508eea7.so.56.51.100와 같이 명명되므로 이 파일을 찾는 regex를 만든다:

# -*- coding: utf-8 -*-

import re
import os
from importlib import import_module
from recc.package.package_utils import get_module_directory


def find_ffmpeg_library_path(library_name: str) -> str:
    av_module = import_module("av")
    av_core_module = import_module("av._core")

    av_module_dir = get_module_directory(av_module)
    if av_module_dir[-1] == "/":
        av_module_dir = av_module_dir[:-1]
    av_lib_module_dir = av_module_dir + ".libs"
    if not os.path.isdir(av_lib_module_dir):
        raise FileNotFoundError(f"Not found module directory: {av_lib_module_dir}")

    versions = getattr(av_core_module, "library_versions")
    assert isinstance(versions, dict)
    if library_name not in versions:
        raise ValueError(f"Unknown library name: {library_name}")

    major = versions[library_name][0]
    minor = versions[library_name][1]
    patch = versions[library_name][2]
    assert isinstance(major, int)
    assert isinstance(minor, int)
    assert isinstance(patch, int)

    regex_pattern = r"^{name}-(.*)\.so\.{major}\.{minor}\.{patch}$".format(
        name=library_name, major=major, minor=minor, patch=patch
    )
    matcher = re.compile(regex_pattern)

    for file in os.listdir(av_lib_module_dir):
        if matcher.match(file):
            return os.path.join(av_lib_module_dir, file)

    raise FileNotFoundError(f"Not found ffmpeg library: {library_name}")


# noinspection SpellCheckingInspection
def find_libavutil_path() -> str:
    return find_ffmpeg_library_path("libavutil")

ctypes를 사용하여 libavutil의 av_log_set_level를 호출한다.

# -*- coding: utf-8 -*-

from ctypes import CDLL
from recc_plugin_vms.av.modules import find_libavutil_path
from av.logging import ERROR


# noinspection SpellCheckingInspection
def av_log_set_level(level: int) -> None:
    libavutil = CDLL(find_libavutil_path())
    try:
        libavutil.av_log_set_level(level)
    finally:
        del libavutil


# noinspection SpellCheckingInspection
def silent_av_warnings() -> None:
    # e.g.
    # [swscaler @ 0x5569f73ced40]
    # deprecated pixel format used, make sure you did set range correctly
    av_log_set_level(ERROR)

최종적으로 다음 함수를 호출하면 된다:

silent_av_warnings()

Troubleshooting

SIGSEGV

트랙 종료시 다음과 같은 현상이 발생된다.

SIGSEGV(11):
 #12 0x00007FFF6BD625FD _sigtramp+29 [libsystem_platform.dylib]
 #13 0x00007FFF6BD6E4B9 _pthread_cond_wait+846 [libsystem_pthread.dylib]

deprecated pixel format used, make sure you did set range correctly

#Disable Warnings 항목 참조.

Cannot rebase to zero time

OutputStream 에 encode 전달시 Cannot rebase to zero time에러가 발생한다. 패킷 또는 프레임에 time_base가 절달됐는지 확인해보자.

opencv-python의 imshow 에서 화면이 멎어버리는 현상

OpenCV:Troubleshooting#imshow 에서 화면이 멎어버리는 현상 항목 참조.

See also

Favorite site