import hashlib
try:
import audioread
except:
print "ERROR: audioread not installed."
print "Try one of these commands (stop trying if one succeeds):"
print " 'sudo pip install audioread'"
print " 'sudo easy_install audioread'"
print " 'sudo apt-get install python-audioread'"
print " 'sudo yum install python-audioread'"
exit(1)
import os
import argparse
import signal
import pickle
import re
from coroutine import coroutine, broadcast, threaded, sinks
from threading import Thread, Lock
from Queue import Queue, Empty
# render audioread incapable of using gstreamer because gstreamer is not
# threadsafe
audioread._gst_available = lambda : False
@coroutine
def filterFileNames(regex, target):
while True:
try:
filepath = (yield)
match = regex.search(filepath)
if match is not None:
target.send(filepath)
except GeneratorExit:
target.close()
return
@coroutine
def skip(hashes, target):
while True:
try:
filepath = (yield)
if filepath not in hashes:
target.send(filepath)
except GeneratorExit:
target.close()
return
@coroutine
def hashSink(queue):
while True:
try:
filepath = (yield)
try:
with audioread.audio_open(filepath) as f:
pass
m = hashlib.md5()
for buf in f:
m.update(buf)
queue.put((filepath, m.hexdigest()))
except audioread.DecodeError, IOError:
queue.put((filepath, None))
except GeneratorExit:
queue.put(GeneratorExit)
return
def walk(dirpath, target):
for root, dirs, files in os.walk(dirpath):
for filename in files:
filepath = os.path.sep.join([root, filename])
target.send(filepath)
target.close()
def readHashes(inputFile):
with open(inputFile, 'rb') as f:
return pickle.load(f)
def writeOutput(results, output):
with open(output, 'wb') as f:
pickle.dump(results, f)
class SignalHandler(object):
def __init__(self, output):
self.output = output
self.results = None
self.queue = None
self.consumer_thread = None
self.producers = None
def handler(self, signum, frame):
# signal the producer threads to stop
for producer in self.producers:
producer.stop()
# signal the consumer thread to stop
self.consumer_thread.join()
processed = self.queue.get()
self.results.update(processed)
print 'Writing temporary file {}...'.format(self.getTmpName())
writeOutput(self.results, self.getTmpName())
exit(0)
def getTmpName(self):
return '{}.tmp'.format(self.output)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('dir', metavar='DIR',
help='directory to scan')
parser.add_argument('output',
help='Output filename')
args = parser.parse_args()
sighandler = SignalHandler(args.output)
initial_data = {}
if os.path.exists(sighandler.getTmpName()):
prompt = 'This operation was interrupted, recover from temporary file? (y/N) '
f = lambda : raw_input(prompt)
ans = None
while ans not in ['y', 'n', '']:
ans = f().lower()
if ans == 'y':
initial_data = readHashes(sighandler.getTmpName())
def run_consumer(queue, num_producers):
hashes = {}
exits_seen = 0
while exits_seen < num_producers:
val = queue.get()
if val is GeneratorExit:
exits_seen += 1
continue
filepath, hexdigest = val
hashes[filepath] = hexdigest
queue.put(hashes)
queue = Queue()
num_producer_threads = 10
consumer_thread = Thread(target=run_consumer, args=(queue, num_producer_threads))
consumer_thread.start()
sighandler.results = initial_data
sighandler.consumer_thread = consumer_thread
sighandler.queue = queue
lock = Lock()
pipe = lambda : \
filterFileNames(
re.compile(r'mp3$'),
skip(initial_data.copy(),
broadcast(
sinks.locking_printer(lock),
hashSink(queue)
)
)
)
threaded_pipe = threaded(num_producer_threads, pipe)
sighandler.producers = threaded_pipe
signal.signal(signal.SIGINT, sighandler.handler)
walk(args.dir, threaded_pipe)
while consumer_thread.isAlive():
consumer_thread.join(0.25)
results = queue.get()
results.update(initial_data)
writeOutput(results, args.output)
if __name__ == '__main__':
main()