2023年1月6日
【Google Cloud】Workflows使ってみた~Part1~
-
- Category Google Cloud
最近、業務で Google Cloud のWorkflowsを使用する機会があり、Workflowsでの処理の書き方についていろいろ調べ、試してみました。
そこで、実際に使用した構文や処理をいくつか記事にまとめてみようと思います!
これからWorkflowsを使ってみたい人や、使っているけど書き方がよくわからない人などのお役に立てれば幸いです。
今回のテーマは、Part1とPart2の全2回です。
このPart 1 では、Workflowsに関する基本的な内容やBigQuery APIへの処理の呼び出しなどを紹介します!
Workflowsとは
YAML または JSONで定義を記述することで、指定した順番にGoogle APIへの呼び出しやhttpリクエストを送信することができます。
例えば、Cloud FunctionsやDataflowなどの処理を呼び出したり、BigQueryへのクエリの実行など、様々な処理を実行できます。また、サーバレスでフルマネージドのサービスであり、安価で、無料枠もあるといった特徴もあります。
料金についての詳細は、こちらからご確認ください。
他のサービスと組み合わせれば、下記のようなことも実現できます。
・Cloud Schedulerと組み合わせて、ワークフローの実行をスケジューリングする
・Eventarc APIを有効にして、イベントトリガーでワークフローを実行させる
・Cloud Loggingへエラーログを出力し、Cloud Monitoringでアラートを設定してエラー時に通知させる
Workflowsの構文
まず、Workflowsのソースコードには、以下の特徴があります。
・YAML、またはJSON形式で記述されている(本記事ではYAML形式のコードを記載します)
・1個のメインワークフローと0個以上のサブワークフローで構成されている
・全てのワークフロー(メイン/サブ)は1個以上のステップが含まれている
【サンプル】
# コメントは「#」の後に記載します。 #メインワークフロー main: #ランタイム引数(実行時にワークフローに渡す引数)を変数inputに格納 params: [input] #ステップ steps: - STEP1: # 変数に値を割り当てる assign: - 変数1: 値 - 変数2: 値 ... - STEP2: # サブワークフローを呼び出す call: Subworkflow1 args: 引数1: サブワークフローに渡す値 引数2: サブワークフローに渡す値 ... result: 返り値を格納する変数 - STEP3: # その他の処理に進む ... #サブワークフロー Subworkflow1: params: [パラメータ1,パラメータ2...] steps: - step_1: ...
メインワークフローは、「main」ブロックに記述される、その名の通りメインの処理です。
ワークフローが実行されるとメインワークフローの中の処理が実行されます。
サブワークフローは、メインワークフローや他のサブワークフローから呼び出される処理です。
ステップは、ワークフロー内で実行する処理(APIへのリクエストやhttpリクエストの送信など)の単位です。
ワークフローでは、ステップは上から順に実行されていきますが、特定のステップにジャンプさせたり(next)
、条件分岐(switch)やループ処理(for)をさせる構文もあります。
また、本記事の最後でも紹介しますが、エラー発生時の処理を定義する構文もあります。
BigQueryとの連携
まず最初に、プロジェクトIDを変数に入れておきます。
main: steps: - 設定値割当: assign: - projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
(1) ロード処理
Cloud Storageに格納した以下のタブ区切りファイルのデータをBigQueryのテーブルに取り込む、ロード処理を実行します。

下記が実行するコードです。
- ロード処理: call: googleapis.bigquery.v2.jobs.insert args: projectId: ${projectId} body: configuration: load: sourceUris: "gs://{バケット名}/input-to-bq.txt" fieldDelimiter: \t destinationTable: projectId: ${projectId} datasetId: test_workflow tableId: test_tbl schema: fields: - name: name type: STRING - name: age type: INTEGER - name: mail type: STRING createDisposition: CREATE_IF_NEEDED writeDisposition: WRITE_TRUNCATE skipLeadingRows: 1
・projectId:課金対象のプロジェクト(ここでは「設定値割当」ステップで設定した変数を指定しています。)
・sourceUris:取込ファイルのURI
・fieldDelimiter:取り込みファイルの区切り文字
・destinationTable:宛先テーブル
・skipLeadingRows:取り込みファイルのスキップする先頭行数
・createDisposition:テーブルが存在しない際に、新規作成するか否か(CREATE_IF_NEEDED / CREATE_NEVER)
※テーブルを新規作成させる場合は、schemaフィールドの指定が必要です。
・writeDisposition:データの取り込み方式
(「WRITE_TRUNCATE」は宛先テーブルへの洗い替え処理を意味します。デフォルト値は「WRITE_APPEND」で、追加処理です。)
※本記事で使用していないフィールドについては、公式ドキュメントでご確認ください。
(以降の処理も同様)
処理を実行してみると、テーブルが作成され、ファイルのデータが登録されました。

