Skip to content

MIME:Python

MIME를 Python으로 구현.

mime_codec.py Code

# -*- coding: utf-8 -*-

from typing import Any, Callable, Optional, Union

from recc.mime.mime_type import MimeType

MimeEncoder = Callable[[Any], bytes]
MimeDecoder = Callable[[bytes], Any]


class MimeCodec:
    def __init__(
        self,
        mime: Union[str, MimeType],
        encoder: Optional[MimeEncoder] = None,
        decoder: Optional[MimeDecoder] = None,
    ):
        if isinstance(mime, str):
            self.mime = MimeType.parse(mime)
        elif isinstance(mime, MimeType):
            self.mime = mime
        else:
            raise ValueError(f"Invalid `mime` type: {type(mime).__name__}")

        self.encoder = encoder
        self.decoder = decoder

    def exist_encoder(self) -> bool:
        return self.encoder is not None

    def exist_decoder(self) -> bool:
        return self.decoder is not None

    def set_encoder(self, encoder: MimeEncoder) -> None:
        self.encoder = encoder

    def set_decoder(self, decoder: MimeDecoder) -> None:
        self.decoder = decoder

    def remove_encoder(self) -> None:
        self.encoder = None

    def remove_decoder(self) -> None:
        self.decoder = None

    def encode(self, data: Any) -> bytes:
        if self.encoder is None:
            raise NotImplementedError(f"Not implemented {str(self.mime)} encoder")
        return self.encoder(data)

    def decode(self, data: bytes) -> Any:
        if self.decoder is None:
            raise NotImplementedError(f"Not implemented {str(self.mime)} decoder")
        return self.decoder(data)

mime_codec_register.py Code

# -*- coding: utf-8 -*-

import pickle
from typing import Any, Callable, Dict, Optional

from recc.driver.json import global_json_byte_decoder, global_json_byte_encoder
from recc.mime.mime_codec import MimeCodec
from recc.mime.mime_type import APPLICATION_JSON, APPLICATION_OCTET_STREAM, TEXT_PLAIN

MimeEncoder = Callable[[Any], bytes]
MimeDecoder = Callable[[bytes], Any]


def create_pickle_codec() -> MimeCodec:
    return MimeCodec(
        APPLICATION_OCTET_STREAM,
        lambda x: pickle.dumps(x),
        lambda x: pickle.loads(x),
    )


def create_text_codec() -> MimeCodec:
    return MimeCodec(
        TEXT_PLAIN,
        lambda x: str(x).encode(encoding="utf-8"),
        lambda x: str(x, encoding="utf-8"),
    )


def create_json_codec() -> MimeCodec:
    return MimeCodec(
        APPLICATION_JSON,
        global_json_byte_encoder,
        global_json_byte_decoder,
    )


