2023年1月6日

【Google Cloud】Workflows使ってみた~Part1~


Content

みなさま、こんにちは。Y.Yです。
最近、業務で Google Cloud のWorkflowsを使用する機会があり、Workflowsでの処理の書き方についていろいろ調べ、試してみました。
そこで、実際に使用した構文や処理をいくつか記事にまとめてみようと思います!
これからWorkflowsを使ってみたい人や、使っているけど書き方がよくわからない人などのお役に立てれば幸いです。

今回のテーマは、Part1とPart2の全2回です。
このPart 1 では、Workflowsに関する基本的な内容やBigQuery APIへの処理の呼び出しなどを紹介します!

Workflowsとは

Workflowsは、Google Cloud で提供されるワークフローツールです。
YAML または JSONで定義を記述することで、指定した順番にGoogle APIへの呼び出しやhttpリクエストを送信することができます。
例えば、Cloud FunctionsやDataflowなどの処理を呼び出したり、BigQueryへのクエリの実行など、様々な処理を実行できます。また、サーバレスでフルマネージドのサービスであり、安価で、無料枠もあるといった特徴もあります。
料金についての詳細は、こちらからご確認ください。

他のサービスと組み合わせれば、下記のようなことも実現できます。
・Cloud Schedulerと組み合わせて、ワークフローの実行をスケジューリングする
・Eventarc APIを有効にして、イベントトリガーでワークフローを実行させる
・Cloud Loggingへエラーログを出力し、Cloud Monitoringでアラートを設定してエラー時に通知させる

Workflowsの構文

次に、Workflowsの構文に関する基礎的な内容をご紹介します。

まず、Workflowsのソースコードには、以下の特徴があります。
・YAML、またはJSON形式で記述されている(本記事ではYAML形式のコードを記載します)
・1個のメインワークフローと0個以上のサブワークフローで構成されている
・全てのワークフロー(メイン/サブ)は1個以上のステップが含まれている

【サンプル】

# コメントは「#」の後に記載します。

#メインワークフロー
main:
    #ランタイム引数(実行時にワークフローに渡す引数)を変数inputに格納
    params: [input]
    #ステップ
    steps:
        - STEP1:
            # 変数に値を割り当てる
            assign:
                - 変数1: 値
                - 変数2: 値
                ...
        - STEP2:
            # サブワークフローを呼び出す
            call: Subworkflow1
            args:
                引数1: サブワークフローに渡す値
                引数2: サブワークフローに渡す値
                ...
            result: 返り値を格納する変数
        - STEP3:
            # その他の処理に進む
            ...
#サブワークフロー
Subworkflow1:
    params: [パラメータ1,パラメータ2...]
    steps:
        - step_1:
        ...

メインワークフローは、「main」ブロックに記述される、その名の通りメインの処理です。
ワークフローが実行されるとメインワークフローの中の処理が実行されます。
サブワークフローは、メインワークフローや他のサブワークフローから呼び出される処理です。

ステップは、ワークフロー内で実行する処理(APIへのリクエストやhttpリクエストの送信など)の単位です。

ワークフローでは、ステップは上から順に実行されていきますが、特定のステップにジャンプさせたり(next)
、条件分岐(switch)やループ処理(for)をさせる構文もあります。
また、本記事の最後でも紹介しますが、エラー発生時の処理を定義する構文もあります。

BigQueryとの連携

次に、BigQueryへのデータの取り込みやクエリの実行を試してみます。

まず最初に、プロジェクトIDを変数に入れておきます。

main:
    steps:
    - 設定値割当:
        assign:
            - projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
(1) ロード処理

Cloud Storageに格納した以下のタブ区切りファイルのデータをBigQueryのテーブルに取り込む、ロード処理を実行します。

