[ create a new paste ] login | about

Link: http://codepad.org/3NCRtugH    [ raw code | output | fork ]

ploffie - Python, pasted on Feb 13:
# -*- coding: utf-8 -*-
"""
Created on Fri Jan 31 13:34:25 2014

@author:Paul Hofstra

http://www.cs.umd.edu/~samir/498/vitter.pdf
Random Sampling with a Reservoir
by JEFFREY SCOTT VITTER
ACM Transactions on Mathematical Software, Vol. 11, No. 1, March 1985.

http://pastebin.com/14LTxtPF
"""
from __future__ import division

from itertools import islice, count, izip
from random import randrange, random
from math import sqrt
from operator import mul
from time import clock
from collections import defaultdict    

def f(s, k, t):
    """probabillity function for skip - formula 3.2 from Vitter
       t: number of processed records 
       k: reservoir size
       s: number of records to skip
       t >= k
    """
    if s < k:
        mult, start = (k / (t - k), 0) if t > k else (k / (t + 1), 1)
        return reduce(mul, ((t - k + i) / (t + 1 + i) 
            for i in xrange(start,s+1)), mult)
    else:
        return reduce(mul, ((t - i) / (t + s - i) 
            for i in xrange(k)), k / (t + s + 1))

def algorithm_R(seq, k):
    'take a random sample with k items fom seq'
    it = iter(seq)
    reservoir = list(islice(it, k))
    for t, item in izip(count(k), it):
        j = randrange(0, t + 1) 
        if j < k:
            reservoir[j] = item
    return reservoir

def algorithm_X(seq, k):
    'take a random sample with k items fom seq'
    it = iter(seq)
    reservoir = list(islice(it, k))
    t = k
    num = 0
    while 1:
        V = random()
        S, t, num = (0, t + 1, num + 1)
        quot = num / t
        while quot > V:
            S, t, num = (S + 1, t + 1, num + 1)
            quot *= num / t
        try:
            next_item = islice(it, S, S + 1).next() # skip S records
            reservoir[randrange(0, k)] = next_item
        except StopIteration:
            break        
    return reservoir
    
def algorithm_Z(seq, k, T=22):
    """take a random sample with k items fom seq
    """
    it = iter(seq)
    reservoir = list(islice(it, k))
    t = k
    tresh = T * k
    num = 0
    while t <= tresh:
        V = random()
        S, t, num = (0, t + 1, num + 1)
        quot = num / t
        while quot > V:
            S, t, num = (S + 1, t + 1, num + 1)
            quot *= num / t
        try:
            next_item = islice(it, S, S + 1).next()
            if next_item:
                reservoir[randrange(0, k)] = next_item
        except StopIteration:
            break
    W = random() ** (-1/k)
    term = t - k + 1
    while 1:      # items in sequence
        while 1:  # rejection sampling loop
            U = random()
            X = t * (W - 1)
            S = int(X)
            # first try if h(S) / cg(S) > random
            lhs = (((U *(((t + 1) / term) ** 2)) * (term + S))/ (t + X)) ** (1 / k)
            rhs = (((t + X) / (term + S)) * term) / t
            if lhs <= rhs:
                W = rhs / lhs
                break
            # h(S) / cg(S) <= random - now test if f(S) / cg(S) > random
            y = (((U * (t + 1)) / term) * (t + S + 1)) / (t + X)
            denom, numer_lim = (t, term + S) if k < S else (t - k + S, t + 1)
            for numer in xrange(t + S, numer_lim - 1, -1):
                y *= numer / denom
                denom -= 1
            W = random() ** (-1/k)            
            if y ** (1 / k) <= (t + X) / t:
                break
        try:
            next_item = islice(it, S, S + 1).next() # skip S records
            reservoir[randrange(0, k)] = next_item
            t += S + 1
            term += S + 1
        except StopIteration:
            break        
    return reservoir

