Skip to content

Numba

Numba is an open source JIT compiler that translates a subset of Python and NumPy code into fast machine code.

Install

pip install numba

jit

Option

설명

주의점

사용 예시

nopython

Python을 interpreter로 처리하지 않고, Compile을 진행

Python에서만 존재하는 라이브러리(pandas 등)를 사용하면 Error가 뜸

@jit(nopython=True)

nogil

GIL(Global Interpreter Lock)을 사용하지 않음.

Thread 간 안전성 문제가 있을 수 있음, 메모리 사용량이 많아질 수 있음

@jit(nogil=True)

cache

Compile 결과를 디스크에 캐싱하여, 이후에 재사용 할 수 있도록 함

Numba 버전이나, 코드 변경 시, 캐시 파일이 의미가 없을 수 있음.

@jit(cache=True)

parallel

병렬 처리를 위해 사용, 반복문과 배열의 연산을 병렬화하여 cpu 코어를 활용 가능

추가적인 메모리 사용과 오버헤드 발생 가능, 병렬화로 인해 항상 성능 향상을 보장하지 못함.

@jit(parallel=True)

njit

@njit == @jit(nopython=True) 이다.

페러럴 루프 최적화

#prange 항목 참조.

Caching

캐시 디렉터리의 위치를 ​​재정의합니다. 정의된 경우 이는 유효한 디렉터리 경로여야 합니다.

정의되지 않은 경우 Numba는 다음 순서로 캐시 디렉토리를 선택합니다.

  1. 트리 내 캐시. .pyc 파일이 저장되는 방식에 따라 __pycache__ 디렉터리 아래 해당 소스 파일 옆에 캐시를 넣습니다.
  2. 사용자 전체 캐시. Appdirs 패키지의 appdirs.user_cache_dir을 사용하여 사용자의 애플리케이션 디렉터리에 캐시를 넣습니다.
  3. IPython 캐시. 캐시를 IPython 특정 애플리케이션 디렉토리에 넣습니다. 저장소는 IPython.paths.get_ipython_cache_dir()이 반환한 디렉터리의 numba_cache 아래에 만들어집니다.

캐시 공유에 대한 문서와 캐시 삭제에 대한 문서도 참조하세요.

NumPy 지원

numpy.nditer
첫 번째 Argument 만 전달해야 한다. 2중 루프가 필요하다면 #prange를 사용하자.

Automatic parallelization with @jit

prange

from numba import njit, prange
import numpy as np

@njit(parallel=True)
def test(x):
    n = x.shape[0]
    a = np.sin(x)
    b = np.cos(a * a)
    acc = 0
    for i in prange(n - 2):
        for j in prange(n - 1):
            acc += b[i] + b[j + 1]
    return acc

test(np.arange(10))

test.parallel_diagnostics(level=4)

GIL 없는 Multithread

GIL을 해제하면 cpu bound작업시 multithread의 성능을 제대로 발휘할수 있다.

#!/usr/bin/env python
from __future__ import print_function, division, absolute_import

import math
import threading
from timeit import repeat

import numpy as np
from numba import jit

nthreads = 4
size = 10**6

def func_np(a, b):
    """
    Control function using Numpy.
    """
    return np.exp(2.1 * a + 3.2 * b)

@jit('void(double[:], double[:], double[:])', nopython=True, nogil=True)
def inner_func_nb(result, a, b):
    """
    Function under test.
    """
    for i in range(len(result)):
        result[i] = math.exp(2.1 * a[i] + 3.2 * b[i])

def timefunc(correct, s, func, *args, **kwargs):
    """
    Benchmark *func* and print out its runtime.
    """
    print(s.ljust(20), end=" ")
    # Make sure the function is compiled before we start the benchmark
    res = func(*args, **kwargs)
    if correct is not None:
        assert np.allclose(res, correct), (res, correct)
    # time it
    print('{:>5.0f} ms'.format(min(repeat(lambda: func(*args, **kwargs),
                                          number=5, repeat=2)) * 1000))
    return res

def make_singlethread(inner_func):
    """
    Run the given function inside a single thread.
    """
    def func(*args):
        length = len(args[0])
        result = np.empty(length, dtype=np.float64)
        inner_func(result, *args)
        return result
    return func

def make_multithread(inner_func, numthreads):
    """
    Run the given function inside *numthreads* threads, splitting its
    arguments into equal-sized chunks.
    """
    def func_mt(*args):
        length = len(args[0])
        result = np.empty(length, dtype=np.float64)
        args = (result,) + args
        chunklen = (length + numthreads - 1) // numthreads
        # Create argument tuples for each input chunk
        chunks = [[arg[i * chunklen:(i + 1) * chunklen] for arg in args]
                  for i in range(numthreads)]
        # Spawn one thread per chunk
        threads = [threading.Thread(target=inner_func, args=chunk)
                   for chunk in chunks]
        for thread in threads:
            thread.start()
        for thread in threads:
            thread.join()
        return result
    return func_mt


func_nb = make_singlethread(inner_func_nb)
func_nb_mt = make_multithread(inner_func_nb, nthreads)

a = np.random.rand(size)
b = np.random.rand(size)

correct = timefunc(None, "numpy (1 thread)", func_np, a, b)
timefunc(correct, "numba (1 thread)", func_nb, a, b)
timefunc(correct, "numba (%d threads)" % nthreads, func_nb_mt, a, b)

CUDA 디바이스 사용 방법

CUDA 메모리 해제

from numba import cuda

# 현재 사용 중인 디바이스 해제
cuda.select_device(0)
cuda.close()

Sharing CUDA Memory

프로세스 간 공유는 레거시 CUDA IPC API(함수 이름이 cuIpc로 시작함)를 사용하여 구현되며 Linux에서만 지원됩니다.

다른 프로세스로 장치 배열 내보내기

장치 배열은 CUDA IPC API를 사용하여 동일한 머신의 다른 프로세스와 공유될 수 있습니다.

이를 위해 .get_ipc_handle()장치 배열에서 메서드를 사용하여 #IpcArrayHandle객체를 가져오고, 이 객체를 다른 프로세스로 전송할 수 있습니다.

DeviceNDArray.get_ipc_handle()
로컬 할당을 공유하기 위해 직렬화하고 다른 프로세스로 전송하기에 안전한 #IpcArrayHandle 객체를 반환합니다.

IpcArrayHandle

클래스 원형은 class numba.cuda.cudadrv.devicearray.IpcArrayHandle(ipc_handle, array_desc) 이다.

IPC 배열 핸들은 직렬화되어 동일한 머신의 다른 프로세스에 전달될 수 있으며, GPU 할당을 공유하는 데 사용됩니다.

목적지 프로세스(공유를 받는 측)에서는 .open() 메서드를 사용하여 원래 프로세스의 할당을 공유하는 새로운 DeviceNDArray 객체를 생성합니다.

리소스를 해제하려면 .close() 메서드를 호출합니다. 그 후에는 목적지 프로세스(공유를 받는 측)에서 더 이상 공유된 배열 객체를 사용할 수 없습니다. 이 시점에서 리소스에 대한 약한 참조가 제거됩니다.

이 객체는 .open().close() 메서드를 자동으로 호출하는 컨텍스트 관리자 (with 문법) 를 지원합니다.

with the_ipc_array_handle as ipc_array:
    # use ipc_array here as a normal gpu array object
    some_code(ipc_array)

# ipc_array is dead at this point

다른 프로세스에서 IPC 메모리 가져오기

cuda.open_ipc_array(shape, dtype, strides=None, offset=0) 함수는 다른 프로세스의 IPC 핸들을 장치 배열로 여는 데 사용됩니다.

바이트 시퀀스 (예: bytes, int 튜플)로 표현된 IPC 핸들(CUipcMemHandle)을 열고, shape, dtype, strides 으로 배열로 나타내는 컨텍스트 관리자입니다.

strides는 생략할 수 있으며, 이 경우 1D C1 연속 배열로 간주됩니다.

장치 배열을 생성합니다.

컨텍스트 관리자가 종료되면 IPC 핸들은 자동으로 닫힙니다.

Example

호스트 측 (공유 주는 측) 프로세스:

import numpy as np
from numba import cuda
from time import sleep
from pickle import dumps

# CUDA 장치에서 배열을 생성합니다.
data = np.arange(10, dtype=np.float32)
d_data = cuda.to_device(data)

# IPC 핸들을 생성합니다.
ipc_handle = d_data.get_ipc_handle()

# IPC 핸들을 직렬화하여 다른 프로세스에 전달할 수 있습니다.
serialized_handle = dumps(ipc_handle)

# serialized_handle을 파일이나 다른 방법으로 다른 프로세스에 전달합니다.
with open('ipc_handle.bin', 'wb') as f:
    f.write(serialized_handle)


while True:
    sleep(1)
    print(".", end="", flush=True)

게스트 측 (공유 받는 측) 프로세스:

from numba import cuda
from pickle import loads

# serialized_handle을 파일이나 다른 방법으로 수신합니다.
with open('ipc_handle.bin', 'rb') as f:
    serialized_handle = f.read()

# IPC 핸들을 역직렬화합니다.
ipc_handle = loads(serialized_handle)

# 핸들을 통해 배열에 접근합니다.
d_data = ipc_handle.open()
data = d_data.copy_to_host()

print(data)

See also

Favorite site

References


  1. array.flags['C_CONTIGUOUS']True인, 1차원 C 스타일의 메모리 배치.