Skip to content

Python:multiprocessing

multiprocessing 은 threading 모듈과 유사한 API를 사용하여 프로세스 스포닝(spawning)을 지원하는 패키지입니다. multiprocessing 패키지는 지역과 원격 동시성을 모두 제공하며 스레드 대신 서브 프로세스를 사용하여 전역 인터프리터 록 을 효과적으로 피합니다. 이것 때문에, multiprocessing 모듈은 프로그래머가 주어진 기계에서 다중 프로세서를 최대한 활용할 수 있게 합니다. 유닉스와 윈도우에서 모두 실행됩니다.

multiprocessing 모듈은 threading 모듈에 대응 물이 없는 API도 제공합니다. 이것의 대표적인 예가 Pool 객체입니다. 이 객체는 여러 입력 값에 걸쳐 함수의 실행을 병렬 처리하고 입력 데이터를 프로세스에 분산시키는 편리한 방법을 제공합니다(데이터 병렬 처리). 다음 예제는 자식 프로세스가 해당 모듈을 성공적으로 임포트 할 수 있도록, 모듈에서 이러한 함수를 정의하는 일반적인 방법을 보여줍니다.

Example

다음은 Pool 를 사용하는 데이터 병렬 처리의 기본 예제입니다:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

표준 출력으로 다음과 같은 것을 인쇄합니다

[1, 4, 9]

Shared Memory

Python:multiprocessing.shared_memory 항목 참조.

Capture, Standard output

from multiprocessing import Process
import os
import sys

def info(title):
    print title
    print 'module name:', __name__
    print 'parent process:', os.getppid()
    print 'process id:', os.getpid()

def f(name):
    sys.stdout = open(str(os.getpid()) + ".out", "w")
    info('function f')
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    q = Process(target=f, args=('fred',))
    q.start()
    p.join()
    q.join()

Locking example

from multiprocessing import Process, Lock, Manager

class Worker:
    def __init__(self):
        self.cache = Manager().list()

    def pop_data(self, lock, redis):
        lock.acquire()
        while True:
            item = redis.pop_data()
            if item == None:  # 큐가 비워졌으면 끝
                item = None
                break
            if item in self.cache:
                print 'already finished! dump it = {}'.format(item)
                continue
            if len(self.cache) > 10:
                self.cache.pop(0)
            self.cache.append(item)
            break
        lock.release()
        return each_item

    def working(self, lock, redis):
        while True:
            data = self.pop_data(lock, redis)
            if not data:
                break
            ~~~~~  # 어떤 작업

def main(argv):
    redis = ~~~~
    process_list = []
    lock = Lock()
    process_count = 5
    for i in xrange(process_count):
        worker = Worker()
        ps = Process(target=worker.working, args=(lock, redis, i))
        process_list.append(ps)
        ps.start()

    for i in xrange(len(process_list)):
        process_list[i].join()

if '__name__' == '__main__':
    main()

Troubleshooting

Deadlock at the end of the program

multiprocessing.Queue를 사용할 때, 프로그램 종료시 데드락이 걸린다면 Queue.close()사용 여부를 확인하는 것이 좋다.

UserWarning: resource_tracker: There appear to be 1 leaked shared_memory objects to clean up at shutdown

다음과 같은 에러 발생:

/usr/local/lib/python3.8/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 10 leaked shared_memory objects to clean up at shutdown
  warnings.warn('resource_tracker: There appear to be %d '
/usr/local/lib/python3.8/multiprocessing/resource_tracker.py:229: UserWarning: resource_tracker: '/psm_e27e5f9e': [Errno 2] No such file or directory: '/psm_e27e5f9e'
  warnings.warn('resource_tracker: %r: %s' % (name, e))
/usr/local/lib/python3.8/multiprocessing/resource_tracker.py:229: UserWarning: resource_tracker: '/psm_2cf099ac': [Errno 2] No such file or directory: '/psm_2cf099ac'
<8 more similar messages omitted>

... 아직 문제해결 안됨.

See also

Favorite site

References


  1. On_Sharing_Large_Arrays_When_Using_Pythons_Multiprocessing.pdf