class MimeCodecRegister:
    def __init__(self, default: Optional[MimeCodec] = None):
        self._mimes: Dict[str, MimeCodec] = dict()
        self._default = default if default else create_pickle_codec()

    @property
    def mimes(self):
        return self._mimes.keys()

    def exist(self, mime: str) -> bool:
        return mime in self._mimes.keys()

    def exist_encoder(self, mime: str) -> bool:
        return self.exist(mime) and self._mimes[mime].exist_encoder()

    def exist_decoder(self, mime: str) -> bool:
        return self.exist(mime) and self._mimes[mime].exist_decoder()

    def set_encoder(self, mime: str, encoder: MimeEncoder) -> None:
        if self.exist(mime):
            self._mimes[mime].set_encoder(encoder)
        else:
            self.add(mime, encoder=encoder)

    def set_decoder(self, mime: str, decoder: MimeDecoder) -> None:
        if self.exist(mime):
            self._mimes[mime].set_decoder(decoder)
        else:
            self.add(mime, decoder=decoder)

    def set_default_encoder(self, encoder: MimeEncoder) -> None:
        self._default.set_encoder(encoder)

    def set_default_decoder(self, decoder: MimeDecoder) -> None:
        self._default.set_decoder(decoder)

    def remove_encoder(self, mime: str) -> None:
        if self.exist(mime):
            self._mimes[mime].remove_encoder()

    def remove_decoder(self, mime: str) -> None:
        if self.exist(mime):
            self._mimes[mime].remove_decoder()

    def add(
        self,
        mime: str,
        encoder: Optional[MimeEncoder] = None,
        decoder: Optional[MimeDecoder] = None,
    ) -> None:
        self._mimes[mime] = MimeCodec(mime, encoder=encoder, decoder=decoder)

    def add_mime_codec(self, codec: MimeCodec) -> None:
        self._mimes[str(codec.mime)] = codec

    def remove(self, mime: str) -> None:
        del self._mimes[mime]

    def encode(self, mime: str, data: Any) -> bytes:
        if mime in self._mimes:
            return self._mimes[mime].encode(data)
        return self._default.encode(data)

    def decode(self, mime: str, data: bytes) -> Any:
        if mime in self._mimes:
            return self._mimes[mime].decode(data)
        return self._default.decode(data)

    def encode_json(self, data: Any) -> bytes:
        return self.encode(APPLICATION_JSON, data)

    def decode_json(self, data: bytes) -> Any:
        return self.decode(APPLICATION_JSON, data)

    def encode_text(self, data: Any) -> bytes:
        return self.encode(TEXT_PLAIN, data)

    def decode_text(self, data: bytes) -> Any:
        return self.decode(TEXT_PLAIN, data)

    def encode_binary(self, data: Any) -> bytes:
        return self.encode(APPLICATION_OCTET_STREAM, data)

    def decode_binary(self, data: bytes) -> Any:
        return self.decode(APPLICATION_OCTET_STREAM, data)

    def encode_default(self, data: Any) -> bytes:
        return self._default.encode(data)

    def decode_default(self, data: bytes) -> Any:
        return self._default.decode(data)

    @classmethod
    def default(cls):
        result = cls()
        result.add_mime_codec(create_text_codec())
        result.add_mime_codec(create_json_codec())
        return result


_GLOBAL_MIME_CODEC_REGISTER = MimeCodecRegister.default()


def get_global_mime_register() -> MimeCodecRegister:
    return _GLOBAL_MIME_CODEC_REGISTER


def global_mime_encoder(mime: str, data: Any) -> bytes:
    return _GLOBAL_MIME_CODEC_REGISTER.encode(mime, data)


def global_mime_decoder(mime: str, data: bytes) -> Any:
    return _GLOBAL_MIME_CODEC_REGISTER.decode(mime, data)

mime_register.py Code

# -*- coding: utf-8 -*-

import os
from typing import List

from recc.mime.mime_type import MimeType


def read_csv_mimes(csv_path: str, encoding="utf-8", start_line=1) -> List[MimeType]:
    result = list()
    with open(csv_path, "r", encoding=encoding) as f:
        for line in f.readlines()[start_line:]:
            items = line.split(",")
            if len(items) != 3:
                continue

            name = items[0].strip()
            template = items[1].strip()
            reference = items[2].strip()
            try:
                result.append(MimeType.parse(template, name, reference))
            except:  # noqa
                continue
    return result


def read_csv_mimes_by_default(name: str) -> List[MimeType]:
    csv_path = os.path.join(os.path.dirname(__file__), f"{name}.csv")
    try:
        return read_csv_mimes(csv_path)
    except BaseException as e:  # noqa
        return list()


TYPE_APPLICATION = "application"
TYPE_AUDIO = "audio"
TYPE_FONT = "font"
TYPE_EXAMPLE = "example"
TYPE_IMAGE = "image"
TYPE_MESSAGE = "message"
TYPE_MODEL = "model"
TYPE_MULTIPART = "multipart"
TYPE_TEXT = "text"
TYPE_VIDEO = "video"

REGISTERED_TYPES = (
    TYPE_APPLICATION,
    TYPE_AUDIO,
    TYPE_FONT,
    TYPE_EXAMPLE,
    TYPE_IMAGE,
    TYPE_MESSAGE,
    TYPE_MODEL,
    TYPE_MULTIPART,
    TYPE_TEXT,
    TYPE_VIDEO,
)
REGISTERED_MIMES = {t: read_csv_mimes_by_default(t) for t in REGISTERED_TYPES}

