[ create a new paste ] login | about

Link: http://codepad.org/y1LnjLl0    [ raw code | fork ]

D, pasted on Jan 22:
import std.stdio: writeln;
import std.conv: text;
import std.random: uniform, Xorshift;
import std.algorithm: min, max;
import std.parallelism: task;
import core.thread: Thread;
import core.sync.mutex: Mutex;
import core.time: dur;
import core.atomic: atomicLoad, atomicOp, cas;

__gshared uint transfersCount;

final class Buckets(size_t nBuckets) {
    alias TBucketValue = uint;
    alias Bucket = TBucketValue;

    private shared Bucket[nBuckets] buckets;
    private bool running;
    Mutex mtx;

    public this() {
        this.running = true;
        this.mtx = new Mutex();
        foreach (ref b; buckets)
            b = uniform(0, 100);
    }

    public TBucketValue opIndex(in size_t index) const pure nothrow {
        return buckets[index];
    }

    public void transfer(in size_t from, in size_t to,
                         TBucketValue amount) {
        //if (from == to) // Needed?
        //    return;
        mtx.lock();

        while (true) {
            auto v1 = atomicLoad(buckets[from]);
            if (amount > v1)
                amount = v1;
            if (cas(&buckets[from], v1, v1 - amount)) {
                atomicOp!"+="(buckets[to], amount);
                break;
            }
            // Else retry.
        }

        transfersCount++;
        scope(exit)
            mtx.unlock();
    }

    @property size_t length() const pure nothrow {
        return this.buckets.length;
    }

    void toString(in void delegate(const(char)[]) sink) {
        TBucketValue total = 0;
        mtx.lock();
        foreach (ref b; buckets)
            total += b;

        scope(exit)
            mtx.unlock();

        sink(text(buckets));
        sink(" ");
        sink(text(total));
    }
}

void randomize(size_t N)(Buckets!N data) {
    immutable maxi = data.length - 1;
    auto rng = Xorshift(1);

    while (data.running) {
        immutable i = uniform(0, maxi, rng);
        immutable j = uniform(0, maxi, rng);
        immutable amount = uniform(0, 20, rng);
        data.transfer(i, j, amount);
    }
}

void equalize(size_t N)(Buckets!N data) {
    immutable maxi = data.length - 1;
    auto rng = Xorshift(1);

    while (data.running) {
        immutable i = uniform(0, maxi, rng);
        immutable j = uniform(0, maxi, rng);
        immutable a = data[i];
        immutable b = data[j];
        if (a > b)
            data.transfer(i, j, (a - b) / 2);
        else
            data.transfer(j, i, (b - a) / 2);
    }
}

void display(size_t N)(Buckets!N data) {
    foreach (immutable _; 0 .. 10) {
        writeln(transfersCount, " ", data);
        transfersCount = 0;
        Thread.sleep(dur!"msecs"(1000));
    }
    data.running = false;
}

void main() {
    writeln("N. transfers, buckets, buckets sum:");
    auto data = new Buckets!20();
    task!randomize(data).executeInNewThread();
    task!equalize(data).executeInNewThread();
    task!display(data).executeInNewThread();
}


Create a new paste based on this one


Comments: