MIME:Python
MIME를 Python으로 구현.
mime_codec.py Code
# -*- coding: utf-8 -*-
from typing import Any, Callable, Optional, Union
from recc.mime.mime_type import MimeType
MimeEncoder = Callable[[Any], bytes]
MimeDecoder = Callable[[bytes], Any]
class MimeCodec:
def __init__(
self,
mime: Union[str, MimeType],
encoder: Optional[MimeEncoder] = None,
decoder: Optional[MimeDecoder] = None,
):
if isinstance(mime, str):
self.mime = MimeType.parse(mime)
elif isinstance(mime, MimeType):
self.mime = mime
else:
raise ValueError(f"Invalid `mime` type: {type(mime).__name__}")
self.encoder = encoder
self.decoder = decoder
def exist_encoder(self) -> bool:
return self.encoder is not None
def exist_decoder(self) -> bool:
return self.decoder is not None
def set_encoder(self, encoder: MimeEncoder) -> None:
self.encoder = encoder
def set_decoder(self, decoder: MimeDecoder) -> None:
self.decoder = decoder
def remove_encoder(self) -> None:
self.encoder = None
def remove_decoder(self) -> None:
self.decoder = None
def encode(self, data: Any) -> bytes:
if self.encoder is None:
raise NotImplementedError(f"Not implemented {str(self.mime)} encoder")
return self.encoder(data)
def decode(self, data: bytes) -> Any:
if self.decoder is None:
raise NotImplementedError(f"Not implemented {str(self.mime)} decoder")
return self.decoder(data)
mime_codec_register.py Code
# -*- coding: utf-8 -*-
import pickle
from typing import Any, Callable, Dict, Optional
from recc.driver.json import global_json_byte_decoder, global_json_byte_encoder
from recc.mime.mime_codec import MimeCodec
from recc.mime.mime_type import APPLICATION_JSON, APPLICATION_OCTET_STREAM, TEXT_PLAIN
MimeEncoder = Callable[[Any], bytes]
MimeDecoder = Callable[[bytes], Any]
def create_pickle_codec() -> MimeCodec:
return MimeCodec(
APPLICATION_OCTET_STREAM,
lambda x: pickle.dumps(x),
lambda x: pickle.loads(x),
)
def create_text_codec() -> MimeCodec:
return MimeCodec(
TEXT_PLAIN,
lambda x: str(x).encode(encoding="utf-8"),
lambda x: str(x, encoding="utf-8"),
)
def create_json_codec() -> MimeCodec:
return MimeCodec(
APPLICATION_JSON,
global_json_byte_encoder,
global_json_byte_decoder,
)
class MimeCodecRegister:
def __init__(self, default: Optional[MimeCodec] = None):
self._mimes: Dict[str, MimeCodec] = dict()
self._default = default if default else create_pickle_codec()
@property
def mimes(self):
return self._mimes.keys()
def exist(self, mime: str) -> bool:
return mime in self._mimes.keys()
def exist_encoder(self, mime: str) -> bool:
return self.exist(mime) and self._mimes[mime].exist_encoder()
def exist_decoder(self, mime: str) -> bool:
return self.exist(mime) and self._mimes[mime].exist_decoder()
def set_encoder(self, mime: str, encoder: MimeEncoder) -> None:
if self.exist(mime):
self._mimes[mime].set_encoder(encoder)
else:
self.add(mime, encoder=encoder)
def set_decoder(self, mime: str, decoder: MimeDecoder) -> None:
if self.exist(mime):
self._mimes[mime].set_decoder(decoder)
else:
self.add(mime, decoder=decoder)
def set_default_encoder(self, encoder: MimeEncoder) -> None:
self._default.set_encoder(encoder)
def set_default_decoder(self, decoder: MimeDecoder) -> None:
self._default.set_decoder(decoder)
def remove_encoder(self, mime: str) -> None:
if self.exist(mime):
self._mimes[mime].remove_encoder()
def remove_decoder(self, mime: str) -> None:
if self.exist(mime):
self._mimes[mime].remove_decoder()
def add(
self,
mime: str,
encoder: Optional[MimeEncoder] = None,
decoder: Optional[MimeDecoder] = None,
) -> None:
self._mimes[mime] = MimeCodec(mime, encoder=encoder, decoder=decoder)
def add_mime_codec(self, codec: MimeCodec) -> None:
self._mimes[str(codec.mime)] = codec
def remove(self, mime: str) -> None:
del self._mimes[mime]
def encode(self, mime: str, data: Any) -> bytes:
if mime in self._mimes:
return self._mimes[mime].encode(data)
return self._default.encode(data)
def decode(self, mime: str, data: bytes) -> Any:
if mime in self._mimes:
return self._mimes[mime].decode(data)
return self._default.decode(data)
def encode_json(self, data: Any) -> bytes:
return self.encode(APPLICATION_JSON, data)
def decode_json(self, data: bytes) -> Any:
return self.decode(APPLICATION_JSON, data)
def encode_text(self, data: Any) -> bytes:
return self.encode(TEXT_PLAIN, data)
def decode_text(self, data: bytes) -> Any:
return self.decode(TEXT_PLAIN, data)
def encode_binary(self, data: Any) -> bytes:
return self.encode(APPLICATION_OCTET_STREAM, data)
def decode_binary(self, data: bytes) -> Any:
return self.decode(APPLICATION_OCTET_STREAM, data)
def encode_default(self, data: Any) -> bytes:
return self._default.encode(data)
def decode_default(self, data: bytes) -> Any:
return self._default.decode(data)
@classmethod
def default(cls):
result = cls()
result.add_mime_codec(create_text_codec())
result.add_mime_codec(create_json_codec())
return result
_GLOBAL_MIME_CODEC_REGISTER = MimeCodecRegister.default()
def get_global_mime_register() -> MimeCodecRegister:
return _GLOBAL_MIME_CODEC_REGISTER
def global_mime_encoder(mime: str, data: Any) -> bytes:
return _GLOBAL_MIME_CODEC_REGISTER.encode(mime, data)
def global_mime_decoder(mime: str, data: bytes) -> Any:
return _GLOBAL_MIME_CODEC_REGISTER.decode(mime, data)
mime_register.py Code
# -*- coding: utf-8 -*-
import os
from typing import List
from recc.mime.mime_type import MimeType
def read_csv_mimes(csv_path: str, encoding="utf-8", start_line=1) -> List[MimeType]:
result = list()
with open(csv_path, "r", encoding=encoding) as f:
for line in f.readlines()[start_line:]:
items = line.split(",")
if len(items) != 3:
continue
name = items[0].strip()
template = items[1].strip()
reference = items[2].strip()
try:
result.append(MimeType.parse(template, name, reference))
except: # noqa
continue
return result
def read_csv_mimes_by_default(name: str) -> List[MimeType]:
csv_path = os.path.join(os.path.dirname(__file__), f"{name}.csv")
try:
return read_csv_mimes(csv_path)
except BaseException as e: # noqa
return list()
TYPE_APPLICATION = "application"
TYPE_AUDIO = "audio"
TYPE_FONT = "font"
TYPE_EXAMPLE = "example"
TYPE_IMAGE = "image"
TYPE_MESSAGE = "message"
TYPE_MODEL = "model"
TYPE_MULTIPART = "multipart"
TYPE_TEXT = "text"
TYPE_VIDEO = "video"
REGISTERED_TYPES = (
TYPE_APPLICATION,
TYPE_AUDIO,
TYPE_FONT,
TYPE_EXAMPLE,
TYPE_IMAGE,
TYPE_MESSAGE,
TYPE_MODEL,
TYPE_MULTIPART,
TYPE_TEXT,
TYPE_VIDEO,
)
REGISTERED_MIMES = {t: read_csv_mimes_by_default(t) for t in REGISTERED_TYPES}
mime_type.py Code
# -*- coding: utf-8 -*-
from typing import Final, List, Optional, Tuple
TYPE_SEPARATOR: Final[str] = "/"
PARAMETER_SEPARATOR: Final[str] = ";"
PARAM_SEP: Final[str] = PARAMETER_SEPARATOR
ANY_WILDCARD: Final[str] = "*"
class MimeType:
"""
A media type (formerly known as MIME type) is a two-part identifier
for file formats and format contents transmitted on the Internet.
See also:
* https://en.wikipedia.org/wiki/Media_type
* https://datatracker.ietf.org/doc/html/rfc2045#section-5.1
* https://www.iana.org/assignments/media-types/media-types.xhtml
"""
def __init__(
self,
family: str,
subtype: str,
parameter: Optional[str] = None,
original: Optional[str] = None,
name: Optional[str] = None,
reference: Optional[str] = None,
):
# The `type` of MIME is a Python reserved character.
# Therefore, the name will be changed to `family`.
self.family = family.strip().lower()
# Does not parse `tree` and `suffix`.
self.subtype = subtype.strip().lower()
# Does not parse `attribute` and `value`.
self.parameter = parameter
# Full original text.
self.original = original
self.name = name
self.reference = reference
@property
def mime(self) -> str:
prefix = f"{self.family}{TYPE_SEPARATOR}{self.subtype}"
if self.parameter:
return f"{prefix}{PARAM_SEP}{self.parameter}"
else:
return prefix
@property
def parameter_tuple(self) -> Tuple[Optional[str], Optional[str]]:
if not self.parameter:
return None, None
kv = self.parameter.split("=", 1)
key = kv[0].strip()
if len(kv) == 1:
return key, None
assert len(kv) == 2
return key, kv[1].strip()
def get_parameter_value(
self,
key: str,
default_value: Optional[str] = None,
lower_key=True,
) -> Optional[str]:
pkey, pval = self.parameter_tuple
if pkey is None or pval is None:
return default_value
pkey = pkey.lower() if lower_key else pkey
if pkey != key:
return default_value
return pval
@property
def charset(self) -> Optional[str]:
return self.get_parameter_value("charset", lower_key=True)
@property
def q(self) -> Optional[float]:
val = self.get_parameter_value("q", lower_key=True)
if val is None:
return None
try:
return float(val)
except ValueError:
return None
@classmethod
def parse(
cls,
text: str,
name: Optional[str] = None,
reference: Optional[str] = None,
) -> "MimeType":
types_and_param = text.split(PARAM_SEP, 1)
assert 1 <= len(types_and_param) <= 2
parameter: Optional[str] = None
if len(types_and_param) == 2:
parameter = types_and_param[1].strip()
types = types_and_param[0].strip().split(TYPE_SEPARATOR, 1)
assert 1 <= len(types) <= 2
family = types[0]
if len(types) == 1:
if family == ANY_WILDCARD:
subtype = ANY_WILDCARD
else:
subtype = str()
else:
assert len(types) == 2
subtype = types[1]
return cls(family, subtype, parameter, text, name, reference)
def __str__(self) -> str:
return self.mime
def test_from_accept(self, accept: "MimeType") -> bool:
if self.family == accept.family:
pass
elif accept.family == ANY_WILDCARD:
pass
elif self.family == ANY_WILDCARD:
pass
else:
assert self.family != accept.family
return False
if self.subtype == accept.subtype:
return True
elif accept.subtype == ANY_WILDCARD:
return True
elif self.subtype == ANY_WILDCARD:
return True
else:
assert self.subtype != accept.subtype
return False
def test_from_accepts(self, accepts: List["MimeType"]) -> bool:
for accept in accepts:
if self.test_from_accept(accept):
return True
return False
SINGLE_ANY: Final[str] = "*"
BOTH_ANY: Final[str] = "*/*"
APPLICATION_OCTET_STREAM: Final[str] = "application/octet-stream"
APPLICATION_JSON: Final[str] = "application/json"
APPLICATION_XML: Final[str] = "application/xml"
APPLICATION_YAML: Final[str] = "application/x-yaml"
APPLICATION_FORM: Final[str] = "application/x-www-form-urlencoded"
TEXT_PLAIN: Final[str] = "text/plain"
CHARSET_UTF8: Final[str] = "charset=utf-8"
MIME_ANY = MimeType.parse(SINGLE_ANY)
MIME_ANY_BOTH = MimeType.parse(BOTH_ANY)
MIME_APPLICATION_OCTET_STREAM = MimeType.parse(APPLICATION_OCTET_STREAM)
MIME_APPLICATION_JSON = MimeType.parse(APPLICATION_JSON)
MIME_APPLICATION_XML = MimeType.parse(APPLICATION_XML)
MIME_APPLICATION_YAML = MimeType.parse(APPLICATION_YAML)
MIME_APPLICATION_FORM = MimeType.parse(APPLICATION_FORM)
MIME_TEXT_PLAIN = MimeType.parse(TEXT_PLAIN)
MIME_APPLICATION_JSON_UTF8 = MimeType.parse(APPLICATION_JSON + PARAM_SEP + CHARSET_UTF8)
MIME_APPLICATION_XML_UTF8 = MimeType.parse(APPLICATION_XML + PARAM_SEP + CHARSET_UTF8)
MIME_APPLICATION_YAML_UTF8 = MimeType.parse(APPLICATION_YAML + PARAM_SEP + CHARSET_UTF8)
MIME_APPLICATION_FORM_UTF8 = MimeType.parse(APPLICATION_FORM + PARAM_SEP + CHARSET_UTF8)
MIME_TEXT_PLAIN_UTF8 = MimeType.parse(TEXT_PLAIN + PARAM_SEP + CHARSET_UTF8)
TestCase
from unittest import TestCase, main
from recc.mime.mime_codec import MimeCodec
from recc.mime.mime_codec_register import get_global_mime_register
from recc.mime.mime_register import (
REGISTERED_MIMES,
TYPE_APPLICATION,
TYPE_AUDIO,
TYPE_EXAMPLE,
TYPE_FONT,
TYPE_IMAGE,
TYPE_MESSAGE,
TYPE_MODEL,
TYPE_MULTIPART,
TYPE_TEXT,
TYPE_VIDEO,
)
from recc.mime.mime_type import (
MIME_ANY,
MIME_ANY_BOTH,
MIME_APPLICATION_JSON,
MIME_APPLICATION_OCTET_STREAM,
MIME_TEXT_PLAIN,
MimeType,
)
class MimeCodecTestCase(TestCase):
def test_default(self):
codec = MimeCodec("unknown/text")
self.assertEqual("unknown/text", codec.mime.mime)
class _ComplexObject:
def __init__(self):
self.test_data1 = "text"
self.test_data2 = 100
self.test_data3 = 3.14
class MimeCodecRegisterTestCase(TestCase):
def test_binary(self):
test_data = {"aa": 11, "bb": 22.5, "cc": [1, 2, 3]}
codec = get_global_mime_register()
encoded_data = codec.encode_binary(test_data)
self.assertIsInstance(encoded_data, bytes)
decoded_data = codec.decode_binary(encoded_data)
self.assertIsInstance(decoded_data, dict)
self.assertEqual(decoded_data, test_data)
def test_binary_complex(self):
test_data = _ComplexObject()
codec = get_global_mime_register()
encoded_data = codec.encode_binary(test_data)
self.assertIsInstance(encoded_data, bytes)
decoded_data = codec.decode_binary(encoded_data)
self.assertIsInstance(decoded_data, _ComplexObject)
self.assertEqual(decoded_data.test_data1, test_data.test_data1)
self.assertEqual(decoded_data.test_data2, test_data.test_data2)
self.assertEqual(decoded_data.test_data3, test_data.test_data3)
def test_json(self):
test_data = {"aa": 11, "bb": 22.5, "cc": [1, 2, 3]}
codec = get_global_mime_register()
encoded_data = codec.encode_json(test_data)
self.assertIsInstance(encoded_data, bytes)
decoded_data = codec.decode_json(encoded_data)
self.assertIsInstance(decoded_data, dict)
self.assertEqual(decoded_data, test_data)
def test_text(self):
test_data = "Hello, World!"
codec = get_global_mime_register()
encoded_data = codec.encode_text(test_data)
self.assertIsInstance(encoded_data, bytes)
decoded_data = codec.decode_text(encoded_data)
self.assertIsInstance(decoded_data, str)
self.assertEqual(decoded_data, test_data)
class MimeRegisterTestCase(TestCase):
def test_registered_mimes(self):
self.assertLess(0, len(REGISTERED_MIMES))
self.assertEqual(0, len(REGISTERED_MIMES[TYPE_APPLICATION]))
self.assertEqual(0, len(REGISTERED_MIMES[TYPE_AUDIO]))
self.assertEqual(0, len(REGISTERED_MIMES[TYPE_FONT]))
self.assertEqual(0, len(REGISTERED_MIMES[TYPE_EXAMPLE]))
self.assertEqual(0, len(REGISTERED_MIMES[TYPE_IMAGE]))
self.assertEqual(0, len(REGISTERED_MIMES[TYPE_MESSAGE]))
self.assertEqual(0, len(REGISTERED_MIMES[TYPE_MODEL]))
self.assertEqual(0, len(REGISTERED_MIMES[TYPE_MULTIPART]))
self.assertEqual(0, len(REGISTERED_MIMES[TYPE_TEXT]))
self.assertEqual(0, len(REGISTERED_MIMES[TYPE_VIDEO]))
class MimeTypeTestCase(TestCase):
def test_default(self):
self.assertEqual("application", MIME_APPLICATION_OCTET_STREAM.family)
self.assertEqual("octet-stream", MIME_APPLICATION_OCTET_STREAM.subtype)
self.assertEqual("application", MIME_APPLICATION_JSON.family)
self.assertEqual("json", MIME_APPLICATION_JSON.subtype)
self.assertEqual("text", MIME_TEXT_PLAIN.family)
self.assertEqual("plain", MIME_TEXT_PLAIN.subtype)
def test_accept(self):
text_any = MimeType.parse("text/*")
any_plain = MimeType.parse("*/plain")
self.assertTrue(MIME_TEXT_PLAIN.test_from_accepts([text_any]))
self.assertTrue(MIME_TEXT_PLAIN.test_from_accepts([any_plain]))
self.assertTrue(MIME_TEXT_PLAIN.test_from_accepts([MIME_ANY]))
self.assertTrue(MIME_TEXT_PLAIN.test_from_accepts([MIME_ANY_BOTH]))
text_unknown = MimeType.parse("text/unknown")
unknown_text = MimeType.parse("unknown/text")
error_mimes = [text_unknown, text_unknown]
self.assertFalse(MIME_TEXT_PLAIN.test_from_accepts([text_unknown]))
self.assertFalse(MIME_TEXT_PLAIN.test_from_accepts([unknown_text]))
self.assertFalse(MIME_TEXT_PLAIN.test_from_accepts(error_mimes))
mixed_mimes = [text_unknown, text_unknown, MIME_ANY]
self.assertTrue(MIME_TEXT_PLAIN.test_from_accepts(mixed_mimes))
def test_parameters(self):
xml_mime = MimeType.parse("application/xml;q=0.9")
self.assertEqual("q=0.9", xml_mime.parameter)
any_mime = MimeType.parse("*/*;q=0.8")
self.assertEqual("q=0.8", any_mime.parameter)
See also