[ create a new paste ] login | about

Link: http://codepad.org/FXF2SWmg    [ raw code | output | fork ]

Python, pasted on Jun 11:
from __future__ import with_statement

import time
import threading


def iter_consumed(items, produce, consume, _producer_delay=0):
    '''Return an iterator over the consumed items.

    :param items: An iterable of objects to be `produce`()d and `consume`()d.

    :param produce: A callable `f(item)` that produces a single item; the return
        value is ignored. What "produce" exactly means is application-specific.

    :param consume: A callable `f()` that consumes a previously produced item
        and returns the consumed item. What "consume" exactly means is
        application-specific. The only assumption is that if `produce` is called
        `N` times, then the next `N` calls to `consume` will (eventually, though
        not necessarily immediatelly) return, i.e they will not block indefinitely.

    :param _producer_delay: If greater than zero, introduce a delay producer
        between two subsequent calls to produce(); otherwise everything will most
        likely be produced before even the first item is consumed, which is not
        very interesting.
    '''
    condition = threading.Condition()
    done_remaining = [False, 0]   # will be defined as 'nonlocal' in the future
    def abort():
        if not done_remaining[0]:
            with condition:
                done_remaining[0] = True
                condition.notifyAll()
    def produce_all():
        try:
            for item in items:
                with condition:
                    if done_remaining[0]:
                        break
                    produce(item)
                    done_remaining[1] += 1
                    condition.notify()
                time.sleep(_producer_delay)
        finally:
            abort()
    producer = threading.Thread(target=produce_all)
    producer.start()
    try:
        while True:
            with condition:
                if done_remaining[1] > 0:   # there is a pending item; yield it
                    done_remaining[1] -= 1
                    item = consume()
                elif done_remaining[0]: # no pending items and producer is done
                    return
                else:               # no pending items but producer is not done
                    condition.wait()
                    continue
            yield item
    finally:
        abort()
        producer.join()


if __name__ == '__main__':

    import collections
    available = collections.deque()

    def produce(item):
        available.append(item)
        print 'Produced', item

    def consume():
        return available.popleft() / 2.0

    producer_delay = 0.01
    for ratio in 0.5, 0.9, 1.0, 1.1, 1.5, 2, 5:
        consumer_delay = producer_delay * ratio
        print 'Producer delay:', producer_delay
        print 'Consumer delay:', consumer_delay
        for item in iter_consumed(range(10), produce, consume, producer_delay):
            print 'Consumed', item
            time.sleep(consumer_delay)
        raw_input(30*'=' + '\nPress any key to continue')


Output:
1
Disallowed system call: SYS_pipe


Create a new paste based on this one


Comments: