notebook

都内でWEB系エンジニアやってます。

GoogleCloudのDataformでAPI経由の手動実行をやってみる

Google CloudのDataformがGAになり旧版のDataformも使えなくなるとのアナウンスがあったので、いくつか個人で作っていたプロジェクトを移行し始めた

DataformのパイプラインをAPI経由で実行している部分も移行したためそのときに調べたことなどのメモ

パイプラインの実行までの流れ

前提の説明

自分が作っているDataformプロジェクトは小さいプロジェクトが多く、処理の起点がWorkflowsなどGoogleCloudのサービスを使わずGitHub Actionsで済ませてしまう場合が多い

具体的にはActionsで次のようなフローでデータマートまでの生成を行っていた

  • データ取得
  • GCSへ配置
  • bq load
  • dataformの実行

これ以降はdataformの実行をAPI経由でするための色々を残していく

スケジュール実行

まずスケジュール実行のドキュメントを読んだ

Workflows と Cloud Scheduler を使用して実行をスケジュールする  |  Dataform  |  Google Cloud

cloud.google.com

コンパイル結果をカスタマイズして実行する場合は2フェーズ必要

コンパイルとコンパイル結果を用いた実行の2つ

  • compilationResults(コンパイル)

    • ここで、オーバーライド設定で設定できる項目をオーバーライドできるっぽい
    • なのでスキーマ、データベース、テーブルのオーバーライドができる
    • ということは環境分離ができる
    • 確認用にプロジェクト分けて、とかデータセット分けて、テーブル分けてとかそういう使い方ができる
  • workflowInvocations(コンパイル結果を用いた実行)

    • 1フェーズ目でコンパイルした結果の成果物を用いてパイプラインを実行する

サンプルはWorkflows + Schedulerでのワークフロー構成の実行だがたたいているのはREST APIなのでたたく側は何でも良い

REST APIをたたいてみる

ドキュメントと、REST APIのドキュメントもみながら

以下$TOKEN, $PROJECT_ID, $LOCATION, $REPOSITORY_IDは事前に環境変数をセットしている前提

TOKENは次のように生成した

TOKEN=$(gcloud auth print-access-token)

compilationResults

GIT_COMMITISH="main"
curl -X POST -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \
  "https://dataform.googleapis.com/v1beta1/projects/${PROJECT_ID}/locations/${LOCATION}/repositories/${REPOSITORY_ID}/compilationResults" \
  -d @- <<EOS
{
  "gitCommitish": "${GIT_COMMITISH}"
}
EOS
  • 結果
{
  "name": "projects/project-111111/locations/asia-northeast1/repositories/github-projects/compilationResults/8b7ea6a7-1934-4d15-9e6c-bb49c6a86dc1",
  "gitCommitish": "master",
  "codeCompilationConfig": {
    "defaultDatabase": "project-111111",
    "defaultSchema": "github_projects_dataform",
    "assertionSchema": "github_projects_dataform_assertions",
    "defaultLocation": "asia-northeast1"
  },
  "dataformCoreVersion": "2.4.2",
  "resolvedGitCommitSha": "8588f2b317541a9a3a971909fe2436f833825d6c"
}

ただこのエンドポイント、それなりの確率でエラーが返ってくる…

  • エラー例
{
  "error": {
    "code": 400,
    "message": "Package installation timed out.",
    "status": "FAILED_PRECONDITION"
  }
}

インストール時にタイムアウト…

workflowInvocation

コンパイルしたSQLクエリを実行する

こちらは非同期らしい…

なので実行の発行、と実行が終わったか確認するためポーリングする

コンパイル時の実行結果nameの値をcompilationResultパラメータに指定する