mime_type.py Code

# -*- coding: utf-8 -*-

from typing import Final, List, Optional, Tuple

TYPE_SEPARATOR: Final[str] = "/"
PARAMETER_SEPARATOR: Final[str] = ";"
PARAM_SEP: Final[str] = PARAMETER_SEPARATOR
ANY_WILDCARD: Final[str] = "*"


class MimeType:
    """
    A media type (formerly known as MIME type) is a two-part identifier
    for file formats and format contents transmitted on the Internet.

    See also:
    * https://en.wikipedia.org/wiki/Media_type
    * https://datatracker.ietf.org/doc/html/rfc2045#section-5.1
    * https://www.iana.org/assignments/media-types/media-types.xhtml
    """

    def __init__(
        self,
        family: str,
        subtype: str,
        parameter: Optional[str] = None,
        original: Optional[str] = None,
        name: Optional[str] = None,
        reference: Optional[str] = None,
    ):
        # The `type` of MIME is a Python reserved character.
        # Therefore, the name will be changed to `family`.
        self.family = family.strip().lower()

        # Does not parse `tree` and `suffix`.
        self.subtype = subtype.strip().lower()

        # Does not parse `attribute` and `value`.
        self.parameter = parameter

        # Full original text.
        self.original = original

        self.name = name
        self.reference = reference

    @property
    def mime(self) -> str:
        prefix = f"{self.family}{TYPE_SEPARATOR}{self.subtype}"
        if self.parameter:
            return f"{prefix}{PARAM_SEP}{self.parameter}"
        else:
            return prefix

    @property
    def parameter_tuple(self) -> Tuple[Optional[str], Optional[str]]:
        if not self.parameter:
            return None, None
        kv = self.parameter.split("=", 1)
        key = kv[0].strip()
        if len(kv) == 1:
            return key, None
        assert len(kv) == 2
        return key, kv[1].strip()

    def get_parameter_value(
        self,
        key: str,
        default_value: Optional[str] = None,
        lower_key=True,
    ) -> Optional[str]:
        pkey, pval = self.parameter_tuple
        if pkey is None or pval is None:
            return default_value
        pkey = pkey.lower() if lower_key else pkey
        if pkey != key:
            return default_value
        return pval

    @property
    def charset(self) -> Optional[str]:
        return self.get_parameter_value("charset", lower_key=True)

    @property
    def q(self) -> Optional[float]:
        val = self.get_parameter_value("q", lower_key=True)
        if val is None:
            return None
        try:
            return float(val)
        except ValueError:
            return None

    @classmethod
    def parse(
        cls,
        text: str,
        name: Optional[str] = None,
        reference: Optional[str] = None,
    ) -> "MimeType":
        types_and_param = text.split(PARAM_SEP, 1)
        assert 1 <= len(types_and_param) <= 2

        parameter: Optional[str] = None
        if len(types_and_param) == 2:
            parameter = types_and_param[1].strip()

        types = types_and_param[0].strip().split(TYPE_SEPARATOR, 1)
        assert 1 <= len(types) <= 2

        family = types[0]
        if len(types) == 1:
            if family == ANY_WILDCARD:
                subtype = ANY_WILDCARD
            else:
                subtype = str()
        else:
            assert len(types) == 2
            subtype = types[1]

        return cls(family, subtype, parameter, text, name, reference)

    def __str__(self) -> str:
        return self.mime

    def test_from_accept(self, accept: "MimeType") -> bool:
        if self.family == accept.family:
            pass
        elif accept.family == ANY_WILDCARD:
            pass
        elif self.family == ANY_WILDCARD:
            pass
        else:
            assert self.family != accept.family
            return False

        if self.subtype == accept.subtype:
            return True
        elif accept.subtype == ANY_WILDCARD:
            return True
        elif self.subtype == ANY_WILDCARD:
            return True
        else:
            assert self.subtype != accept.subtype
            return False

    def test_from_accepts(self, accepts: List["MimeType"]) -> bool:
        for accept in accepts:
            if self.test_from_accept(accept):
                return True
        return False


