Pyrtmp
Pure python RTMP server.
Sample code
import asyncio
import logging
import os
import tempfile
from pyrtmp import StreamClosedException, RTMPProtocol
from pyrtmp.messages import SessionManager
from pyrtmp.messages.audio import AudioMessage
from pyrtmp.messages.command import NCConnect, NCCreateStream, NSPublish, NSCloseStream, NSDeleteStream
from pyrtmp.messages.data import MetaDataMessage
from pyrtmp.messages.protocolcontrol import WindowAcknowledgementSize, SetChunkSize, SetPeerBandwidth
from pyrtmp.messages.usercontrol import StreamBegin
from pyrtmp.messages.video import VideoMessage
from pyrtmp.misc.flvdump import FLVFile, FLVMediaType
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
async def simple_controller(reader, writer):
session = SessionManager(reader=reader, writer=writer)
flv = None
try:
logger.debug(f'Client connected {session.peername}')
# do handshake
await session.handshake()
# read chunks
async for chunk in session.read_chunks_from_stream():
message = chunk.as_message()
logger.debug(f"Receiving {str(message)} {message.chunk_id}")
if isinstance(message, NCConnect):
session.write_chunk_to_stream(WindowAcknowledgementSize(ack_window_size=5000000))
session.write_chunk_to_stream(SetPeerBandwidth(ack_window_size=5000000, limit_type=2))
session.write_chunk_to_stream(StreamBegin(stream_id=0))
session.write_chunk_to_stream(SetChunkSize(chunk_size=8192))
session.writer_chunk_size = 8192
session.write_chunk_to_stream(message.create_response())
await session.drain()
logger.debug("Response to NCConnect")
elif isinstance(message, WindowAcknowledgementSize):
pass
elif isinstance(message, NCCreateStream):
session.write_chunk_to_stream(message.create_response())
await session.drain()
logger.debug("Response to NCCreateStream")
elif isinstance(message, NSPublish):
# create flv file at temp
flv = FLVFile(os.path.join(tempfile.gettempdir(), message.publishing_name))
session.write_chunk_to_stream(StreamBegin(stream_id=1))
session.write_chunk_to_stream(message.create_response())
await session.drain()
logger.debug("Response to NSPublish")
elif isinstance(message, MetaDataMessage):
# Write meta data to file
flv.write(0, message.to_raw_meta(), FLVMediaType.OBJECT)
elif isinstance(message, SetChunkSize):
session.reader_chunk_size = message.chunk_size
elif isinstance(message, VideoMessage):
# Write video data to file
flv.write(message.timestamp, message.payload, FLVMediaType.VIDEO)
elif isinstance(message, AudioMessage):
# Write data data to file
flv.write(message.timestamp, message.payload, FLVMediaType.AUDIO)
elif isinstance(message, NSCloseStream):
pass
elif isinstance(message, NSDeleteStream):
pass
else:
logger.debug(f"Unknown message {str(message)}")
except StreamClosedException as ex:
logger.debug(f"Client {session.peername} disconnected!")
finally:
if flv:
flv.close()
async def serve_rtmp(use_protocol=True):
loop = asyncio.get_running_loop()
if use_protocol is True:
server = await loop.create_server(lambda: RTMPProtocol(controller=simple_controller, loop=loop), '0.0.0.0', 1935)
else:
server = await asyncio.start_server(simple_controller, '0.0.0.0', 1935)
addr = server.sockets[0].getsockname()
logger.info(f'Serving on {addr}')
async with server:
await server.serve_forever()
def wrapper(port: int):
asyncio.run(serve_rtmp(port=port))
IS_DEBUG=True
NUM_PROCESS=2
if __name__ == "__main__":
if IS_DEBUG is True:
wrapper(1935)
else:
from multiprocessing import Process
import uvloop
uvloop.install()
process = []
for i in range(NUM_PROCESS):
p = Process(target=wrapper, args=(1935 + i + 1,))
p.start()
process.append(p)
for p in process:
p.join()
See also
Favorite site