curl -X POST -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \
  "https://dataform.googleapis.com/v1beta1/projects/${PROJECT_ID}/locations/${LOCATION}/repositories/${REPOSITORY_ID}/workflowInvocations" \
  -d @- <<EOS
{
  "compilationResult": "projects/project-111111/locations/asia-northeast1/repositories/github-projects/compilationResults/8b7ea6a7-1934-4d15-9e6c-bb49c6a86dc1"
}
EOS
  • 結果
{
  "name": "projects/project-111111/locations/asia-northeast1/repositories/github-projects/workflowInvocations/1694225665-b6a53ba5-b625-40e4-bb10-5295e9bc6f1f",
  "compilationResult": "projects/111111111111/locations/asia-northeast1/repositories/github-projects/compilationResults/8b7ea6a7-1934-4d15-9e6c-bb49c6a86dc1",
  "state": "RUNNING",
  "invocationTiming": {
    "startTime": "2023-09-09T02:14:25.398717Z"
  }
}

RUNNINGで返ってきたので実行は開始したよう

実行結果の取得

curl -X GET -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \
  "https://dataform.googleapis.com/v1beta1/projects/${PROJECT_ID}/locations/${LOCATION}/repositories/${REPOSITORY_ID}/workflowInvocations/1694225665-b6a53ba5-b625-40e4-bb10-5295e9bc6f1f"

実行のIDと思われる値をつけて実行結果を確認する

基本的にURLのprojects以下の値が前段で実行するAPIの結果に載っているのでそれを使ってあげればよい

  • 成功時のレスポンス
{
  "name": "projects/project-111111/locations/asia-northeast1/repositories/github-projects/workflowInvocations/1694225665-b6a53ba5-b625-40e4-bb10-5295e9bc6f1f",
  "compilationResult": "projects/111111111111/locations/asia-northeast1/repositories/github-projects/compilationResults/8b7ea6a7-1934-4d15-9e6c-bb49c6a86dc1",
  "state": "SUCCEEDED",
  "invocationTiming": {
    "startTime": "2023-09-09T02:14:25.398717Z",
    "endTime": "2023-09-09T02:14:51.545026865Z"
  }
}
  • ステータスのリスト

https://cloud.google.com/dataform/reference/rest/v1beta1/projects.locations.repositories.workflowInvocations#state

これ見てステータスによって判断すればポーリングは書ける

  • 雑なポーリングの例
TOKEN=$(gcloud auth print-access-token)
LOCATION="asia-northeast1"
REPOSITORY_ID="sample"
GIT_COMMITISH="main"

compile() {
  compilation=$(curl -X POST -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \
    "https://dataform.googleapis.com/v1beta1/projects/${GOOGLE_PROJECT}/locations/${LOCATION}/repositories/${REPOSITORY_ID}/compilationResults" \
    -d @- <<EOS
{
  "gitCommitish": "${GIT_COMMITISH}"
}
EOS
)

  echo ${compilation} | jq -r '.name'
}

invocation() {
  compilation_result=$1

  invocation=$(
    curl -X POST -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \
      "https://dataform.googleapis.com/v1beta1/projects/${GOOGLE_PROJECT}/locations/${LOCATION}/repositories/${REPOSITORY_ID}/workflowInvocations" \
      -d @- <<EOS
{
  "compilationResult": "${compilation_result}"
}
EOS
)

  echo ${invocation} | jq -r '.name'
}

polling() {
  invocation_name=$1

  invocation=$(curl -X GET -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \
    "https://dataform.googleapis.com/v1beta1/${invocation_name}")


  status=$(echo ${invocation} | jq -r '.state')

  case "$status" in
    "FAILED" )
      echo "FAILED Invocation."
      exit 1
      ;;
    "SUCCEEDED" )
      echo "SUCCESS Invocation."
      ;;
    "RUNNING" )
      echo "RUNNING. retry wait 10 seconds..."
      sleep 10
      polling ${invocation_name}
  esac
}

compiled_name=$(compile)
invocation=$(invocation ${compiled_name})
polling ${invocation}

最初pollingに渡す引数invocationは実行リクエスト発行時のレスポンス(結果JSON)のnameの値を入れる

