formula 3.2 from Vitter t: number of processed records k: reservoir size s: number of records to skip t >= k """ if s < k: mult, start = (k / (t - k), 0) if t > k else (k / (t + 1), 1) return reduce(mul, ((t - k + i) / (t + 1 + i) for i in xrange(start,s+1)), mult) else: return reduce(mul, ((t - i) / (t + s - i) for i in xrange(k)), k / (t + s + 1)) def algorithm_R(seq, k): 'take a random sample with k items fom seq' it = iter(seq) reservoir = list(islice(it, k)) for t, item in izip(count(k), it): j = randrange(0, t + 1) if j < k: reservoir[j] = item return reservoir def algorithm_X(seq, k): 'take a random sample with k items fom seq' it = iter(seq) reservoir = list(islice(it, k)) t = k num = 0 while 1: V = random() S, t, num = (0, t + 1, num + 1) quot = num / t while quot > V: S, t, num = (S + 1, t + 1, num + 1) quot *= num / t try: next_item = islice(it, S, S + 1).next() # skip S records reservoir[randrange(0, k)] = next_item except StopIteration: break return reservoir def algorithm_Z(seq, k, T=22): """take a random sample with k items fom seq """ it = iter(seq) reservoir = list(islice(it, k)) t = k tresh = T * k num = 0 while t <= tresh: V = random() S, t, num = (0, t + 1, num + 1) quot = num / t while quot > V: S, t, num = (S + 1, t + 1, num + 1) quot *= num / t try: next_item = islice(it, S, S + 1).next() if next_item: reservoir[randrange(0, k)] = next_item except StopIteration: break W = random() ** (-1/k) term = t - k + 1 while 1: # items in sequence while 1: # rejection sampling loop U = random() X = t * (W - 1) S = int(X) # first try if h(S) / cg(S) > random lhs = (((U *(((t + 1) / term) ** 2)) * (term + S))/ (t + X)) ** (1 / k) rhs = (((t + X) / (term + S)) * term) / t if lhs <= rhs: W = rhs / lhs break # h(S) / cg(S) <= random - now test if f(S) / cg(S) > random y = (((U * (t + 1)) / term) * (t + S + 1)) / (t + X) denom, numer_lim = (t, term + S) if k < S else (t - k + S, t + 1) for numer in xrange(t + S, numer_lim - 1, -1): y *= numer / denom denom -= 1 W = random() ** (-1/k) if y ** (1 / k) <= (t + X) / t: break try: next_item = islice(it, S, S + 1).next() # skip S records reservoir[randrange(0, k)] = next_item t += S + 1 term += S + 1 except StopIteration: break return reservoir def algorithm_Z_naive(seq, k, T=5): it = iter(seq) reservoir = list(islice(it, k)) t = k # do first k*T steps with algorithm R if T > 0: for t, item in islice(izip(count(k), it), k*T): j = randrange(0, t + 1) if j < k: reservoir[j] = item t += 1 while 1: # loop over records in sequence while 1: # rejection sampling loop X = t * (random() ** (-1/k) - 1) # use inverse of G S = int(X) # first try if h(S) / cg(X) >= random cgs = (t + 1) / (t - k + 1) * k / (t + X) * (t / (t + X)) ** k rg = random() * cgs hs = k / (t + 1) * ((t - k + 1) / (t + S - k + 1)) ** (k + 1) if rg <= hs: break # h(S) / cg(X) < random - now test if f(S) / cg(X) >= random if rg <= f(S, k, t): break try: next_item = islice(it, S, S + 1).next() # skip S records reservoir[randrange(0, k)] = next_item t += S + 1 except StopIteration: break return reservoir def big_range(M): 'generates sequence 0,1,2,3,4,5, ... , M-1 with M <= 2147483647' return islice(count(), M) def test_moment(k, counts): 'Test if the distribution in counts (i, n) is uniform' mini = min(i for i,n in counts.iteritems()) maxi = max(i for i,n in counts.iteritems()) exp = sum(((i-mini)/maxi) ** k for i in xrange(mini, maxi+1)) / (maxi + 1) N = sum(n for i, n in counts.iteritems()) t = sum(n * ((i - mini) / maxi) ** k for i, n in counts.iteritems()) / N s = 1 / sqrt(N) form = "actual:%6.3f expected:%6.3f tolerance:%6.3f" assert abs(t - exp) < s, form % (t, exp, s) def test(method, k, M, time=10): """check if all numbers have equal chance to be chosen The records are [0, 1, 2, ....] method: method for Reservoir Sampling k: reservoir size M; total number of records time: length of the test in seconds Test if moments 1-4 of resulting distribution is uniform Make sure that T parameter is set low enough to test algo Z, otherwise only the startup methods are tested """ counts = defaultdict(int) t0 = clock() for i in xrange(100): x = method(big_range(M), k) for xi in x: counts[xi] += 1 N = int(time * 100 / t0 + 0.5) for _ in xrange(N): x = method(big_range(M), k) for xi in x: counts[xi] += 1 print "Test", method.__name__, test_moment(1, counts) test_moment(2, counts) test_moment(3, counts) test_moment(4, counts) print "OK" def do_nothing(seq, k): 'pass through the data and do nothing' it = iter(seq) reservoir = list(islice(it, k)) for record in it: pass return reservoir def benchmark(M=10000000, k=10): print "Benchmark - number of records =", M, ", k =", k for method in [algorithm_R, algorithm_X, algorithm_Z, algorithm_Z_naive, do_nothing]: t0 = clock() if M > 10000000 and method == algorithm_R: continue if M > 100000000 and method == algorithm_X: continue method(big_range(M), k) print " %20s time:%10.3f secs" % (method.__name__, clock() - t0) benchmark(M=100000, k=10) #test(algorithm_Z_naive, 10, 100, 100) print algorithm_R(ascii_lowercase, 3) print algorithm_X(ascii_lowercase, 3) print algorithm_Z(ascii_lowercase, 3) print algorithm_Z_naive(ascii_lowercase, 3) ```
 ```1 2 3 4 5 6 7 8 9 10 ``` ```Benchmark - number of records = 100000 , k = 10 algorithm_R time: 0.160 secs algorithm_X time: 0.040 secs algorithm_Z time: 0.000 secs algorithm_Z_naive time: 0.000 secs do_nothing time: 0.010 secs ['h', 'g', 'l'] ['r', 'b', 'q'] ['r', 'p', 'q'] ['o', 's', 'y'] ```