Skip to content

GStreamer:Python

GStreamer의 Python 바인딩 지원에 대한 정리.

Python example

패키지 설치:

$ sudo apt-get install gir1.2-gst-rtsp-server-1.0

파이선 샘플코드:

import sys
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GstRtspServer, GObject

if __name__ == '__main__':
    # GstRtspServer의 루프입니다.
    loop = GObject.MainLoop()
    GObject.threads_init()
    Gst.init(None)

    class MyFactory(GstRtspServer.RTSPMediaFactory):
        def __init__(self):
            GstRtspServer.RTSPMediaFactory.__init__(self)
            # 팩토리에 들어갈 영상의 파이프라인을 설정하는 부분입니다.
            # 아래에서 해당 함수를 콜하는 부분이 없지만 init 될때와 서버가 
            # 기동해서 루프돌고 있을때 참조해서 가동하게 됩니다.

        def do_create_element(self, url):
            # 여기에 파이프라인을 형성할 영상의 주소와 그 설정을 넣게 됩니다.
            spec = """
            filesrc location=test.mp4 ! qtdemux name=demux
            demux.video_0 ! queue ! rtph264pay pt=96 name=pay0
            demux.audio_0 ! queue ! rtpmp4apay pt=97 name=pay1
            demux.subtitle_0 ! queue ! rtpgstpay pt=98 name=pay2
            """
            return Gst.parse_launch(spec)

    class GstServer():
        def __init__(self):
            # GstRtspServer를 클래스 내에서 선언하고
            self.server = GstRtspServer.RTSPServer()
            # 포트를 지정해줍니다.
            self.server.set_service("3002")
            # 팩토리를 생성하는데 이 부분은 위의 MyFactory 클래스에서 설명하겠습니다.
            f = MyFactory()
            # 이 팩토리를 공유 할것이라 설정하고
            f.set_shared(True)
            # 서버 마운트 포인트를 선언하고 
            m = self.server.get_mount_points()
            # 마운트 포인트에 주소와 공유할 팩토리를 넣어줍니다.
            m.add_factory("/test", f)
            self.server.attach(None)

    # GstServer 클래스로 서버 설정을 마치고
    s = GstServer()

    # 서버 루프를 돌립니다.
    loop.run()

RTSP Example

#!/usr/bin/env python
# -*- coding:utf-8 vi:ts=4:noexpandtab
# Simple RTSP server. Run as-is or with a command-line to replace the default pipeline

import sys
import gi

gi.require_version('Gst', '1.0')
from gi.repository import Gst, GstRtspServer, GObject

loop = GObject.MainLoop()
GObject.threads_init()
Gst.init(None)

class MyFactory(GstRtspServer.RTSPMediaFactory):
    def __init__(self):
        GstRtspServer.RTSPMediaFactory.__init__(self)

    def do_create_element(self, url):
        s_src = "v4l2src ! video/x-raw,rate=30,width=320,height=240 ! videoconvert ! video/x-raw,format=I420"
        s_h264 = "videoconvert ! vaapiencode_h264 bitrate=1000"
        s_src = "videotestsrc ! video/x-raw,rate=30,width=320,height=240,format=I420"
        s_h264 = "x264enc tune=zerolatency"
        pipeline_str = "( {s_src} ! queue max-size-buffers=1 name=q_enc ! {s_h264} ! rtph264pay name=pay0 pt=96 )".format(**locals())
        if len(sys.argv) > 1:
            pipeline_str = " ".join(sys.argv[1:])
        print(pipeline_str)
        return Gst.parse_launch(pipeline_str)

class GstServer():
    def __init__(self):
        self.server = GstRtspServer.RTSPServer()
        f = MyFactory()
        f.set_shared(True)
        m = self.server.get_mount_points()
        m.add_factory("/test", f)
        self.server.attach(None)

if __name__ == '__main__':
    s = GstServer()
    loop.run()

rtsp://localhost:8554/test로 접속하면 된다.

ETC examples

See also

Favorite site