Google CloudのDataformがGAになり旧版のDataformも使えなくなるとのアナウンスがあったので、いくつか個人で作っていたプロジェクトを移行し始めた
DataformのパイプラインをAPI経由で実行している部分も移行したためそのときに調べたことなどのメモ
パイプラインの実行までの流れ
前提の説明
自分が作っているDataformプロジェクトは小さいプロジェクトが多く、処理の起点がWorkflowsなどGoogleCloudのサービスを使わずGitHub Actionsで済ませてしまう場合が多い
具体的にはActionsで次のようなフローでデータマートまでの生成を行っていた
- データ取得
- GCSへ配置
- bq load
- dataformの実行
これ以降はdataformの実行
をAPI経由でするための色々を残していく
スケジュール実行
まずスケジュール実行のドキュメントを読んだ
Workflows と Cloud Scheduler を使用して実行をスケジュールする | Dataform | Google Cloud
コンパイル結果をカスタマイズして実行する場合は2フェーズ必要
コンパイルとコンパイル結果を用いた実行の2つ
compilationResults(コンパイル)
- ここで、オーバーライド設定で設定できる項目をオーバーライドできるっぽい
- なのでスキーマ、データベース、テーブルのオーバーライドができる
- ということは環境分離ができる
- 確認用にプロジェクト分けて、とかデータセット分けて、テーブル分けてとかそういう使い方ができる
workflowInvocations(コンパイル結果を用いた実行)
- 1フェーズ目でコンパイルした結果の成果物を用いてパイプラインを実行する
サンプルはWorkflows + Schedulerでのワークフロー構成の実行だがたたいているのはREST APIなのでたたく側は何でも良い
REST APIをたたいてみる
ドキュメントと、REST APIのドキュメントもみながら
以下$TOKEN
, $PROJECT_ID
, $LOCATION
, $REPOSITORY_ID
は事前に環境変数をセットしている前提
TOKENは次のように生成した
TOKEN=$(gcloud auth print-access-token)
compilationResults
GIT_COMMITISH="main" curl -X POST -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \ "https://dataform.googleapis.com/v1beta1/projects/${PROJECT_ID}/locations/${LOCATION}/repositories/${REPOSITORY_ID}/compilationResults" \ -d @- <<EOS { "gitCommitish": "${GIT_COMMITISH}" } EOS
- 結果
{ "name": "projects/project-111111/locations/asia-northeast1/repositories/github-projects/compilationResults/8b7ea6a7-1934-4d15-9e6c-bb49c6a86dc1", "gitCommitish": "master", "codeCompilationConfig": { "defaultDatabase": "project-111111", "defaultSchema": "github_projects_dataform", "assertionSchema": "github_projects_dataform_assertions", "defaultLocation": "asia-northeast1" }, "dataformCoreVersion": "2.4.2", "resolvedGitCommitSha": "8588f2b317541a9a3a971909fe2436f833825d6c" }
ただこのエンドポイント、それなりの確率でエラーが返ってくる…
- エラー例
{ "error": { "code": 400, "message": "Package installation timed out.", "status": "FAILED_PRECONDITION" } }
インストール時にタイムアウト…
workflowInvocation
コンパイルしたSQLクエリを実行する
こちらは非同期らしい…
なので実行の発行、と実行が終わったか確認するためポーリングする
コンパイル時の実行結果name
の値をcompilationResult
パラメータに指定する
curl -X POST -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \ "https://dataform.googleapis.com/v1beta1/projects/${PROJECT_ID}/locations/${LOCATION}/repositories/${REPOSITORY_ID}/workflowInvocations" \ -d @- <<EOS { "compilationResult": "projects/project-111111/locations/asia-northeast1/repositories/github-projects/compilationResults/8b7ea6a7-1934-4d15-9e6c-bb49c6a86dc1" } EOS
- 結果
{ "name": "projects/project-111111/locations/asia-northeast1/repositories/github-projects/workflowInvocations/1694225665-b6a53ba5-b625-40e4-bb10-5295e9bc6f1f", "compilationResult": "projects/111111111111/locations/asia-northeast1/repositories/github-projects/compilationResults/8b7ea6a7-1934-4d15-9e6c-bb49c6a86dc1", "state": "RUNNING", "invocationTiming": { "startTime": "2023-09-09T02:14:25.398717Z" } }
RUNNINGで返ってきたので実行は開始したよう
実行結果の取得
curl -X GET -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \ "https://dataform.googleapis.com/v1beta1/projects/${PROJECT_ID}/locations/${LOCATION}/repositories/${REPOSITORY_ID}/workflowInvocations/1694225665-b6a53ba5-b625-40e4-bb10-5295e9bc6f1f"
実行のIDと思われる値をつけて実行結果を確認する
基本的にURLのprojects
以下の値が前段で実行するAPIの結果に載っているのでそれを使ってあげればよい
- 成功時のレスポンス
{ "name": "projects/project-111111/locations/asia-northeast1/repositories/github-projects/workflowInvocations/1694225665-b6a53ba5-b625-40e4-bb10-5295e9bc6f1f", "compilationResult": "projects/111111111111/locations/asia-northeast1/repositories/github-projects/compilationResults/8b7ea6a7-1934-4d15-9e6c-bb49c6a86dc1", "state": "SUCCEEDED", "invocationTiming": { "startTime": "2023-09-09T02:14:25.398717Z", "endTime": "2023-09-09T02:14:51.545026865Z" } }
- ステータスのリスト
これ見てステータスによって判断すればポーリングは書ける
- 雑なポーリングの例
TOKEN=$(gcloud auth print-access-token) LOCATION="asia-northeast1" REPOSITORY_ID="sample" GIT_COMMITISH="main" compile() { compilation=$(curl -X POST -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \ "https://dataform.googleapis.com/v1beta1/projects/${GOOGLE_PROJECT}/locations/${LOCATION}/repositories/${REPOSITORY_ID}/compilationResults" \ -d @- <<EOS { "gitCommitish": "${GIT_COMMITISH}" } EOS ) echo ${compilation} | jq -r '.name' } invocation() { compilation_result=$1 invocation=$( curl -X POST -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \ "https://dataform.googleapis.com/v1beta1/projects/${GOOGLE_PROJECT}/locations/${LOCATION}/repositories/${REPOSITORY_ID}/workflowInvocations" \ -d @- <<EOS { "compilationResult": "${compilation_result}" } EOS ) echo ${invocation} | jq -r '.name' } polling() { invocation_name=$1 invocation=$(curl -X GET -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \ "https://dataform.googleapis.com/v1beta1/${invocation_name}") status=$(echo ${invocation} | jq -r '.state') case "$status" in "FAILED" ) echo "FAILED Invocation." exit 1 ;; "SUCCEEDED" ) echo "SUCCESS Invocation." ;; "RUNNING" ) echo "RUNNING. retry wait 10 seconds..." sleep 10 polling ${invocation_name} esac } compiled_name=$(compile) invocation=$(invocation ${compiled_name}) polling ${invocation}
最初pollingに渡す引数invocation
は実行リクエスト発行時のレスポンス(結果JSON)のname
の値を入れる
自分の場合だいたい2回~3回目くらいで成功レスポンスが返ってくるくらいの感じだった
query
実行内容の詳細を表示したい場合は
実行結果の取得APIの末尾に
/1694225665-b6a53ba5-b625-40e4-bb10-5295e9bc6f1f:query
というように:query
をつけてあげると詳細情報を閲覧できる
curl -X GET -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \ "https://dataform.googleapis.com/v1beta1/projects/${PROJECT_ID}/locations/${LOCATION}/repositories/${REPOSITORY_ID}/workflowInvocations/1694225665-b6a53ba5-b625-40e4-bb10-5295e9bc6f1f:query"
- 結果
{ "workflowInvocationActions": [ { "target": { "database": "project-111111", "schema": "github_projects_dataform", "name": "flattened_items" }, "canonicalTarget": { "database": "project-111111", "schema": "github_projects_dataform", "name": "flattened_items" }, "state": "SUCCEEDED", "invocationTiming": { "startTime": "2023-09-09T02:14:32.150109726Z", "endTime": "2023-09-09T02:14:35.801821363Z" }, "bigqueryAction": { "sqlScript": " BEGIN\n CREATE SCHEMA IF NOT EXISTS `project-111111.github_projects_dataform` OPTIONS(location=\"asia-northeast1\");\nEXCEPTION WHEN ERROR THEN\n IF NOT CONTAINS_SUBSTR(@@error.message, \"already exists: dataset\") AND\n NOT CONTAINS_SUBSTR(@@error.message, \"too many dataset metadata update operations\") AND\n NOT CONTAINS_SUBSTR(@@error.message, \"User does not have bigquery.datasets.create permission\")\n THEN\n RAISE USING MESSAGE = @@error.message;\n END IF;\nEND;\n BEGIN\n DECLARE dataform_table_type DEFAULT (\n SELECT ANY_VALUE(table_type)\n FROM `project-111111.github_projects_dataform.INFORMATION_SCHEMA.TABLES`\n WHERE table_name = 'flattened_items'\n);\n IF dataform_table_type IS NOT NULL AND dataform_table_type != 'VIEW' THEN\n IF dataform_table_type = 'BASE TABLE' THEN\n DROP TABLE IF EXISTS `project-111111.github_projects_dataform.flattened_items`;\n ELSEIF dataform_table_type = \"VIEW\" THEN\n DROP VIEW IF EXISTS `project-111111.github_projects_dataform.flattened_items`;\n ELSEIF dataform_table_type = 'MATERIALIZED VIEW' THEN\n DROP MATERIALIZED VIEW IF EXISTS `project-111111.github_projects_dataform.flattened_items`;\n END IF;\nEND IF;\n BEGIN\n \n CREATE OR REPLACE VIEW `project-111111.github_projects_dataform.flattened_items`\n OPTIONS(description='''flat化されたItems''')\n AS (\n \n\nWITH nodes AS (\n SELECT\n *\n FROM\n `project-111111.github_projects_dataform.latest_nodes`\n),\nexpanded AS (\n SELECT\n nodes.*,\n JSON_VALUE(field.users.nodes [0].name) AS fv_assignees,\n JSON_VALUE(field.repository.name) AS fv_repository,\n JSON_VALUE(field.labels.nodes) AS fv_labels,\n /* 使ってない */\n JSON_VALUE(field.text) AS fv_text,\n JSON_VALUE(field.name) AS fv_name,\n JSON_VALUE(field.number) AS fv_number,\n JSON_VALUE(field.date) AS fv_date,\n JSON_VALUE(field.title) AS fv_title,\n JSON_VALUE(field.startDate) AS fv_start_date,\n JSON_VALUE(field.milestone.title) AS fv_milestone,\n JSON_VALUE(field.field.name) AS column_name\n FROM\n nodes,\n UNNEST(nodes.fields) AS field\n),\nconverged AS (\n SELECT\n expanded.id,\n expanded.dt,\n MAX(expanded.updated_at) AS updated_at,\n MAX(expanded.created_at) AS created_at,\n MAX(expanded.is_archived) AS is_archived,\n MAX(expanded.content_type) AS content_type,\n MAX(expanded.c_title) AS c_title,\n MAX(expanded.c_number) AS c_number,\n MAX(expanded.c_url) AS c_url,\n MAX(expanded.c_closed) AS c_closed,\n MAX(expanded.c_closed_at) AS c_closed_at,\n MAX(expanded.c_created_at) AS c_created_at,\n MAX(expanded.c_repository) AS c_repository,\n MAX(expanded.c_milestone) AS c_milestone,\n MAX(expanded.c_assignee) AS c_assignee,\n MAX(expanded.c_merged) AS c_merged,\n MAX(expanded.c_merged_at) AS c_merged_at,\n MAX(\n IF (\n expanded.column_name = \"Title\",\n expanded.fv_text,\n NULL\n )\n ) AS f_title,\n MAX(\n IF (\n expanded.column_name = \"Milestone\",\n expanded.fv_milestone,\n NULL\n )\n ) AS f_milestone,\n MAX(\n IF (\n expanded.column_name = \"Month\",\n expanded.fv_date,\n NULL\n )\n ) AS month,\n MAX(\n IF (\n expanded.column_name = \"Status\",\n expanded.fv_name,\n NULL\n )\n ) AS f_status,\n MAX(\n IF (\n expanded.column_name = \"Iteration\",\n expanded.fv_title,\n NULL\n )\n ) AS f_iteration,\n MAX(\n IF (\n expanded.column_name = \"Point\",\n SAFE_CAST(expanded.fv_number AS NUMERIC),\n NULL\n )\n ) AS f_point\n FROM\n expanded\n GROUP BY\n expanded.id,\n expanded.dt\n)\nSELECT\n converged.id,\n converged.dt,\n COALESCE(converged.f_title, converged.c_title) AS title,\n converged.content_type,\n converged.c_number AS number,\n converged.c_url AS url,\n converged.c_closed AS closed,\n converged.c_closed_at AS closed_at,\n COALESCE(converged.c_created_at, converged.created_at) AS created_at,\n converged.updated_at,\n converged.c_repository AS repository,\n converged.c_milestone AS milestone,\n converged.c_assignee AS assignee,\n converged.c_merged AS merged,\n converged.c_merged_at AS merged_at,\n converged.month,\n converged.f_status AS status,\n converged.f_iteration AS iteration,\n converged.f_point AS point\nFROM\n converged\n\n );\n \n END;\n END;" } }, { "target": { "database": "project-111111", "schema": "github_projects_dataform", "name": "latest_nodes" }, "canonicalTarget": { "database": "project-111111", "schema": "github_projects_dataform", "name": "latest_nodes" }, "state": "SUCCEEDED", "invocationTiming": { "startTime": "2023-09-09T02:14:26.898848683Z", "endTime": "2023-09-09T02:14:30.653608214Z" }, "bigqueryAction": { "sqlScript": " BEGIN\n CREATE SCHEMA IF NOT EXISTS `project-111111.github_projects_dataform` OPTIONS(location=\"asia-northeast1\");\nEXCEPTION WHEN ERROR THEN\n IF NOT CONTAINS_SUBSTR(@@error.message, \"already exists: dataset\") AND\n NOT CONTAINS_SUBSTR(@@error.message, \"too many dataset metadata update operations\") AND\n NOT CONTAINS_SUBSTR(@@error.message, \"User does not have bigquery.datasets.create permission\")\n THEN\n RAISE USING MESSAGE = @@error.message;\n END IF;\nEND;\n BEGIN\n DECLARE dataform_table_type DEFAULT (\n SELECT ANY_VALUE(table_type)\n FROM `project-111111.github_projects_dataform.INFORMATION_SCHEMA.TABLES`\n WHERE table_name = 'latest_nodes'\n);\n IF dataform_table_type IS NOT NULL AND dataform_table_type != 'VIEW' THEN\n IF dataform_table_type = 'BASE TABLE' THEN\n DROP TABLE IF EXISTS `project-111111.github_projects_dataform.latest_nodes`;\n ELSEIF dataform_table_type = \"VIEW\" THEN\n DROP VIEW IF EXISTS `project-111111.github_projects_dataform.latest_nodes`;\n ELSEIF dataform_table_type = 'MATERIALIZED VIEW' THEN\n DROP MATERIALIZED VIEW IF EXISTS `project-111111.github_projects_dataform.latest_nodes`;\n END IF;\nEND IF;\n BEGIN\n \n CREATE OR REPLACE VIEW `project-111111.github_projects_dataform.latest_nodes`\n OPTIONS(description='''nodeのリスト最新レコード''')\n AS (\n \n\nWITH nodes AS (\n SELECT\n JSON_VALUE(node.id) AS id,\n JSON_QUERY_ARRAY(node.fieldValues.nodes) AS fields,\n JSON_VALUE(node.updatedAt) AS updated_at,\n JSON_VALUE(node.createdAt) AS created_at,\n JSON_VALUE(node.isArchived) AS is_archived,\n JSON_VALUE(node.type) AS content_type,\n JSON_VALUE(node.content.title) AS c_title,\n JSON_VALUE(node.content.number) AS c_number,\n JSON_VALUE(node.content.url) AS c_url,\n JSON_VALUE(node.content.closed) AS c_closed,\n JSON_VALUE(node.content.closedAt) AS c_closed_at,\n JSON_VALUE(node.content.createdAt) AS c_created_at,\n JSON_VALUE(node.content.repository.name) AS c_repository,\n JSON_VALUE(node.content.milestone.title) AS c_milestone,\n JSON_VALUE(node.content.assignees.nodes [0].name) AS c_assignee,\n JSON_VALUE(node.content.merged) AS c_merged,\n JSON_VALUE(node.content.mergedAt) AS c_merged_at,\n dt\n FROM\n `project-111111.github_projects_datalake.raw_items`,\n UNNEST(JSON_QUERY_ARRAY(response.data.node.items.nodes)) AS node\n)\nSELECT\n nodes.*,\n ROW_NUMBER() OVER(\n PARTITION BY id\n ORDER BY\n dt DESC\n ) AS rn\nFROM\n nodes QUALIFY rn = 1\n\n );\n \n END;\n END;" } }, { "target": { "database": "project-111111", "schema": "github_projects_datamart", "name": "dateseries" }, "canonicalTarget": { "database": "project-111111", "schema": "github_projects_datamart", "name": "dateseries" }, "state": "SUCCEEDED", "invocationTiming": { "startTime": "2023-09-09T02:14:37.174215530Z", "endTime": "2023-09-09T02:14:51.539589265Z" }, "bigqueryAction": { "sqlScript": " BEGIN\n CREATE SCHEMA IF NOT EXISTS `project-111111.github_projects_datamart` OPTIONS(location=\"asia-northeast1\");\nEXCEPTION WHEN ERROR THEN\n IF NOT CONTAINS_SUBSTR(@@error.message, \"already exists: dataset\") AND\n NOT CONTAINS_SUBSTR(@@error.message, \"too many dataset metadata update operations\") AND\n NOT CONTAINS_SUBSTR(@@error.message, \"User does not have bigquery.datasets.create permission\")\n THEN\n RAISE USING MESSAGE = @@error.message;\n END IF;\nEND;\n BEGIN\n DECLARE dataform_table_type DEFAULT (\n SELECT ANY_VALUE(table_type)\n FROM `project-111111.github_projects_datamart.INFORMATION_SCHEMA.TABLES`\n WHERE table_name = 'dateseries'\n);\n IF dataform_table_type IS NOT NULL AND dataform_table_type != 'BASE TABLE' THEN\n IF dataform_table_type = 'BASE TABLE' THEN\n DROP TABLE IF EXISTS `project-111111.github_projects_datamart.dateseries`;\n ELSEIF dataform_table_type = \"VIEW\" THEN\n DROP VIEW IF EXISTS `project-111111.github_projects_datamart.dateseries`;\n ELSEIF dataform_table_type = 'MATERIALIZED VIEW' THEN\n DROP MATERIALIZED VIEW IF EXISTS `project-111111.github_projects_datamart.dateseries`;\n END IF;\nEND IF;\n BEGIN\n \n CREATE OR REPLACE TABLE `project-111111.github_projects_datamart.dateseries`\n \n \n OPTIONS(description='''月ごとのバーンアップ/ダウン用のStats''')\n AS (\n \n\nWITH date_list AS (\n SELECT\n d\n FROM\n UNNEST(\n GENERATE_DATE_ARRAY(\n '2017-02-01',\n CURRENT_DATE(\"Asia/Tokyo\"),\n INTERVAL 1 DAY\n )\n ) AS d\n),\npoints AS (\n SELECT\n dl.d,\n SUM(\n IF(\n DATE(dl.d) \u003c PARSE_DATE(\"%Y-%m-%d\", items.month)\n AND PARSE_DATE(\"%Y-%m-%d\", items.month) = DATE_TRUNC(dl.d, MONTH),\n items.point,\n 0\n )\n ) AS before_sprint_point,\n SUM(\n IF(\n DATE(dl.d) \u003e= DATE(\n PARSE_TIMESTAMP(\"%Y-%m-%dT%H:%M:%SZ\", items.created_at)\n )\n AND PARSE_DATE(\"%Y-%m-%d\", items.month) = DATE_TRUNC(dl.d, MONTH),\n items.point,\n 0\n )\n ) AS in_sprint_point,\n SUM(\n IF(\n DATE(dl.d) \u003e= DATE(\n PARSE_TIMESTAMP(\"%Y-%m-%dT%H:%M:%SZ\", items.closed_at)\n )\n AND PARSE_DATE(\"%Y-%m-%d\", items.month) = DATE_TRUNC(dl.d, MONTH),\n items.point,\n 0\n )\n ) AS actual_point\n FROM\n date_list dl,\n `project-111111.github_projects_dataform.flattened_items` items\n GROUP BY\n dl.d\n)\nSELECT\n dl.d,\n DATE_TRUNC(dl.d, MONTH) AS month,\n p.in_sprint_point,\n p.before_sprint_point,\n p.before_sprint_point + p.in_sprint_point AS plan_point,\n p.actual_point\nFROM\n date_list dl\n LEFT JOIN points p ON(p.d = dl.d)\n\n );\n \n END;\n END;" } }, { "target": { "database": "project-111111", "schema": "github_projects_datamart", "name": "items" }, "canonicalTarget": { "database": "project-111111", "schema": "github_projects_datamart", "name": "items" }, "state": "SUCCEEDED", "invocationTiming": { "startTime": "2023-09-09T02:14:37.287655251Z", "endTime": "2023-09-09T02:14:50.427789907Z" }, "bigqueryAction": { "sqlScript": " BEGIN\n CREATE SCHEMA IF NOT EXISTS `project-111111.github_projects_datamart` OPTIONS(location=\"asia-northeast1\");\nEXCEPTION WHEN ERROR THEN\n IF NOT CONTAINS_SUBSTR(@@error.message, \"already exists: dataset\") AND\n NOT CONTAINS_SUBSTR(@@error.message, \"too many dataset metadata update operations\") AND\n NOT CONTAINS_SUBSTR(@@error.message, \"User does not have bigquery.datasets.create permission\")\n THEN\n RAISE USING MESSAGE = @@error.message;\n END IF;\nEND;\n BEGIN\n DECLARE dataform_table_type DEFAULT (\n SELECT ANY_VALUE(table_type)\n FROM `project-111111.github_projects_datamart.INFORMATION_SCHEMA.TABLES`\n WHERE table_name = 'items'\n);\n IF dataform_table_type IS NOT NULL AND dataform_table_type != 'BASE TABLE' THEN\n IF dataform_table_type = 'BASE TABLE' THEN\n DROP TABLE IF EXISTS `project-111111.github_projects_datamart.items`;\n ELSEIF dataform_table_type = \"VIEW\" THEN\n DROP VIEW IF EXISTS `project-111111.github_projects_datamart.items`;\n ELSEIF dataform_table_type = 'MATERIALIZED VIEW' THEN\n DROP MATERIALIZED VIEW IF EXISTS `project-111111.github_projects_datamart.items`;\n END IF;\nEND IF;\n BEGIN\n \n CREATE OR REPLACE TABLE `project-111111.github_projects_datamart.items`\n \n \n OPTIONS(description='''issue, pull request, cardのリスト''')\n AS (\n \n\nSELECT\n items.id,\n items.dt,\n items.title,\n items.content_type,\n items.url,\n items.created_at,\n items.updated_at,\n items.closed_at,\n items.merged_at,\n items.repository,\n items.month,\n items.iteration,\n items.point,\n items.status\nFROM\n `project-111111.github_projects_dataform.flattened_items` items\n\n );\n \n END;\n END;" } } ] }
こんな感じ
このレベルの情報であれば管理画面見に行ったほうが良さそうとは思った
コマンドのまとめ
ここまでの流れをまとめると
- コンパイル
- 実行
- 実行結果の確認
で最低限の実行は可能
# コンパイル curl -X POST -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \ "https://dataform.googleapis.com/v1beta1/projects/${PROJECT_ID}/locations/${LOCATION}/repositories/${REPOSITORY_ID}/compilationResults" \ -d @- <<EOS { "gitCommitish": "${GIT_COMMITISH}" } EOS # Invocation(実行) curl -X POST -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \ "https://dataform.googleapis.com/v1beta1/projects/${PROJECT_ID}/locations/${LOCATION}/repositories/${REPOSITORY_ID}/workflowInvocations" \ -d @- <<EOS { "compilationResult": "projects/project-111111/locations/asia-northeast1/repositories/github-projects/compilationResults/8b7ea6a7-1934-4d15-9e6c-bb49c6a86dc1" } EOS # 実行結果の取得 curl -X GET -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \ "https://dataform.googleapis.com/v1beta1/projects/${PROJECT_ID}/locations/${LOCATION}/repositories/${REPOSITORY_ID}/workflowInvocations/1694225665-b6a53ba5-b625-40e4-bb10-5295e9bc6f1f"
実行時パラメータを渡す
Dataform API でコンパイルのオーバーライドを構成する | Google Cloud
たとえば「普段は前日起算の今月」のレポート、手動で動作せるとき(再入れ込みなど)は期間指定したいといった場合
独自変数を扱いたい場合は上記ドキュメントのようにコンパイル時に変数を渡し、コンパイル結果を変えて実行するという手段が必要
これ見る感じ一連の処理を実行しないといけないので面倒といえば面倒、2コマンドcurlで実行すれば良いだけって話でもあるが…
ドキュメントではWorkflowsにまとめて実行するようにしていて、1つWorkflow作ったら使いまわせそうではある
ただ、この辺を使いこなせれば、テーブルやデータセットも分離しつつコンパイル変数も渡すことで環境もしくは変数によって挙動を変えられさまざまなパターンでの実行が可能になる
Dataform の設定を構成する | Google Cloud
ここで実際のSQLクエリでどのように変数をもとに制御するかなどが書いてある
まぁ三項演算子と同様ととらえて良さそう
例としてデフォルトの場合は今月、そうじゃない場合は指定月を検索するような処理を書いてみた
- sqlxファイル
config { type: "table" } WITH data AS( SELECT * FROM UNNEST(ARRAY<STRUCT<m DATE, item STRING, store STRING, sales INT64>> [ ("2023-09-01", "hoge", "A", 100), ("2023-08-01", "hoge", "A", 200), ("2023-08-01", "fuga", "A", 100), ("2023-08-01", "hoge", "B", 100), ("2023-09-01", "fuga", "B", 100), ("2023-09-01", "hoge", "B", 200) ]) ) SELECT * FROM data WHERE m = DATE(${when(dataform.projectConfig.vars.targetMonth === "CURRENT", "FORMAT_DATE('%Y-%m-01', CURRENT_DATE)", `"${dataform.projectConfig.vars.targetMonth}"`)})
ドキュメントの通り
最後の行が変数によって挙動が変わるようになっている
- dataform.json
"vars": { "targetMonth": "CURRENT" }
デフォルトはCURRENT
DataformのSQLXは条件などの指定をJavaScriptで書いているのでコンパイル結果のSQLにクオートを出力させるようにしてあげないと実行時に怒られるので注意が必要
まぁ当然ではあるがこの辺はちょっとだるい
- API実行時パラメータ(compile時)
"gitCommitish": "${GIT_COMMITISH}", "codeCompilationConfig": { "vars": { "targetMonth": "2023-08-01" } }
この手順でAPIから実行したら指定したパラメータでSQLを実行できた
これは前々からやりたかったやつ!!!
内部の仕組みを想像してみると面白い
以前は旧版でこのようなことをやりたくてIssue起票したが「やりません」とバッサリ切られてしまっていた
たしかに実行だけのエンドポイントでは仕組み的に変数を渡すことは厳しそう
しかし、compilationResultとInvocationと2つに分けて実行すれば可能
コンパイル時に変数渡せれば生成するSQLも可変にできるよねという話と解釈した
2回APIたたかないといけないのが手間ではあるが納得行く感じ
変数の他にもオーバーライドの必要性があるデータセットやテーブルなども含めコントロール可能(コンパイルのオーバーライドのドキュメントに詳しく載っている)
まとめ
- GoogleCloudのDataformでREST APIを使ってSQLのコンパイルと実行をしてみた
- コンパイル~実行完了までの流れをポーリングして確認できるようにした
- SQLクエリ中に動的にパラメータを与えられるようにした
- よくあるやりたいパターンだと思うのでできるようになってよかった
まだGoogleCloud版の方は触り始めたばかりなので他にも色々ためしてみたい