RTSP:Examples:SimplePythonServer
순수 Python 으로 RTSP 로 비디오 영상 (노이즈 영상) 을 스트리밍 해주는 RTSPServer 클래스를 구현하고 실행하는 main.py 파일
MJPEG 포맷으로 스트리밍. 영상 크기는 800x600 이고, 빨간색 배경.
WARNING |
아직 개발중... 서버는 열리지만 클라이언트 접속시 재생 안되더라... |
Code
import socket
import threading
import time
import random
import struct
from PIL import Image
import io
class RTSPServer:
def __init__(self, host="0.0.0.0", port=8554):
self.host = host
self.port = port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.sessions = {}
self.sequence_number = 0
def start(self):
self.server_socket.listen(5)
print(f"RTSP 서버가 {self.host}:{self.port}에서 실행 중입니다.")
while True:
client_socket, address = self.server_socket.accept()
client_thread = threading.Thread(
target=self.handle_client,
args=(client_socket, address),
)
client_thread.start()
def handle_client(self, client_socket, address):
print(f"클라이언트 연결됨: {address}")
session = None
try:
while True:
data = client_socket.recv(1024).decode()
print("---------------")
print(f"[DATA] {data}")
print("---------------")
if not data:
break
lines = data.split("\r\n")
request_type = lines[0].split()[0]
if request_type == "OPTIONS":
response = self.handle_options()
elif request_type == "DESCRIBE":
response = self.handle_describe(lines[0].split()[1])
elif request_type == "SETUP":
session, response = self.handle_setup(lines)
elif request_type == "PLAY":
response = self.handle_play(session)
elif request_type == "TEARDOWN":
response = self.handle_teardown(session)
break
else:
response = self.create_response(400, "Bad Request")
print("---------------")
print(f"[RESPONSE] {response}")
print("---------------")
client_socket.send(response.encode())
if request_type == "PLAY":
self.stream_video(client_socket, session)
except Exception as e:
print(f"Error: {e}")
finally:
if session in self.sessions:
del self.sessions[session]
client_socket.close()
print(f"클라이언트 연결 종료: {address}")
def handle_options(self):
return self.create_response(
200, "OK", {"Public": "OPTIONS, DESCRIBE, SETUP, PLAY, TEARDOWN"}
)
def handle_describe(self, url):
sdp = self.create_sdp_description(url)
headers = {
"Content-Base": url,
"Content-Type": "application/sdp",
"Content-Length": len(sdp),
}
return self.create_response(200, "OK", headers, sdp)
def handle_setup(self, lines):
session = str(random.randint(100000, 999999))
self.sessions[session] = {"state": "READY"}
transport = (
[line for line in lines if line.startswith("Transport:")][0]
.split(":")[1]
.strip()
)
return session, self.create_response(
200, "OK", {"Session": session, "Transport": transport}
)
def handle_play(self, session):
if session in self.sessions:
self.sessions[session]["state"] = "PLAYING"
return self.create_response(200, "OK", {"Session": session})
return self.create_response(454, "Session Not Found")
def handle_teardown(self, session):
if session in self.sessions:
del self.sessions[session]
return self.create_response(200, "OK")
return self.create_response(454, "Session Not Found")
def create_response(self, status_code, status_text, headers=None, body=None):
response = f"RTSP/1.0 {status_code} {status_text}\r\n"
response += f"CSeq: {self.sequence_number}\r\n"
self.sequence_number += 1
if headers:
for key, value in headers.items():
response += f"{key}: {value}\r\n"
response += "\r\n"
if body:
response += body
return response
def create_sdp_description(self, url):
return (
"v=0\r\n"
"o=- 0 0 IN IP4 0.0.0.0\r\n"
"s=Red Background MJPEG Stream\r\n"
f"u={url}\r\n"
"[email protected]\r\n"
"c=IN IP4 0.0.0.0\r\n"
"t=0 0\r\n"
"m=video 0 RTP/AVP 26\r\n"
"a=rtpmap:26 JPEG/90000\r\n"
"a=control:trackID=1\r\n"
)
def stream_video(self, client_socket, session):
rtp_seq_num = 0
start_time = time.time()
try:
print(f"-------[stream_video]-------")
while (
session in self.sessions
and self.sessions[session]["state"] == "PLAYING"
):
print(f"-------[loop...]-------")
frame = self.generate_red_frame()
rtp_packet = self.create_rtp_packet(frame, rtp_seq_num, start_time)
try:
print(f"-------[packet]: {len(rtp_packet)} bytes")
client_socket.send(rtp_packet)
except:
break
rtp_seq_num = (rtp_seq_num + 1) % 65536
print(f"-------[sleep]")
time.sleep(1 / 30) # 30 FPS
except Exception as e:
print(f"Streaming error: {e}")
def generate_red_frame(self):
image = Image.new("RGB", (800, 600), color="red")
buffer = io.BytesIO()
image.save(buffer, format="JPEG")
return buffer.getvalue()
def create_rtp_packet(self, payload, seq_num, start_time):
version = 2
padding = 0
extension = 0
cc = 0
marker = 1
pt = 26 # MJPEG 페이로드 타입
ssrc = 0x12345678
timestamp = int(
(time.time() - start_time) * 90000
) # RTP 타임스탬프 (90kHz 클럭 레이트)
header = struct.pack(
"!BBHII",
(version << 6) | (padding << 5) | (extension << 4) | cc,
(marker << 7) | pt,
seq_num,
timestamp,
ssrc,
)
return header + payload
def main():
server = RTSPServer()
server.start()
if __name__ == "__main__":
main()
클라이언트 접속:
See also
Favorite site
- Github - vladpen/python-rtsp-server - Lightweight, zero-dependency proxy and storage RTSP server
- runtheops/rtsp-rtp: RTSP client and RTP packet parser