Skip to content

RTSP:Examples:SimplePythonServer

순수 Python 으로 RTSP 로 비디오 영상 (노이즈 영상) 을 스트리밍 해주는 RTSPServer 클래스를 구현하고 실행하는 main.py 파일

MJPEG 포맷으로 스트리밍. 영상 크기는 800x600 이고, 빨간색 배경.

WARNING

아직 개발중... 서버는 열리지만 클라이언트 접속시 재생 안되더라...

Code

import socket
import threading
import time
import random
import struct
from PIL import Image
import io


class RTSPServer:
    def __init__(self, host="0.0.0.0", port=8554):
        self.host = host
        self.port = port
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind((self.host, self.port))
        self.sessions = {}
        self.sequence_number = 0

    def start(self):
        self.server_socket.listen(5)
        print(f"RTSP 서버가 {self.host}:{self.port}에서 실행 중입니다.")
        while True:
            client_socket, address = self.server_socket.accept()
            client_thread = threading.Thread(
                target=self.handle_client,
                args=(client_socket, address),
            )
            client_thread.start()

    def handle_client(self, client_socket, address):
        print(f"클라이언트 연결됨: {address}")
        session = None
        try:
            while True:
                data = client_socket.recv(1024).decode()
                print("---------------")
                print(f"[DATA] {data}")
                print("---------------")

                if not data:
                    break

                lines = data.split("\r\n")
                request_type = lines[0].split()[0]

                if request_type == "OPTIONS":
                    response = self.handle_options()
                elif request_type == "DESCRIBE":
                    response = self.handle_describe(lines[0].split()[1])
                elif request_type == "SETUP":
                    session, response = self.handle_setup(lines)
                elif request_type == "PLAY":
                    response = self.handle_play(session)
                elif request_type == "TEARDOWN":
                    response = self.handle_teardown(session)
                    break
                else:
                    response = self.create_response(400, "Bad Request")

                print("---------------")
                print(f"[RESPONSE] {response}")
                print("---------------")
                client_socket.send(response.encode())

                if request_type == "PLAY":
                    self.stream_video(client_socket, session)

        except Exception as e:
            print(f"Error: {e}")
        finally:
            if session in self.sessions:
                del self.sessions[session]
            client_socket.close()
            print(f"클라이언트 연결 종료: {address}")

    def handle_options(self):
        return self.create_response(
            200, "OK", {"Public": "OPTIONS, DESCRIBE, SETUP, PLAY, TEARDOWN"}
        )

    def handle_describe(self, url):
        sdp = self.create_sdp_description(url)
        headers = {
            "Content-Base": url,
            "Content-Type": "application/sdp",
            "Content-Length": len(sdp),
        }
        return self.create_response(200, "OK", headers, sdp)

    def handle_setup(self, lines):
        session = str(random.randint(100000, 999999))
        self.sessions[session] = {"state": "READY"}
        transport = (
            [line for line in lines if line.startswith("Transport:")][0]
            .split(":")[1]
            .strip()
        )
        return session, self.create_response(
            200, "OK", {"Session": session, "Transport": transport}
        )

    def handle_play(self, session):
        if session in self.sessions:
            self.sessions[session]["state"] = "PLAYING"
            return self.create_response(200, "OK", {"Session": session})
        return self.create_response(454, "Session Not Found")

    def handle_teardown(self, session):
        if session in self.sessions:
            del self.sessions[session]
            return self.create_response(200, "OK")
        return self.create_response(454, "Session Not Found")

    def create_response(self, status_code, status_text, headers=None, body=None):
        response = f"RTSP/1.0 {status_code} {status_text}\r\n"
        response += f"CSeq: {self.sequence_number}\r\n"
        self.sequence_number += 1
        if headers:
            for key, value in headers.items():
                response += f"{key}: {value}\r\n"
        response += "\r\n"
        if body:
            response += body
        return response

    def create_sdp_description(self, url):
        return (
            "v=0\r\n"
            "o=- 0 0 IN IP4 0.0.0.0\r\n"
            "s=Red Background MJPEG Stream\r\n"
            f"u={url}\r\n"
            "[email protected]\r\n"
            "c=IN IP4 0.0.0.0\r\n"
            "t=0 0\r\n"
            "m=video 0 RTP/AVP 26\r\n"
            "a=rtpmap:26 JPEG/90000\r\n"
            "a=control:trackID=1\r\n"
        )

    def stream_video(self, client_socket, session):
        rtp_seq_num = 0
        start_time = time.time()
        try:
            print(f"-------[stream_video]-------")
            while (
                session in self.sessions
                and self.sessions[session]["state"] == "PLAYING"
            ):
                print(f"-------[loop...]-------")
                frame = self.generate_red_frame()
                rtp_packet = self.create_rtp_packet(frame, rtp_seq_num, start_time)
                try:
                    print(f"-------[packet]: {len(rtp_packet)} bytes")
                    client_socket.send(rtp_packet)
                except:
                    break
                rtp_seq_num = (rtp_seq_num + 1) % 65536
                print(f"-------[sleep]")
                time.sleep(1 / 30)  # 30 FPS
        except Exception as e:
            print(f"Streaming error: {e}")

    def generate_red_frame(self):
        image = Image.new("RGB", (800, 600), color="red")
        buffer = io.BytesIO()
        image.save(buffer, format="JPEG")
        return buffer.getvalue()

    def create_rtp_packet(self, payload, seq_num, start_time):
        version = 2
        padding = 0
        extension = 0
        cc = 0
        marker = 1
        pt = 26  # MJPEG 페이로드 타입
        ssrc = 0x12345678

        timestamp = int(
            (time.time() - start_time) * 90000
        )  # RTP 타임스탬프 (90kHz 클럭 레이트)

        header = struct.pack(
            "!BBHII",
            (version << 6) | (padding << 5) | (extension << 4) | cc,
            (marker << 7) | pt,
            seq_num,
            timestamp,
            ssrc,
        )

        return header + payload


def main():
    server = RTSPServer()
    server.start()


if __name__ == "__main__":
    main()

클라이언트 접속:

mpv rtsp://localhost:8554/

See also

Favorite site