2024年7月18日

【Goolge Cloud】Application IntegrationでBigQueryにインサート


Content
Google Cloud研究開発チームのわいです!
 

データ処理フローを作りたいけどコードを書くのは難しい…と感じています。
そこで、Google Cloudに新しく登場した「Application Integration」を使ってみました。

Application Integrationとは

2023年7月にリリースされた新しいサービスで、 Google Cloudサービスやサードパーティー製SaaSとのデータ連携やデータ処理をローコードで実装できます。
参照:Application Integration の概要  |  アプリケーションの統合  |  Google Cloud
 

料金体系は、統合の実行数や処理されたデータ量による従量課金制です。
無料枠もあるので、大量のデータを扱わなければ安価に利用できます。
参照:Application Integration の料金  |  Google Cloud
 

本記事では、Application Integrationで簡単な統合フローを作成してみます。
私自身初心者で苦戦した点も多かったので、詳しめに説明できればと思います。

やりたいこと

  • 店舗別売上情報を記載したCSVを用意する
  • 上記CSVをCloud Storageの特定バケットにアップロードし、BigQueryの売上テーブルにデータをインサートする
  • 処理中にエラーが発生したら、エラーが発生したプロジェクト、実行IDなどをメールで通知する

 

今回は以下のようなケースを想定しています。

  • 追加でCSVをCloud Storageにアップロードする場合、CSVを別名にする(既存のCSVを上書きしない)
  • アップロードするCSVの項目は固定とする

 

完成イメージはこちらです。最終的にこの形になるよう、1つずつご説明します。

事前準備

今回の実装で利用するサービスはこちらです。

  • Cloud Storage
  • BigQuery
  • Pub/Sub
  • Application Integration

では、Application Integrationで統合フローを作成するための事前準備をしていきます。

※本記事は、Google CloudプロジェクトやGoogleアカウントがあることを前提として進めていきます。

 

Cloud Storage

ファイルをアップロードするためのバケットを作成します。

バケット名は「sample_datasource」としました。

参照:バケットを作成する  |  Cloud Storage  |  Google Cloud

 

 

CSVファイル

Cloud StorageにアップロードするサンプルデータのCSVファイルを作成します。

今回は、店舗別売上情報のサンプルデータを「テストデータ02.csv」という名前で作成しました。

 

 

BigQuery

データをインサートするためのデータセットとテーブルを作成します。

データセット名は「sample_dataset」、テーブル名は「sample_table」としました。

CSVファイルの項目だけではレコードがいつインサートされたかわからないので、登録日時の列を追加し、デフォルト値を「Current_DateTime(‘Asia/Tokyo’)」にしました。

参照:テーブルの作成と使用  |  BigQuery  |  Google Cloud

 

 

Pub/Sub

Cloud Storageの変更を通知するためのPub/Subトピックを作成します。

トピックIDを入力し、ほかの項目はデフォルトままで大丈夫です。

サブスクリプションはトピックを作成すると自動で作成されるため、新規作成は不要です。

 

このPub/SubトピックでCloud Storageの変更を受け取れるよう、通知を設定します。

Cloud Shellで以下のコマンドを実行します。
(バケット名、トピック名をご自身の環境に合わせて適宜変更してください。)

gcloud storage buckets notifications create gs://sample_datasource --topic=sample_topic

※このコマンドで指定したバケット内のオブジェクトの変更を検知します。

参照:Cloud Storage の Pub/Sub 通知を構成する  |  Google Cloud

※WARNINGと表示されていますが、問題ありません。
ここでは --topic=sample_topic で既に作成したトピックを指定しているため表示されています。
(存在しないトピック名を指定すると、その名前でトピックが作成されます。)

 

 

サービスアカウント

Application Integrationで利用するサービスアカウントを作成し、適切な権限を付与します。

今回の場合、以下の権限が必要です。

  • Application Integrationの起動元
  • BigQueryジョブユーザー
  • BigQuery読み取りセッションユーザー
  • Pub/Subパブリッシャー
  • ストレージ管理者

 

以上で事前準備は完了です。

Application Integrationで統合フロー作成

