codepad
[
create a new paste
]
login
|
about
Language:
C
C++
D
Haskell
Lua
OCaml
PHP
Perl
Plain Text
Python
Ruby
Scheme
Tcl
// hypothetical C# async sequencing interface IAsyncEnumerable<out T> { IAsyncEnumerator<out T> GetEnumerator(CancellationToken token); } interface IAsyncEnumerator<out T> { Task<bool> MoveNext(); T Current { get }; } interface IAsyncObservable<out T> { IDisposable Subscribe(IAsyncObserver<in T>); } interface IAsyncObserver<in T> { Task OnNext(T element); void OnCompleted(); void OnError(Exception error); } static IAsyncObservable<T> ToAO(this IAsyncEnumerable<T> seq) { return AsyncObservable.Create(observer => { var tc = new CancellationTokenSource(); var token = tc.Token; Task.Run(async () => { Exception e = null; try { if (token.IsCancellationRequested) return; foreach (var item in seq with token) { if (token.IsCancellationRequested) return; await observer.OnNext(item); } } catch (Exception err) { e = err; } if (e != null) { observer.OnError(e); } else { observer.OnCompleted(); } }); return Disposable.Create(tc.Cancel); }); } static async(with token) IAsyncEnumerable<T> ToAE(this IAsyncObservable<T> seq) { T value; var tcs = new TaskCompletionSource<int>(); var sconsumer = new SemaphoreSlim(); var sproducer = new SemaphoreSlim(); var disposable = seq.Subscribe( item => { value = item; await WaitHandle.SignalAndWaitAsync(sconsumer, sproducer); }, () => tcs.TrySetValue(0), tcs.TrySetException ); using (token.Register(() => tcs.TrySetCancelled())) { var completion = tcs.Task; if (completion != await Task.WhenAny(sconsumer.WaitAsync(), competion)) { do { yield value; } while(completion != await Task.WhenAny( WaitHandle.SignalAndWaitAsync(sproducer, sconsumer), completion )); } } }
Private
[
?
]
Run code
Submit