自分の場合だいたい2回~3回目くらいで成功レスポンスが返ってくるくらいの感じだった

query

実行内容の詳細を表示したい場合は

実行結果の取得APIの末尾に

/1694225665-b6a53ba5-b625-40e4-bb10-5295e9bc6f1f:query

というように:queryをつけてあげると詳細情報を閲覧できる

curl -X GET -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \
   "https://dataform.googleapis.com/v1beta1/projects/${PROJECT_ID}/locations/${LOCATION}/repositories/${REPOSITORY_ID}/workflowInvocations/1694225665-b6a53ba5-b625-40e4-bb10-5295e9bc6f1f:query"
  • 結果
{
  "workflowInvocationActions": [
    {
      "target": {
        "database": "project-111111",
        "schema": "github_projects_dataform",
        "name": "flattened_items"
      },
      "canonicalTarget": {
        "database": "project-111111",
        "schema": "github_projects_dataform",
        "name": "flattened_items"
      },
      "state": "SUCCEEDED",
      "invocationTiming": {
        "startTime": "2023-09-09T02:14:32.150109726Z",
        "endTime": "2023-09-09T02:14:35.801821363Z"
      },
      "bigqueryAction": {
        "sqlScript": "    BEGIN\n  CREATE SCHEMA IF NOT EXISTS `project-111111.github_projects_dataform` OPTIONS(location=\"asia-northeast1\");\nEXCEPTION WHEN ERROR THEN\n  IF NOT CONTAINS_SUBSTR(@@error.message, \"already exists: dataset\") AND\n    NOT CONTAINS_SUBSTR(@@error.message, \"too many dataset metadata update operations\") AND\n    NOT CONTAINS_SUBSTR(@@error.message, \"User does not have bigquery.datasets.create permission\")\n  THEN\n    RAISE USING MESSAGE = @@error.message;\n  END IF;\nEND;\n    BEGIN\n      DECLARE dataform_table_type DEFAULT (\n  SELECT ANY_VALUE(table_type)\n  FROM `project-111111.github_projects_dataform.INFORMATION_SCHEMA.TABLES`\n  WHERE table_name = 'flattened_items'\n);\n      IF dataform_table_type IS NOT NULL AND dataform_table_type != 'VIEW' THEN\n  IF dataform_table_type = 'BASE TABLE' THEN\n    DROP TABLE IF EXISTS `project-111111.github_projects_dataform.flattened_items`;\n  ELSEIF dataform_table_type = \"VIEW\" THEN\n    DROP VIEW IF EXISTS `project-111111.github_projects_dataform.flattened_items`;\n  ELSEIF dataform_table_type = 'MATERIALIZED VIEW' THEN\n    DROP MATERIALIZED VIEW IF EXISTS `project-111111.github_projects_dataform.flattened_items`;\n  END IF;\nEND IF;\n      BEGIN\n        \n            CREATE OR REPLACE VIEW `project-111111.github_projects_dataform.flattened_items`\n    OPTIONS(description='''flat化されたItems''')\n    AS (\n      \n\nWITH nodes AS (\n  SELECT\n    *\n  FROM\n    `project-111111.github_projects_dataform.latest_nodes`\n),\nexpanded AS (\n  SELECT\n    nodes.*,\n    JSON_VALUE(field.users.nodes [0].name) AS fv_assignees,\n    JSON_VALUE(field.repository.name) AS fv_repository,\n    JSON_VALUE(field.labels.nodes) AS fv_labels,\n    /* 使ってない */\n    JSON_VALUE(field.text) AS fv_text,\n    JSON_VALUE(field.name) AS fv_name,\n    JSON_VALUE(field.number) AS fv_number,\n    JSON_VALUE(field.date) AS fv_date,\n    JSON_VALUE(field.title) AS fv_title,\n    JSON_VALUE(field.startDate) AS fv_start_date,\n    JSON_VALUE(field.milestone.title) AS fv_milestone,\n    JSON_VALUE(field.field.name) AS column_name\n  FROM\n    nodes,\n    UNNEST(nodes.fields) AS field\n),\nconverged AS (\n  SELECT\n    expanded.id,\n    expanded.dt,\n    MAX(expanded.updated_at) AS updated_at,\n    MAX(expanded.created_at) AS created_at,\n    MAX(expanded.is_archived) AS is_archived,\n    MAX(expanded.content_type) AS content_type,\n    MAX(expanded.c_title) AS c_title,\n    MAX(expanded.c_number) AS c_number,\n    MAX(expanded.c_url) AS c_url,\n    MAX(expanded.c_closed) AS c_closed,\n    MAX(expanded.c_closed_at) AS c_closed_at,\n    MAX(expanded.c_created_at) AS c_created_at,\n    MAX(expanded.c_repository) AS c_repository,\n    MAX(expanded.c_milestone) AS c_milestone,\n    MAX(expanded.c_assignee) AS c_assignee,\n    MAX(expanded.c_merged) AS c_merged,\n    MAX(expanded.c_merged_at) AS c_merged_at,\n    MAX(\n      IF (\n        expanded.column_name = \"Title\",\n        expanded.fv_text,\n        NULL\n      )\n    ) AS f_title,\n    MAX(\n      IF (\n        expanded.column_name = \"Milestone\",\n        expanded.fv_milestone,\n        NULL\n      )\n    ) AS f_milestone,\n    MAX(\n      IF (\n        expanded.column_name = \"Month\",\n        expanded.fv_date,\n        NULL\n      )\n    ) AS month,\n    MAX(\n      IF (\n        expanded.column_name = \"Status\",\n        expanded.fv_name,\n        NULL\n      )\n    ) AS f_status,\n    MAX(\n      IF (\n        expanded.column_name = \"Iteration\",\n        expanded.fv_title,\n        NULL\n      )\n    ) AS f_iteration,\n    MAX(\n      IF (\n        expanded.column_name = \"Point\",\n        SAFE_CAST(expanded.fv_number AS NUMERIC),\n        NULL\n      )\n    ) AS f_point\n  FROM\n    expanded\n  GROUP BY\n    expanded.id,\n    expanded.dt\n)\nSELECT\n  converged.id,\n  converged.dt,\n  COALESCE(converged.f_title, converged.c_title) AS title,\n  converged.content_type,\n  converged.c_number AS number,\n  converged.c_url AS url,\n  converged.c_closed AS closed,\n  converged.c_closed_at AS closed_at,\n  COALESCE(converged.c_created_at, converged.created_at) AS created_at,\n  converged.updated_at,\n  converged.c_repository AS repository,\n  converged.c_milestone AS milestone,\n  converged.c_assignee AS assignee,\n  converged.c_merged AS merged,\n  converged.c_merged_at AS merged_at,\n  converged.month,\n  converged.f_status AS status,\n  converged.f_iteration AS iteration,\n  converged.f_point AS point\nFROM\n  converged\n\n    );\n        \n      END;\n    END;"
      }
    },
    {
      "target": {
        "database": "project-111111",
        "schema": "github_projects_dataform",
        "name": "latest_nodes"
      },
      "canonicalTarget": {
        "database": "project-111111",
        "schema": "github_projects_dataform",
        "name": "latest_nodes"
      },
      "state": "SUCCEEDED",
      "invocationTiming": {
        "startTime": "2023-09-09T02:14:26.898848683Z",
        "endTime": "2023-09-09T02:14:30.653608214Z"
      },
      "bigqueryAction": {
        "sqlScript": "    BEGIN\n  CREATE SCHEMA IF NOT EXISTS `project-111111.github_projects_dataform` OPTIONS(location=\"asia-northeast1\");\nEXCEPTION WHEN ERROR THEN\n  IF NOT CONTAINS_SUBSTR(@@error.message, \"already exists: dataset\") AND\n    NOT CONTAINS_SUBSTR(@@error.message, \"too many dataset metadata update operations\") AND\n    NOT CONTAINS_SUBSTR(@@error.message, \"User does not have bigquery.datasets.create permission\")\n  THEN\n    RAISE USING MESSAGE = @@error.message;\n  END IF;\nEND;\n    BEGIN\n      DECLARE dataform_table_type DEFAULT (\n  SELECT ANY_VALUE(table_type)\n  FROM `project-111111.github_projects_dataform.INFORMATION_SCHEMA.TABLES`\n  WHERE table_name = 'latest_nodes'\n);\n      IF dataform_table_type IS NOT NULL AND dataform_table_type != 'VIEW' THEN\n  IF dataform_table_type = 'BASE TABLE' THEN\n    DROP TABLE IF EXISTS `project-111111.github_projects_dataform.latest_nodes`;\n  ELSEIF dataform_table_type = \"VIEW\" THEN\n    DROP VIEW IF EXISTS `project-111111.github_projects_dataform.latest_nodes`;\n  ELSEIF dataform_table_type = 'MATERIALIZED VIEW' THEN\n    DROP MATERIALIZED VIEW IF EXISTS `project-111111.github_projects_dataform.latest_nodes`;\n  END IF;\nEND IF;\n      BEGIN\n        \n            CREATE OR REPLACE VIEW `project-111111.github_projects_dataform.latest_nodes`\n    OPTIONS(description='''nodeのリスト最新レコード''')\n    AS (\n      \n\nWITH nodes AS (\n  SELECT\n    JSON_VALUE(node.id) AS id,\n    JSON_QUERY_ARRAY(node.fieldValues.nodes) AS fields,\n    JSON_VALUE(node.updatedAt) AS updated_at,\n    JSON_VALUE(node.createdAt) AS created_at,\n    JSON_VALUE(node.isArchived) AS is_archived,\n    JSON_VALUE(node.type) AS content_type,\n    JSON_VALUE(node.content.title) AS c_title,\n    JSON_VALUE(node.content.number) AS c_number,\n    JSON_VALUE(node.content.url) AS c_url,\n    JSON_VALUE(node.content.closed) AS c_closed,\n    JSON_VALUE(node.content.closedAt) AS c_closed_at,\n    JSON_VALUE(node.content.createdAt) AS c_created_at,\n    JSON_VALUE(node.content.repository.name) AS c_repository,\n    JSON_VALUE(node.content.milestone.title) AS c_milestone,\n    JSON_VALUE(node.content.assignees.nodes [0].name) AS c_assignee,\n    JSON_VALUE(node.content.merged) AS c_merged,\n    JSON_VALUE(node.content.mergedAt) AS c_merged_at,\n    dt\n  FROM\n    `project-111111.github_projects_datalake.raw_items`,\n    UNNEST(JSON_QUERY_ARRAY(response.data.node.items.nodes)) AS node\n)\nSELECT\n  nodes.*,\n  ROW_NUMBER() OVER(\n    PARTITION BY id\n    ORDER BY\n      dt DESC\n  ) AS rn\nFROM\n  nodes QUALIFY rn = 1\n\n    );\n        \n      END;\n    END;"
      }
    },
    {
      "target": {
        "database": "project-111111",
        "schema": "github_projects_datamart",
        "name": "dateseries"
      },
      "canonicalTarget": {
        "database": "project-111111",
        "schema": "github_projects_datamart",
        "name": "dateseries"
      },
      "state": "SUCCEEDED",
      "invocationTiming": {
        "startTime": "2023-09-09T02:14:37.174215530Z",
        "endTime": "2023-09-09T02:14:51.539589265Z"
      },
      "bigqueryAction": {
        "sqlScript": "    BEGIN\n  CREATE SCHEMA IF NOT EXISTS `project-111111.github_projects_datamart` OPTIONS(location=\"asia-northeast1\");\nEXCEPTION WHEN ERROR THEN\n  IF NOT CONTAINS_SUBSTR(@@error.message, \"already exists: dataset\") AND\n    NOT CONTAINS_SUBSTR(@@error.message, \"too many dataset metadata update operations\") AND\n    NOT CONTAINS_SUBSTR(@@error.message, \"User does not have bigquery.datasets.create permission\")\n  THEN\n    RAISE USING MESSAGE = @@error.message;\n  END IF;\nEND;\n    BEGIN\n      DECLARE dataform_table_type DEFAULT (\n  SELECT ANY_VALUE(table_type)\n  FROM `project-111111.github_projects_datamart.INFORMATION_SCHEMA.TABLES`\n  WHERE table_name = 'dateseries'\n);\n      IF dataform_table_type IS NOT NULL AND dataform_table_type != 'BASE TABLE' THEN\n  IF dataform_table_type = 'BASE TABLE' THEN\n    DROP TABLE IF EXISTS `project-111111.github_projects_datamart.dateseries`;\n  ELSEIF dataform_table_type = \"VIEW\" THEN\n    DROP VIEW IF EXISTS `project-111111.github_projects_datamart.dateseries`;\n  ELSEIF dataform_table_type = 'MATERIALIZED VIEW' THEN\n    DROP MATERIALIZED VIEW IF EXISTS `project-111111.github_projects_datamart.dateseries`;\n  END IF;\nEND IF;\n      BEGIN\n        \n            CREATE OR REPLACE TABLE `project-111111.github_projects_datamart.dateseries`\n    \n    \n    OPTIONS(description='''月ごとのバーンアップ/ダウン用のStats''')\n    AS (\n      \n\nWITH date_list AS (\n  SELECT\n    d\n  FROM\n    UNNEST(\n      GENERATE_DATE_ARRAY(\n        '2017-02-01',\n        CURRENT_DATE(\"Asia/Tokyo\"),\n        INTERVAL 1 DAY\n      )\n    ) AS d\n),\npoints AS (\n  SELECT\n    dl.d,\n    SUM(\n      IF(\n        DATE(dl.d) \u003c PARSE_DATE(\"%Y-%m-%d\", items.month)\n        AND PARSE_DATE(\"%Y-%m-%d\", items.month) = DATE_TRUNC(dl.d, MONTH),\n        items.point,\n        0\n      )\n    ) AS before_sprint_point,\n    SUM(\n      IF(\n        DATE(dl.d) \u003e= DATE(\n          PARSE_TIMESTAMP(\"%Y-%m-%dT%H:%M:%SZ\", items.created_at)\n        )\n        AND PARSE_DATE(\"%Y-%m-%d\", items.month) = DATE_TRUNC(dl.d, MONTH),\n        items.point,\n        0\n      )\n    ) AS in_sprint_point,\n    SUM(\n      IF(\n        DATE(dl.d) \u003e= DATE(\n          PARSE_TIMESTAMP(\"%Y-%m-%dT%H:%M:%SZ\", items.closed_at)\n        )\n        AND PARSE_DATE(\"%Y-%m-%d\", items.month) = DATE_TRUNC(dl.d, MONTH),\n        items.point,\n        0\n      )\n    ) AS actual_point\n  FROM\n    date_list dl,\n    `project-111111.github_projects_dataform.flattened_items` items\n  GROUP BY\n    dl.d\n)\nSELECT\n  dl.d,\n  DATE_TRUNC(dl.d, MONTH) AS month,\n  p.in_sprint_point,\n  p.before_sprint_point,\n  p.before_sprint_point + p.in_sprint_point AS plan_point,\n  p.actual_point\nFROM\n  date_list dl\n  LEFT JOIN points p ON(p.d = dl.d)\n\n    );\n        \n      END;\n    END;"
      }
    },
    {
      "target": {
        "database": "project-111111",
        "schema": "github_projects_datamart",
        "name": "items"
      },
      "canonicalTarget": {
        "database": "project-111111",
        "schema": "github_projects_datamart",
        "name": "items"
      },
      "state": "SUCCEEDED",
      "invocationTiming": {
        "startTime": "2023-09-09T02:14:37.287655251Z",
        "endTime": "2023-09-09T02:14:50.427789907Z"
      },
      "bigqueryAction": {
        "sqlScript": "    BEGIN\n  CREATE SCHEMA IF NOT EXISTS `project-111111.github_projects_datamart` OPTIONS(location=\"asia-northeast1\");\nEXCEPTION WHEN ERROR THEN\n  IF NOT CONTAINS_SUBSTR(@@error.message, \"already exists: dataset\") AND\n    NOT CONTAINS_SUBSTR(@@error.message, \"too many dataset metadata update operations\") AND\n    NOT CONTAINS_SUBSTR(@@error.message, \"User does not have bigquery.datasets.create permission\")\n  THEN\n    RAISE USING MESSAGE = @@error.message;\n  END IF;\nEND;\n    BEGIN\n      DECLARE dataform_table_type DEFAULT (\n  SELECT ANY_VALUE(table_type)\n  FROM `project-111111.github_projects_datamart.INFORMATION_SCHEMA.TABLES`\n  WHERE table_name = 'items'\n);\n      IF dataform_table_type IS NOT NULL AND dataform_table_type != 'BASE TABLE' THEN\n  IF dataform_table_type = 'BASE TABLE' THEN\n    DROP TABLE IF EXISTS `project-111111.github_projects_datamart.items`;\n  ELSEIF dataform_table_type = \"VIEW\" THEN\n    DROP VIEW IF EXISTS `project-111111.github_projects_datamart.items`;\n  ELSEIF dataform_table_type = 'MATERIALIZED VIEW' THEN\n    DROP MATERIALIZED VIEW IF EXISTS `project-111111.github_projects_datamart.items`;\n  END IF;\nEND IF;\n      BEGIN\n        \n            CREATE OR REPLACE TABLE `project-111111.github_projects_datamart.items`\n    \n    \n    OPTIONS(description='''issue, pull request, cardのリスト''')\n    AS (\n      \n\nSELECT\n  items.id,\n  items.dt,\n  items.title,\n  items.content_type,\n  items.url,\n  items.created_at,\n  items.updated_at,\n  items.closed_at,\n  items.merged_at,\n  items.repository,\n  items.month,\n  items.iteration,\n  items.point,\n  items.status\nFROM\n  `project-111111.github_projects_dataform.flattened_items` items\n\n    );\n        \n      END;\n    END;"
      }
    }
  ]
}