では、Application Integrationで統合フローを作っていきます。

 

新しい統合フローの作成

Google CloudコンソールでApplication Integration サービスの画面を開きます。
(日本語では「アプリケーションの統合」と表示されています。)

「CREATE INTEGRATION」を押下すると以下の画面が表示されるので、統合の名前、説明、リージョンを設定します。

 

CSVファイルのアップロードにより統合フローを開始

まず、一連のタスクを開始するためのトリガーを作成します。

 

TRIGGERSのリストからCloud Pub/Subを選択し、配置します。
配置すると、統合フロー内で利用できる変数が自動生成され、画面左側に表示されます。

トリガーもタスクも、配置するとそれぞれで利用できる変数が自動生成されます。

 

 

画面右側には構成パネルが表示されるので、必要な項目を設定していきます。

Pub/Sub topic、Service accountに、事前準備で作成したトピックとサービスアカウントをそれぞれ設定します。

 

処理の実施条件で利用する値を取得

続いて、Data Mappingタスクを利用し、Pub/Subから通知されるCloud Storageのイベントタイプを取得します。

この処理を行う理由は、統合フローの想定外の実行を防ぐためです。
GCSでファイルを削除した場合(OBJECT_DELETE)は後続処理に進まないようにするため、イベントタイプがOBJECT_FINALIZEのときだけ後続処理に進む条件を設定します。

このタスクではイベントタイプの取得だけ行い、後ほどその値を利用して条件を作成します。

参照:Cloud Storage の Pub/Sub 通知  |  Google Cloud

 

TASKSのリストからData Mappingを選択します。

構成パネルのLabelやNotesを分かりやすく修正します。
「OPEN DATA MAPPING EDITOR」ボタンを押下し、マッピングする変数を設定していきます。

 

Input

マッピングしたい変数のInputを設定していきます。

取得したいイベントタイプは、Pub/SubトリガーのアウトプットであるCloudPubSubMessageに含まれています。
この「attributes」フィールド内の「eventType」の値を取得し、Inputとします。

画面左側の変数から、 attributes をInputにドラッグします。
次に、「+」を押下してプルダウンからGET_PROPERTY関数を選択します。

GET_PROPERTY関数の引数を設定します。

「Variable or Value」を押下すると以下のようなパネルが表示されるので、Valueを選択して取得したいプロパティ名eventTypeを入力し、「SAVE」を押下します。

参照:データ マッピング タスク関数  |  アプリケーションの統合  |  Google Cloud

 

Output

次は、取り出したeventTypeの値を入れる変数を作成します。

Outputの「create a new one」を押下し、変数名やデータ型を設定します。
「CREATE」を押下して完了です。

完成するとこのようなイメージです。

 

データをインサートするBigQueryとの接続を作成

次は、BigQueryコネクタを作成していきます。

Application Integrationでの「コネクタ」は、特定のデータソースへの接続を提供するものです。
今回はBigQueryへの接続を提供するBigQueryコネクタを使用します。

対して「接続」は、データソースに実際にアクセスするために必要なインスタンスと考えることができます。
BigQuery接続を作成すると、テーブルへの行のインサート、読み取り、更新、削除ができ、生成された出力を統合フローで使用できます。

参照:コネクタと接続  |  インテグレーション コネクタ  |  Google Cloud

 

TASKSのリストからConnectorsを選択します。

「CONFIGURE CONNECTOR」ボタンを押下し、接続を構成していきます。

Select connectionの接続で、「接続を作成」を押下します。

 

続いて画面右側に表示されるCreate Connectionで、以下の項目(画像赤枠内)を設定していきます。

Connector プルダウンからBigQueryを選択します。
Connetion Name 接続名を入力します。
Description 必要に応じて説明を入力します。
Enable Cloud Logging ログを確認したい場合はオンにします。(デフォルトはオフです。)
Service Account 必要なロールを持つアカウントを選択します。
Project ID 接続したいBigQueryデータセットが存在するプロジェクトIDを入力します。
Dataset ID 接続したいBigQueryデータセット名を入力します。

もし画像のグレー部分のような表示があれば、「GRANT」を押下します。
(サービスアカウントに必要な権限を付与することを許可しています。)

  

