ONVIF:Example:Client
zeep을 사용한 간단한 명령행 클라이언트.
Source code
# -*- coding: utf-8 -*-
import os
import sys
from argparse import ArgumentParser, Namespace, RawDescriptionHelpFormatter
from datetime import datetime, timedelta
from json import JSONEncoder, dumps
from logging import DEBUG, StreamHandler, getLogger
from pathlib import Path
from pprint import pprint
from typing import Any, Dict, Final, List, Optional, Sequence, Tuple, Union
from urllib.parse import urlparse
from requests import Session
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
from wsdiscovery.discovery import ThreadedWSDiscovery
from zeep import Client
from zeep.cache import Base as ZeepCacheBase
from zeep.helpers import serialize_object
from zeep.proxy import ServiceProxy
from zeep.transports import Transport
from zeep.wsse.username import UsernameToken
PROG: Final[str] = "opy-onvif"
EPILOG = f"""
ONVIF device discovery:
{PROG} -j WS-Discovery | jq '.[].XAddrs[]'
Obtain a list of device capabilities:
{PROG} -j -a [URL] GetCapabilities | jq '.[].XAddr'
If authentication is required:
{PROG} -u 'admin' -p '1q2w3e4r5t!' -a [URL] ...
{PROG} -u 'admin' -p '1q2w3e4r5t!' --use-digest -a [URL] ...
{PROG} -u 'admin' -p '1q2w3e4r5t!' --with-http-basic -a [URL] ...
{PROG} -u 'admin' -p '1q2w3e4r5t!' --use-digest --with-http-digest -a [URL] ...
Obtain a list of media profiles:
{PROG} -j -a [URL] GetProfiles | jq '.[].token'
Obtain a stream URI:
{PROG} -j -a [URL] GetStreamUri [TOKEN] | jq '.Uri'
Obtain a snapshot URI:
{PROG} -j -a [URL] GetSnapshotUri [TOKEN] | jq '.Uri'
"""
ONVIF_V10_SCHEMA_URL: Final[str] = "http://www.onvif.org/ver10/schema"
TRANSPORT_PROTOCOL_UDP: Final[str] = "UDP"
TRANSPORT_PROTOCOL_TCP: Final[str] = "TCP" # Deprecated
TRANSPORT_PROTOCOL_RTSP: Final[str] = "RTSP"
TRANSPORT_PROTOCOL_HTTP: Final[str] = "HTTP"
TRANSPORT_PROTOCOLS: Sequence[str] = (
TRANSPORT_PROTOCOL_UDP,
TRANSPORT_PROTOCOL_TCP,
TRANSPORT_PROTOCOL_RTSP,
TRANSPORT_PROTOCOL_HTTP,
)
_UDP: Final[str] = TRANSPORT_PROTOCOL_UDP
_TCP: Final[str] = TRANSPORT_PROTOCOL_TCP
_RTSP: Final[str] = TRANSPORT_PROTOCOL_RTSP
_HTTP: Final[str] = TRANSPORT_PROTOCOL_HTTP
_PROTOCOLS: Sequence[str] = TRANSPORT_PROTOCOLS
STREAM_TYPE_RTP_UNICAST: Final[str] = "RTP-Unicast"
STREAM_TYPE_RTP_MULTICAST: Final[str] = "RTP-Multicast"
STREAM_TYPES: Sequence[str] = (STREAM_TYPE_RTP_UNICAST, STREAM_TYPE_RTP_MULTICAST)
_RTP_UNICAST: Final[str] = STREAM_TYPE_RTP_UNICAST
_RTP_MULTICAST: Final[str] = STREAM_TYPE_RTP_MULTICAST
_STREAMS: Sequence[str] = STREAM_TYPES
PROFILE_TOKEN_MAX_LENGTH: Final[int] = 64
logger = getLogger("opm.onvif")
def default_logger_setup():
logger.addHandler(StreamHandler(sys.stderr))
logger.setLevel(DEBUG)
def find_default_wsdl_cache_dir() -> str:
"""
Default cache folder location to use when used within an OPM library.
"""
temp_cache_dir = os.path.join(os.path.dirname(__file__), "wsdl")
opm_home_dir = os.environ.get("OPM_HOME", None)
if opm_home_dir is None:
current_dir = os.path.dirname(__file__)
current_dirname = os.path.basename(current_dir)
if current_dirname != "python3":
return temp_cache_dir
parent_dir = os.path.dirname(current_dir)
parent_dirname = os.path.basename(parent_dir)
if parent_dirname != "lib":
return temp_cache_dir
opm_home_dir = os.path.dirname(parent_dir)
var_dir = os.path.join(opm_home_dir, "var")
if not os.path.isdir(var_dir):
return temp_cache_dir
wsdl_dir = os.path.join(var_dir, "wsdl")
if not os.path.isdir(wsdl_dir):
return temp_cache_dir
return wsdl_dir
class ZeepFileCache(ZeepCacheBase):
def __init__(self, prefix: str):
super().__init__()
self._prefix = prefix
def get_cache_path(self, url: str) -> Path:
o = urlparse(url)
return Path(os.path.join(self._prefix, o.hostname, *o.path.split("/")))
def add(self, url: str, content: Any):
filepath = self.get_cache_path(url)
filepath.parent.mkdir(parents=True, exist_ok=True)
try:
if not filepath.exists():
with filepath.open("wb") as f:
f.write(content)
except BaseException as e: # noqa
logger.error(f"ZeepFileCache.add(url={url}) error: {e}")
else:
logger.debug(f"ZeepFileCache.add(url={url}) ok")
def get(self, url: str):
filepath = self.get_cache_path(url)
try:
if filepath.is_file():
with filepath.open("rb") as f:
return f.read()
except BaseException as e: # noqa
logger.error(f"ZeepFileCache.get(url={url}) error: {e}")
else:
logger.debug(f"ZeepFileCache.get(url={url}) ok")
return None
class OnvifWsdlDeclaration:
def __init__(
self,
declaration: str,
http_sub: str,
wsdl_file: str,
subclass: str,
binding_names: Optional[List[str]] = None,
):
self.declaration = declaration
self.http_sub = http_sub
self.wsdl_file = wsdl_file
self.subclass = subclass
# <wsdl:binding name="???" ...> ... </wsdl:binding>
self.binding_names = binding_names if binding_names else list()
@property
def wsdl_file_url(self) -> str:
return self.declaration + "/" + self.wsdl_file
def create_client(
self,
wsse: Optional[UsernameToken] = None,
cache_dir: Optional[str] = None,
with_http_basic=False,
with_http_digest=False,
):
cache = ZeepFileCache(cache_dir) if cache_dir else None
session = Session()
if wsse:
if with_http_basic and with_http_digest:
raise ValueError(
"The 'with_http_basic' and 'with_http_digest' flags cannot coexist"
)
if with_http_basic:
assert not with_http_digest
session.auth = HTTPBasicAuth(wsse.username, wsse.password)
if with_http_digest:
assert not with_http_basic
if not wsse.use_digest:
logger.warning("<UsernameToken> should be encoded as a digest.")
session.auth = HTTPDigestAuth(wsse.username, wsse.password)
transport = Transport(cache=cache, session=session)
return Client(wsdl=self.wsdl_file_url, wsse=wsse, transport=transport)
def get_service_binding_name(self, index_or_name: Union[str, int] = 0) -> str:
if isinstance(index_or_name, int):
return "{" + self.declaration + "}" + self.binding_names[index_or_name]
else:
return "{" + self.declaration + "}" + index_or_name
ONVIF_DECL_DEVICE_MANAGEMENT = OnvifWsdlDeclaration(
declaration="http://www.onvif.org/ver10/device/wsdl",
http_sub="device_service",
wsdl_file="devicemgmt.wsdl",
subclass="DeviceManagement",
binding_names=["DeviceBinding"],
)
ONVIF_DECL_MEDIA = OnvifWsdlDeclaration(
declaration="http://www.onvif.org/ver10/media/wsdl",
http_sub="Media",
wsdl_file="media.wsdl",
subclass="Media",
binding_names=["MediaBinding"],
)
ONVIF_DECL_EVENTS = OnvifWsdlDeclaration(
declaration="http://www.onvif.org/ver10/events/wsdl",
http_sub="Events",
wsdl_file="events.wsdl",
subclass="Events",
)
ONVIF_DECL_PTZ = OnvifWsdlDeclaration(
declaration="http://www.onvif.org/ver20/ptz/wsdl",
http_sub="PTZ",
wsdl_file="ptz.wsdl",
subclass="PTZ",
binding_names=["PTZBinding"],
)
ONVIF_DECL_IMAGING = OnvifWsdlDeclaration(
declaration="http://www.onvif.org/ver20/imaging/wsdl",
http_sub="Imaging",
wsdl_file="imaging.wsdl",
subclass="Imaging",
binding_names=["ImagingBinding"],
)
ONVIF_DECL_DEVICE_IO = OnvifWsdlDeclaration(
declaration="http://www.onvif.org/ver10/deviceIO/wsdl",
http_sub="DeviceIO",
wsdl_file="deviceio.wsdl",
subclass="DeviceIO",
)
ONVIF_DECL_ANALYTICS = OnvifWsdlDeclaration(
declaration="http://www.onvif.org/ver20/analytics/wsdl",
http_sub="Analytics",
wsdl_file="analytics.wsdl",
subclass="Analytics",
binding_names=["RuleEngineBinding", "AnalyticsEngineBinding"],
)
ONVIF_DECLS: Sequence[OnvifWsdlDeclaration] = (
ONVIF_DECL_DEVICE_MANAGEMENT,
ONVIF_DECL_MEDIA,
# ONVIF_DECL_EVENTS,
ONVIF_DECL_PTZ,
ONVIF_DECL_IMAGING,
# ONVIF_DECL_DEVICE_IO,
ONVIF_DECL_ANALYTICS,
)
def dumps_default(o: Any) -> Any:
if isinstance(o, datetime):
return o.isoformat()
elif isinstance(o, timedelta):
return o.total_seconds()
try:
return JSONEncoder().default(o)
except TypeError:
return str(o)
def create_username_token(
username: Optional[str],
password: Optional[str],
use_digest=False,
) -> Optional[UsernameToken]:
if not username:
return None
if not password:
return None
return UsernameToken(username=username, password=password, use_digest=use_digest)
def create_username_token_with_namespace(args: Namespace) -> Optional[UsernameToken]:
assert isinstance(args.username, (type(None), str))
assert isinstance(args.password, (type(None), str))
assert isinstance(args.use_digest, bool)
return create_username_token(args.username, args.password, args.use_digest)
def create_client_and_service(
decl: OnvifWsdlDeclaration,
args: Namespace,
) -> Tuple[Client, ServiceProxy]:
if not args.address:
raise ValueError("The 'address' argument is required.")
assert isinstance(args.no_cache, bool)
assert isinstance(args.cache_dir, str)
assert isinstance(args.with_http_basic, bool)
assert isinstance(args.with_http_digest, bool)
client = decl.create_client(
wsse=create_username_token_with_namespace(args),
cache_dir=None if args.no_cache else args.cache_dir,
with_http_basic=args.with_http_basic,
with_http_digest=args.with_http_digest,
)
binding_name = decl.get_service_binding_name()
service = client.create_service(binding_name, args.address)
return client, service
def create_service(decl: OnvifWsdlDeclaration, args: Namespace) -> ServiceProxy:
return create_client_and_service(decl, args)[1]
def ws_discovery_dict() -> List[Dict[str, any]]:
wsd = ThreadedWSDiscovery()
wsd.start()
try:
result = list()
for service in wsd.searchServices():
item = dict()
item["EPR"] = service.getEPR()
item["InstanceId"] = service.getInstanceId()
item["MessageNumber"] = service.getMessageNumber()
item["MetadataVersion"] = service.getMetadataVersion()
item["Scopes"] = [s.getValue() for s in service.getScopes()]
item["Types"] = [t.getFullname() for t in service.getTypes()]
item["XAddrs"] = [a for a in service.getXAddrs()]
result.append(item)
return result
finally:
wsd.stop()
def ws_discovery(args: Namespace):
assert args
return ws_discovery_dict()
# ------------------------------------------------------
# http://www.onvif.org/ver10/device/wsdl/devicemgmt.wsdl
# ------------------------------------------------------
def get_system_date_and_time(args: Namespace):
service = create_service(ONVIF_DECL_DEVICE_MANAGEMENT, args)
return service.GetSystemDateAndTime()
def get_capabilities(args: Namespace):
"""
This method has been replaced by the more generic GetServices method.
For capabilities of individual services refer to the GetServiceCapabilities methods.
"""
service = create_service(ONVIF_DECL_DEVICE_MANAGEMENT, args)
return service.GetCapabilities()
def get_services(args: Namespace):
assert isinstance(args.IncludeCapability, bool)
service = create_service(ONVIF_DECL_DEVICE_MANAGEMENT, args)
return service.GetServices(IncludeCapability=args.IncludeCapability)
def get_device_information(args: Namespace):
service = create_service(ONVIF_DECL_DEVICE_MANAGEMENT, args)
return service.GetDeviceInformation()
# ------------------------------------------------------
# http://www.onvif.org/ver10/media/wsdl/media.wsdl
# ------------------------------------------------------
def get_profiles(args: Namespace):
service = create_service(ONVIF_DECL_MEDIA, args)
return service.GetProfiles()
def get_stream_uri(args: Namespace):
assert isinstance(args.Protocol, str)
assert isinstance(args.Stream, str)
assert isinstance(args.ProfileToken, str)
assert args.Protocol in TRANSPORT_PROTOCOLS
if args.Protocol == TRANSPORT_PROTOCOL_TCP:
logger.warning(f"'{TRANSPORT_PROTOCOL_TCP}' protocol is deprecated")
assert args.Stream in STREAM_TYPES
assert len(args.ProfileToken) <= PROFILE_TOKEN_MAX_LENGTH
client, service = create_client_and_service(ONVIF_DECL_MEDIA, args)
schema = client.type_factory(namespace=ONVIF_V10_SCHEMA_URL)
transport = schema.Transport(Protocol=args.Protocol)
setup = schema.StreamSetup(Stream=args.Stream, Transport=transport)
return service.GetStreamUri(StreamSetup=setup, ProfileToken=args.ProfileToken)
def get_snapshot_uri(args: Namespace):
assert isinstance(args.ProfileToken, str)
assert len(args.ProfileToken) <= PROFILE_TOKEN_MAX_LENGTH
service = create_service(ONVIF_DECL_MEDIA, args)
return service.GetSnapshotUri(ProfileToken=args.ProfileToken)
def get_default_arguments(
cmdline: Optional[List[str]] = None,
namespace: Optional[Namespace] = None,
default_cache_dir: Optional[str] = None,
) -> Namespace:
parser = ArgumentParser(
prog=PROG,
epilog=EPILOG,
formatter_class=RawDescriptionHelpFormatter,
)
parser.add_argument(
"--address",
"-a",
metavar="{uri}",
default="",
help="URI to request ONVIF service",
)
parser.add_argument(
"--username",
"-u",
metavar="{id}",
default=None,
help="Username for <UsernameToken>",
)
parser.add_argument(
"--password",
"-p",
metavar="{pw}",
default=None,
help="Password for <UsernameToken>",
)
parser.add_argument(
"--use-digest",
"-d",
action="store_true",
default=False,
help="Digest-encode <UsernameToken>",
)
parser.add_argument(
"--with-http-basic",
"-B",
action="store_true",
default=False,
help="Add Basic authentication to the HTTP Authorization header.",
)
parser.add_argument(
"--with-http-digest",
"-D",
action="store_true",
default=False,
help="Add Digest authentication to the HTTP Authorization header.",
)
parser.add_argument(
"--use-json",
"-j",
action="store_true",
default=False,
help="The final result is output in JSON format.",
)
parser.add_argument(
"--json-indent",
"-i",
metavar="{indent}",
type=int,
default=4,
help="When outputting JSON, pretty-printed with that indent level."
)
parser.add_argument(
"--json-sort",
"-s",
action="store_true",
default=False,
help="When outputting JSON, sort by key.",
)
parser.add_argument(
"--no-cache",
action="store_true",
default=False,
help="Do not save the WSDL schema as a file.",
)
parser.add_argument(
"--cache-dir",
metavar="{dir}",
default=default_cache_dir,
help=f"Specify WSDL cache directory (default: '{default_cache_dir}')",
)
subparsers = parser.add_subparsers(dest="cmd")
discovery = subparsers.add_parser(
name="WS-Discovery",
help="Discover ONVIF devices.",
)
discovery.set_defaults(func=ws_discovery)
dt = subparsers.add_parser(
name="GetSystemDateAndTime",
help="This operation gets the device system date and time.",
)
dt.set_defaults(func=get_system_date_and_time)
caps = subparsers.add_parser(
name="GetCapabilities",
help="Returns the capabilities of the device service.",
)
caps.set_defaults(func=get_capabilities)
services = subparsers.add_parser(
name="GetServices",
help="Returns information about services on the device.",
)
services.add_argument("-IncludeCapability", action="store_true", default=False)
services.set_defaults(func=get_services)
info = subparsers.add_parser(
name="GetDeviceInformation",
help="Gets basic device information from the device.",
)
info.set_defaults(func=get_device_information)
profiles = subparsers.add_parser(
name="GetProfiles",
help="Get a list of media profiles.",
)
profiles.set_defaults(func=get_profiles)
stream_uri = subparsers.add_parser(
name="GetStreamUri",
help="Obtain the RTSP stream address.",
)
stream_uri.add_argument("-Protocol", choices=_PROTOCOLS, default=_RTSP)
stream_uri.add_argument("-Stream", choices=_STREAMS, default=_RTP_UNICAST)
stream_uri.add_argument("ProfileToken")
stream_uri.set_defaults(func=get_stream_uri)
snapshot_uri = subparsers.add_parser(
name="GetSnapshotUri",
help="Obtain a JPEG snapshot from the device.",
)
snapshot_uri.add_argument("ProfileToken")
snapshot_uri.set_defaults(func=get_snapshot_uri)
return parser.parse_known_args(cmdline, namespace)[0]
def main() -> None:
default_logger_setup()
default_cache_dir = find_default_wsdl_cache_dir()
args = get_default_arguments(default_cache_dir=default_cache_dir)
if args.cmd is None:
print("Empty command", file=sys.stderr)
sys.exit(1)
try:
o = serialize_object(args.func(args), dict)
assert isinstance(args.use_json, bool)
if args.use_json:
assert isinstance(args.json_indent, int)
assert isinstance(args.json_sort, bool)
indent = args.json_indent
sort = args.json_sort
json = dumps(o, indent=indent, sort_keys=sort, default=dumps_default)
print(json)
else:
pprint(o)
except Exception as e:
logger.exception(e)
sys.exit(1)
else:
sys.exit(0)
if __name__ == "__main__":
main()