[ create a new paste ] login | about

Link: http://codepad.org/unquCIu3    [ raw code | fork ]

Python, pasted on Jun 14:
import hashlib
try:
    import audioread
except:
    print "ERROR: audioread not installed."
    print "Try one of these commands (stop trying if one succeeds):"
    print "  'sudo pip install audioread'"
    print "  'sudo easy_install audioread'"
    print "  'sudo apt-get install python-audioread'"
    print "  'sudo yum install python-audioread'"
    exit(1)
import os
import argparse
import signal
import pickle
import re
from coroutine import coroutine, broadcast, threaded, sinks
from threading import Thread, Lock
from Queue import Queue, Empty

# render audioread incapable of using gstreamer because gstreamer is not
# threadsafe
audioread._gst_available = lambda : False

@coroutine
def filterFileNames(regex, target):
    while True:
        try:
            filepath = (yield)
            match = regex.search(filepath)
            if match is not None:
                target.send(filepath)
        except GeneratorExit:
            target.close()
            return

@coroutine
def skip(hashes, target):
    while True:
        try:
            filepath = (yield)
            if filepath not in hashes:
                target.send(filepath)
        except GeneratorExit:
            target.close()
            return

@coroutine
def hashSink(queue):
    while True:
        try:
            filepath = (yield)
            try:
                with audioread.audio_open(filepath) as f:
                    pass
                    m = hashlib.md5()
                    for buf in f:
                        m.update(buf)
                    queue.put((filepath, m.hexdigest()))
            except audioread.DecodeError, IOError:
                queue.put((filepath, None))
        except GeneratorExit:
            queue.put(GeneratorExit)
            return

def walk(dirpath, target):
    for root, dirs, files in os.walk(dirpath):
        for filename in files:
            filepath = os.path.sep.join([root, filename])
            target.send(filepath)
    target.close()

def readHashes(inputFile):
    with open(inputFile, 'rb') as f:
        return pickle.load(f)

def writeOutput(results, output):
    with open(output, 'wb') as f:
        pickle.dump(results, f)

class SignalHandler(object):
    def __init__(self, output):
        self.output = output
        self.results = None
        self.queue = None
        self.consumer_thread = None
        self.producers = None
        
    def handler(self, signum, frame):
        # signal the producer threads to stop
        for producer in self.producers:
            producer.stop()
        
        # signal the consumer thread to stop
        self.consumer_thread.join()

        processed = self.queue.get()        
        self.results.update(processed)

        print 'Writing temporary file {}...'.format(self.getTmpName())
        writeOutput(self.results, self.getTmpName())
        exit(0)
        
    def getTmpName(self):
        return '{}.tmp'.format(self.output)

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('dir', metavar='DIR',
                        help='directory to scan')
    parser.add_argument('output',
                        help='Output filename')
    
    args = parser.parse_args()

    sighandler = SignalHandler(args.output)
    initial_data = {}
    if os.path.exists(sighandler.getTmpName()):
        prompt = 'This operation was interrupted, recover from temporary file? (y/N) '
        f = lambda : raw_input(prompt)
        ans = None
        while ans not in ['y', 'n', '']:
            ans = f().lower()
        if ans == 'y':
            initial_data = readHashes(sighandler.getTmpName())

    def run_consumer(queue, num_producers):
        hashes = {}
        exits_seen = 0
        while exits_seen < num_producers:
            val = queue.get()
            if val is GeneratorExit:
                exits_seen += 1
                continue

            filepath, hexdigest = val
            hashes[filepath] = hexdigest

        queue.put(hashes)

    queue = Queue()

    num_producer_threads = 10

    consumer_thread = Thread(target=run_consumer, args=(queue, num_producer_threads))
    consumer_thread.start()

    sighandler.results = initial_data
    sighandler.consumer_thread = consumer_thread
    sighandler.queue = queue
    lock = Lock()

    pipe = lambda : \
        filterFileNames(
            re.compile(r'mp3$'),
            skip(initial_data.copy(),
                 broadcast(
                    sinks.locking_printer(lock),
                    hashSink(queue)
                 )
            )
        )
    threaded_pipe = threaded(num_producer_threads, pipe)

    sighandler.producers = threaded_pipe

    signal.signal(signal.SIGINT, sighandler.handler)

    walk(args.dir, threaded_pipe)
    
    while consumer_thread.isAlive():
        consumer_thread.join(0.25)

    results = queue.get()
    results.update(initial_data)
    
    writeOutput(results, args.output)

if __name__ == '__main__':
    main()


Create a new paste based on this one


Comments: