非同期ストリームを扱うときはRxとIx-Asyncを使い分ける

非同期ストリーム(=非同期シーケンス)を扱うならRx(System.Reactive)となりがちな昨今、Ix-Async(System.Interactive.Async)も道具箱に入れておいて適切に使い分けるのがいいのではという話。

Rxの解説において、IEnumerable<T>はpull型でIObservable<T>はpush型、あるいはIObservable<T>は時間軸上のシーケンスであると説明される*1。改めてdotnet/reactiveから表を引用して復習する。

Single return value Multiple return values
Pull/Synchronous/Interactive T IEnumerable<T>
Push/Asynchronous/Reactive Task<T> IObservable<T>

そして、ここには載っていないのがPull/Asynchronous/InteractiveなIAsyncEnumerable<T>であり、2018年現在はSystem.Interactive.AsyncというNuGetパッケージで配布されている(以前はIx-Asyncという名前だった)。現在はdotnet/reactive/Ix.NETホスティングされている。

IAsyncEnumerable<T>は非同期ストリームと呼ばれ、C# 8.0に関連シンタックスと合わせて組み込まれる予定で開発が進められている。

Champion "Async Streams" (including async disposable) (16.3, Core 3) · Issue #43 · dotnet/csharplang · GitHub
C# 8.0プレビュー

非同期ストリームについて、IEnumerable<Task<T>>IAsyncEnumerable<T>の違いなどはこちらが参考になる。

c# - Can I await an enumerable I create with a generator? - Stack Overflow

IAsyncEnumerable<T>の何が嬉しいのか

1. pull型の表現力

何よりも、問題の非同期ストリームがコンテキストにおいてpull型/push型どちらで認識されているのかが重要である。例えば一連のイベントは発生するもの=push型であるため一般にIObservable<T>で表現される。一方、gRPCのserver-side streaming RPCではサーバへのリクエストに対して連続的な応答が返ってくるのを取り出す=pull型でありIAsyncEnumerable<T>で表現されている(実際はIAsyncEnumerator<T>を継承したIAsyncStreamReader<T>なので、自分でイテレータを回すかIAsyncEnumerableに変換して読みだすことになる)。コンテキストにおけるpull型/push型の認識を無視してしまうと表現力に乏しくバグを埋め込みやすく保守性の悪いコードになってしまう可能性がある。

嬉しいことに、IObservable<T>IAsyncEnumerable<T>にはそれぞれToAsyncEnumerable()ToObservable()が存在し相互に変換できる。つまりpull型/push型の両モデルを接続することができる。例えば上記のserver-side streaming RPCで受け取った非同期ストリームをIObservable<T>に変換して消費できる。これにより例えば、クライアントのドメインモデル層ではIObservable<T>のイベントストリームとして扱う一方で、インフラストラクチャ層ではserver-side streaming RPCを利用してそこではpull型で扱うといったことができ、それぞれのコンテキストにおいて的確な表現が可能である。


2020/4/3追記

gRPCの最新ではSystems.Interactive.Asyncへの依存が削除されているため、本記事とは状況が少々異なる。新しいIAsyncStreamReader<T>IAsyncEnumerable<T>へ変換して扱うのが良さそう。

https://github.com/cactuaroid/GrpcWpfSample/blob/master/GrpcWpfSample.Client/Model/IAsyncStreamReaderExtensions.cs


2. 逐次処理との親和性

push型のIObservable<T>は消費側は反応するだけであるため並列処理との親和性が高く、pull型のIAsyncEnumerable<T>は消費側から1つずつ列挙するため逐次処理との親和性が高い。例えば、async lambda(async () => await ...という書き方のラムダ式)を受け取って逐次処理する拡張メソッドを実装するとRxでは直観的でないコードになる。一方でIx-Asyncでは素直なコードになる。なお、Ix-Asyncではこういったasync lambdaを引数に取る拡張メソッドが用意されているべきであり、いくつもIssueがある*2C# 8.0にて用意される見込みとして保留されている。


2020/4/3追記

こちらを参照してください。

cactuaroid.hatenablog.com


3. 学習コスト

正直、Rxは難しい。pushとはいえHotとColdやSubscribe()時の挙動といったpull的なことも意識しないといけないし、いつ何が行われるかイメージしづらくデバッグも難しいし、例外処理にC#標準のtry/catchではない書き方が複数あるなど、学習コストがかなり高い。これは大人数チーム・人の入れ替わりの多いチーム・外注委託の可能性がある場合などには深刻な問題である。一方でIAsyncEnumerable<T>IEnumerable<T>の延長で考えられるため、Rxに比べればかなり簡単である。async/awaitは非同期プログラミングにもはや必須であるが、それさえ分かっていれば導入障壁はさほど高くない。Rxがバッチリハマる場面以外について、pullモデルで設計するのは現実的な選択肢になりうる。

Chunky/Chatty問題

詳しくは以下に書かれているが、要はIAsyncEnumerable<T>は裏でいっぱいTaskを使うのでオーバーヘッドが大きく、IObservable<T>の方が軽いという話。パフォーマンスについて考慮が必要な場合には注意が必要である。ただし、C# 7.0でValueTaskが登場したおかげでオーバーヘッドを劇的に改善可能なようでC# 8.0以降でIAsyncEnumerable<T>が組み込まれた後は特に問題とはならないだろう。(というか逆に、ValueTaskのおかげで標準搭載の前提が整った*3

Is IAsyncEnumerator dead?