notebook

都内でWEB系エンジニアやってます。

rxjsでpollingする

rxjs難しいですね

時間軸を変えたりキャッシュをしたり定期的に何かしたりなど。。。結構な数のオペレータがあるので組み合わせ次第で色々できるでしょう

今回は管理画面上のレポート(csvなど)をダウンロードするときの処理をrxjsでスマート()にしてみたいと思います

要件としては生成処理が重めのCSVを落としたい、といった感じ

普通に実装したらリクエストをwebサーバが受けて計算したりフォーマットしたりしてCSVを生成して返すって感じですね

今担当のサービスは性質上webサーバ(rails)のスペックは高くないがバックエンド(resque)のスペックは高いのでそちらで処理させて(rails)側のサーバにはなるべく負担をかけない方法をとりたいです

なのでサーバではCSV生成のリクエストだけを受けてバックエンドで非同期にCSVを生成してCSVが生成されるまでフロント側でポーリングしてCSVができるのを待つ

といった方法で実装しました

流れはこんな感じ(plantuml使ってみたかっただけ)

f:id:swfz:20180130025222p:plain

環境

  • angular5.1
  • Rails5.1

実装

結局実装はこんな感じになりました(フロント側だけ)

  private initialize() {
    // サーバのレスポンス想定
    this.job = {
      id: '6e1bbc60-a142-4eea-8170-52128b7a3e79'
    };
    // サーバのレスポンス想定
    this.httpResponses = [
      { status: 'progress' },
      { status: 'progress' },
      { status: 'progress' },
      { url: 'http://example.com/aaa.csv' }
    ];
  }

  download() {
    this.initialize();
    return this.kickJob().pipe(concatMap(job => this.polling(job.id)));
  }

  private kickJob(): Observable<any> {
    // 本来はサーバへ通信
    return Observable.of(this.job);
  }

  private getStatus(id): Observable<any> {
    // 本来はサーバへ通信
    return Observable.of(this.httpResponses.shift());
  }

  private polling(id: string): Observable<any> {
    return Observable.interval(2000).pipe(
      concatMap(() => this.getStatus(id)),
      concatMap(this.convertResponse),
      tap(this.downloadFile),
      takeWhile(this.inProgress),
      map(res => res.status)
    );
  }

  private convertResponse(res): Observable<any> {
    if (res.url) {
      return Observable.of(Object.assign(res, { status: 'progress' }), {
        status: 'done'
      });
    }
    return Observable.of(res);
  }

  private inProgress(res) {
    return res.status === 'progress';
  }

  private downloadFile(res) {
    if (res['url']) {
      // location.href = res.url;
      console.log('download now!');
    }
  }

実際にサーバに通信するところはサンプルなので想定の値を書いてあります

ダウンロードの箇所もコメントアウトしてあります

これだけでした

使っているオペレータを見ていきます

  • concatMap

値は直列に流すため順番が前後することはない(mergeMap,switchMapと違う)

値を受け取って別のObservableを流したりすることができる

ポーリングのリクエストでファイルが生成されたらurlに値が入るようにしたのでそれを判断基準にしてファイルが生成されたら次に流すObservableを変えてあげる

なのでURLが作られた段階で後続で処理を終わらせられるように適切なObservableに書き換える{status: 'done'}

  • takeWhile

callbackが返す真偽値が偽になるまで値を次に流す

polling中は{status: 'progress'}なので値が流れる。

それ以外の値が流れてきたらcompleteを流して終了する

  • tap

lettable(pipable)operatorの場合はtap

doと同じ

副作用のある操作などをtapで行い受けたストリームを次へそのまま流す、といった使い方ができるよう

サンプルだとログとるとかそういうのがあるみたいです

今回はurlが含まれていればダウンロード(location.hrefをCSVのURLにする)する処理をはさみました

きっかけ

https://stackoverflow.com/questions/46728922/long-polling-in-angular-4

最初↑をみて簡単そうだなと思って実装してみたがどうも要件とあっていない

takeWhileでlast valueが流れない

https://stackoverflow.com/questions/44641246/rxjs-takewhile-but-include-the-last-value

takeWhileで偽を返した際に流れてきた値を流してほしい→それを使って処理したい

といった感じだったのでtakeWhileだけだと要件を満たせなかったので結局色々繋げることになりました

まとめ

rxjsはangularで使ってるとはいってもだいたいがhttp clientのレスポンスが自動的にObservableで返ってくるので一応使ってる、といった感じだったので今回色々調べられて勉強になった

オペレータ全て覚えるのは難しいかもしれないが使いこなせれば相当な武器になると思うので少しづつ勉強していきたい