from __future__ import with_statement
import time
import threading
def iter_consumed(items, produce, consume, _producer_delay=0):
'''Return an iterator over the consumed items.
:param items: An iterable of objects to be `produce`()d and `consume`()d.
:param produce: A callable `f(item)` that produces a single item; the return
value is ignored. What "produce" exactly means is application-specific.
:param consume: A callable `f()` that consumes a previously produced item
and returns the consumed item. What "consume" exactly means is
application-specific. The only assumption is that if `produce` is called
`N` times, then the next `N` calls to `consume` will (eventually, though
not necessarily immediatelly) return, i.e they will not block indefinitely.
:param _producer_delay: If greater than zero, introduce a delay producer
between two subsequent calls to produce(); otherwise everything will most
likely be produced before even the first item is consumed, which is not
very interesting.
'''
condition = threading.Condition()
done_remaining = [False, 0] # will be defined as 'nonlocal' in the future
def abort():
if not done_remaining[0]:
with condition:
done_remaining[0] = True
condition.notifyAll()
def produce_all():
try:
for item in items:
with condition:
if done_remaining[0]:
break
produce(item)
done_remaining[1] += 1
condition.notify()
time.sleep(_producer_delay)
finally:
abort()
producer = threading.Thread(target=produce_all)
producer.start()
try:
while True:
with condition:
if done_remaining[1] > 0: # there is a pending item; yield it
done_remaining[1] -= 1
item = consume()
elif done_remaining[0]: # no pending items and producer is done
return
else: # no pending items but producer is not done
condition.wait()
continue
yield item
finally:
abort()
producer.join()
if __name__ == '__main__':
import collections
available = collections.deque()
def produce(item):
available.append(item)
print 'Produced', item
def consume():
return available.popleft() / 2.0
producer_delay = 0.01
for ratio in 0.5, 0.9, 1.0, 1.1, 1.5, 2, 5:
consumer_delay = producer_delay * ratio
print 'Producer delay:', producer_delay
print 'Consumer delay:', consumer_delay
for item in iter_consumed(range(10), produce, consume, producer_delay):
print 'Consumed', item
time.sleep(consumer_delay)
raw_input(30*'=' + '\nPress any key to continue')