SINGLE_ANY: Final[str] = "*"
BOTH_ANY: Final[str] = "*/*"

APPLICATION_OCTET_STREAM: Final[str] = "application/octet-stream"
APPLICATION_JSON: Final[str] = "application/json"
APPLICATION_XML: Final[str] = "application/xml"
APPLICATION_YAML: Final[str] = "application/x-yaml"
APPLICATION_FORM: Final[str] = "application/x-www-form-urlencoded"
TEXT_PLAIN: Final[str] = "text/plain"

CHARSET_UTF8: Final[str] = "charset=utf-8"

MIME_ANY = MimeType.parse(SINGLE_ANY)
MIME_ANY_BOTH = MimeType.parse(BOTH_ANY)
MIME_APPLICATION_OCTET_STREAM = MimeType.parse(APPLICATION_OCTET_STREAM)
MIME_APPLICATION_JSON = MimeType.parse(APPLICATION_JSON)
MIME_APPLICATION_XML = MimeType.parse(APPLICATION_XML)
MIME_APPLICATION_YAML = MimeType.parse(APPLICATION_YAML)
MIME_APPLICATION_FORM = MimeType.parse(APPLICATION_FORM)
MIME_TEXT_PLAIN = MimeType.parse(TEXT_PLAIN)

MIME_APPLICATION_JSON_UTF8 = MimeType.parse(APPLICATION_JSON + PARAM_SEP + CHARSET_UTF8)
MIME_APPLICATION_XML_UTF8 = MimeType.parse(APPLICATION_XML + PARAM_SEP + CHARSET_UTF8)
MIME_APPLICATION_YAML_UTF8 = MimeType.parse(APPLICATION_YAML + PARAM_SEP + CHARSET_UTF8)
MIME_APPLICATION_FORM_UTF8 = MimeType.parse(APPLICATION_FORM + PARAM_SEP + CHARSET_UTF8)
MIME_TEXT_PLAIN_UTF8 = MimeType.parse(TEXT_PLAIN + PARAM_SEP + CHARSET_UTF8)

TestCase

from unittest import TestCase, main

from recc.mime.mime_codec import MimeCodec
from recc.mime.mime_codec_register import get_global_mime_register
from recc.mime.mime_register import (
    REGISTERED_MIMES,
    TYPE_APPLICATION,
    TYPE_AUDIO,
    TYPE_EXAMPLE,
    TYPE_FONT,
    TYPE_IMAGE,
    TYPE_MESSAGE,
    TYPE_MODEL,
    TYPE_MULTIPART,
    TYPE_TEXT,
    TYPE_VIDEO,
)
from recc.mime.mime_type import (
    MIME_ANY,
    MIME_ANY_BOTH,
    MIME_APPLICATION_JSON,
    MIME_APPLICATION_OCTET_STREAM,
    MIME_TEXT_PLAIN,
    MimeType,
)


class MimeCodecTestCase(TestCase):
    def test_default(self):
        codec = MimeCodec("unknown/text")
        self.assertEqual("unknown/text", codec.mime.mime)


class _ComplexObject:
    def __init__(self):
        self.test_data1 = "text"
        self.test_data2 = 100
        self.test_data3 = 3.14


