```1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 ``` ```# -*- coding: utf-8 -*- """ Created on Fri Jan 31 13:34:25 2014 @author:Paul Hofstra http://www.cs.umd.edu/~samir/498/vitter.pdf Random Sampling with a Reservoir by JEFFREY SCOTT VITTER ACM Transactions on Mathematical Software, Vol. 11, No. 1, March 1985. http://pastebin.com/14LTxtPF """ from __future__ import division from itertools import islice, count, izip from random import randrange, random from math import sqrt from operator import mul from time import clock from collections import defaultdict from string import ascii_lowercase def f(s, k, t): """probabillity function for skip - formula 3.2 from Vitter t: number of processed records k: reservoir size s: number of records to skip t >= k """ if s < k: mult, start = (k / (t - k), 0) if t > k else (k / (t + 1), 1) return reduce(mul, ((t - k + i) / (t + 1 + i) for i in xrange(start,s+1)), mult) else: return reduce(mul, ((t - i) / (t + s - i) for i in xrange(k)), k / (t + s + 1)) def algorithm_R(seq, k): 'take a random sample with k items fom seq' it = iter(seq) reservoir = list(islice(it, k)) for t, item in izip(count(k), it): j = randrange(0, t + 1) if j < k: reservoir[j] = item return reservoir def algorithm_X(seq, k): 'take a random sample with k items fom seq' it = iter(seq) reservoir = list(islice(it, k)) t = k num = 0 while 1: V = random() S, t, num = (0, t + 1, num + 1) quot = num / t while quot > V: S, t, num = (S + 1, t + 1, num + 1) quot *= num / t try: next_item = islice(it, S, S + 1).next() # skip S records reservoir[randrange(0, k)] = next_item except StopIteration: break return reservoir def algorithm_Z(seq, k, T=22): """take a random sample with k items fom seq """ it = iter(seq) reservoir = list(islice(it, k)) t = k tresh = T * k num = 0 while t <= tresh: V = random() S, t, num = (0, t + 1, num + 1) quot = num / t while quot > V: S, t, num = (S + 1, t + 1, num + 1) quot *= num / t try: next_item = islice(it, S, S + 1).next() if next_item: reservoir[randrange(0, k)] = next_item except StopIteration: break W = random() ** (-1/k) term = t - k + 1 while 1: # items in sequence while 1: # rejection sampling loop U = random() X = t * (W - 1) S = int(X) # first try if h(S) / cg(S) > random lhs = (((U *(((t + 1) / term) ** 2)) * (term + S))/ (t + X)) ** (1 / k) rhs = (((t + X) / (term + S)) * term) / t if lhs <= rhs: W = rhs / lhs break # h(S) / cg(S) <= random - now test if f(S) / cg(S) > random y = (((U * (t + 1)) / term) * (t + S + 1)) / (t + X) denom, numer_lim = (t, term + S) if k < S else (t - k + S, t + 1) for numer in xrange(t + S, numer_lim - 1, -1): y *= numer / denom denom -= 1 W = random() ** (-1/k) if y ** (1 / k) <= (t + X) / t: break try: next_item = islice(it, S, S + 1).next() # skip S records reservoir[randrange(0, k)] = next_item t += S + 1 term += S + 1 except StopIteration: break return reservoir def algorithm_Z_naive(seq, k, T=5): it = iter(seq) reservoir = list(islice(it, k)) t = k # do first k*T steps with algorithm R if T > 0: for t, item in islice(izip(count(k), it), k*T): j = randrange(0, t + 1) if j < k: reservoir[j] = item t += 1 while 1: # loop over records in sequence while 1: # rejection sampling loop X = t * (random() ** (-1/k) - 1) # use inverse of G S = int(X) # first try if h(S) / cg(X) >= random cgs = (t + 1) / (t - k + 1) * k / (t + X) * (t / (t + X)) ** k rg = random() * cgs hs = k / (t + 1) * ((t - k + 1) / (t + S - k + 1)) ** (k + 1) if rg <= hs: break # h(S) / cg(X) < random - now test if f(S) / cg(X) >= random if rg <= f(S, k, t): break try: next_item = islice(it, S, S + 1).next() # skip S records reservoir[randrange(0, k)] = next_item t += S + 1 except StopIteration: break return reservoir def big_range(M): 'generates sequence 0,1,2,3,4,5, ... , M-1 with M <= 2147483647' return islice(count(), M) def test_moment(k, counts): 'Test if the distribution in counts (i, n) is uniform' mini = min(i for i,n in counts.iteritems()) maxi = max(i for i,n in counts.iteritems()) exp = sum(((i-mini)/maxi) ** k for i in xrange(mini, maxi+1)) / (maxi + 1) N = sum(n for i, n in counts.iteritems()) t = sum(n * ((i - mini) / maxi) ** k for i, n in counts.iteritems()) / N s = 1 / sqrt(N) form = "actual:%6.3f expected:%6.3f tolerance:%6.3f" assert abs(t - exp) < s, form % (t, exp, s) def test(method, k, M, time=10): """check if all numbers have equal chance to be chosen The records are [0, 1, 2, ....] method: method for Reservoir Sampling k: reservoir size M; total number of records time: length of the test in seconds Test if moments 1-4 of resulting distribution is uniform Make sure that T parameter is set low enough to test algo Z, otherwise only the startup methods are tested """ counts = defaultdict(int) t0 = clock() for i in xrange(100): x = method(big_range(M), k) for xi in x: counts[xi] += 1 N = int(time * 100 / t0 + 0.5) for _ in xrange(N): x = method(big_range(M), k) for xi in x: counts[xi] += 1 print "Test", method.__name__, test_moment(1, counts) test_moment(2, counts) test_moment(3, counts) test_moment(4, counts) print "OK" def do_nothing(seq, k): 'pass through the data and do nothing' it = iter(seq) reservoir = list(islice(it, k)) for record in it: pass return reservoir def benchmark(M=10000000, k=10): print "Benchmark - number of records =", M, ", k =", k for method in [algorithm_R, algorithm_X, algorithm_Z, algorithm_Z_naive, do_nothing]: t0 = clock() if M > 10000000 and method == algorithm_R: continue if M > 100000000 and method == algorithm_X: continue method(big_range(M), k) print " %20s time:%10.3f secs" % (method.__name__, clock() - t0) benchmark(M=100000, k=10) #test(algorithm_Z_naive, 10, 100, 100) print algorithm_R(ascii_lowercase, 3) print algorithm_X(ascii_lowercase, 3) print algorithm_Z(ascii_lowercase, 3) print algorithm_Z_naive(ascii_lowercase, 3) ```
 ```1 2 3 4 5 6 7 8 9 10 ``` ```Benchmark - number of records = 100000 , k = 10 algorithm_R time: 0.160 secs algorithm_X time: 0.040 secs algorithm_Z time: 0.000 secs algorithm_Z_naive time: 0.000 secs do_nothing time: 0.010 secs ['h', 'g', 'l'] ['r', 'b', 'q'] ['r', 'p', 'q'] ['o', 's', 'y'] ```