2023年2月16日 【Google Cloud】Workflowsをイベントトリガーで動かしてみた(Eventarc API) BigQuery 検索する Popular tags 生成AI(Generative AI) Looker Studio BigQuery AlloyDB Google Workspace 事例紹介 Cloud SQL Category Google Cloud Author Y.Y SHARE Content みなさま、こんにちは。Y.Yです。 今回はGoogle CloudのWorkflowsに関する記事です。 Eventarc APIを用いたWorkflowsのイベントトリガー機能を試してみたいと思います! Workflowsを使用している方、イベントトリガー機能の使用を考えている方の参考になれば幸いです。 ※Workflowsの基本的な内容や処理の紹介は、 ・【Google Cloud】Workflows使ってみた~Part1~ ・【Google Cloud】Workflows使ってみた~Part2~ を是非ご覧ください! ・Cloud Storage オブジェクトのファイナライズトリガー ・BigQueryのジョブ完了トリガー ・まとめ Cloud Storage オブジェクトのファイナライズトリガー まずは、Cloud Storageのバケットへのオブジェクトのファイナライズ(作成/上書き)をトリガーに起動する処理を試してみます! ファイルがアップロードされた際に、アップロードファイル内のデータをBigQueryに取り込む処理を作ります。 設定 ワークフローの作成画面でワークフロー名などの必要な情報を入力し、「新しいトリガーの追加」でEventarcを選択します。 Eventarc APIがまだ有効になっていない場合は下の画面が表示されます。 APIを有効にして設定に進みます。 ①トリガー名を入力し、トリガーのタイプは「自社」、イベントプロバイダは「Cloud Storage」を指定します。 ②イベントには、「google.cloud.storage.object.v1.finalized」を指定し、トリガー対象のバケットを指定します。 ③設定が完了したら、「トリガーを保存」をクリックします。 ※ファイナライズ以外にも様々なイベントをトリガーに指定できます。 ワークフローのコードには以下の内容を入力します。 main: params: [event] steps: - イベント発生ログ: call: sys.log args: text: '${"ファイルがアップロードされました。ファイル名:" + event.data.name}' severity: INFO - ロード処理: try: steps: - 処理開始ログ: call: sys.log args: text: "ロード処理を実行します。データセット:test_workflow, テーブル:test_tbl" severity: INFO - ロード: call: googleapis.bigquery.v2.jobs.insert args: projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")} body: configuration: load: sourceUris: '${"gs://" + event.bucket + "/" + event.data.name}' fieldDelimiter: \t destinationTable: projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")} datasetId: test_workflow tableId: test_tbl createDisposition: CREATE_NEVER writeDisposition: WRITE_APPEND skipLeadingRows: 1 except: as: e steps: - エラーログ: call: sys.log args: text: "テーブルへのロード処理でエラーが発生しました。" severity: ERROR - エラー内容ログ出力: call: sys.log args: text: ${e} severity: ERROR - raiseError: raise: "BigQuery Job Error" ワークフローのランタイム引数(上のコードのevent)には、下記のデータがイベント情報として渡されます。 (バケット名は [バケット名] に変換しています) { "bucket": "[バケット名]", "data": { "bucket": "[バケット名]", "contentType": "text/plain", "crc32c": "dPjWtg==", "etag": "CLXmv/Dx+vsCEAE=", "generation": "1671082084922165", "id": "[バケット名]/test_tbl.txt/1671082084922165", "kind": "storage#object", "md5Hash": "3mEijkySugTVJ2XJkqBxDw==", "mediaLink": "https://storage.googleapis.com/download/storage/v1/b/[バケット名]/o/test_tbl.txt?generation=1671082084922165&alt=media", "metageneration": "1", "name": "test_tbl.txt", "selfLink": "https://www.googleapis.com/storage/v1/b/[バケット名]/o/test_tbl.txt", "size": "73", "storageClass": "STANDARD", "timeCreated": "2022-12-15T05:28:04.930Z", "timeStorageClassUpdated": "2022-12-15T05:28:04.930Z", "updated": "2022-12-15T05:28:04.930Z" }, "datacontenttype": "application/json", "id": "6603729032775695", "source": "//storage.googleapis.com/projects/_/buckets/[バケット名]", "specversion": "1.0", "subject": "objects/test_tbl.txt", "time": "2022-12-15T05:28:04.930634Z", "type": "google.cloud.storage.object.v1.finalized" } 動作検証 ワークフローを作成し、実際に動かしてみます。 取り込みファイルと取込先のテーブルは以下です。 【取り込みファイル】 【取り込み先テーブル】 バケットにファイルをアップロードします。 作成したワークフローを確認してみると、正常に起動されていることが確認できました。 【ログ】 【取り込み先テーブル】 BigQueryのジョブ完了トリガー 次に、BigQueryのジョブ完了をトリガーに起動する処理を作成してみます。 設定・作成 まず、以下のコードのワークフローを作成します。 (今回は先にワークフローを作り、コマンドでトリガーを作ります) main: params: [event] steps: - イベント情報を変数に設定: assign: - query: ${event.data.protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.query.query} #実行したクエリ - jobId: ${event.data.protoPayload.serviceData.jobCompletedEvent.job.jobName.jobId} #ジョブID - jobLocation: ${event.data.protoPayload.serviceData.jobCompletedEvent.job.jobName.location} #ジョブロケーション - user: ${event.data.protoPayload.authenticationInfo.principalEmail} #ジョブ実行ユーザ - createTime: ${event.data.protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.createTime} #ジョブ作成日時 - startTime: ${event.data.protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.startTime} #ジョブ開始日時 - endTime: ${event.data.protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.endTime} #ジョブ終了日時 - state: ${event.data.protoPayload.serviceData.jobCompletedEvent.job.jobStatus.state} #ジョブ実行状態 - errorCode: ${default(map.get(event.data.protoPayload.serviceData.jobCompletedEvent.job.jobStatus.error,"code"), "-")} #エラーコード(デフォルト値:-) - errorMessage: ${default(map.get(event.data.protoPayload.serviceData.jobCompletedEvent.job.jobStatus.error,"message"), "-")} #エラーメッセージ(デフォルト値:-) - ジョブ情報ログ: call: sys.log args: text: '${"ジョブID:" + jobId + " ロケーション:" + jobLocation + "実行状態:" + state + " 開始日時:" + startTime + " 終了日時:" + endTime}' severity: INFO - クエリ: call: sys.log args: text: '${"実行されたクエリ:" + query}' severity: INFO - エラー確認: switch: - condition: ${errorMessage == "-"} next: end - condition: true next: エラーログ - エラーログ: call: sys.log args: text: '${"ジョブでエラーが発生しました。コード:" + errorCode + " メッセージ:" + errorMessage}' severity: ERROR 一部のジョブ情報をログに出力し、エラーが発生していたらエラー情報をログに出力する処理です。 実行時にランタイム引数に渡されるイベント情報は数が多いため省略します。 今回は一部の情報のみ変数に格納しました。 ワークフローを作ったら、トリガーを作成します。 Eventarcトリガーで使用するサービスアカウントに、IAMの「Eventarcイベント受信者」ロールを付与します。 以下のコマンドをCloud Shellターミナルで実行し、ワークフローにトリガーを作成します。 gcloud eventarc triggers create {Eventarcトリガー名] \ --location={Eventarcトリガーのロケーション} \ --destination-workflow={トリガー対象ワークフロー名} \ --destination-workflow-location={トリガー対象ワークフローのロケーション} \ --event-filters="type=google.cloud.audit.log.v1.written" \ --event-filters="serviceName=bigquery.googleapis.com" \ --event-filters="methodName=jobservice.jobcompleted" \ --service-account={Eventarcトリガーで使用するサービスアカウント} ワークフローを確認するとトリガーが作成されていることが分かります。 動作確認 トリガーが作成できたため、実際にBigQueryでジョブを実行し、ワークフローを起動させてみます。 まずは、BigQuery で正常なクエリを実行します。 SQL実行後にワークフローを確認すると、正常に起動されており、以下のログが出力されていることを確認できました。 次に、エラー(データ型不正)の起きるクエリを実行してみます。 ワークフローを確認すると、今回も正常に起動されており、以下のログが出力されていました。 まとめ 今回は、Cloud StorageへのファイルのファイナライズとBigQueryへのジョブ実行をトリガーに起動する処理を試してみました。 Cloud Functionsのように、イベント発生時に自動で起動してくれるのはかなり便利ですね。 みなさんも是非、Workflowsのイベントトリガー機能を試してみてくださいね! ご覧いただきありがとうございました。 当社、システムサポートは、Google Cloudの導入・移行・運営支援を行っています。 お問い合わせは以下よりお願いいたします。 Google Cloud導入についてのお問い合わせはこちら 頂きましたご意見につきましては、今後のより良い商品開発・サービス改善に活かしていきたいと考えております。 よく分かった もっと知りたい 参考になった 使ってみたい よく分からなかった Author Y.Y 株式会社システムサポート名古屋支社BI事業部所属。 2020年新卒入社で、2021年の末頃からGoogle Cloudを使い始めました。趣味はジャグリング。 BigQuery 2023年2月16日 【Google Cloud】Workflowsをイベントトリガーで動かしてみた(Eventarc API) Category Google Cloud 前の記事を読む 【Google Cloud】集計方法の異なる累計をひとつのグラフで表示する【Looker】 次の記事を読む DroidKaigi 2022をモバイルアプリチームで見てきた! Recommendation オススメ記事 2023年9月5日 Google Cloud 【Google Cloud】Looker Studio × Looker Studio Pro × Looker を徹底比較!機能・選び方を解説 2023年8月24日 Google Cloud 【Google Cloud】Migrate for Anthos and GKEでVMを移行してみた(1:概要編) 2022年10月10日 Google Cloud 【Google Cloud】AlloyDB と Cloud SQL を徹底比較してみた!!(第1回:AlloyDB の概要、性能検証編) BigQuery ML ワークショップ開催のお知らせ 生成AI導入支援パッケージ Discovery AI導入支援パッケージ Google Cloud ホワイトペーパー 新着記事 2024年9月2日 4koma 【4コマ漫画】SEひつじは定時退社の夢を見る ~ダウングレード~ 2024年8月29日 Google Cloud 【Google Cloud】Cloud NGFW Standard を試してみた 2024年8月29日 Google Cloud 【Google Cloud】Cloud Storage FUSE Read Cache を試してみた HOME Google Cloud 【Google Cloud】Workflowsをイベントトリガーで動かしてみた(Eventarc API) ご意見・ご相談・料金のお見積もりなど、お気軽にお問い合わせください。 お問い合わせはこちら HOME Categories お知らせ イベント・セミナー Google Cloud Google Workspace モバイル インフラ 技術開発 ブログ 4koma Tags 生成AI(Generative AI) Looker Studio BigQuery AlloyDB Google Workspace 事例紹介 Cloud SQL STSエンジニアリングマガジン 「サイタル」