(2) SQLクエリの実行
BigQueryにSQL文を実行します。今回は、(1)でデータを投入したテーブルに、SELECT文を実行します。
- クエリ実行: call: googleapis.bigquery.v2.jobs.insert args: projectId: ${projectId} body: configuration: query: query: select * from test_workflow.test_tbl useLegacySql: false result: insertResult
・projectID:課金対象のプロジェクト
・query:実行するSQL
・useLegacySql:レガシーSQLを使用するか否か(標準SQLを使用する場合はfalseにします)
また、上記では①
googleapis.bigquery.v2.jobs.insertを使用しましたが、
②googleapis.bigquery.v2.jobs.queryでもクエリは実行できます。
(argsの設定フィールドに差異があるため注意が必要です)
2つの違いの1つとして、①はクエリが完了/失敗/タイムアウトするまで待機するのに対し、②では待機する時間を「timeoutMS」フィールドで指定します(デフォルトは10000ミリ秒です)。
(3) クエリ結果の取得
(2)で実行したクエリの結果を取得します。さらに、その中からSELECT 文の取得データをログに出力してみます。
※(2)の戻り値insertResult には、SELECT文の取得データは含まれません。
- クエリ結果を取得: call: googleapis.bigquery.v2.jobs.getQueryResults args: projectId: ${projectId} jobId: ${insertResult.jobReference.jobId} location: ${insertResult.jobReference.location} result: queryResult
jobId と location は、(2)の返り値「insertResult」から取得しています。
上記のコードでは、変数queryResultにクエリ結果(ジョブ情報や、SELECT 文実行時には取得したデータなど)を格納しています。
SELECT文の取得データは、queryResultの中のrowsフィールドに格納されます。
上記から特定のデータを取得するには、下記のように指定します。
queryResult.rows["行番号"].f["列番号"].v
(2)で実行したクエリでは、以下の赤字のデータが取得されるため、

下記のようにコードを書くと、
- ログ出力1: call: sys.log args: text: ${queryResult.rows} severity: INFO - ログ出力2: call: sys.log args: text: ${queryResult.rows[0].f[0].v + "さんの年齢は" + queryResult.rows[0].f[1].v + "才です。"} severity: INFO
以下のようにログに出力されます。

エラーハンドリング
下記のコードは、ランタイム引数(input)からSQLを受け取り、実行する処理です。
main: params: [input] steps: - クエリ実行: try: steps: - クエリ実行ログ: call: sys.log args: text: "クエリを実行します。" severity: INFO - executeQuery: call: googleapis.bigquery.v2.jobs.insert args: projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")} body: configuration: query: query: ${input.testQuery} useLegacySql: false retry: predicate: ${retry_decision} max_retries: 5 backoff: initial_delay: 30 max_delay: 300 multiplier: 2 except: as: e steps: - エラーログ: call: sys.log args: text: クエリの実行でエラーが発生しました。 severity: ERROR - エラー内容ログ出力: call: sys.log args: text: ${e} severity: ERROR - raiseError: raise: "Query execution error" retry_decision: params: [e] steps: - do_repeat: return: true
エラー発生時にはretryブロックで指定した設定に基づき、tryブロック内の処理を再実行します。
リトライブロック内の各パラメーターは、
・max_retries:最大リトライ回数
・initial_delay:エラー発生から最初のリトライまでの待機時間
・multiplier:前の待機時間と次の待機時間の比
・max_delay:待機時間の上限
を指定します。
上記のコードでは、初回が30秒待機、最長300秒待機、待機時間は2倍に増えていくように設定しています。
そのため、エラーが発生すると
・30秒後に1回目のリトライ
・1回目の60秒後に2回目のリトライ
・2回目の120秒後に3回目のリトライ
・3回目の240秒後に4回目のリトライ
・4回目の300秒後に5回目のリトライ
を実行します。
(エラーが発生せず、tryブロック内の処理が正常に実行されたら、それ以降はリトライされません。)
リトライ条件は、サブワークフロー「retry_decision」で設定しています。
リトライさせる場合はtrueを返し、リトライさせない場合はfalseを返すように条件分岐させます。
上記のコードでは、任意のエラーでtrueを返す(リトライさせる)ようにしていますが、下記のようにエラー内容によって処理を分けることもできます。
retry_decision: params: [e] steps: - what_to_repeat: switch: # HTTPステータスコードが500の場合はリトライする - condition: ${e.code == 500} return: true - otherwise: return: false
exceptブロックにはエラーをキャッチした際の処理を定義します。
「as: e」は、エラー内容を格納する変数を定義しています。
今回のコードでは、エラーが発生した旨を知らせる文言と変数e(エラー内容)をログに出力し、「raise」コマンドで実行中のワークフローをエラーで終了させます。
実際に、不正なクエリをランタイム引数に設定して、ワークフローを実行すると以下のようなログが出力されます。
【実行】
(ランタイム引数)”testQuery”: “SELECT not_exist_column FROM test_workflow.test_tbl;”

【ログ】

5回リトライされ、最後のリトライ後にexceptブロック内の処理が実行されていることが分かります。
また、9行目のエラー内容ログを開くと、下記のようにエラー理由が記されています。
"status":{"errorResult":{"location":"query","message":"Unrecognized name: not_exist_column at [1:8]","reason":"invalidQuery"}, "errors":[{"location":"query","message":"Unrecognized name: not_exist_column at [1:8]","reason":"invalidQuery"}],"state":"DONE"}
まとめ
『Workflows使ってみた~Part2~』では、以下の内容を紹介いたします。
・Cloud Storage上のファイルへの処理(コピー・削除・中身の取得)
・ワークフローから別のワークフローを呼び出す
・Cloud Functionsの処理を起動する
次回もぜひご覧ください!
当社、システムサポートは、Google Cloudの導入・移行・運営支援を行っています。
Google Cloud に関してのご用命の際は「クラウド導入支援サービス for Google Cloud」へご連絡ください。
頂きましたご意見につきましては、
今後のより良い商品開発・サービス改善に活かしていきたいと考えております。