`````` ```# -*- coding: utf-8 -*- """ Created on Fri Jan 31 13:34:25 2014 @author:Paul Hofstra http://www.cs.umd.edu/~samir/498/vitter.pdf Random Sampling with a Reservoir by JEFFREY SCOTT VITTER ACM Transactions on Mathematical Software, Vol. 11, No. 1, March 1985. http://pastebin.com/14LTxtPF """ from __future__ import division from itertools import islice, count, izip from random import randrange, random from math import sqrt from operator import mul from time import clock from collections import defaultdict from string import ascii_lowercase def f(s, k, t): """probabillity function for skip - formula 3.2 from Vitter t: number of processed records k: reservoir size s: number of records to skip t >= k """ if s < k: mult, start = (k / (t - k), 0) if t > k else (k / (t + 1), 1) return reduce(mul, ((t - k + i) / (t + 1 + i) for i in xrange(start,s+1)), mult) else: return reduce(mul, ((t - i) / (t + s - i) for i in xrange(k)), k / (t + s + 1)) def algorithm_R(seq, k): 'take a random sample with k items fom seq' it = iter(seq) reservoir = list(islice(it, k)) for t, item in izip(count(k), it): j = randrange(0, t + 1) if j < k: reservoir[j] = item return reservoir def algorithm_X(seq, k): 'take a random sample with k items fom seq' it = iter(seq) reservoir = list(islice(it, k)) t = k num = 0 while 1: V = random() S, t, num = (0, t + 1, num + 1) quot = num / t while quot > V: S, t, num = (S + 1, t + 1, num + 1) quot *= num / t try: next_item = islice(it, S, S + 1).next() # skip S records reservoir[randrange(0, k)] = next_item except StopIteration: break return reservoir def algorithm_Z(seq, k, T=22): """take a random sample with k items fom seq """ it = iter(seq) reservoir = list(islice(it, k)) t = k tresh = T * k num = 0 while t <= tresh: V = random() S, t, num = (0, t + 1, num + 1) quot = num / t while quot > V: S, t, num = (S + 1, t + 1, num + 1) quot *= num / t try: next_item = islice(it, S, S + 1).next() if next_item: reservoir[randrange(0, k)] = next_item except StopIteration: break W = random() ** (-1/k) term = t - k + 1 while 1: # items in sequence while 1: # rejection sampling loop U = random() X = t * (W - 1) S = int(X) # first try if h(S) / cg(S) > random lhs = (((U *(((t + 1) / term) ** 2)) * (term + S))/ (t + X)) ** (1 / k) rhs = (((t + X) / (term + S)) * term) / t if lhs <= rhs: W = rhs / lhs break # h(S) / cg(S) <= random - now test if f(S) / cg(S) > random y = (((U * (t + 1)) / term) * (t + S + 1)) / (t + X) denom, numer_lim = (t, term + S) if k < S else (t - k + S, t + 1) for numer in xrange(t + S, numer_lim - 1, -1): y *= numer / denom denom -= 1 W = random() ** (-1/k) if y ** (1 / k) <= (t + X) / t: break try: next_item = islice(it, S, S + 1).next() # skip S records reservoir[randrange(0, k)] = next_item t += S + 1 term += S + 1 except StopIteration: break return reservoir def algorithm_Z_naive(seq, k, T=5): it = iter(seq) reservoir = list(islice(it, k)) t = k # do first k*T steps with algorithm R if T > 0: for t, item in islice(izip(count(k), it), k*T): j = randrange(0, t + 1) if j < k: reservoir[j] = item t += 1 while 1: # loop over records in sequence while 1: # rejection sampling loop X = t * (random() ** (-1/k) - 1) # use inverse of G S = int(X) # first try if h(S) / cg(X) >= random cgs = (t + 1) / (t - k + 1) * k / (t + X) * (t / (t + X)) ** k rg = random() * cgs hs = k / (t + 1) * ((t - k + 1) / (t + S - k + 1)) ** (k + 1) if rg <= hs: break # h(S) / cg(X) < random - now test if f(S) / cg(X) >= random if rg <= f(S, k, t): break try: next_item = islice(it, S, S + 1).next() # skip S records reservoir[randrange(0, k)] = next_item t += S + 1 except StopIteration: break return reservoir def big_range(M): 'generates sequence 0,1,2,3,4,5, ... , M-1 with M <= 2147483647' return islice(count(), M) def test_moment(k, counts): 'Test if the distribution in counts (i, n) is uniform' mini = min(i for i,n in counts.iteritems()) maxi = max(i for i,n in counts.iteritems()) exp = sum(((i-mini)/maxi) ** k for i in xrange(mini, maxi+1)) / (maxi + 1) N = sum(n for i, n in counts.iteritems()) t = sum(n * ((i - mini) / maxi) ** k for i, n in counts.iteritems()) / N s = 1 / sqrt(N) form = "actual:%6.3f expected:%6.3f tolerance:%6.3f" assert abs(t - exp) < s, form % (t, exp, s) def test(method, k, M, time=10): """check if all numbers have equal chance to be chosen The records are [0, 1, 2, ....] method: method for Reservoir Sampling k: reservoir size M; total number of records time: length of the test in seconds Test if moments 1-4 of resulting distribution is uniform Make sure that T parameter is set low enough to test algo Z, otherwise only the startup methods are tested """ counts = defaultdict(int) t0 = clock() for i in xrange(100): x = method(big_range(M), k) for xi in x: counts[xi] += 1 N = int(time * 100 / t0 + 0.5) for _ in xrange(N): x = method(big_range(M), k) for xi in x: counts[xi] += 1 print "Test", method.__name__, test_moment(1, counts) test_moment(2, counts) test_moment(3, counts) test_moment(4, counts) print "OK" def do_nothing(seq, k): 'pass through the data and do nothing' it = iter(seq) reservoir = list(islice(it, k)) for record in it: pass return reservoir def benchmark(M=10000000, k=10): print "Benchmark - number of records =", M, ", k =", k for method in [algorithm_R, algorithm_X, algorithm_Z, algorithm_Z_naive, do_nothing]: t0 = clock() if M > 10000000 and method == algorithm_R: continue if M > 100000000 and method == algorithm_X: continue method(big_range(M), k) print " %20s time:%10.3f secs" % (method.__name__, clock() - t0) benchmark(M=100000, k=10) #test(algorithm_Z_naive, 10, 100, 100) print algorithm_R(ascii_lowercase, 3) print algorithm_X(ascii_lowercase, 3) print algorithm_Z(ascii_lowercase, 3) print algorithm_Z_naive(ascii_lowercase, 3) ```
 ```1 2 3 4 5 6 7 8 9 10 ``` ```Benchmark - number of records = 100000 , k = 10 algorithm_R time: 0.160 secs algorithm_X time: 0.040 secs algorithm_Z time: 0.000 secs algorithm_Z_naive time: 0.000 secs do_nothing time: 0.010 secs ['h', 'g', 'l'] ['r', 'b', 'q'] ['r', 'p', 'q'] ['o', 's', 'y'] ```