def algorithm_Z_naive(seq, k, T=5):
    it = iter(seq)
    reservoir = list(islice(it, k))
    t = k
    # do first k*T steps with algorithm R
    if T > 0:
        for t, item in islice(izip(count(k), it), k*T):
            j = randrange(0, t + 1)
            if j < k:
                reservoir[j] = item
        t += 1
    while 1:      # loop over records in sequence
        while 1:  # rejection sampling loop
            X = t * (random() ** (-1/k) - 1) # use inverse of G 
            S = int(X)  
            # first try if h(S) / cg(X) >= random
            cgs = (t + 1) / (t - k + 1) * k / (t + X) * (t / (t + X)) ** k
            rg = random() * cgs
            hs = k / (t + 1) * ((t - k + 1) / (t + S - k + 1)) ** (k + 1)
            if rg <= hs:
                break
            # h(S) / cg(X) < random - now test if f(S) / cg(X) >= random
            if rg <= f(S, k, t):
                break
        try:
            next_item = islice(it, S, S + 1).next() # skip S records
            reservoir[randrange(0, k)] = next_item
            t += S + 1
        except StopIteration:
            break        
    return reservoir
    
def big_range(M):
    'generates sequence 0,1,2,3,4,5, ... , M-1 with M <= 2147483647'
    return islice(count(), M)
    
def test_moment(k, counts):
    'Test if the distribution in counts (i, n) is uniform'
    mini = min(i for i,n in counts.iteritems())
    maxi = max(i for i,n in counts.iteritems())
    exp = sum(((i-mini)/maxi) ** k for i in xrange(mini, maxi+1)) / (maxi + 1)
    N = sum(n for i, n in counts.iteritems())
    t = sum(n * ((i - mini) / maxi) ** k for i, n in counts.iteritems()) / N
    s = 1 / sqrt(N)
    form = "actual:%6.3f expected:%6.3f tolerance:%6.3f"
    assert abs(t - exp) < s, form % (t, exp, s)
    
def test(method, k, M, time=10):
    """check if all numbers have equal chance to be chosen
       The records are [0, 1, 2, ....]    
       method: method for Reservoir Sampling
       k: reservoir size
       M; total number of records
       time: length of the test in seconds
       Test if moments 1-4 of resulting distribution is uniform
       Make sure that T parameter is set low enough to test algo Z, otherwise
          only the startup methods are tested
    """
    counts = defaultdict(int)
    t0 = clock()
    for i in xrange(100):
        x = method(big_range(M), k)
        for xi in x:
            counts[xi] += 1
    N = int(time * 100 / t0 + 0.5)
    for _ in xrange(N):
        x = method(big_range(M), k)
        for xi in x:
            counts[xi] += 1
    print "Test", method.__name__, 
    test_moment(1, counts)
    test_moment(2, counts)
    test_moment(3, counts)
    test_moment(4, counts)  
    print "OK"
    
def do_nothing(seq, k):
    'pass through the data and do nothing'
    it = iter(seq)
    reservoir = list(islice(it, k))
    for record in it:
        pass
    return reservoir

def benchmark(M=10000000, k=10):
    print "Benchmark - number of records =", M, ", k =", k
    for method in [algorithm_R, algorithm_X, algorithm_Z, algorithm_Z_naive, do_nothing]:
        t0 = clock()
        if M > 10000000 and method == algorithm_R:
            continue
        if M > 100000000 and method == algorithm_X:
            continue
        method(big_range(M), k)        
        print " %20s time:%10.3f secs" % (method.__name__, clock() - t0)

benchmark(M=100000, k=10)
#test(algorithm_Z_naive, 10, 100, 100)
        


Output:
1
2
3
4
5
6
Benchmark - number of records = 100000 , k = 10
          algorithm_R time:     0.160 secs
          algorithm_X time:     0.030 secs
          algorithm_Z time:     0.010 secs
    algorithm_Z_naive time:     0.000 secs
           do_nothing time:     0.000 secs


Create a new paste based on this one


Comments: