2023年1月6日 【Google Cloud】Workflows使ってみた~Part1~ BigQuery 検索する Popular tags 生成AI(Generative AI) Looker Studio BigQuery AlloyDB Google Workspace 事例紹介 Cloud SQL Category Google Cloud Author Y.Y SHARE Content みなさま、こんにちは。Y.Yです。 最近、業務で Google Cloud のWorkflowsを使用する機会があり、Workflowsでの処理の書き方についていろいろ調べ、試してみました。 そこで、実際に使用した構文や処理をいくつか記事にまとめてみようと思います! これからWorkflowsを使ってみたい人や、使っているけど書き方がよくわからない人などのお役に立てれば幸いです。 今回のテーマは、Part1とPart2の全2回です。 このPart 1 では、Workflowsに関する基本的な内容やBigQuery APIへの処理の呼び出しなどを紹介します! ・Workflowsとは ・Workflowsの構文 ・BigQueryとの連携 ・エラーハンドリング ・まとめ Workflowsとは Workflowsは、Google Cloud で提供されるワークフローツールです。 YAML または JSONで定義を記述することで、指定した順番にGoogle APIへの呼び出しやhttpリクエストを送信することができます。 例えば、Cloud FunctionsやDataflowなどの処理を呼び出したり、BigQueryへのクエリの実行など、様々な処理を実行できます。また、サーバレスでフルマネージドのサービスであり、安価で、無料枠もあるといった特徴もあります。 料金についての詳細は、こちらからご確認ください。 他のサービスと組み合わせれば、下記のようなことも実現できます。 ・Cloud Schedulerと組み合わせて、ワークフローの実行をスケジューリングする ・Eventarc APIを有効にして、イベントトリガーでワークフローを実行させる ・Cloud Loggingへエラーログを出力し、Cloud Monitoringでアラートを設定してエラー時に通知させる Workflowsの構文 次に、Workflowsの構文に関する基礎的な内容をご紹介します。 まず、Workflowsのソースコードには、以下の特徴があります。 ・YAML、またはJSON形式で記述されている(本記事ではYAML形式のコードを記載します) ・1個のメインワークフローと0個以上のサブワークフローで構成されている ・全てのワークフロー(メイン/サブ)は1個以上のステップが含まれている 【サンプル】 # コメントは「#」の後に記載します。 #メインワークフロー main: #ランタイム引数(実行時にワークフローに渡す引数)を変数inputに格納 params: [input] #ステップ steps: - STEP1: # 変数に値を割り当てる assign: - 変数1: 値 - 変数2: 値 ... - STEP2: # サブワークフローを呼び出す call: Subworkflow1 args: 引数1: サブワークフローに渡す値 引数2: サブワークフローに渡す値 ... result: 返り値を格納する変数 - STEP3: # その他の処理に進む ... #サブワークフロー Subworkflow1: params: [パラメータ1,パラメータ2...] steps: - step_1: ... メインワークフローは、「main」ブロックに記述される、その名の通りメインの処理です。 ワークフローが実行されるとメインワークフローの中の処理が実行されます。 サブワークフローは、メインワークフローや他のサブワークフローから呼び出される処理です。 ステップは、ワークフロー内で実行する処理(APIへのリクエストやhttpリクエストの送信など)の単位です。 ワークフローでは、ステップは上から順に実行されていきますが、特定のステップにジャンプさせたり(next) 、条件分岐(switch)やループ処理(for)をさせる構文もあります。 また、本記事の最後でも紹介しますが、エラー発生時の処理を定義する構文もあります。 BigQueryとの連携 次に、BigQueryへのデータの取り込みやクエリの実行を試してみます。 まず最初に、プロジェクトIDを変数に入れておきます。 main: steps: - 設定値割当: assign: - projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")} (1) ロード処理 Cloud Storageに格納した以下のタブ区切りファイルのデータをBigQueryのテーブルに取り込む、ロード処理を実行します。 下記が実行するコードです。 - ロード処理: call: googleapis.bigquery.v2.jobs.insert args: projectId: ${projectId} body: configuration: load: sourceUris: "gs://{バケット名}/input-to-bq.txt" fieldDelimiter: \t destinationTable: projectId: ${projectId} datasetId: test_workflow tableId: test_tbl schema: fields: - name: name type: STRING - name: age type: INTEGER - name: mail type: STRING createDisposition: CREATE_IF_NEEDED writeDisposition: WRITE_TRUNCATE skipLeadingRows: 1 ・projectId:課金対象のプロジェクト(ここでは「設定値割当」ステップで設定した変数を指定しています。) ・sourceUris:取込ファイルのURI ・fieldDelimiter:取り込みファイルの区切り文字 ・destinationTable:宛先テーブル ・skipLeadingRows:取り込みファイルのスキップする先頭行数 ・createDisposition:テーブルが存在しない際に、新規作成するか否か(CREATE_IF_NEEDED / CREATE_NEVER) ※テーブルを新規作成させる場合は、schemaフィールドの指定が必要です。 ・writeDisposition:データの取り込み方式 (「WRITE_TRUNCATE」は宛先テーブルへの洗い替え処理を意味します。デフォルト値は「WRITE_APPEND」で、追加処理です。) ※本記事で使用していないフィールドについては、公式ドキュメントでご確認ください。 (以降の処理も同様) 処理を実行してみると、テーブルが作成され、ファイルのデータが登録されました。 (2) SQLクエリの実行 BigQueryにSQL文を実行します。今回は、(1)でデータを投入したテーブルに、SELECT文を実行します。 - クエリ実行: call: googleapis.bigquery.v2.jobs.insert args: projectId: ${projectId} body: configuration: query: query: select * from test_workflow.test_tbl useLegacySql: false result: insertResult ・projectID:課金対象のプロジェクト ・query:実行するSQL ・useLegacySql:レガシーSQLを使用するか否か(標準SQLを使用する場合はfalseにします) また、上記では① googleapis.bigquery.v2.jobs.insertを使用しましたが、 ②googleapis.bigquery.v2.jobs.queryでもクエリは実行できます。 (argsの設定フィールドに差異があるため注意が必要です) 2つの違いの1つとして、①はクエリが完了/失敗/タイムアウトするまで待機するのに対し、②では待機する時間を「timeoutMS」フィールドで指定します(デフォルトは10000ミリ秒です)。 (3) クエリ結果の取得 (2)で実行したクエリの結果を取得します。さらに、その中からSELECT 文の取得データをログに出力してみます。 ※(2)の戻り値insertResult には、SELECT文の取得データは含まれません。 - クエリ結果を取得: call: googleapis.bigquery.v2.jobs.getQueryResults args: projectId: ${projectId} jobId: ${insertResult.jobReference.jobId} location: ${insertResult.jobReference.location} result: queryResult jobId と location は、(2)の返り値「insertResult」から取得しています。 上記のコードでは、変数queryResultにクエリ結果(ジョブ情報や、SELECT 文実行時には取得したデータなど)を格納しています。 SELECT文の取得データは、queryResultの中のrowsフィールドに格納されます。 上記から特定のデータを取得するには、下記のように指定します。 queryResult.rows["行番号"].f["列番号"].v (2)で実行したクエリでは、以下の赤字のデータが取得されるため、 下記のようにコードを書くと、 - ログ出力1: call: sys.log args: text: ${queryResult.rows} severity: INFO - ログ出力2: call: sys.log args: text: ${queryResult.rows[0].f[0].v + "さんの年齢は" + queryResult.rows[0].f[1].v + "才です。"} severity: INFO 以下のようにログに出力されます。 エラーハンドリング 最後に、エラー発生時のリトライ処理・エラーキャッチ処理を試してみます。 下記のコードは、ランタイム引数(input)からSQLを受け取り、実行する処理です。 main: params: [input] steps: - クエリ実行: try: steps: - クエリ実行ログ: call: sys.log args: text: "クエリを実行します。" severity: INFO - executeQuery: call: googleapis.bigquery.v2.jobs.insert args: projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")} body: configuration: query: query: ${input.testQuery} useLegacySql: false retry: predicate: ${retry_decision} max_retries: 5 backoff: initial_delay: 30 max_delay: 300 multiplier: 2 except: as: e steps: - エラーログ: call: sys.log args: text: クエリの実行でエラーが発生しました。 severity: ERROR - エラー内容ログ出力: call: sys.log args: text: ${e} severity: ERROR - raiseError: raise: "Query execution error" retry_decision: params: [e] steps: - do_repeat: return: true エラー発生時にはretryブロックで指定した設定に基づき、tryブロック内の処理を再実行します。 リトライブロック内の各パラメーターは、 ・max_retries:最大リトライ回数 ・initial_delay:エラー発生から最初のリトライまでの待機時間 ・multiplier:前の待機時間と次の待機時間の比 ・max_delay:待機時間の上限 を指定します。 上記のコードでは、初回が30秒待機、最長300秒待機、待機時間は2倍に増えていくように設定しています。 そのため、エラーが発生すると ・30秒後に1回目のリトライ ・1回目の60秒後に2回目のリトライ ・2回目の120秒後に3回目のリトライ ・3回目の240秒後に4回目のリトライ ・4回目の300秒後に5回目のリトライ を実行します。 (エラーが発生せず、tryブロック内の処理が正常に実行されたら、それ以降はリトライされません。) リトライ条件は、サブワークフロー「retry_decision」で設定しています。 リトライさせる場合はtrueを返し、リトライさせない場合はfalseを返すように条件分岐させます。 上記のコードでは、任意のエラーでtrueを返す(リトライさせる)ようにしていますが、下記のようにエラー内容によって処理を分けることもできます。 retry_decision: params: [e] steps: - what_to_repeat: switch: # HTTPステータスコードが500の場合はリトライする - condition: ${e.code == 500} return: true - otherwise: return: false exceptブロックにはエラーをキャッチした際の処理を定義します。 「as: e」は、エラー内容を格納する変数を定義しています。 今回のコードでは、エラーが発生した旨を知らせる文言と変数e(エラー内容)をログに出力し、「raise」コマンドで実行中のワークフローをエラーで終了させます。 実際に、不正なクエリをランタイム引数に設定して、ワークフローを実行すると以下のようなログが出力されます。 【実行】 (ランタイム引数)”testQuery”: “SELECT not_exist_column FROM test_workflow.test_tbl;” 【ログ】 5回リトライされ、最後のリトライ後にexceptブロック内の処理が実行されていることが分かります。 また、9行目のエラー内容ログを開くと、下記のようにエラー理由が記されています。 "status":{"errorResult":{"location":"query","message":"Unrecognized name: not_exist_column at [1:8]","reason":"invalidQuery"}, "errors":[{"location":"query","message":"Unrecognized name: not_exist_column at [1:8]","reason":"invalidQuery"}],"state":"DONE"} まとめ 今回は、Workflowsに関する基本的な内容やBigQueryとの連携、エラーハンドリングについて書きました。 『Workflows使ってみた~Part2~』では、以下の内容を紹介いたします。 ・Cloud Storage上のファイルへの処理(コピー・削除・中身の取得) ・ワークフローから別のワークフローを呼び出す ・Cloud Functionsの処理を起動する 次回もぜひご覧ください! 当社、システムサポートは、Google Cloudの導入・移行・運営支援を行っています。 Google Cloud に関してのご用命の際は「クラウド導入支援サービス for Google Cloud」へご連絡ください。 頂きましたご意見につきましては、今後のより良い商品開発・サービス改善に活かしていきたいと考えております。 よく分かった もっと知りたい 参考になった 使ってみたい よく分からなかった Author Y.Y 株式会社システムサポート名古屋支社BI事業部所属。 2020年新卒入社で、2021年の末頃からGoogle Cloudを使い始めました。趣味はジャグリング。 BigQuery 2023年1月6日 【Google Cloud】Workflows使ってみた~Part1~ Category Google Cloud 前の記事を読む 【Google Cloud】Vertex AI WorkbenchからBigQueryのデータを取得してみた 次の記事を読む 【Google Cloud】Workflows使ってみた~Part2~ Recommendation オススメ記事 2023年9月5日 Google Cloud 【Google Cloud】Looker Studio × Looker Studio Pro × Looker を徹底比較!機能・選び方を解説 2023年8月24日 Google Cloud 【Google Cloud】Migrate for Anthos and GKEでVMを移行してみた(1:概要編) 2022年10月10日 Google Cloud 【Google Cloud】AlloyDB と Cloud SQL を徹底比較してみた!!(第1回:AlloyDB の概要、性能検証編) BigQuery ML ワークショップ開催のお知らせ 生成AI導入支援パッケージ Discovery AI導入支援パッケージ Google Cloud ホワイトペーパー 新着記事 2024年9月2日 4koma 【4コマ漫画】SEひつじは定時退社の夢を見る ~ダウングレード~ 2024年8月29日 Google Cloud 【Google Cloud】Cloud NGFW Standard を試してみた 2024年8月29日 Google Cloud 【Google Cloud】Cloud Storage FUSE Read Cache を試してみた HOME Google Cloud 【Google Cloud】Workflows使ってみた~Part1~ ご意見・ご相談・料金のお見積もりなど、お気軽にお問い合わせください。 お問い合わせはこちら HOME Categories お知らせ イベント・セミナー Google Cloud Google Workspace モバイル インフラ 技術開発 ブログ 4koma Tags 生成AI(Generative AI) Looker Studio BigQuery AlloyDB Google Workspace 事例紹介 Cloud SQL STSエンジニアリングマガジン 「サイタル」