# -*- coding: utf-8 -*-
"""
Created on Fri Jan 31 13:34:25 2014
@author:Paul Hofstra
http://www.cs.umd.edu/~samir/498/vitter.pdf
Random Sampling with a Reservoir
by JEFFREY SCOTT VITTER
ACM Transactions on Mathematical Software, Vol. 11, No. 1, March 1985.
http://pastebin.com/14LTxtPF
"""
from __future__ import division
from itertools import islice, count, izip
from random import randrange, random
from math import sqrt
from operator import mul
from time import clock
from collections import defaultdict
from string import ascii_lowercase
def f(s, k, t):
"""probabillity function for skip - formula 3.2 from Vitter
t: number of processed records
k: reservoir size
s: number of records to skip
t >= k
"""
if s < k:
mult, start = (k / (t - k), 0) if t > k else (k / (t + 1), 1)
return reduce(mul, ((t - k + i) / (t + 1 + i)
for i in xrange(start,s+1)), mult)
else:
return reduce(mul, ((t - i) / (t + s - i)
for i in xrange(k)), k / (t + s + 1))
def algorithm_R(seq, k):
'take a random sample with k items fom seq'
it = iter(seq)
reservoir = list(islice(it, k))
for t, item in izip(count(k), it):
j = randrange(0, t + 1)
if j < k:
reservoir[j] = item
return reservoir
def algorithm_X(seq, k):
'take a random sample with k items fom seq'
it = iter(seq)
reservoir = list(islice(it, k))
t = k
num = 0
while 1:
V = random()
S, t, num = (0, t + 1, num + 1)
quot = num / t
while quot > V:
S, t, num = (S + 1, t + 1, num + 1)
quot *= num / t
try:
next_item = islice(it, S, S + 1).next() # skip S records
reservoir[randrange(0, k)] = next_item
except StopIteration:
break
return reservoir
def algorithm_Z(seq, k, T=22):
"""take a random sample with k items fom seq
"""
it = iter(seq)
reservoir = list(islice(it, k))
t = k
tresh = T * k
num = 0
while t <= tresh:
V = random()
S, t, num = (0, t + 1, num + 1)
quot = num / t
while quot > V:
S, t, num = (S + 1, t + 1, num + 1)
quot *= num / t
try:
next_item = islice(it, S, S + 1).next()
if next_item:
reservoir[randrange(0, k)] = next_item
except StopIteration:
break
W = random() ** (-1/k)
term = t - k + 1
while 1: # items in sequence
while 1: # rejection sampling loop
U = random()
X = t * (W - 1)
S = int(X)
# first try if h(S) / cg(S) > random
lhs = (((U *(((t + 1) / term) ** 2)) * (term + S))/ (t + X)) ** (1 / k)
rhs = (((t + X) / (term + S)) * term) / t
if lhs <= rhs:
W = rhs / lhs
break
# h(S) / cg(S) <= random - now test if f(S) / cg(S) > random
y = (((U * (t + 1)) / term) * (t + S + 1)) / (t + X)
denom, numer_lim = (t, term + S) if k < S else (t - k + S, t + 1)
for numer in xrange(t + S, numer_lim - 1, -1):
y *= numer / denom
denom -= 1
W = random() ** (-1/k)
if y ** (1 / k) <= (t + X) / t:
break
try:
next_item = islice(it, S, S + 1).next() # skip S records
reservoir[randrange(0, k)] = next_item
t += S + 1
term += S + 1
except StopIteration:
break
return reservoir
def algorithm_Z_naive(seq, k, T=5):
it = iter(seq)
reservoir = list(islice(it, k))
t = k
# do first k*T steps with algorithm R
if T > 0:
for t, item in islice(izip(count(k), it), k*T):
j = randrange(0, t + 1)
if j < k:
reservoir[j] = item
t += 1
while 1: # loop over records in sequence
while 1: # rejection sampling loop
X = t * (random() ** (-1/k) - 1) # use inverse of G
S = int(X)
# first try if h(S) / cg(X) >= random
cgs = (t + 1) / (t - k + 1) * k / (t + X) * (t / (t + X)) ** k
rg = random() * cgs
hs = k / (t + 1) * ((t - k + 1) / (t + S - k + 1)) ** (k + 1)
if rg <= hs:
break
# h(S) / cg(X) < random - now test if f(S) / cg(X) >= random
if rg <= f(S, k, t):
break
try:
next_item = islice(it, S, S + 1).next() # skip S records
reservoir[randrange(0, k)] = next_item
t += S + 1
except StopIteration:
break
return reservoir
def big_range(M):
'generates sequence 0,1,2,3,4,5, ... , M-1 with M <= 2147483647'
return islice(count(), M)
def test_moment(k, counts):
'Test if the distribution in counts (i, n) is uniform'
mini = min(i for i,n in counts.iteritems())
maxi = max(i for i,n in counts.iteritems())
exp = sum(((i-mini)/maxi) ** k for i in xrange(mini, maxi+1)) / (maxi + 1)
N = sum(n for i, n in counts.iteritems())
t = sum(n * ((i - mini) / maxi) ** k for i, n in counts.iteritems()) / N
s = 1 / sqrt(N)
form = "actual:%6.3f expected:%6.3f tolerance:%6.3f"
assert abs(t - exp) < s, form % (t, exp, s)
def test(method, k, M, time=10):
"""check if all numbers have equal chance to be chosen
The records are [0, 1, 2, ....]
method: method for Reservoir Sampling
k: reservoir size
M; total number of records
time: length of the test in seconds
Test if moments 1-4 of resulting distribution is uniform
Make sure that T parameter is set low enough to test algo Z, otherwise
only the startup methods are tested
"""
counts = defaultdict(int)
t0 = clock()
for i in xrange(100):
x = method(big_range(M), k)
for xi in x:
counts[xi] += 1
N = int(time * 100 / t0 + 0.5)
for _ in xrange(N):
x = method(big_range(M), k)
for xi in x:
counts[xi] += 1
print "Test", method.__name__,
test_moment(1, counts)
test_moment(2, counts)
test_moment(3, counts)
test_moment(4, counts)
print "OK"
def do_nothing(seq, k):
'pass through the data and do nothing'
it = iter(seq)
reservoir = list(islice(it, k))
for record in it:
pass
return reservoir
def benchmark(M=10000000, k=10):
print "Benchmark - number of records =", M, ", k =", k
for method in [algorithm_R, algorithm_X, algorithm_Z, algorithm_Z_naive, do_nothing]:
t0 = clock()
if M > 10000000 and method == algorithm_R:
continue
if M > 100000000 and method == algorithm_X:
continue
method(big_range(M), k)
print " %20s time:%10.3f secs" % (method.__name__, clock() - t0)
benchmark(M=100000, k=10)
#test(algorithm_Z_naive, 10, 100, 100)
print algorithm_R(ascii_lowercase, 3)
print algorithm_X(ascii_lowercase, 3)
print algorithm_Z(ascii_lowercase, 3)
print algorithm_Z_naive(ascii_lowercase, 3)