import std.stdio: writeln;
import std.conv: text;
import std.random: uniform, Xorshift;
import std.algorithm: min, max;
import std.parallelism: task;
import core.thread: Thread;
import core.sync.mutex: Mutex;
import core.time: dur;
import core.atomic: atomicLoad, atomicOp, cas;
__gshared uint transfersCount;
final class Buckets(size_t nBuckets) {
alias TBucketValue = uint;
alias Bucket = TBucketValue;
private shared Bucket[nBuckets] buckets;
private bool running;
Mutex mtx;
public this() {
this.running = true;
this.mtx = new Mutex();
foreach (ref b; buckets)
b = uniform(0, 100);
}
public TBucketValue opIndex(in size_t index) const pure nothrow {
return buckets[index];
}
public void transfer(in size_t from, in size_t to,
TBucketValue amount) {
//if (from == to) // Needed?
// return;
mtx.lock();
while (true) {
auto v1 = atomicLoad(buckets[from]);
if (amount > v1)
amount = v1;
if (cas(&buckets[from], v1, v1 - amount)) {
atomicOp!"+="(buckets[to], amount);
break;
}
// Else retry.
}
transfersCount++;
scope(exit)
mtx.unlock();
}
@property size_t length() const pure nothrow {
return this.buckets.length;
}
void toString(in void delegate(const(char)[]) sink) {
TBucketValue total = 0;
mtx.lock();
foreach (ref b; buckets)
total += b;
scope(exit)
mtx.unlock();
sink(text(buckets));
sink(" ");
sink(text(total));
}
}
void randomize(size_t N)(Buckets!N data) {
immutable maxi = data.length - 1;
auto rng = Xorshift(1);
while (data.running) {
immutable i = uniform(0, maxi, rng);
immutable j = uniform(0, maxi, rng);
immutable amount = uniform(0, 20, rng);
data.transfer(i, j, amount);
}
}
void equalize(size_t N)(Buckets!N data) {
immutable maxi = data.length - 1;
auto rng = Xorshift(1);
while (data.running) {
immutable i = uniform(0, maxi, rng);
immutable j = uniform(0, maxi, rng);
immutable a = data[i];
immutable b = data[j];
if (a > b)
data.transfer(i, j, (a - b) / 2);
else
data.transfer(j, i, (b - a) / 2);
}
}
void display(size_t N)(Buckets!N data) {
foreach (immutable _; 0 .. 10) {
writeln(transfersCount, " ", data);
transfersCount = 0;
Thread.sleep(dur!"msecs"(1000));
}
data.running = false;
}
void main() {
writeln("N. transfers, buckets, buckets sum:");
auto data = new Buckets!20();
task!randomize(data).executeInNewThread();
task!equalize(data).executeInNewThread();
task!display(data).executeInNewThread();
}