2023年1月6日
【Google Cloud】Workflows使ってみた~Part2~
-
- Category Google Cloud
今回は、Google Cloud の Workflows に関する記事のPart2です!
今回も様々な処理を試していきます。
Workflows をこれから使ってみたい方や、最近使い始めた方の参考になれば幸いです。
(Part1では、Workflows に関する基本的な内容や BigQuery との連携について記載しました。
まだご覧になっていない方は、こちらも是非ご覧ください!)
Cloud Storageとの連携
最初に、変数に値を設定します。
main: params: [input] steps: - 設定値割当: assign: - sourceBucket: バケット名を入力 - destinationBucket: ${sourceBucket} - sourceObject: sample-file.txt - destinationObject: ${"copied-" + sourceObject}
(1)ファイルコピー
バケットに格納したファイルを、指定した先にコピーします。
(今回は同じバケットにファイル名を変更してコピーします。)
- copy(api): call: googleapis.storage.v1.objects.copy args: destinationBucket: ${destinationBucket} #コピー先バケット destinationObject: ${destinationObject} #コピー先オブジェクト sourceBucket: ${sourceBucket} #コピー元バケット sourceObject: ${sourceObject} #コピー元オブジェクト body:
argsの中身は、コピー先のバケット名とオブジェクト名、コピー元のバケット名とオブジェクト名を指定しています。
今回は、最初のステップで設定した変数をそれぞれ指定しました。
※上のコードでは必要最小限のフィールドのみ指定しています。
その他の設定フィールドについては、公式ドキュメントをご確認ください。(以降の処理も同様)
ワークフローを実行してみると、正常にコピーオブジェクトが作成されました。
【実行前】

【実行後】

(2)ファイル削除
上でコピーしたファイルを削除します。
- delete(api): call: googleapis.storage.v1.objects.delete args: bucket: ${destinationBucket} object: ${destinationObject}
argsでは、削除対象のオブジェクトのバケット名とオブジェクト名を指定します。
ワークフローを実行すると、先ほどコピーされたオブジェクトが削除されました。

(3)ファイル内のテキストを取得
Cloud Storageに格納した下記のファイルからテキストを取得し、ログに出力します。

実行するには、下記のコードを記述します。
- get(API): call: googleapis.storage.v1.objects.get args: bucket: ${sourceBucket} object: ${sourceObject} query: alt: media result: getResultApi - ログ出力: call: sys.log args: text: ${getResultApi} severity: INFO
argsには、対象オブジェクトの格納バケット名とオブジェクト名を指定します。
オブジェクトデータを取得するには、queryに「alt: media」と指定します。
(オブジェクトメタデータを取得する場合は、「alt: json」と指定します。)
resultには、戻り値を格納する変数を定義します。
実行すると、以下のようにログに出力されます。

(4)おまけ
(1)~(3)の処理を、下記のように HTTPリクエストを送信する形で書き直すこともできます。
main: steps: - 設定値割当: assign: - sourceBucket: バケット名を入力 #コピー元バケット - destinationBucket: ${sourceBucket} #コピー先バケット - sourceObject: sample-file.txt #コピー元オブジェクト - destinationObject: ${"copied-" + sourceObject} #コピー先オブジェクト - copy(http): call: http.post args: url: '${"https://storage.googleapis.com/storage/v1/b/"+sourceBucket+"/o/"+sourceObject+"/copyTo/b/"+destinationBucket+"/o/"+destinationObject}' auth: type: OAuth2 - delete(http): call: http.delete args: url: '${"https://storage.googleapis.com/storage/v1/b/"+destinationBucket+"/o/"+destinationObject}' auth: type: OAuth2 - get(HTTP): call: http.get args: url: '${"https://storage.googleapis.com/download/storage/v1/b/"+sourceBucket+"/o/"+sourceObject}' auth: type: OAuth2 query: alt: media result: getResultHttp - ログ出力: call: sys.log args: text: ${getResultHttp["body"]} severity: INFO
別のワークフローを起動する
まず、呼び出す側のワークフローを書きます。
main: steps: - 設定値割当: assign: # Project - projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")} # Workflow - workflowLocation: asia-northeast1 #起動するワークフローのロケーション - workflowName: triggered-workflow #起動するワークフロー名 - Workflow起動ログ: call: sys.log args: text: '${"ワークフローを起動します。フロー名:" + workflowName}' severity: INFO - Workflow起動処理: call: http.post args: url: ${"https://workflowexecutions.googleapis.com/v1/projects/" + projectId + "/locations/" + workflowLocation + "/workflows/" + workflowName + "/executions"} body: argument: "{\"parameter1\": \"trigger-test1\",\"parameter2\": \"trigger-test2\"}" auth: type: OAuth2
bodyのargumentに、引数として parameter1 と parameter2 を設定しています。
これらの値は、呼び出される側のワークフローでランタイム引数に格納されます。
次に、呼び出される側のワークフローを書きます。
main: params: [input] steps: - トリガー元からの引数をログに出力: call: sys.log args: text: '${"引数1:" + input.parameter1 + ", 引数2:" + input.parameter2}' severity: INFO
ランタイム引数に格納された値をログに出力する処理です。
呼び出し元のワークフローを実行してみます。

呼び出し先のワークフローを確認してみると、起動されていることを確認できました。

Cloud Funtionsの関数を呼び出す
関数に引数を渡し、戻り値を受け取ります。
まず、以下の処理(Pythonコード)をCloud Functionsにデプロイします。
2つの引数のうち、大きい方の値とメッセージを返すだけのシンプルな処理です。
from flask import jsonify def main(request): param_dict = request.get_json() parameter1 = param_dict["parameter1"] parameter2 = param_dict["parameter2"] if parameter1 > parameter2: bigger_value = parameter1 else: bigger_value = parameter2 msg = "関数の呼び出しに成功しました。" return jsonify({'message': msg, 'biggerValue': bigger_value})
次に、下記のコードのワークフローを作成します。
main: params: [input] steps: - 設定値割当: assign: # Project - projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")} # Cloud Functions - functionName: workflow-test - functionLocation: asia-northeast1 - value1: ${input.value1} - value2: ${input.value2} - function実行: call: http.post args: url: ${"https://" + functionLocation + "-" + projectId + ".cloudfunctions.net/" + functionName} body: parameter1: ${value1} parameter2: ${value2} auth: type: OIDC result: functionResult - ログ出力(メッセージ): call: sys.log args: text: ${functionResult.body.message} severity: INFO - ログ出力(大きい値): call: sys.log args: text: ${functionResult.body.biggerValue} severity: INFO
ランタイム引数に設定する2つの値をCloud Functions関数に渡し、戻り値をログに出力する処理です。
httpリクエストのbodyで、Cloud Functions関数に渡す引数(parameter1, parameter2)を設定しています。
ランタイム引数を設定して、ワークフローを実行してみます。

Cloud Functions関数の呼び出しに成功し、受け取った値(メッセージと大きい方の値)がログに出力されました。

まとめ
本記事で紹介した内容以外にも、様々な処理を呼び出すことができます。
是非みなさんもWorkflowsを使ってみてくださいね!
当社、システムサポートは、Google Cloudの導入・移行・運営支援を行っています。
Google Cloud に関してのご用命の際は「クラウド導入支援サービス for Google Cloud」へご連絡ください。
2023年1月6日 【Google Cloud】Workflows使ってみた~Part2~
ご意見・ご相談・料金のお見積もりなど、
お気軽にお問い合わせください。