ほかの項目はデフォルトのまま画面に従い「次へ」、「CREATE」を押下し、2~3分待つと接続が作成されます。

 

再び「CONFIGURE CONNECTOR」ボタンを押下し、コネクタを構成していきます。

Region BigQuery接続を作成したリージョンを選択します。今回はasia-northeast1です。
接続 先ほど作成した接続をプルダウンから選択します。

Typeではアクションを選択し、「次へ」を押下します。

Set entities/actionsではプルダウンからInsertLoadJobを選択し、「次へ」、「完了」を押下します。

 

コネクタが作成されると、ジョブのパラメータとして利用できる変数(connectorInputPayload)と、実行後にアクションの結果が格納される変数(connectorOutputPayload)が表示されます。

 

アップロードしたCSVファイルのgsutil URIを取得

続いても、Data Mappingタスクを新しく作成します。

ここではCloud StorageにアップロードしたCSVファイルのgsutil URIを取得し、変数に格納する処理を行います。
ここで作成した変数はこの後、SourceURIsパラメータの設定値として利用します。

 

gsutil URIは以下のような形式です。

gs://BUCKET_NAME/OBJECT_NAME

今のところ対象ファイルのgsutil URIをそのまま取得する手段はなさそうなので、eventType同様にCloudPubSubMessageのattributesフィールドのbucketIdとobjectIdを利用して作っていきます。

BUCKET_NAMEOBJECT_NAME部分にCloudPubSubMessageから取得したbucketIdとobjectIdを設定し、「gs://」と「/」を文字列としてつなげてgsutil URIを作成する処理です。

 

Input

以下画像のような状態になれば完了です。

文字列「gs://」と「/」、関数で取得したbucketIdとobjectIdを関数で結合することでgsutil URIを取得しています。

 

Output

Input部分で作ったgsutil URIを格納する変数を設定します。

この変数は、Vriable TypeでInput to Integrationを選択します。

Input to Integrationに設定することで、この変数が実行されたトリガーへの入力として提供されるようになります。
この変数を後続タスクへの入力値として利用するため、この設定を行います。

参照:変数  |  アプリケーションの統合  |  Google Cloud

 

これで、対象のバケットにアップロードしたファイルのバケットIDとオブジェクトIDを取得してgsutil URIを作成し、その値を変数SourceURIsに格納する処理ができました。

 

 

BigQueryへのインサート処理のパラメータを設定

次は、InsertLoadJobのパラメータを設定するためのData Mappingタスクを新しく作ります。

InsertLoadJobのパラメータはたくさんあるので、柔軟にジョブを構成することが可能です。
今回は必要なパラメータに絞って設定していきます。

参照:BigQuery  |  インテグレーション コネクタ  |  Google Cloud

 

まず、データのインサート先テーブルDestinationTable)のパラメータを設定します。

Input

ここでのInputには、パラメータとして設定したい値を持つ変数を利用します。

「Variable or Value」、「+ Add new variable」を押下し、新しい変数を作成します。

Name 変数名を入力します。
Default Value DestProjectId.DestDatasetId.DestTable 形式でテーブルを指定します。

「CREATE」を押下すると、作成した変数がInputに設定されます。

 

Output

Outputには、用意された入力パラメータを設定します。

変数connectorInputPayload内のDestinationTableをOutputにドラッグするだけで完了です。

 

同様の手順で、さらに以下のパラメータを設定していきます。

SourceURIs Cloud Storageオブジェクトのgsutil URIを設定します。
今回は、先ほど作成した変数SourceURIsをInputに指定します。
SourceFormat 以下の6種類から指定できます。小文字だと動かないので、必ず大文字で指定してください。今回は「CSV」です。

  •  AVRO
  • NEWLINE_DELIMITED_JSON
  • DATASTORE_BACKUP
  • PARQUET
  • ORC
  • CSV
CSVSkipLeadingRows CSVファイルの先頭でスキップする行数を指定します。
今回はヘッダー行だけスキップするので1です。
DestinationTableSchema 宛先テーブルのスキーマをJSONリストで指定します。

今回は全部で5つのパラメータを設定しました。

 

CSVSkipLeadingRowsは、CSVファイルのヘッダー行を取り除くために使いました。

DestinationTableSchemaの書き方には苦戦しました。。。

ここにはCSVファイルにある項目のみ記載します。
今回の場合、デフォルト値をインサートする登録日時の項目はBigQueryテーブルにはありますが、CSVファイルにないため記載しません。
登録日時の項目をDestinationTableSchemaに記載すると、テーブルの登録日時列にはnullが入ってしまいました。

以下の形式で、デフォルト値以外の各項目のカラム名、データ型等を指定してください。
スキーマを表示する
{
"fields": [
{
"name": "ID",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "AREA_CODE",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "AREA_NAME",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "MISE_CODE",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "MISE_NAME",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "TANTO_CODE",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "TANTO_NAME",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "AMOUNT",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "CUSTOMER",
"type": "STRING",
"mode": "NULLABLE"
}
]
}

参照:REST Resource: tables  |  BigQuery  |  Google Cloud

 

完成したイメージはこちらです。

 

各タスクを接続

今まで作成したトリガーやタスクをつなげていきます。
Cloud Storageのイベントタイプによって処理を分岐する条件もここで記載します。

以下の順番に並べ、ドラッグしてつなげるだけでOKです。

 

新しいCSVファイルのアップロード時のみ後続処理に進む

上の画像赤枠部分のエッジを押下し、条件を追加します。

1つ目のData Mappingタスクで取得した変数 GCS_eventType が OBJECT_FINALIZE なら後続処理に進む条件を作成します。
変数を「$」で囲うことで、条件式で利用できます。

 

完成すると以下のイメージになります。

これで、Cloud StorageのファイルのデータをBigQueryのテーブルにインサートする統合フローができました。

 

エラー処理フロー作成

ここからは、統合フロー内でエラーが発生した場合にメールを送信する処理を作っていきます。

 

インサート処理でのエラー発生を検知・フローを開始

TRIGGERSのリストからError Catcherを選択し、配置します。

Error CatcherトリガーにはError catcher IDが振られています。

今回はBigQueryのインサート時にエラーが発生した場合にこの処理を開始したいので、先ほど作成したBigQueryタスクのError handlingを設定します。

Pub/Subトリガーは非同期モードで統合を実行するため、Strategy for asynchronous executionsでRetry intervalとMaximum retry countを設定します。

参照:エラーとエラー処理  |  アプリケーションの統合  |  Google Cloud

 

Select error catcherで先ほど配置したError catcherのIDを選択したら、トリガーの設定は完了です。

 

メールへの記載内容を取得

エラー通知メールにエラーが発生したプロジェクトID実行ID統合名を記載するため、これらの値を取得するData Mappingタスクを作成します。

 

まずプロジェクトIDです。

Input

「Variable or Value」を押下し、FunctionからGET_PROJECT_ID関数を選択して「SAVE」を押下します。

 

Output

「create a new one」を押下し、GET_PROJECT_ID関数で取得したプロジェクトIDを入れる変数を作成します。

変数名やデータ型を設定して「CREATE」を押下したら完了です。

 

同様の手順で、実行IDと統合名のマッピングを作成します。

実行IDはGET_EXCUTION_ID関数、統合名はGET_INTEGRATION_NAME関数で取得できます。

 

完成したイメージはこちらです。

 

エラー通知メールを送信

エラー発生を通知するメール送信処理を設定していきます。

TASKSのリストからSend Emailを選択し、配置します。

 

構成パネルで送信先、件名、本文を設定します。

送信先メールアドレスは、カンマ区切りで複数設定することも可能です。
画像のように作成した変数を設定することも、直接入力することもできます。

件名や本文では、「$Project_ID$」のように「$」で囲うことで変数を利用できます。

 

あとはトリガーと各タスクを接続したら統合フローの完成です。

記事冒頭の完成イメージ通りになりました。

 

統合フローの実行

作成した統合は、テスト実行もしくは公開して実行ができます。

 

テスト実行で動作確認

意図通りに動くかどうか、作成した統合をテスト実行してみます。

