notebook

都内でWEB系エンジニアやってます。

GCPのCloud Workflowsを試す

ちょうど気になっていたので使ってみる

Workflows

ワークフロー  |  Google Cloud

cloud.google.com

サーバレスでHTTPベースのAPIサービスを組み合わせてワークフローを定義できる

外部APIも対応しているので色々できることはありそう

目次

こんな感じで試していく

  • まず最小のサンプルを動かす
  • デプロイ時にサービスアカウントをひもづける
  • workflowsの標準ライブラリ
    • sys.get_env:環境変数の取得
    • sys.log:ログの送信
    • sys.now
  • conditional switch
  • Arrayによる反復処理
  • ワークフロー実行時に引数を渡す
  • サブワークフローを使う
  • エラーハンドリング

まず最小のサンプルを動かす

  • sample.workflows.yml
- getMessage:
    call: http.get
    args:
      url: https://jsonplaceholder.typicode.com/todos/1
    result: todo1
- returnValue:
    return: ${todo1.body}
  • デプロイ
$ gcloud workflows deploy sample --source=sample.workflow.yml
  • 実行
$ gcloud workflows run sample
WARNING: The default location(us-central1) was used since the location flag was not specified.
Waiting for execution [424cd37e-73f2-423a-aaaa-b953a797bafb] to complete...done.
argument: 'null'
endTime: '2021-03-25T19:25:05.905174478Z'
name: projects/111111111111/locations/us-central1/workflows/sample/executions/424cd37e-73f2-423a-aaaa-b953a797bafb
result: '{"completed":false,"id":1,"title":"delectus aut autem","userId":1}'
startTime: '2021-03-25T19:25:05.556254961Z'
state: SUCCEEDED
workflowRevisionId: 000003-5de

resultで最終的な結果が確認できる

stateでワークフロー自体のステータスが確認できる

GUIで見てみると

f:id:swfz:20210406194537p:plain

流れが可視化されているのが分かる

2021-03-19時点の状態だと、GUIで編集するとリアルタイムで可視化の箇所も変更されるのでわかりやすい

GUIで編集したい感あるがデバッグ等含めるとCUIでデプロイ→実行→ログなどが必要な場合のみGUIでログ確認の方がやりやすかった

実行時間が長いワークフローなどに関してはexecute + executions describeの組み合わせで実行命令の発行と結果の確認という形で分けることもできる

デプロイ時にサービスアカウントをひもづける

ワークフローの実行時のサービスアカウントを設定する

$ gcloud workflows deploy sample --source=sample.workflow.yml --service-account=sample-project@sample-project-111111.iam.gserviceaccount.com

どのサービスアカウントがひも付いているかはGUI上の詳細から確認できる

gcloud workflows describeでも確認できる

デフォルトは次のような形式のサービスアカウントが割り当てられる

projects/sample-project-111111/serviceAccounts/111111111111-compute@developer.gserviceaccount.com

ワークフロー内でコネクタを使って他のリソースにアクセスしたりする場合には意識する必要がある

Workflowsの標準ライブラリ

Standard library overview  |  ワークフロー  |  Google Cloud

cloud.google.com

便利関数やエラータイプなどワークフローを組むときに使うであろうライブラリの一覧が載っている

http,sys,text,json,encode,decodeなど

これ見つけるのにもなんか時間かかった、どこからたどれば良いんだ…

sys.get_env 環境変数の取得

Function: sys.get_env  |  ワークフロー  |  Google Cloud

cloud.google.com

PROJECT_IDやPROJECT_NUMBERなどを取得できる

コネクタで使用する際にPROJECTに関する情報が必要になることがあるのでそういう場合に使う

- assign_value:
    assign:
      - project: ${sys.get_env("GOOGLE_CLOUD_PROJECT_NUMBER")}

sys.log ログの送信

Cloud Logging へのログの送信  |  ワークフロー  |  Google Cloud

cloud.google.com

ログが見たい場合は自分で送る必要がある

  • log-output.workflow.yml
main:
  steps:
    - getMessage:
        call: http.get
        args:
          url: https://jsonplaceholder.typicode.com/todos/1
        result: todo1
    - log_raw:
        call: sys.log
        args:
          text: ${todo1}
          severity: INFO
    - log_encoded:
        call: sys.log
        args:
          text: ${json.encode_to_string(todo1)}
          severity: WARNING
    - log_with_text:
        call: sys.log
        args:
          text: ${"ERROR " + json.encode_to_string(todo1)}
          severity: CRITICAL
    - returnValue:
        return: ${todo1}