こんな感じ

このレベルの情報であれば管理画面見に行ったほうが良さそうとは思った

コマンドのまとめ

ここまでの流れをまとめると

  • コンパイル
  • 実行
  • 実行結果の確認

で最低限の実行は可能

# コンパイル
curl -X POST -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \
  "https://dataform.googleapis.com/v1beta1/projects/${PROJECT_ID}/locations/${LOCATION}/repositories/${REPOSITORY_ID}/compilationResults" \
  -d @- <<EOS
{
  "gitCommitish": "${GIT_COMMITISH}"
}
EOS

# Invocation(実行)
curl -X POST -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \
  "https://dataform.googleapis.com/v1beta1/projects/${PROJECT_ID}/locations/${LOCATION}/repositories/${REPOSITORY_ID}/workflowInvocations" \
  -d @- <<EOS
{
  "compilationResult": "projects/project-111111/locations/asia-northeast1/repositories/github-projects/compilationResults/8b7ea6a7-1934-4d15-9e6c-bb49c6a86dc1"
}
EOS

# 実行結果の取得
curl -X GET -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \
  "https://dataform.googleapis.com/v1beta1/projects/${PROJECT_ID}/locations/${LOCATION}/repositories/${REPOSITORY_ID}/workflowInvocations/1694225665-b6a53ba5-b625-40e4-bb10-5295e9bc6f1f"