class MimeCodecRegisterTestCase(TestCase):
    def test_binary(self):
        test_data = {"aa": 11, "bb": 22.5, "cc": [1, 2, 3]}
        codec = get_global_mime_register()
        encoded_data = codec.encode_binary(test_data)
        self.assertIsInstance(encoded_data, bytes)
        decoded_data = codec.decode_binary(encoded_data)
        self.assertIsInstance(decoded_data, dict)
        self.assertEqual(decoded_data, test_data)

    def test_binary_complex(self):
        test_data = _ComplexObject()
        codec = get_global_mime_register()
        encoded_data = codec.encode_binary(test_data)
        self.assertIsInstance(encoded_data, bytes)
        decoded_data = codec.decode_binary(encoded_data)
        self.assertIsInstance(decoded_data, _ComplexObject)
        self.assertEqual(decoded_data.test_data1, test_data.test_data1)
        self.assertEqual(decoded_data.test_data2, test_data.test_data2)
        self.assertEqual(decoded_data.test_data3, test_data.test_data3)

    def test_json(self):
        test_data = {"aa": 11, "bb": 22.5, "cc": [1, 2, 3]}
        codec = get_global_mime_register()
        encoded_data = codec.encode_json(test_data)
        self.assertIsInstance(encoded_data, bytes)
        decoded_data = codec.decode_json(encoded_data)
        self.assertIsInstance(decoded_data, dict)
        self.assertEqual(decoded_data, test_data)

    def test_text(self):
        test_data = "Hello, World!"
        codec = get_global_mime_register()
        encoded_data = codec.encode_text(test_data)
        self.assertIsInstance(encoded_data, bytes)
        decoded_data = codec.decode_text(encoded_data)
        self.assertIsInstance(decoded_data, str)
        self.assertEqual(decoded_data, test_data)


class MimeRegisterTestCase(TestCase):
    def test_registered_mimes(self):
        self.assertLess(0, len(REGISTERED_MIMES))
        self.assertEqual(0, len(REGISTERED_MIMES[TYPE_APPLICATION]))
        self.assertEqual(0, len(REGISTERED_MIMES[TYPE_AUDIO]))
        self.assertEqual(0, len(REGISTERED_MIMES[TYPE_FONT]))
        self.assertEqual(0, len(REGISTERED_MIMES[TYPE_EXAMPLE]))
        self.assertEqual(0, len(REGISTERED_MIMES[TYPE_IMAGE]))
        self.assertEqual(0, len(REGISTERED_MIMES[TYPE_MESSAGE]))
        self.assertEqual(0, len(REGISTERED_MIMES[TYPE_MODEL]))
        self.assertEqual(0, len(REGISTERED_MIMES[TYPE_MULTIPART]))
        self.assertEqual(0, len(REGISTERED_MIMES[TYPE_TEXT]))
        self.assertEqual(0, len(REGISTERED_MIMES[TYPE_VIDEO]))


class MimeTypeTestCase(TestCase):
    def test_default(self):
        self.assertEqual("application", MIME_APPLICATION_OCTET_STREAM.family)
        self.assertEqual("octet-stream", MIME_APPLICATION_OCTET_STREAM.subtype)

        self.assertEqual("application", MIME_APPLICATION_JSON.family)
        self.assertEqual("json", MIME_APPLICATION_JSON.subtype)

        self.assertEqual("text", MIME_TEXT_PLAIN.family)
        self.assertEqual("plain", MIME_TEXT_PLAIN.subtype)

    def test_accept(self):
        text_any = MimeType.parse("text/*")
        any_plain = MimeType.parse("*/plain")
        self.assertTrue(MIME_TEXT_PLAIN.test_from_accepts([text_any]))
        self.assertTrue(MIME_TEXT_PLAIN.test_from_accepts([any_plain]))
        self.assertTrue(MIME_TEXT_PLAIN.test_from_accepts([MIME_ANY]))
        self.assertTrue(MIME_TEXT_PLAIN.test_from_accepts([MIME_ANY_BOTH]))

        text_unknown = MimeType.parse("text/unknown")
        unknown_text = MimeType.parse("unknown/text")
        error_mimes = [text_unknown, text_unknown]
        self.assertFalse(MIME_TEXT_PLAIN.test_from_accepts([text_unknown]))
        self.assertFalse(MIME_TEXT_PLAIN.test_from_accepts([unknown_text]))
        self.assertFalse(MIME_TEXT_PLAIN.test_from_accepts(error_mimes))

        mixed_mimes = [text_unknown, text_unknown, MIME_ANY]
        self.assertTrue(MIME_TEXT_PLAIN.test_from_accepts(mixed_mimes))

    def test_parameters(self):
        xml_mime = MimeType.parse("application/xml;q=0.9")
        self.assertEqual("q=0.9", xml_mime.parameter)

        any_mime = MimeType.parse("*/*;q=0.8")
        self.assertEqual("q=0.8", any_mime.parameter)

See also