下記が実行するコードです。

    - ロード処理:
        call: googleapis.bigquery.v2.jobs.insert
        args:
            projectId: ${projectId}
            body:
                configuration:
                    load:
                        sourceUris: "gs://{バケット名}/input-to-bq.txt"
                        fieldDelimiter: \t
                        destinationTable:
                            projectId: ${projectId}
                            datasetId: test_workflow
                            tableId: test_tbl
                        schema:
                            fields:
                            - name: name
                              type: STRING
                            - name: age
                              type: INTEGER
                            - name: mail
                              type: STRING
                        createDisposition: CREATE_IF_NEEDED
                        writeDisposition: WRITE_TRUNCATE
                        skipLeadingRows: 1

・projectId:課金対象のプロジェクト(ここでは「設定値割当」ステップで設定した変数を指定しています。)
・sourceUris:取込ファイルのURI
・fieldDelimiter:取り込みファイルの区切り文字
・destinationTable:宛先テーブル
・skipLeadingRows:取り込みファイルのスキップする先頭行数
・createDisposition:テーブルが存在しない際に、新規作成するか否か(CREATE_IF_NEEDED / CREATE_NEVER)
※テーブルを新規作成させる場合は、schemaフィールドの指定が必要です。
・writeDisposition:データの取り込み方式
(「WRITE_TRUNCATE」は宛先テーブルへの洗い替え処理を意味します。デフォルト値は「WRITE_APPEND」で、追加処理です。)

※本記事で使用していないフィールドについては、公式ドキュメントでご確認ください。
(以降の処理も同様)

処理を実行してみると、テーブルが作成され、ファイルのデータが登録されました。

 

(2) SQLクエリの実行

BigQueryにSQL文を実行します。今回は、(1)でデータを投入したテーブルに、SELECT文を実行します。

    - クエリ実行:
        call: googleapis.bigquery.v2.jobs.insert
        args:
            projectId: ${projectId}
            body:
                configuration:
                    query:
                        query: select * from test_workflow.test_tbl
                        useLegacySql: false
        result: insertResult

・projectID:課金対象のプロジェクト
・query:実行するSQL
・useLegacySql:レガシーSQLを使用するか否か(標準SQLを使用する場合はfalseにします)

また、上記では①
googleapis.bigquery.v2.jobs.insertを使用しましたが、
googleapis.bigquery.v2.jobs.queryでもクエリは実行できます。
(argsの設定フィールドに差異があるため注意が必要です)
2つの違いの1つとして、①はクエリが完了/失敗/タイムアウトするまで待機するのに対し、②では待機する時間を「timeoutMS」フィールドで指定します(デフォルトは10000ミリ秒です)。

(3) クエリ結果の取得

(2)で実行したクエリの結果を取得します。さらに、その中からSELECT 文の取得データをログに出力してみます。
※(2)の戻り値insertResult には、SELECT文の取得データは含まれません。

    - クエリ結果を取得:
        call: googleapis.bigquery.v2.jobs.getQueryResults
        args:
            projectId: ${projectId}
            jobId: ${insertResult.jobReference.jobId}
            location: ${insertResult.jobReference.location}
        result: queryResult

jobId と location は、(2)の返り値「insertResult」から取得しています。
上記のコードでは、変数queryResultにクエリ結果(ジョブ情報や、SELECT 文実行時には取得したデータなど)を格納しています。

SELECT文の取得データは、queryResultの中のrowsフィールドに格納されます。
上記から特定のデータを取得するには、下記のように指定します。

queryResult.rows["行番号"].f["列番号"].v

(2)で実行したクエリでは、以下の赤字のデータが取得されるため、

下記のようにコードを書くと、

    - ログ出力1:
        call: sys.log
        args:
            text: ${queryResult.rows}
            severity: INFO
    - ログ出力2:
        call: sys.log
        args:
            text: ${queryResult.rows[0].f[0].v + "さんの年齢は" + queryResult.rows[0].f[1].v + "才です。"}
            severity: INFO

以下のようにログに出力されます。

 

エラーハンドリング

最後に、エラー発生時のリトライ処理・エラーキャッチ処理を試してみます。

下記のコードは、ランタイム引数(input)からSQLを受け取り、実行する処理です。

