Python:threading
Value
Return a ctypes object allocated from shared memory. By default the return value is actually a synchronized wrapper for the object.
typecode_or_type determines the type of the returned object: it is either a ctypes type or a one character typecode of the kind used by the array module. *args is passed on to the constructor for the type.
If lock is True (the default) then a new recursive lock object is created to synchronize access to the value. If lock is a Lock or RLock object then that will be used to synchronize access to the value. If lock is False then access to the returned object will not be automatically protected by a lock, so it will not necessarily be “process-safe”.
Operations like += which involve a read and write are not atomic. So if, for instance, you want to atomically increment a shared value it is insufficient to just do
Assuming the associated lock is recursive (which it is by default) you can instead do
Note that lock is a keyword-only argument.
Example
import multiprocessing
def worker1(v):
with v.get_lock():
v.value += 1
def worker2(v):
with v.get_lock():
v.value += 2
ctypes_int = multiprocessing.Value("i", 0)
print ctypes_int.value
# Output: 0
process1 = multiprocessing.Process(
target=worker1, args=[ctypes_int])
process2 = multiprocessing.Process(
target=worker2, args=[ctypes_int])
process1.start()
process2.start()
process1.join()
process2.join()
print ctypes_int.value
RLock
"Reentrant Lock"
간혹 lock을 거는 함수가 재귀호출을 하는 경우 쓰레드가 Block되어 Lock을 해제할 수 없게 되어버림
RLock()은 쓰레드가 lock을 취득한 상태에서 lock을 다시 취득하면 lock count를 1 올리면서 즉시 return한다.
"lock 재 획득 문제를 해결"
Condition variable
import threading
import time
CONSUMER_COUNT = 10
PRODUCER_COUNT = CONSUMER_COUNT // 2
queue = []
cv = threading.Condition()
item_id = 0
class Consumer(threading.Thread):
def __init__(self, id):
threading.Thread.__init__(self)
self.id = id
def run(self):
for i in range(5):
cv.acquire()
while len(queue) < 1:
print('consumer({}) waiting...'.format(self.id))
cv.wait()
print('consumer({}) -> item({})'.format(self.id, queue.pop(0)))
cv.release()
time.sleep(0.5)
class Producer(threading.Thread):
def run(self):
global item_id
for i in range(10):
cv.acquire()
item_id += 1
queue.append(item_id)
cv.notify()
cv.release()
time.sleep(0.7)
threads = []
for i in range(CONSUMER_COUNT):
threads.append(Consumer(i))
for i in range(PRODUCER_COUNT):
threads.append(Producer())
for th in threads:
th.start()
for th in threads:
th.join()
print('<End>')