実行時パラメータを渡す

Dataform API でコンパイルのオーバーライドを構成する  |  Google Cloud

cloud.google.com

たとえば「普段は前日起算の今月」のレポート、手動で動作せるとき(再入れ込みなど)は期間指定したいといった場合

独自変数を扱いたい場合は上記ドキュメントのようにコンパイル時に変数を渡し、コンパイル結果を変えて実行するという手段が必要

これ見る感じ一連の処理を実行しないといけないので面倒といえば面倒、2コマンドcurlで実行すれば良いだけって話でもあるが…

ドキュメントではWorkflowsにまとめて実行するようにしていて、1つWorkflow作ったら使いまわせそうではある

ただ、この辺を使いこなせれば、テーブルやデータセットも分離しつつコンパイル変数も渡すことで環境もしくは変数によって挙動を変えられさまざまなパターンでの実行が可能になる

Dataform の設定を構成する  |  Google Cloud

cloud.google.com

ここで実際のSQLクエリでどのように変数をもとに制御するかなどが書いてある

まぁ三項演算子と同様ととらえて良さそう

例としてデフォルトの場合は今月、そうじゃない場合は指定月を検索するような処理を書いてみた

  • sqlxファイル
config {
  type: "table"
}

WITH
  data AS(
  SELECT
    *
  FROM
    UNNEST(ARRAY<STRUCT<m DATE, item STRING, store STRING, sales INT64>> [
       ("2023-09-01", "hoge", "A", 100),
       ("2023-08-01", "hoge", "A", 200),
       ("2023-08-01", "fuga", "A", 100),
       ("2023-08-01", "hoge", "B", 100),
       ("2023-09-01", "fuga", "B", 100),
       ("2023-09-01", "hoge", "B", 200)
    ])
  )
