Elasticsearch:Python
오픈소스 포크버전 호환성 문제
ElasticSearch 공식 파이썬 라이브러리가 더 이상 오픈소스 포크버전과 동작하지 않음 ==
- Verify connection to Elasticsearch #1623
- 첫번째 API 호출 전에 버전 번호와 특정 HTTP 헤더를 체크해서 에러를 발생시키도록 수정
- 버전 필드 없거나 6.0 이하는 에러
- 7.14 이상의 버전은 X-Elastic-Product HTTP 헤더가 없거나 값이 "Elasticsearch" 가 아니면 에러
- 7.14.0 아래 7.x 버전은 tagline 에 "You know, for Search", build_flavor 가 "default" 값이 없으면 에러
recc Example
pip install:
AsyncElasticLogger
class:
# -*- coding: utf-8 -*-
import os
import time
from typing import Optional
from elasticsearch import AsyncElasticsearch
DEFAULT_ELASTICSEARCH_INDEX_NAME = "answer"
DEFAULT_ELASTICSEARCH_HOST = "localhost"
DEFAULT_ELASTICSEARCH_PORT = 9200
DEFAULT_ELASTICSEARCH_PROP_NAME = "name"
DEFAULT_ELASTICSEARCH_PROP_PID = "pid"
DEFAULT_ELASTICSEARCH_PROP_TIME = "time"
DEFAULT_ELASTICSEARCH_PROP_LEVEL = "level"
DEFAULT_ELASTICSEARCH_PROP_MSG = "msg"
DEFAULT_ELASTICSEARCH_MAPPING = {
"mappings": {
"properties": {
DEFAULT_ELASTICSEARCH_PROP_NAME: {
"type": "keyword",
},
DEFAULT_ELASTICSEARCH_PROP_PID: {
"type": "integer",
},
DEFAULT_ELASTICSEARCH_PROP_TIME: {
"type": "date",
"format": "epoch_millis",
},
DEFAULT_ELASTICSEARCH_PROP_LEVEL: {
"type": "short",
},
DEFAULT_ELASTICSEARCH_PROP_MSG: {
"type": "text",
},
},
},
}
class AsyncElasticLogger:
def __init__(
self,
name: str,
host: Optional[str] = None,
port: Optional[int] = None,
index: Optional[str] = None,
):
self.name = name
self.host = host if host else DEFAULT_ELASTICSEARCH_HOST
self.port = port if port else DEFAULT_ELASTICSEARCH_PORT
self.index = index if index else DEFAULT_ELASTICSEARCH_INDEX_NAME
self._pid = os.getpid()
self._mapping = DEFAULT_ELASTICSEARCH_MAPPING
self._ready = False
self._es = AsyncElasticsearch(f"{self.host}:{self.port}")
async def open(self) -> bool:
try:
if not await self._es.indices.exists(index=self.index):
await self._es.indices.create(index=self.index, body=self._mapping)
self._ready = True
return True
except Exception: # noqa
self._ready = False
return False
async def close(self) -> None:
if self._ready:
try:
await self._es.close()
except Exception: # noqa
pass
finally:
self._ready = False
async def write(self, level: int, msg: str):
return await self._es.index(
index=self.index,
doc_type="_doc",
body={
DEFAULT_ELASTICSEARCH_PROP_NAME: self.name,
DEFAULT_ELASTICSEARCH_PROP_PID: self._pid,
DEFAULT_ELASTICSEARCH_PROP_TIME: int(time.time() * 1000),
DEFAULT_ELASTICSEARCH_PROP_LEVEL: level,
DEFAULT_ELASTICSEARCH_PROP_MSG: msg,
},
)