Python:multiprocessing
multiprocessing 은 threading 모듈과 유사한 API를 사용하여 프로세스 스포닝(spawning)을 지원하는 패키지입니다. multiprocessing 패키지는 지역과 원격 동시성을 모두 제공하며 스레드 대신 서브 프로세스를 사용하여 전역 인터프리터 록 을 효과적으로 피합니다. 이것 때문에, multiprocessing 모듈은 프로그래머가 주어진 기계에서 다중 프로세서를 최대한 활용할 수 있게 합니다. 유닉스와 윈도우에서 모두 실행됩니다.
multiprocessing 모듈은 threading 모듈에 대응 물이 없는 API도 제공합니다. 이것의 대표적인 예가 Pool 객체입니다. 이 객체는 여러 입력 값에 걸쳐 함수의 실행을 병렬 처리하고 입력 데이터를 프로세스에 분산시키는 편리한 방법을 제공합니다(데이터 병렬 처리). 다음 예제는 자식 프로세스가 해당 모듈을 성공적으로 임포트 할 수 있도록, 모듈에서 이러한 함수를 정의하는 일반적인 방법을 보여줍니다.
Example
다음은 Pool 를 사용하는 데이터 병렬 처리의 기본 예제입니다:
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
표준 출력으로 다음과 같은 것을 인쇄합니다
Shared Memory
Python:multiprocessing.shared_memory 항목 참조.
Capture, Standard output
from multiprocessing import Process
import os
import sys
def info(title):
print title
print 'module name:', __name__
print 'parent process:', os.getppid()
print 'process id:', os.getpid()
def f(name):
sys.stdout = open(str(os.getpid()) + ".out", "w")
info('function f')
print 'hello', name
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
q = Process(target=f, args=('fred',))
q.start()
p.join()
q.join()
Locking example
from multiprocessing import Process, Lock, Manager
class Worker:
def __init__(self):
self.cache = Manager().list()
def pop_data(self, lock, redis):
lock.acquire()
while True:
item = redis.pop_data()
if item == None: # 큐가 비워졌으면 끝
item = None
break
if item in self.cache:
print 'already finished! dump it = {}'.format(item)
continue
if len(self.cache) > 10:
self.cache.pop(0)
self.cache.append(item)
break
lock.release()
return each_item
def working(self, lock, redis):
while True:
data = self.pop_data(lock, redis)
if not data:
break
~~~~~ # 어떤 작업
def main(argv):
redis = ~~~~
process_list = []
lock = Lock()
process_count = 5
for i in xrange(process_count):
worker = Worker()
ps = Process(target=worker.working, args=(lock, redis, i))
process_list.append(ps)
ps.start()
for i in xrange(len(process_list)):
process_list[i].join()
if '__name__' == '__main__':
main()
Troubleshooting
Deadlock at the end of the program
multiprocessing.Queue
를 사용할 때, 프로그램 종료시 데드락이 걸린다면 Queue.close()
사용 여부를 확인하는 것이 좋다.
UserWarning: resource_tracker: There appear to be 1 leaked shared_memory objects to clean up at shutdown
다음과 같은 에러 발생:
/usr/local/lib/python3.8/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 10 leaked shared_memory objects to clean up at shutdown
warnings.warn('resource_tracker: There appear to be %d '
/usr/local/lib/python3.8/multiprocessing/resource_tracker.py:229: UserWarning: resource_tracker: '/psm_e27e5f9e': [Errno 2] No such file or directory: '/psm_e27e5f9e'
warnings.warn('resource_tracker: %r: %s' % (name, e))
/usr/local/lib/python3.8/multiprocessing/resource_tracker.py:229: UserWarning: resource_tracker: '/psm_2cf099ac': [Errno 2] No such file or directory: '/psm_2cf099ac'
<8 more similar messages omitted>
... 아직 문제해결 안됨.
See also
Favorite site
- multiprocessing --- 프로세스 기반 병렬 처리
- [추천] HAMA 블로그 - 파이썬 동시성 프로그래밍 - (4) multiprocessing
- (Python) Multiprocessing (Thread vs Process)
- (Python) 파이썬 멀티 쓰레드(thread)와 멀티 프로세스(process)
- Python Multi-Threading, Multi-Processing
- [추천] On Sharing Large Arrays When Using Python's Multiprocessing 1
References
-
On_Sharing_Large_Arrays_When_Using_Pythons_Multiprocessing.pdf ↩