2026年3月26日

ADKエージェントを自由にデプロイしたい!Firestoreでセッション管理を自作してみた


Content

こんにちは!みっちーです!


これまでのADK(Agent Development Kit)に関する記事では、主にGoogle CloudのAgent Engineを活用してきました。Agent Engineを使っている間は、標準のVertexAiSessionServiceを利用するだけで、会話履歴の保存や復元をGoogleにお任せすることができ、非常に便利でした。


ですが、カスタマイズ性の向上やコスト最適化のために、エージェントを自前のCloud Run環境などに独自デプロイしたい場面も出てきます。この場合、Agent Engineのマネージドなセッションサービスが使えなくなるため、自分たちで永続化の仕組みを用意しなければなりません。


ADKのデフォルト設定であるInMemorySessionServiceでは、コンテナが再起動するたびに「記憶」が消えてしまいます。そこで今回は、汎用的に使えるFirestoreを用いた自作の「FirestoreSessionService」の実装方法を紹介します!

前提条件

本記事の実装を進める前に、以下の環境が整っていることを確認してください。


  • ADKを用いたエージェント環境が構築済みであること。

  • Google Cloudプロジェクトで Firestore が有効化されていること。

  • バックエンドが FastAPI 等の非同期フレームワークで動作していること。

今回の課題:マネージド機能を使わずに「記憶」を維持する

Agent Engineを使わない「独自デプロイ」において、以下のステップで永続化を実現します。


    カスタムサービスの開発: ADKの基底クラスを継承し、バックエンドにFirestoreを採用したセッションサービスを作ります。


    ステートレス環境への対応: コンテナがいつ再起動しても、Firestoreから過去の会話履歴(events)や状態(state)を復元できるようにします。


    非同期処理の最適化: 独自デプロイでよく使われる FastAPI 等のイベントループと衝突しない、モダンな非同期実装(async/await)を適用します。

実装手順

1. FirestoreSessionService の実装

ADKの BaseSessionService を継承して、Firestoreへの読み書きを行うクラスを作成します。この実装により、会話履歴だけでなく、エージェントが保持する状態(state)も保存されます。

# app/services/firestore_session.py

import os
from typing import Optional, Any
from google.cloud import firestore
from google.adk.sessions.base_session_service import BaseSessionService
from google.adk.sessions.session import Session
from google.adk.events.event import Event

class FirestoreSessionService(BaseSessionService):
def __init__(self, collection_name: str = "agent_sessions"):
# Firestoreの非同期クライアントを初期化
self.db = firestore.AsyncClient()
self.collection_name = collection_name

def _get_doc_id(self, app_name: str, user_id: str, session_id: str) -> str:
# アプリ・ユーザー・セッションを組み合わせて一意のドキュメントIDを生成
return f"{app_name}::{user_id}::{session_id}"

async def create_session(self, *, app_name: str, user_id: str, session_id: str, **kwargs) -> Session:
doc_id = self._get_doc_id(app_name, user_id, session_id)
doc_ref = self.db.collection(self.collection_name).document(doc_id)

# 既存のセッションがあればFirestoreから取得
doc = await doc_ref.get()
if doc.exists:
return await self.get_session(app_name=app_name, user_id=user_id, session_id=session_id)

# 新規セッションを作成して保存
session = Session(app_name=app_name, user_id=user_id, id=session_id)
await doc_ref.set(session.model_dump())
return session

async def append_event(self, session: Session, event: Event) -> Event:
# 親クラスのメソッドで session オブジェクトを更新(履歴や状態の追加)
await super().append_event(session=session, event=event)

# 更新されたセッション全体をFirestoreに保存
doc_id = self._get_doc_id(session.app_name, session.user_id, session.id)
await self.db.collection(self.collection_name).document(doc_id).set(session.model_dump())
return event

async def get_session(self, *, app_name: str, user_id: str, session_id: str, **kwargs) -> Optional[Session]:
doc_id = self._get_doc_id(app_name, user_id, session_id)
doc = await self.db.collection(self.collection_name).document(doc_id).get()
if doc.exists:
return Session.model_validate(doc.to_dict())
return None

2. エージェント実行環境への組み込み

作成した FirestoreSessionServiceRunner の初期化時に渡します。これにより、エージェントが InMemorySessionService の代わりに Firestore をバックエンドとして利用し始めます。

# app/agent_main.py

from google.adk.runners import Runner
from .services.firestore_session import FirestoreSessionService
from .my_agent import root_agent

# 自作サービスをインスタンス化
session_service = FirestoreSessionService()

# Runnerに登録
runner = Runner(
app_name="my_custom_agent",
agent=root_agent,
session_service=session_service,
)

3. 重要な注意点:「Event loop is closed」への対処

独自環境(特に FastAPI 等)で非同期 Firestore クライアントを使う場合、遭遇しやすいのが Event loop is closed エラーです。これは asyncio.run() などの呼び出しが新しいループを作ってすぐに閉じてしまうことで、Firestoreの非同期処理が途切れるために起こります。

解決策はシンプルです。 全体のフローをすべて async def に統一し、フレームワークが提供するメインのイベントループを最初から最後まで共有するようにします。

# app/main.py (APIサーバ)

@app.post("/api/chat")
async def chat_endpoint(request: ChatRequest):
# エージェント実行も async/await で一貫させる
response = await run_agent_async(
message=request.message,
user_id=request.user_id,
session_id=request.session_id
)
return {"reply": response}

async def run_agent_async(message: str, user_id: str, session_id: str):
# Runner.run_async を使い、FastAPIのイベントループ上で直接実行
async for event in runner.run_async(
user_id=user_id,
session_id=session_id,
new_message=message
):
# ...回答の抽出処理...
pass

動作確認:再起動しても会話を続けられるエージェント

実装が完了したら、実際に会話をしてみましょう。Firestoreコンソールを確認すると、agent_sessions コレクションに会話ログや現在の状態がJSON形式で保存されていくのが分かります。



Cloud Runへのデプロイ後にコンテナが入れ替わっても、同じ chatId でアクセスすれば「前回の続き」として会話を継続できるようになります。これで、Agent EngineというAIエージェントのためのマネージドサービスの外でも、エージェントは高度な文脈理解を維持できるようになりました!

まとめ

今回は、ADKのマネージド機能に頼らず、自前でセッション永続化を実装する方法を紹介しました。Agent Engineは非常に強力ですが、Firestoreによる自作サービスを組み合わせることで、特定の環境に縛られない、より柔軟なエージェント開発が可能になります。


皆さんもぜひ、自分たちのインフラに最適化したメモリ実装に挑戦してみてください!

2026年3月26日 ADKエージェントを自由にデプロイしたい!Firestoreでセッション管理を自作してみた

Category Google Cloud

ご意見・ご相談・料金のお見積もりなど、
お気軽にお問い合わせください。

お問い合わせはこちら