ちょうど気になっていたので使ってみる
Workflows
サーバレスでHTTPベースのAPIサービスを組み合わせてワークフローを定義できる
外部APIも対応しているので色々できることはありそう
目次
こんな感じで試していく
- まず最小のサンプルを動かす
- デプロイ時にサービスアカウントをひもづける
- workflowsの標準ライブラリ
- sys.get_env:環境変数の取得
- sys.log:ログの送信
- sys.now
- conditional switch
- Arrayによる反復処理
- ワークフロー実行時に引数を渡す
- サブワークフローを使う
- エラーハンドリング
まず最小のサンプルを動かす
- sample.workflows.yml
- getMessage: call: http.get args: url: https://jsonplaceholder.typicode.com/todos/1 result: todo1 - returnValue: return: ${todo1.body}
- デプロイ
$ gcloud workflows deploy sample --source=sample.workflow.yml
- 実行
$ gcloud workflows run sample WARNING: The default location(us-central1) was used since the location flag was not specified. Waiting for execution [424cd37e-73f2-423a-aaaa-b953a797bafb] to complete...done. argument: 'null' endTime: '2021-03-25T19:25:05.905174478Z' name: projects/111111111111/locations/us-central1/workflows/sample/executions/424cd37e-73f2-423a-aaaa-b953a797bafb result: '{"completed":false,"id":1,"title":"delectus aut autem","userId":1}' startTime: '2021-03-25T19:25:05.556254961Z' state: SUCCEEDED workflowRevisionId: 000003-5de
result
で最終的な結果が確認できる
state
でワークフロー自体のステータスが確認できる
GUIで見てみると
流れが可視化されているのが分かる
2021-03-19時点の状態だと、GUIで編集するとリアルタイムで可視化
の箇所も変更されるのでわかりやすい
GUIで編集したい感あるがデバッグ等含めるとCUIでデプロイ→実行→ログなどが必要な場合のみGUIでログ確認の方がやりやすかった
実行時間が長いワークフローなどに関してはexecute
+ executions describe
の組み合わせで実行命令の発行と結果の確認という形で分けることもできる
デプロイ時にサービスアカウントをひもづける
ワークフローの実行時のサービスアカウントを設定する
$ gcloud workflows deploy sample --source=sample.workflow.yml --service-account=sample-project@sample-project-111111.iam.gserviceaccount.com
どのサービスアカウントがひも付いているかはGUI上の詳細
から確認できる
gcloud workflows describe
でも確認できる
デフォルトは次のような形式のサービスアカウントが割り当てられる
projects/sample-project-111111/serviceAccounts/111111111111-compute@developer.gserviceaccount.com
ワークフロー内でコネクタを使って他のリソースにアクセスしたりする場合には意識する必要がある
Workflowsの標準ライブラリ
Standard library overview | ワークフロー | Google Cloud
便利関数やエラータイプなどワークフローを組むときに使うであろうライブラリの一覧が載っている
http
,sys
,text
,json
,encode
,decode
など
これ見つけるのにもなんか時間かかった、どこからたどれば良いんだ…
sys.get_env 環境変数の取得
Function: sys.get_env | ワークフロー | Google Cloud
PROJECT_IDやPROJECT_NUMBERなどを取得できる
コネクタで使用する際にPROJECTに関する情報が必要になることがあるのでそういう場合に使う
- 例
- assign_value: assign: - project: ${sys.get_env("GOOGLE_CLOUD_PROJECT_NUMBER")}
sys.log ログの送信
Cloud Logging へのログの送信 | ワークフロー | Google Cloud
ログが見たい場合は自分で送る必要がある
- log-output.workflow.yml
main: steps: - getMessage: call: http.get args: url: https://jsonplaceholder.typicode.com/todos/1 result: todo1 - log_raw: call: sys.log args: text: ${todo1} severity: INFO - log_encoded: call: sys.log args: text: ${json.encode_to_string(todo1)} severity: WARNING - log_with_text: call: sys.log args: text: ${"ERROR " + json.encode_to_string(todo1)} severity: CRITICAL - returnValue: return: ${todo1}
severityにはINFO
、WARNING
、CRITICAL
の3つが指定できる
見ての通りcall: sys.log
で必要項目を渡すことでワークフローの詳細
ページのログ
のタブで参照できるようになる
実際にログを見るとこんな感じで出力される
ドキュメントにはdictを出力する場合json.encode_to_string
を使用してくださいと書いてあったが特に指定せずとも見えるっぽい
sys.now
- now.workflow.yml
main: steps: - assign_value: assign: - now: ${sys.now()} - return_value: return: ${now}
result: '1616751871.7443395'
使い所としては
コネクタのサンプル | ワークフロー | Google Cloud
のCloudTasksへ投げるときのスケジュール設定に書いてあった
scheduleTime: seconds: int(sys.now() + 60*60*24)
サンプルだと24時間後に実行してくださいねという設定をするために使用している
conditional switch
ワークフローでの実行順序の制御 | Google Cloud
conditionの式の内容によって次に実行するステップを指定する
直接steps
を記述して特定条件のときにこういうフローで処理しますという書き方もできる(埋め込みステップ)
これはドキュメントのサンプル実行すればなんとなく分かるのでスキップ
Arrayによる反復処理
Syntax reference | ワークフロー | Google Cloud
条件付きジャンプ、assign、len関数を組み合わせて配列の反復処理を行える
サンプルコードだとcheck_condition
ステップでarray
の要素数をi
と比較し分岐
iterate
ステップでassign
を用いてi
をインクリメントして再代入しcheck_condition
へジャンプ
という流れで反復処理を実現している
少し工夫すればfilter
やmap
のような処理が行えるかと思ったがassign
やreturn
の式の中で配列を生成するような方法が見つけられなかったため現状だとできなそう
assign
やreturn
の式の中で配列、辞書を生成するパターン以外であれば(数値の足し上げや文字列の追加など)switchと合わせてつかえば可能
- iterate-sum.workflow.yml
main: steps: - define: assign: - numbers: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] - sum: 0 - i: 0 - check_condition: switch: - condition: ${len(numbers) > i} next: iterate next: exit_loop - iterate: switch: - condition: ${numbers[i] % 2 == 0} steps: - append: assign: - sum: ${sum + numbers[i]} next: iterate-increment - iterate-increment: assign: - i: ${i+1} next: check_condition - exit_loop: return: result: ${sum} - return_value: return: ${sum}
失敗事例
直接map
,filter
関数を呼んだらLambda式で使う:
の前後にダブルクオートを挟まれて文字列として扱われてしまった
- define: assign: - numbers: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] - return_value: return: ${filter(lambda x: x % 2 == 0, numbers)}
result: '{"${filter(lambda x":"x % 2 == 0, numbers)}"}'
他にも色々試してみたがうまくいく方法が見つけられず…
ワークフロー実行時に引数を渡す
Syntax reference | ワークフロー | Google Cloud
最小であれば次のドキュメント通りにやってみるとわかりやすい
実行リクエストでのランタイム引数渡し | ワークフロー | Google Cloud
params
で受け取った引数に名前をつける
あとは式の中で変数を使うのと同等に扱える
サブワークフローを使う
一連のステップをまとめてワークフー内から呼び出せるようにする
ドキュメントにもあるがプログラミングでいう関数と同様な考え方で書ける
cloud.google.comcloud.google.com
- subworkflow.workflow.yml
main: steps: - call_subworkflow1: call: sum args: n1: 1 n2: 1 result: total1 - call_subworkflow2: call: sum args: n1: 2 n2: 2 result: total2 - return_value: return: ${total1 + total2} sum: params: [n1, n2] steps: - return_value: return: ${n1 + n2}
result: '6'
mainワークフローのみからしか呼べないということはなく、サブワークフローからさらにサブワークフローを呼ぶことも可能
エラーハンドリング
ドキュメントはこの辺
Syntax reference | ワークフロー | Google Cloud
あえて存在しないURLにアクセスするワークフローを書いてみる
シングルステップのエラーハンドリング
- exception-singlestep.workflow.yml
main: steps: - request: try: call: http.get args: url: https://jsonplaceholder.typicode.com/todos/aa result: response except: as: e steps: - log_error: call: sys.log args: text: ${json.encode_to_string(e)} severity: CRITICAL - raise_error: raise: ${e} - return_value: return: ${response}
ドキュメントではレスポンスコードによって返却する値を変えているがサンプルでは「エラーが起きたら必ずログに残してraiseする」だけにしてみた
実際にWorkflowsを使うことを考えた場合
例外により途中で処理が止まってしまってもログに残らなかったのでログから検知できるようにするため
ログにエラー内容が出力された
複数ステップまたいだエラーハンドリング
複数ステップをまたいだハンドリングも行えるようなので雑に全体をtryで囲ってどこで例外が発生したか検知させたい
しかし次のようにmain直下、step直下にtryは置けない
- sample.workflow.yml
main: try: steps: .....
ERROR: (gcloud.workflows.deploy) [INVALID_ARGUMENT] main.yaml:2:3: parse error: in workflow 'main': missing 'steps' try: ^ main.yaml:2:3: parse error: in workflow 'main': unexpected entries: 'try', 'except' try: ^
ので全ステップで例外を捕捉したい場合は1つステップをかませてあげればよい
main: steps: - multi_request: try: steps: - request1: call: http.get args: url: https://jsonplaceholder.typicode.com/todos/1 result: response1 - request2: call: http.get args: url: https://jsonplaceholder.typicode.com/todos/aa result: response2 - return_value: return: ${response2} except: as: e steps: - log_error: call: sys.log args: text: ${json.encode_to_string(e)} severity: CRITICAL - raise_error: raise: ${e}
こんな感じ
サンプルだと複数ステップ間に依存がないので微妙だがやりたいことは実現できた
サンプル
他にも色々サンプルがあるのでやはり公式を見に行くのが早い
所感
色々やったらめちゃくちゃ感想がでてきたw
標準ライブラリで使える関数が少ない
- どうしても自由度に欠ける
- APIたたいて返ってきた値をちょっとだけ手加えて次のAPIにわたすみたいなのはよくありそうなのでそれが現状できない(方法を見つけられていない)
やはりシェルコマンドもしくはPythonのメソッド使いたい
- コンセプトと違うかもしれないが使えるのと使えないので便利さぜんぜん違う
- 日付系の処理とかもFunction作って投げるのは大仰感ある
sys.now()
はあるんだからもう少しよしなにやらしてくれーと感じた- Pythonのメソッドでも良いけど何が使えて何が使えないのかは知りたいのでリストがほしい(これも見つけられなかった)
- とりあえず試してみた感じ
- 使える
- len
- string
- 使えない
- compile
- datetime
- eval
- slice
- array.append
- re
- 使える
ドキュメントがあまり整備されてないイメージ
- ナビゲーションからたどり着けなかったり
- 検索したら出てくるとか結構あった
- どうでも良いけどキャメルケースだったりスネークケースだったりごっちゃになっていてやりづらい…
- せめて統一してくれーという感じ、それともルールがある?
サブワークフロー使いやすい
- サブワークフローからさらにサブワークフローを呼ぶみたいなことができる
- この辺は普通のプログラミングでメソッド呼ぶのと同様な感覚で書ける
運用に入ったら開発たいへんそう
- 連携するAPIすべてを用意しないと確認できない
あくまでマイクロサービスの文脈で各サービス間の関係をとりなすためのものっていうイメージを受けた
リトライ処理などは試していないが結構しっかりしていそう
個人的にはBigQueryのコネクタとか来ないかなーなんて思っていて、SQL渡して実行みたいなワークフローが実現できるのではないかと思っている
まぁその役割はComposerやDataformで十分でしょという話もあるが…
サーバレスでこのあたりまかなえるなら結構優秀なのではと思ったので期待を持ちつつwatchしておこうと思う
次回以降で実際にいくつかのAPIを組み合わせて処理をさせるようなワークフローを組んでみる
今回のサンプルの定義ファイルは以下においた