severityにはINFOWARNINGCRITICALの3つが指定できる

見ての通りcall: sys.logで必要項目を渡すことでワークフローの詳細ページのログのタブで参照できるようになる

実際にログを見るとこんな感じで出力される

f:id:swfz:20210406194543p:plain

ドキュメントにはdictを出力する場合json.encode_to_stringを使用してくださいと書いてあったが特に指定せずとも見えるっぽい

sys.now

  • now.workflow.yml
main:
  steps:
    - assign_value:
        assign:
          - now: ${sys.now()}
    - return_value:
        return: ${now}
result: '1616751871.7443395'

使い所としては

コネクタのサンプル  |  ワークフロー  |  Google Cloud

のCloudTasksへ投げるときのスケジュール設定に書いてあった

            scheduleTime:
              seconds: int(sys.now() + 60*60*24)

サンプルだと24時間後に実行してくださいねという設定をするために使用している

conditional switch

ワークフローでの実行順序の制御  |  Google Cloud

cloud.google.com

conditionの式の内容によって次に実行するステップを指定する

直接stepsを記述して特定条件のときにこういうフローで処理しますという書き方もできる(埋め込みステップ)

サンプル ワークフロー  |  Google Cloud

cloud.google.com

これはドキュメントのサンプル実行すればなんとなく分かるのでスキップ

Arrayによる反復処理

Syntax reference  |  ワークフロー  |  Google Cloud

cloud.google.com

条件付きジャンプ、assign、len関数を組み合わせて配列の反復処理を行える

サンプルコードだとcheck_conditionステップでarrayの要素数をiと比較し分岐

iterateステップでassignを用いてiをインクリメントして再代入しcheck_conditionへジャンプ

という流れで反復処理を実現している

少し工夫すればfiltermapのような処理が行えるかと思ったがassignreturnの式の中で配列を生成するような方法が見つけられなかったため現状だとできなそう

assignreturnの式の中で配列、辞書を生成するパターン以外であれば(数値の足し上げや文字列の追加など)switchと合わせてつかえば可能

  • iterate-sum.workflow.yml
main:
  steps:
    - define:
        assign:
          - numbers: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
          - sum: 0
          - i: 0

    - check_condition:
        switch:
          - condition: ${len(numbers) > i}
            next: iterate
        next: exit_loop

    - iterate:
        switch:
          - condition: ${numbers[i] % 2 == 0}
            steps:
              - append:
                  assign:
                    - sum: ${sum + numbers[i]}
        next: iterate-increment

    - iterate-increment:
        assign:
          - i: ${i+1}
        next: check_condition

    - exit_loop:
        return:
            result: ${sum}

    - return_value:
        return: ${sum}

失敗事例

直接map,filter関数を呼んだらLambda式で使う:の前後にダブルクオートを挟まれて文字列として扱われてしまった

- define:
    assign:
      - numbers: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
- return_value:
    return: ${filter(lambda x: x % 2 == 0, numbers)}
result: '{"${filter(lambda x":"x % 2 == 0, numbers)}"}'

他にも色々試してみたがうまくいく方法が見つけられず…

ワークフロー実行時に引数を渡す

Syntax reference  |  ワークフロー  |  Google Cloud

cloud.google.com

最小であれば次のドキュメント通りにやってみるとわかりやすい

実行リクエストでのランタイム引数渡し  |  ワークフロー  |  Google Cloud

cloud.google.com

paramsで受け取った引数に名前をつける

あとは式の中で変数を使うのと同等に扱える

サブワークフローを使う

一連のステップをまとめてワークフー内から呼び出せるようにする

ドキュメントにもあるがプログラミングでいう関数と同様な考え方で書ける

サブワークフローの作成と使用  |  Google Cloud

cloud.google.comcloud.google.com

  • subworkflow.workflow.yml
main:
  steps:
    - call_subworkflow1:
        call: sum
        args:
          n1: 1
          n2: 1
        result: total1

    - call_subworkflow2:
        call: sum
        args:
          n1: 2
          n2: 2
        result: total2


    - return_value:
        return: ${total1 + total2}

sum:
  params: [n1, n2]
  steps:
    - return_value:
        return: ${n1 + n2}
result: '6'

mainワークフローのみからしか呼べないということはなく、サブワークフローからさらにサブワークフローを呼ぶことも可能

エラーハンドリング

ドキュメントはこの辺

Syntax reference  |  ワークフロー  |  Google Cloud

cloud.google.com

あえて存在しないURLにアクセスするワークフローを書いてみる

シングルステップのエラーハンドリング

  • exception-singlestep.workflow.yml
main:
  steps:
    - request:
        try:
          call: http.get
          args:
            url: https://jsonplaceholder.typicode.com/todos/aa
          result: response
        except:
          as: e
          steps:
            - log_error:
                call: sys.log
                args:
                  text: ${json.encode_to_string(e)}
                  severity: CRITICAL
            - raise_error:
                raise: ${e}

    - return_value:
        return: ${response}

ドキュメントではレスポンスコードによって返却する値を変えているがサンプルでは「エラーが起きたら必ずログに残してraiseする」だけにしてみた

実際にWorkflowsを使うことを考えた場合

例外により途中で処理が止まってしまってもログに残らなかったのでログから検知できるようにするため

f:id:swfz:20210406194550p:plain

ログにエラー内容が出力された

複数ステップまたいだエラーハンドリング

複数ステップをまたいだハンドリングも行えるようなので雑に全体をtryで囲ってどこで例外が発生したか検知させたい

しかし次のようにmain直下、step直下にtryは置けない

  • sample.workflow.yml
main:
  try:
    steps:
    .....
ERROR: (gcloud.workflows.deploy) [INVALID_ARGUMENT] main.yaml:2:3: parse error: in workflow 'main': missing 'steps'
  try:
  ^

main.yaml:2:3: parse error: in workflow 'main': unexpected entries: 'try', 'except'
  try:
  ^

ので全ステップで例外を捕捉したい場合は1つステップをかませてあげればよい

main:
  steps:
    - multi_request:
        try:
          steps:
            - request1:
                call: http.get
                args:
                  url: https://jsonplaceholder.typicode.com/todos/1
                result: response1
            - request2:
                call: http.get
                args:
                  url: https://jsonplaceholder.typicode.com/todos/aa
                result: response2
            - return_value:
                return: ${response2}

        except:
          as: e
          steps:
            - log_error:
                call: sys.log
                args:
                  text: ${json.encode_to_string(e)}
                  severity: CRITICAL
            - raise_error:
                raise: ${e}

こんな感じ

サンプルだと複数ステップ間に依存がないので微妙だがやりたいことは実現できた

サンプル

他にも色々サンプルがあるのでやはり公式を見に行くのが早い

サンプル ワークフロー  |  Google Cloud

cloud.google.com

所感

色々やったらめちゃくちゃ感想がでてきたw

  • 標準ライブラリで使える関数が少ない

    • どうしても自由度に欠ける
    • APIたたいて返ってきた値をちょっとだけ手加えて次のAPIにわたすみたいなのはよくありそうなのでそれが現状できない(方法を見つけられていない)
  • やはりシェルコマンドもしくはPythonのメソッド使いたい

    • コンセプトと違うかもしれないが使えるのと使えないので便利さぜんぜん違う
    • 日付系の処理とかもFunction作って投げるのは大仰感ある
    • sys.now()はあるんだからもう少しよしなにやらしてくれーと感じた
    • Pythonのメソッドでも良いけど何が使えて何が使えないのかは知りたいのでリストがほしい(これも見つけられなかった)
    • とりあえず試してみた感じ
      • 使える
        • len
        • string
      • 使えない
        • compile
        • datetime
        • eval
        • slice
        • array.append
        • re
  • ドキュメントがあまり整備されてないイメージ

    • ナビゲーションからたどり着けなかったり
    • 検索したら出てくるとか結構あった
    • どうでも良いけどキャメルケースだったりスネークケースだったりごっちゃになっていてやりづらい…
    • せめて統一してくれーという感じ、それともルールがある?
  • サブワークフロー使いやすい

    • サブワークフローからさらにサブワークフローを呼ぶみたいなことができる
    • この辺は普通のプログラミングでメソッド呼ぶのと同様な感覚で書ける
  • 運用に入ったら開発たいへんそう

    • 連携するAPIすべてを用意しないと確認できない

あくまでマイクロサービスの文脈で各サービス間の関係をとりなすためのものっていうイメージを受けた

リトライ処理などは試していないが結構しっかりしていそう

個人的にはBigQueryのコネクタとか来ないかなーなんて思っていて、SQL渡して実行みたいなワークフローが実現できるのではないかと思っている

まぁその役割はComposerやDataformで十分でしょという話もあるが…

サーバレスでこのあたりまかなえるなら結構優秀なのではと思ったので期待を持ちつつwatchしておこうと思う

次回以降で実際にいくつかのAPIを組み合わせて処理をさせるようなワークフローを組んでみる

今回のサンプルの定義ファイルは以下においた

swfz/cloud-workflows-samples

github.com