まずは、Cloud Storageの指定のバケットに「テストデータ02.csv」(事前準備で作成したCSVファイル)を格納します。

「TEST」を押下し、表示されるポップアップでインプットの値を指定します。

今回の統合フローでは、トリガーから出力されるCloudPubSubMessageがインプットとなります。

そのためテスト実行する際に、
条件として利用するeventType、Data MappingタスクのインプットとなるbucketIdとobjectIdを指定する必要があります。

以下のような形式で指定し、「Test Integration」を押下すると、統合フローが開始されます。

{
"attributes": {
"bucketId": "sample_datasource",
"eventType": "OBJECT_FINALIZE",
"objectId": "テストデータ02.csv"
}
}

 

この形式で指定するのは、Pub/SubトリガーからのアウトプットであるCloudPubSubMessageの形式に合わせるためです。

 

テスト実行の結果確認

正常終了の場合

正常終了した場合、以下のポップアップ画面が表示されます。

BigQueryのテーブルを確認すると、テストデータ02.csvのデータがデフォルト値(インサート時の現在日時)つきでインサートされています。

 

異常終了の場合

エラーを発生させ、異常終了時にメールが届くことを確認します。
今回は、BigQueryコネクタで利用している接続をInactiveにして実行することでエラーを発生させます。

BigQueryタスクの構成パネルで「Integration Connectors」リンクを押下すると、インテグレーションコネクタサービスの画面に遷移します。
作成した接続の一覧から利用する接続を選択し、「SUSPEND」を押下します。

少し待つと、StatusがInactiveになります。

Inactiveの状態で実行するとエラーが発生し、以下のポップアップ画面が表示されます。

エラー通知メールも無事届きました。

 

 

本番と同じ状態で実行・結果確認

「PUBLISH」を押下して統合フローを公開すると、その時点の統合バージョンがデプロイされ、アクティブ状態になります。

Cloud Storageへのファイルアップロードにより統合フローがトリガーされ、処理が行われることを確認します。

 

正常終了の場合

違う名前のCSVファイル(テストデータ03.csv)をCloud Storageにアップロードしてみます。

<

問題無くインサートされています。

 

異常終了の場合

接続がInactiveの状態でCloud Storageにファイルをアップロードすると、無事エラー通知メールが届きました。

 

テスト実行でも公開後の実行でも、想定通りに動作することが確認できました。

お疲れさまでした。

使ってみた感想

今回初めてApplication Integrationを使ってみて、
何をどう設定すれば自分が作りたい処理ができるのか分かるまでが難しく感じました。

ただ、操作自体はドラッグアンドドロップや少しの入力だけなので、
実装自体はとても簡単にできました。

 

現時点の印象はこんな感じです。

  • Application Integrationの各タスクの設定の意味や設定方法が分かるまでが難しい。
  • それが分かれば、処理の流れ通りにタスクを並べて必要な設定をするだけ。
  • 簡単なデータ処理ならApplication Integrationを使うと短時間で簡単に実装できる。

 

個人的にですが、Application Integrationを使う上で意識すると良いと思うことは以下の通りです。

  • 各タスクのパラメータは何か、必須のパラメータは何か確認する。
    ⇒パラメータを確認すると、そのタスクで何ができるかイメージしやすくなりました。
  • 各パラメータの設定は、該当タスクの前のData Mappingで行う。
    ⇒これが分かってから、実装がスムーズになりました。
  • 各パラメータの正しい指定方法(書き方)は、利用する各サービスのドキュメントから探す。
    ⇒Application Integrationのページだけでは分からないことも多かったです。

 

今は複雑な処理は実装できていませんが、
今後の実務でApplication Integrationの利点をもっと生かした実装ができればと思います。

 

ここまでお読みいただきありがとうございました。
Application Integrationを触ってみたい方や、初めて使う方のお役に立てれば嬉しいです。

2024年7月18日 【Goolge Cloud】Application IntegrationでBigQueryにインサート

Category Google Cloud

ご意見・ご相談・料金のお見積もりなど、
お気軽にお問い合わせください。

お問い合わせはこちら