[ create a new paste ] login | about

Link: http://codepad.org/dWFxBBR1    [ raw code | fork ]

aaronla - C, pasted on Oct 24:
// hypothetical C# async sequencing 

interface IAsyncEnumerable<out T>
{
  IAsyncEnumerator<out T> GetEnumerator(CancellationToken token);
}
interface IAsyncEnumerator<out T>
{
  Task<bool> MoveNext();
  T Current { get };
}

interface IAsyncObservable<out T>
{
  IDisposable Subscribe(IAsyncObserver<in T>);
}
interface IAsyncObserver<in T>
{
  Task OnNext(T element);
  void OnCompleted();
  void OnError(Exception error);
}

static IAsyncObservable<T> ToAO(this IAsyncEnumerable<T> seq) 
{
  return AsyncObservable.Create(observer => {
    var tc = new CancellationTokenSource();
    var token = tc.Token;
    Task.Run(async () => {
      Exception e = null;
      try {
        if (token.IsCancellationRequested) 
          return;
        foreach (var item in seq with token) {
          if (token.IsCancellationRequested) 
            return;
          await observer.OnNext(item);
        }
      }
      catch (Exception err) {
        e = err;
      }
      if (e != null) {
        observer.OnError(e);
      }
      else {
        observer.OnCompleted();
      }
    });
    return Disposable.Create(tc.Cancel);
  });
}

static async(with token) IAsyncEnumerable<T> ToAE(this IAsyncObservable<T> seq) 
{
  T value;
  var tcs = new TaskCompletionSource<int>();
  
  var sconsumer = new SemaphoreSlim();
  var sproducer = new SemaphoreSlim();
  var disposable = seq.Subscribe(
    item => {
      value = item;
      await WaitHandle.SignalAndWaitAsync(sconsumer, sproducer);
    },
    () => tcs.TrySetValue(0), 
    tcs.TrySetException
  );
  using (token.Register(() => tcs.TrySetCancelled()))
  {
    var completion = tcs.Task;

    if (completion != await Task.WhenAny(sconsumer.WaitAsync(), competion))
    {
      do {
        yield value;
      } while(completion != await Task.WhenAny(
        WaitHandle.SignalAndWaitAsync(sproducer, sconsumer),
        completion
      ));
    }
  }
}
        

      


Create a new paste based on this one


Comments: