// hypothetical C# async sequencing
interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<out T> GetEnumerator(CancellationToken token);
}
interface IAsyncEnumerator<out T>
{
Task<bool> MoveNext();
T Current { get };
}
interface IAsyncObservable<out T>
{
IDisposable Subscribe(IAsyncObserver<in T>);
}
interface IAsyncObserver<in T>
{
Task OnNext(T element);
void OnCompleted();
void OnError(Exception error);
}
static IAsyncObservable<T> ToAO(this IAsyncEnumerable<T> seq)
{
return AsyncObservable.Create(observer => {
var tc = new CancellationTokenSource();
var token = tc.Token;
Task.Run(async () => {
Exception e = null;
try {
if (token.IsCancellationRequested)
return;
foreach (var item in seq with token) {
if (token.IsCancellationRequested)
return;
await observer.OnNext(item);
}
}
catch (Exception err) {
e = err;
}
if (e != null) {
observer.OnError(e);
}
else {
observer.OnCompleted();
}
});
return Disposable.Create(tc.Cancel);
});
}
static async(with token) IAsyncEnumerable<T> ToAE(this IAsyncObservable<T> seq)
{
T value;
var tcs = new TaskCompletionSource<int>();
var sconsumer = new SemaphoreSlim();
var sproducer = new SemaphoreSlim();
var disposable = seq.Subscribe(
item => {
value = item;
await WaitHandle.SignalAndWaitAsync(sconsumer, sproducer);
},
() => tcs.TrySetValue(0),
tcs.TrySetException
);
using (token.Register(() => tcs.TrySetCancelled()))
{
var completion = tcs.Task;
if (completion != await Task.WhenAny(sconsumer.WaitAsync(), competion))
{
do {
yield value;
} while(completion != await Task.WhenAny(
WaitHandle.SignalAndWaitAsync(sproducer, sconsumer),
completion
));
}
}
}