SELECT * FROM data
WHERE m = DATE(${when(dataform.projectConfig.vars.targetMonth === "CURRENT", "FORMAT_DATE('%Y-%m-01', CURRENT_DATE)", `"${dataform.projectConfig.vars.targetMonth}"`)})

ドキュメントの通り

最後の行が変数によって挙動が変わるようになっている

  • dataform.json
  "vars": {
    "targetMonth": "CURRENT"
  }

デフォルトはCURRENT

DataformのSQLXは条件などの指定をJavaScriptで書いているのでコンパイル結果のSQLにクオートを出力させるようにしてあげないと実行時に怒られるので注意が必要

まぁ当然ではあるがこの辺はちょっとだるい

  • API実行時パラメータ(compile時)
  "gitCommitish": "${GIT_COMMITISH}",
  "codeCompilationConfig": {
    "vars": {
      "targetMonth": "2023-08-01"
    }
  }

この手順でAPIから実行したら指定したパラメータでSQLを実行できた

これは前々からやりたかったやつ!!!

内部の仕組みを想像してみると面白い

以前は旧版でこのようなことをやりたくてIssue起票したが「やりません」とバッサリ切られてしまっていた

たしかに実行だけのエンドポイントでは仕組み的に変数を渡すことは厳しそう

しかし、compilationResultとInvocationと2つに分けて実行すれば可能

コンパイル時に変数渡せれば生成するSQLも可変にできるよねという話と解釈した

2回APIたたかないといけないのが手間ではあるが納得行く感じ

変数の他にもオーバーライドの必要性があるデータセットやテーブルなども含めコントロール可能(コンパイルのオーバーライドのドキュメントに詳しく載っている)

まとめ

  • GoogleCloudのDataformでREST APIを使ってSQLのコンパイルと実行をしてみた
  • コンパイル~実行完了までの流れをポーリングして確認できるようにした
  • SQLクエリ中に動的にパラメータを与えられるようにした
    • よくあるやりたいパターンだと思うのでできるようになってよかった

まだGoogleCloud版の方は触り始めたばかりなので他にも色々ためしてみたい