main:
    params: [input]
    steps:
    - クエリ実行:
        try:
            steps:
                - クエリ実行ログ:
                    call: sys.log
                    args:
                      text: "クエリを実行します。"
                      severity: INFO
                - executeQuery:
                    call: googleapis.bigquery.v2.jobs.insert
                    args:
                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                        body:
                            configuration:
                                query:
                                    query: ${input.testQuery}
                                    useLegacySql: false
        retry:
            predicate: ${retry_decision}
            max_retries: 5
            backoff:
                initial_delay: 30
                max_delay: 300
                multiplier: 2
        except:
            as: e
            steps:
                - エラーログ:
                    call: sys.log
                    args:
                      text: クエリの実行でエラーが発生しました。
                      severity: ERROR
                - エラー内容ログ出力:
                    call: sys.log
                    args:
                      text: ${e}
                      severity: ERROR
                - raiseError:
                    raise: "Query execution error"
retry_decision:
  params: [e]
  steps:
    - do_repeat:
        return: true

エラー発生時にはretryブロックで指定した設定に基づき、tryブロック内の処理を再実行します。

リトライブロック内の各パラメーターは、
・max_retries:最大リトライ回数
・initial_delay:エラー発生から最初のリトライまでの待機時間
・multiplier:前の待機時間と次の待機時間の比
・max_delay:待機時間の上限
を指定します。

上記のコードでは、初回が30秒待機、最長300秒待機、待機時間は2倍に増えていくように設定しています。
そのため、エラーが発生すると
・30秒後に1回目のリトライ
・1回目の60秒後に2回目のリトライ
・2回目の120秒後に3回目のリトライ
・3回目の240秒後に4回目のリトライ
・4回目の300秒後に5回目のリトライ
を実行します。
(エラーが発生せず、tryブロック内の処理が正常に実行されたら、それ以降はリトライされません。)

リトライ条件は、サブワークフロー「retry_decision」で設定しています。
リトライさせる場合はtrueを返し、リトライさせない場合はfalseを返すように条件分岐させます。

上記のコードでは、任意のエラーでtrueを返す(リトライさせる)ようにしていますが、下記のようにエラー内容によって処理を分けることもできます。

retry_decision:
  params: [e]
  steps:
    - what_to_repeat:
        switch:
          # HTTPステータスコードが500の場合はリトライする
          - condition: ${e.code == 500}
            return: true
    - otherwise:
        return: false

exceptブロックにはエラーをキャッチした際の処理を定義します。
「as: e」は、エラー内容を格納する変数を定義しています。
今回のコードでは、エラーが発生した旨を知らせる文言と変数e(エラー内容)をログに出力し、「raise」コマンドで実行中のワークフローをエラーで終了させます。

実際に、不正なクエリをランタイム引数に設定して、ワークフローを実行すると以下のようなログが出力されます。

【実行】
(ランタイム引数)”testQuery”: “SELECT not_exist_column FROM test_workflow.test_tbl;”

【ログ】

5回リトライされ、最後のリトライ後にexceptブロック内の処理が実行されていることが分かります。
また、9行目のエラー内容ログを開くと、下記のようにエラー理由が記されています。

"status":{"errorResult":{"location":"query","message":"Unrecognized name: not_exist_column at [1:8]","reason":"invalidQuery"},
"errors":[{"location":"query","message":"Unrecognized name: not_exist_column at [1:8]","reason":"invalidQuery"}],"state":"DONE"}

まとめ

今回は、Workflowsに関する基本的な内容やBigQueryとの連携、エラーハンドリングについて書きました。

『Workflows使ってみた~Part2~』では、以下の内容を紹介いたします。
・Cloud Storage上のファイルへの処理(コピー・削除・中身の取得)
・ワークフローから別のワークフローを呼び出す
・Cloud Functionsの処理を起動する

次回もぜひご覧ください!

当社、システムサポートは、Google Cloudの導入・移行・運営支援を行っています。
Google Cloud に関してのご用命の際は「クラウド導入支援サービス for Google Cloud」へご連絡ください。

2023年1月6日 【Google Cloud】Workflows使ってみた~Part1~

Category Google Cloud

ご意見・ご相談・料金のお見積もりなど、
お気軽にお問い合わせください。

お問い合わせはこちら