// resumable functions and Rx
template <class T>
void ImperativeGenerator(
function<task(Subject<T>&)> producer,
function<void(IObservable<T>&)> consumer)
{
auto run = [=] -> task resumable {
Subject<T> sub;
consumer();
try { await producer(sub); }
catch (exception& e) { sub->onerror(e); }
};
run(); // task intentionally discarded.
}
ImperativeGenerator<string>(
[](Subject& sub) -> task resumable {
while (cin) {
var line = await getline_async(cin);
sub.on_next(line);
}
},
[] (IObservable<string>& source) {
source.subscribe([](const string& line) {
cout << line;
}
});