Python:multiprocessing.shared memory
프로세스 간 직접 액세스를 위한 공유 메모리를 제공합니다.
Usage
Producer Process:
from multiprocessing import shared_memory
server_sm = shared_memory.SharedMemory(create=True, size=10)
# use `server_sm.buf`
server_sm.close()
server_sm.unlink()
Consumer Process:
from multiprocessing import shared_memory
client_sm = shared_memory.SharedMemory(sm.name)
# use `client_sm.buf`
client_sm.close()
SharedMemory
새로운 공유 메모리 블록을 생성하거나 기존 공유 메모리 블록에 연결합니다.
모든 공유 메모리 블록에는 고유한 이름이 할당됩니다. 이를 통해 하나의 프로세스가 특정 이름을 가진 공유 메모리 블록을 생성할 수 있으므로 다른 프로세스가 동일한 이름을 사용하여 동일한 공유 메모리 블록에 연결할 수 있습니다.
프로세스 간에 데이터를 공유하기 위한 리소스로서 공유 메모리 블록은 이를 생성한 원래 프로세스보다 오래 지속될 수 있습니다. 한 프로세스가 다른 프로세스에 여전히 필요할 수 있는 공유 메모리 블록에 더 이상 액세스할 필요가 없으면 close()
메서드를 호출해야 합니다.
모든 프로세스에서 공유 메모리 블록이 더 이상 필요하지 않은 경우 적절한 정리를 위해 unlink()
메서드를 호출해야 합니다.
Simple Example
다음 예제는 SharedMemory 인스턴스의 저수준 사용을 보여줍니다:
>>> from multiprocessing import shared_memory
>>> shm_a = shared_memory.SharedMemory(create=True, size=10)
>>> type(shm_a.buf)
<class 'memoryview'>
>>> buffer = shm_a.buf
>>> len(buffer)
10
>>> buffer[:4] = bytearray([22, 33, 44, 55]) # Modify multiple at once
>>> buffer[4] = 100 # Modify single byte at a time
>>> # Attach to an existing shared memory block
>>> shm_b = shared_memory.SharedMemory(shm_a.name)
>>> import array
>>> array.array('b', shm_b.buf[:5]) # Copy the data into a new array.array
array('b', [22, 33, 44, 55, 100])
>>> shm_b.buf[:5] = b'howdy' # Modify via shm_b using bytes
>>> bytes(shm_a.buf[:5]) # Access via shm_a
b'howdy'
>>> shm_b.close() # Close each SharedMemory instance
>>> shm_a.close()
>>> shm_a.unlink() # Call unlink only once to release the shared memory
Numpy Shared Memory Example
다음 예제는 두 개의 다른 파이썬 셸에서 같은 numpy.ndarray에 액세스하는, NumPy 배열과 함께 SharedMemory 클래스를 사용하는 실용적인 방법을 보여줍니다:
>>> # In the first Python interactive shell
>>> import numpy as np
>>> a = np.array([1, 1, 2, 3, 5, 8]) # Start with an existing NumPy array
>>> from multiprocessing import shared_memory
>>> shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
>>> # Now create a NumPy array backed by shared memory
>>> b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
>>> b[:] = a[:] # Copy the original data into shared memory
>>> b
array([1, 1, 2, 3, 5, 8])
>>> type(b)
<class 'numpy.ndarray'>
>>> type(a)
<class 'numpy.ndarray'>
>>> shm.name # We did not specify a name so one was chosen for us
'psm_21467_46075'
>>> # In either the same shell or a new Python shell on the same machine
>>> import numpy as np
>>> from multiprocessing import shared_memory
>>> # Attach to the existing shared memory block
>>> existing_shm = shared_memory.SharedMemory(name='psm_21467_46075')
>>> # Note that a.shape is (6,) and a.dtype is np.int64 in this example
>>> c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf)
>>> c
array([1, 1, 2, 3, 5, 8])
>>> c[-1] = 888
>>> c
array([ 1, 1, 2, 3, 5, 888])
>>> # Back in the first Python interactive shell, b reflects this change
>>> b
array([ 1, 1, 2, 3, 5, 888])
>>> # Clean up from within the second Python shell
>>> del c # Unnecessary; merely emphasizing the array is no longer used
>>> existing_shm.close()
>>> # Clean up from within the first Python shell
>>> del b # Unnecessary; merely emphasizing the array is no longer used
>>> shm.close()
>>> shm.unlink() # Free and release the shared memory block at the very end
Troubleshooting
There appear to be 1 leaked shared_memory objects to clean up at shutdown
- Stackoverflow - Python 3.8 shared_memory resource_tracker producing unexpected warnings at application close
- Issue 39959: Bug on multiprocessing.shared_memory - Python tracker
프로그램 종료 시 다음과 같은 UserWarning 이 출력된다.
~: /usr/lib/python3.8/multiprocessing/resource_tracker.py:203: UserWarning: resource_tracker: There appear to be 1 leaked shared_memory objects to clean up at shutdown
warnings.warn('resource_tracker: There appear to be %d '
문제 상황은 다음과 같다.
공유 메모리를 생성하는 프로세스에서 다음과 같은 코드가 사용된다.
server_sm = SharedMemory(create=True, size=10) # register tracker
# use `server_sm.buf`
server_sm.close()
server_sm.unlink() # unregister tracker
SharedMemory 생성시 Tracker를 등록하고, unlink()
에서 Tracker를 등록해제 한다.
하지만 공유 메모리를 사용하는(Consumer)에서는 unlink()
하지 않게 되는데,
Tracker는 등록되지만 해제되는 코드가 없어서 문제가 된다.
이를 해결하기 위해 이 곳에서 제시하는 것 처럼 원본 코드를 복사하여, 생성자에서 create=True
일 경우에만 등록하거나
(나의 경우) 다음과 같은 유틸리티 함수를 사용하면 된다.
# -*- coding: utf-8 -*-
import os
from multiprocessing.shared_memory import SharedMemory
def _unregister_shared_memory_tracker(sm: SharedMemory) -> None:
# https://bugs.python.org/issue39959
if os.name != "nt":
from multiprocessing.resource_tracker import unregister # noqa
unregister(getattr(sm, "_name"), "shared_memory")
class _AttachSharedMemoryContext:
def __init__(self, name: str):
self.name = name
def __enter__(self) -> SharedMemory:
self.sm = SharedMemory(name=self.name)
return self.sm
def __exit__(self, exc_type, exc_val, exc_tb):
_unregister_shared_memory_tracker(self.sm)
self.sm.close()
def attach_shared_memory(name: str):
return _AttachSharedMemoryContext(name)
Consumer 쪽에서 다음과 같이 사용하면 된다: