Skip to content

Elasticsearch:Python

오픈소스 포크버전 호환성 문제

ElasticSearch 공식 파이썬 라이브러리가 더 이상 오픈소스 포크버전과 동작하지 않음 ==

  • Verify connection to Elasticsearch #1623
  • 첫번째 API 호출 전에 버전 번호와 특정 HTTP 헤더를 체크해서 에러를 발생시키도록 수정
    • 버전 필드 없거나 6.0 이하는 에러
    • 7.14 이상의 버전은 X-Elastic-Product HTTP 헤더가 없거나 값이 "Elasticsearch" 가 아니면 에러
    • 7.14.0 아래 7.x 버전은 tagline 에 "You know, for Search", build_flavor 가 "default" 값이 없으면 에러

recc Example

pip install:

pip install elasticsearch[async]>=7.0.0,<8.0.0

AsyncElasticLogger class:

# -*- coding: utf-8 -*-

import os
import time
from typing import Optional
from elasticsearch import AsyncElasticsearch

DEFAULT_ELASTICSEARCH_INDEX_NAME = "answer"
DEFAULT_ELASTICSEARCH_HOST = "localhost"
DEFAULT_ELASTICSEARCH_PORT = 9200

DEFAULT_ELASTICSEARCH_PROP_NAME = "name"
DEFAULT_ELASTICSEARCH_PROP_PID = "pid"
DEFAULT_ELASTICSEARCH_PROP_TIME = "time"
DEFAULT_ELASTICSEARCH_PROP_LEVEL = "level"
DEFAULT_ELASTICSEARCH_PROP_MSG = "msg"

DEFAULT_ELASTICSEARCH_MAPPING = {
    "mappings": {
        "properties": {
            DEFAULT_ELASTICSEARCH_PROP_NAME: {
                "type": "keyword",
            },
            DEFAULT_ELASTICSEARCH_PROP_PID: {
                "type": "integer",
            },
            DEFAULT_ELASTICSEARCH_PROP_TIME: {
                "type": "date",
                "format": "epoch_millis",
            },
            DEFAULT_ELASTICSEARCH_PROP_LEVEL: {
                "type": "short",
            },
            DEFAULT_ELASTICSEARCH_PROP_MSG: {
                "type": "text",
            },
        },
    },
}


class AsyncElasticLogger:
    def __init__(
        self,
        name: str,
        host: Optional[str] = None,
        port: Optional[int] = None,
        index: Optional[str] = None,
    ):
        self.name = name
        self.host = host if host else DEFAULT_ELASTICSEARCH_HOST
        self.port = port if port else DEFAULT_ELASTICSEARCH_PORT
        self.index = index if index else DEFAULT_ELASTICSEARCH_INDEX_NAME

        self._pid = os.getpid()
        self._mapping = DEFAULT_ELASTICSEARCH_MAPPING
        self._ready = False
        self._es = AsyncElasticsearch(f"{self.host}:{self.port}")

    async def open(self) -> bool:
        try:
            if not await self._es.indices.exists(index=self.index):
                await self._es.indices.create(index=self.index, body=self._mapping)
            self._ready = True
            return True
        except Exception:  # noqa
            self._ready = False
            return False

    async def close(self) -> None:
        if self._ready:
            try:
                await self._es.close()
            except Exception:  # noqa
                pass
            finally:
                self._ready = False

    async def write(self, level: int, msg: str):
        return await self._es.index(
            index=self.index,
            doc_type="_doc",
            body={
                DEFAULT_ELASTICSEARCH_PROP_NAME: self.name,
                DEFAULT_ELASTICSEARCH_PROP_PID: self._pid,
                DEFAULT_ELASTICSEARCH_PROP_TIME: int(time.time() * 1000),
                DEFAULT_ELASTICSEARCH_PROP_LEVEL: level,
                DEFAULT_ELASTICSEARCH_PROP_MSG: msg,
            },
        )

See also

Favorite site