Amazon Bedrock AgentCore は AI エージェントをインフラを気にすることなく、開発、実行、運用できるマネージドサービスです。AgentCore の中でも主となる、開発した AI エージェントを実行するための機能として AgentCore Runtime がありますが、エージェントの応答をストリーム形式で返すことができます。ストリームレスポンスといえば Lambda でもサポートされているので、今回は Lambda から AgentCore Runtime を呼び出し、エージェントの応答をストリームで受け取り、Lambda からもそのままストリームで応答させる、というのをやってみました。いろいろなユースケースに対応できるのではないかと思います。
作成したコードは GitHub で公開しています。
はじめに
なぜにストリームレスポンス?
ストリームレスポンスではない一般的なリクエスト - レスポンス (一括レスポンス) の場合、レスポンスされるデータがサーバー側で全て揃ってからクライアントにデータが送信されます。そのため全てのレスポンスデータが揃うまでに時間がかかる場合やレスポンスデータのサイズが大きい場合、リクエストしてからレスポンスが返ってくるまでの時間が長くなり、ユーザー体験としてよくないものになります。
一方、ストリームレスポンスの場合、一部のレスポンスデータが利用可能になったらクライアントに返送し始めることができるので、時間のかかる処理の完了を待つ必要がありません。クライアントがリクエストを開始してから最初のレスポンスが返ってくるまでの時間を短くすることができます。この「最初のレスポンス (バイト) までの時間」を一般的に TTFB (Time To First Byte) と言うそうで、ストリームレスポンスを利用することでこの TTFB を短くすることができ、ユーザー体験を損なわないようにすることができます。
インフラ的な観点ですと、一括レスポンスの場合は一般的にサーバー側でレスポンスするコンテンツを一時的にメモリに展開・保持することになるので、データサイズが大きいとサーバー側のメモリが相応に必要になります。一方、ストリームレスポンスの場合は (作り方次第なところもありますが) データソースからデータを読み出しながら、レスポンスを返すことで (メモリに全てのレスポンスデータを展開する必要がないので) サーバー側のメモリ必要量を抑えることができます。
LLM による推論では、完了まで多少なりとも時間がかかるので、全ての出力を待ってからクライアントにレスポンスを返すのではなく、すぐに少しずつ出力されたレスポンスを順次返すことでより良いユーザー体験が提供できます。(人間的には長大な回答をいきなり見せられるより、少しずつ回答が出てくる方が見やすい、理解しやすいなどのメリットもありそうです。)
AgentCore Runtime でのストリームレスポンス
AgentCore と言うよりは、各種エージェント SDK 側の対応状況に依存するかと思いますが、ストリームレスポンスを実装した AI エージェントを AgentCore でホスティングすることが可能です。AWS のドキュメントでも Strands Agents (AI エージェント SDK) によるストリームレスポンスのサンプルが提供されていますので今回はこちらを利用します。
- Stream agent responses: https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/response-streaming.html
Lambda でのストリームレスポンス
Lambda でも 2023年 4月よりストリームレスポンスに対応するようになりました。Lambda の Function URLs (関数 URL) という、Lambda が HTTP のエンドポイントを直接ホストする機能を利用している際にストリームレスポンスが利用できます。Lambda のマネージドランタイムとしては Node.js でストリームレスポンスをサポートしています。他の言語ではカスタムランタイムや Lambda Web Adapter を利用することができます。
余談ですが、Lambda のストリームレスポンスでは従来の制限である 6MB のレスポンスサイズを超えてレスポンスを返すことができます (別途料金もかかります)。気になる方は拙著ですが以下の記事もご覧ください。
今回はクライアントからのリクエストを Lambda で受け、Lambda から AgentCore のエージェントを呼び出し、AgentCore からストリームでレスポンスを返し、Lambda からもクライアントへストリームでレスポンスを返す、というのをやってみます。
アーキテクチャ
今回は以下のようなアーキテクチャで試してみました。
- Lambda@Edge では Lambda の関数 URL で POST リクエストを受け付けるために、リクエスト Body の SHA256 を計算し、HTTP ヘッダに追加します
- Lambda では関数 URL を有効にし、 “認証タイプ” を
AWS_IAM
に、“呼び出しモード” をRESPONSE_STREAM
にしています - CloudFront - Lambda 間は OAC (Origin Access Control) を設定し、インターネットから直接 関数 URL にリクエストできないようにします
- AgentCore ではインバウンドの設定としてプロトコルを
HTTP
、認証をIAM
にします。また、ストリームレスポンスで返すコードを記述します - Lambda から AgentCore Runtime の呼び出しは AWS SDK を利用して IAM 認証 (SigV4) でアクセスします
Lambda を介す意味ある?
AgentCore Runtime は IAM 認証 (SigV4) か、JWT Bearer トークンによる認証を利用して呼び出すことできるので、クライアント (ブラウザ含む) から Cognito Identity Pool を使って IAM 認証したり、アクセストークンを使って JWT で認証することができるので、必ずしも Lambda が必要というわけではありません。。。
Lambda を介すことで実現できるユースケース
Lambda をプロキシ的に配置することでさらに実現できるユースケースを考えてみます。
ケース 1. AWS アカウント ID を隠蔽したい時
AgentCore Runtime を呼び出すには IAM 認証を使って SDK から呼び出す方法と、JWT を用いて エンドポイント URL を呼び出す方法があります。SDK のパラメータ、エンドポイント URL のパス、どちらにも AgentCore Runtime の ARN を指定する必要があるため、ブラウザなどから AgentCore Runtime を呼び出す場合は ARN に含まれるアカウント ID が露呈してしまうことになります。AWS からも「アカウント ID は機密情報ではない」とされていますが、エージェントを不特定多数に公開する場合は、アカウント ID は見えないようにしておいた方が良いでしょう。
手前に Lambda を Proxy 的に配置することで AgentCore Runtime の ARN をクライアントから見えないようにすることができます。
-
AWS SDK のパラメータ
SDK から AgentCore Runtime を呼び出すにはInvokeAgentRuntime
という API を使います。JavaScript / TypeScript の場合、以下のようになり、AgentCore Runtime の ARN にアカウント ID が含まれます。1 2 3 4 5 6
const req = new InvokeAgentRuntimeCommand({ agentRuntimeArn: AGENT_ARN, // AgentCore Runtime の ARN を指定 // : // (snip) // : });
-
AgentCore Runtime のエンドポイント URL
エンドポイント URL はドキュメント によると、以下のようになっており、URL エンコードされたエージェントの ARN ({{ENCODED_AGENT_ARN}}
) がパス部分に入ります。ARN なのですなわちアカウント ID がそのまま URL に含まれてしまいます。https://bedrock-agentcore.{{REGION}}.amazonaws.com/runtimes/{{ENCODED_AGENT_ARN}}/invocations?qualifier=DEFAULT
ケース 2. AgentCore Runtime のレスポンス形式を変更したい時
AgentCore Runtime というよりは各種エージェントの実装によりますが、ストリームレスポンスは代表的には SSE (Server-Sent Events) としてデータが送られてきます。詳細は後でも紹介しますが、Lambda 側で AgentCore Runtime からのレスポンスに変更やフィルタをかけたり、独自フォーマットにして送信したい場合に対応できます。
ケース 3. AWS WAF を入れたい時
Lambda というよりは CloudFront を入れることによる恩恵ですが、AgentCore Runtime のエンドポイント URL を使って呼び出す場合、CloudFront - Lambda (関数 URL) - AgentCore とすることで CloudFront に AWS WAF を設定することができます。大人の事情でどうにもこうにも WAF を導入しなければならないケースに対応できそうです。
ケース 4. 独自の認証方式を利用したい時
AgentCore Runtime を呼び出す際の認証として、IAM 認証 (SigV4) もしくは、OAuth 2.0 などの JWT を利用する必要がありますが、Lambda で独自の認証手段 (例: ID / パスワード認証) を実装して提供するといったケースにも対応できそうです (Lambda から AgentCore Runtime を呼び出す場合は IAM 認証 (SigV4) にする、など)。
注意点
注意点として、AgentCore Runtime のストリーミングによる接続の最大時間は 60分に制限されています。一方、Lambda の実行時間の制限はおなじみの 15分までになります。Lambda の実行時間の制限が先に来る点に注意です。
Agent Core まわりの実装
ではここから実装について順を追って紹介していきます。今回作成したもの (CDK + AgentCore まわり) は GitHub で公開しています。(本投稿では読みやすくするため、コードは一部シンプルにしています。)
まずは AgentCore まわりです。
- エージェントの実装
- エージェントの IAM 実行ロール
- ECR リポジトリの作成
- AgentCore Runtime へのデプロイ準備 (ツールインストールと初期設定)
- AgentCore Runtime へのデプロイ
1. エージェントの実装
はじめに Strands Agents を使ってエージェントを作っていきます。uv で環境を整えます。
uv init my-agent
cd my-agent
uv add strands-agents bedrock-agentcore
# 私が試した時は strands-agents は `>=1.4.0`、bedrock-agentcore は `>=0.1.2` でした
source .venv/bin/activate
AgentCore のストリームレスポンスについてのドキュメントにあるサンプルをベースにします。
モデルの ID やリージョンは適宜変更してください。
main.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
import os from bedrock_agentcore import RequestContext from bedrock_agentcore.runtime import BedrockAgentCoreApp from strands import Agent from strands.models import BedrockModel REGION=os.getenv('AWS_REGION', 'us-east-1') model = BedrockModel(model_id="anthropic.claude-3-5-sonnet-20240620-v1:0", region_name=REGION) agent = Agent(model=model) app = BedrockAgentCoreApp() @app.entrypoint async def invoke(payload: dict, context: RequestContext): """Handler for agent invocation""" user_message = payload.get('prompt') stream = agent.stream_async(user_message) async for event in stream: if 'event' in event: yield event if __name__ == '__main__': app.run()
- 19-22 行目がストリームでレスポンスしているところになります。モデルの出力を得るには
event
キーがあるものを取得できれば良いのでevent
キーが存在する場合だけ送信するようにしています。
- 19-22 行目がストリームでレスポンスしているところになります。モデルの出力を得るには
ローカルでテストするには以下のようにします。
uv run main.py
localhost:8080
でリッスンするエンドポイントが起動するので curl で呼び出します。
curl -X POST http://localhost:8080/invocations \
-H "Content-Type: application/json" \
-d '{"prompt": "Hello world!"}'
2. エージェントの IAM 実行ロール
AgentCore Runtime に設定する実行ロールはドキュメントに掲載されているポリシーを利用します。
3. ECR リポジトリの作成
AgentCore Runtime は前述のコードをコンテナとして実行します。そのコンテナのためのリポジトリを ECR で用意します。
aws ecr create-repository --repository-name ${REPOSITORY_NAME}
4. AgentCore Runtime へのデプロイ準備 (ツールインストールと初期設定)
作成したエージェントを AgentCore にデプロイするには、CloudFormation、CDK ではまだできませんので、専用のツールを利用するのが簡単です。
ツールをインストールします。
uv add --dev bedrock-agentcore-starter-toolkit
uv sync
インストールした agentcore configure
コマンドを実行し、初期設定をします。
ACCOUNT=123456789012
REGION=us-east-1
EXECUTION_ROLE_NAME=... # 実行ロール名
REPOSITORY_NAME=... # ECR リポジトリ名
AGENT_NAME=... # エージェント名
ENTRYPOINT=main.py # 作成したエージェントのファイル名
EXECUTION_ROLE_ARN=arn:aws:iam::${ACCOUNT}:role/${EXECUTION_ROLE_NAME}
ECR_REPOSITORY_URI=${ACCOUNT}.dkr.ecr.${REGION}.amazonaws.com/${REPOSITORY_NAME}
agentcore configure \
--name ${AGENT_NAME} \
--entrypoint ${ENTRYPOINT} \
--authorizer-config 'null' \
--region ${REGION} \
--execution-role ${EXECUTION_ROLE_ARN} \
--ecr ${ECR_REPOSITORY_URI}
上記を実行すると、ライブラリの依存ファイルの指定を求められます。今回は uv を使ってきたので pyproject.toml
を指定します。pip を使ってきた場合は requirements.txt
を指定します。どちらのファイルもオプション --requirements-file
であらかじめ指定することもできます。
🔍 Detected dependency file: pyproject.toml
Press Enter to use this file, or type a different path (use Tab for autocomplete):
Path or Press Enter to use detected dependency file:
agentcore configure
コマンドを実行するとフォルダ内に .bedrock_agentcore.yaml
というファイルや、AgentCore Runtime はコンテナとしてデプロイされるので Dockerfile
などが作成されます。
5. AgentCore Runtime へのデプロイ
デプロイは agentcore launch
というコマンドを使います。CodeBuild のビルドプロジェクトが自動的に作成され、コンテナとしてビルド、ECR へのプッシュを経て、AgentCore Runtime にデプロイされます。
agentcore launch
完了すると以下のように表示されます。
╭───────────────────────────────── CodeBuild Deployment Complete ─────────────────────────────────╮
│ CodeBuild ARM64 Deployment Successful! │
│ │
│ Agent Name: XXXXX │
│ CodeBuild ID: bedrock-agentcore-XXXXX-builder:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx │
│ Agent ARN: arn:aws:bedrock-agentcore:us-east-1:123456789012:runtime/XXXXX-xxxxxxxxxx │
│ ECR URI: 123456789012.dkr.ecr.us-east-1.amazonaws.com/xxxxx:latest │
│ │
│ ARM64 container deployed to Bedrock AgentCore. │
│ │
│ You can now check the status of your Bedrock AgentCore endpoint with: │
│ agentcore status │
│ │
│ You can now invoke your Bedrock AgentCore endpoint with: │
│ agentcore invoke '{"prompt": "Hello"}' │
│ │
│ 📋 Agent logs available at: │
│ /aws/bedrock-agentcore/runtimes/XXXXX-xxxxxxxxxx-DEFAULT │
│ /aws/bedrock-agentcore/runtimes/XXXXX-xxxxxxxxxx-DEFAULT/runtime-logs │
│ │
│ 💡 Tail logs with: │
│ aws logs tail /aws/bedrock-agentcore/runtimes/XXXXX-xxxxxxxxxx-DEFAULT --follow │
│ aws logs tail /aws/bedrock-agentcore/runtimes/XXXXX-xxxxxxxxxx-DEFAULT --since 1h │
╰─────────────────────────────────────────────────────────────────────────────────────────────────╯
出力にもあるとおり、以下のコマンドでデプロイした AgentCore Runtime を呼び出してみることができます。
agentcore invoke '{"prompt": "Hello"}'
Lambda の実装
続いて Lambda の実装です。今回のテーマ的にはここからが本題です。
今回は、AgentCore からの応答を Lambda からもそのままストリームで応答するのと、Lambda で何らか変換処理をしてからストリームで応答する 2種類それぞれ試してみました。
(Lambda やその他の CloudFront、Lambda@Edge は CDK でデプロイできるものを GitHub に公開しています。)
Lambda の実行ロール
Lambda から AgentCore Runtime を呼び出すために、以下のポリシーを付与します。Resource
はデプロイした AgentCore Runtime の ARN です。(他にマネージドポリシー AWSLambdaBasicExecutionRole
もアタッチします。)
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "bedrock-agentcore:InvokeAgentRuntime",
"Resource": [
"arn:aws:bedrock-agentcore:${REGION}:${ACCOUNT}:runtime/XXXXX-xxxxxxxxxx",
"arn:aws:bedrock-agentcore:${REGION}:${ACCOUNT}:runtime/XXXXX-xxxxxxxxxx/runtime-endpoint/*"
]
}
]
}
Lambda のコード (1) - AgentCore からの応答をそのまま返す場合
Lambda のストリームレスポンスの実装方法については Lambda のドキュメントも参照してもらえればと思いますが、AgentCore Runtime からのストリームレスポンスをそのままクライアントに返す場合は以下のようにします。
|
|
- 22-23行目 : ストリームレスポンスを返すために、いつものハンドラー部分は
awslambda.streamifyResponse()
という謎の関数を実行し、引数に渡す関数内でリクエストを処理するコードを記述します - 29-36行目 : Sever-Sent Events のまま応答を返すので、準じた HTTP ヘッダーを設定します
- 40-50行目 : SDK を使って AgentCore Runtime の
InvokeAgentRuntime
API を呼び出します - 10行目 と 53行目 :
pipeline()
を使って、入力ストリーム (AgentCore Runtime からのレスポンス) から出力ストリーム (クライアントへのレスポンス) へそのままデータを流していきます
肝となるのは 53行目の pipeline()
という関数で、これは 10行目 util.promisify(stream.pipeline)
で得られる関数になり、AWS ドキュメントでもこの関数の利用が推奨されています。引数は、第 1引数に入力ストリームとして AgentCore Runtime のレスポンスを指定し、第 2引数に出力ストリームとしてクライアントへのレスポンス (responseStream
) を指定することで、AgentCore Runtime からの出力を Lambda を介してそのままクライアントにストリームで返すことができます。
Lambda の設定
Lambda では “関数 URL” を有効にします。その際、“認証タイプ” は AWS_IAM に、“呼び出しモード” は RESPONSE_STREAM を選択します。
その他のリソース
CloudFront
- 今回は Lambda の関数 URL の前段に CloudFront を配置しています。また、CloudFront を経由しない関数 URL へのアクセスを防ぐために OAC (Origin Access Control) を構成します。詳細はドキュメントも参照ください。
Lambda@Edge
- Lambda 関数 URL に POST (または PUT) する場合は、ドキュメントにもあるとおり、リクエストボディの SHA256 を計算してそのハッシュ値を
x-amz-content-sha265
ヘッダーに設定する必要があります。 - ドキュメントにはサンプルコードとしてハッシュ計算をしている箇所も含まれているのですが、OAC を構成する場合は Lambda@Edge での SigV4 署名は不要になるので、そのまま流用するには冗長です。クラスメソッドさんの記事に、よりシンプルにした関数コードがありますので今回はこちらを利用させていただきました。
GitHub の方にはリソースを CDK でデプロイできるようにしたコードがありますので、詳細な設定などはそちらを参照くださいませ。
動作確認
リソースが全てデプロイできたら、curl でテストしてみます。
curl -X POST https://dxxxxxxxxxxxxx.cloudfront.net/ -d '{"prompt":"hello"}'
出力は以下のようになります。回答が短いレスポンスだとわかりにくいですが、長めのプロンプトを渡すとレスポンスが少しずつ返ってくるのがわかるかと思います。
(エージェント側で {"event":...}
というイベントだけ応答するように絞っています。エージェントの実装の 21行目部分)
data: {"event": {"messageStart": {"role": "assistant"}}}
data: {"event": {"contentBlockDelta": {"delta": {"text": "Hi"}, "contentBlockIndex": 0}}}
data: {"event": {"contentBlockDelta": {"delta": {"text": "! How can"}, "contentBlockIndex": 0}}}
data: {"event": {"contentBlockDelta": {"delta": {"text": " I help you today?"}, "contentBlockIndex": 0}}}
data: {"event": {"contentBlockStop": {"contentBlockIndex": 0}}}
data: {"event": {"messageStop": {"stopReason": "end_turn"}}}
data: {"event": {"metadata": {"usage": {"inputTokens": 8, "outputTokens": 12, "totalTokens": 20}, "metrics": {"latencyMs": 754}}}}
event.contentBlockDelta.delta.text
に差分のテキストが出力されていますね。
Lambda のコード (2) - AgentCore からの応答に何らか処理をしたい場合
AgentCore Runtime からのレスポンスに何かしら処理をしたい場合、Node.js の stream.pipeline() のドキュメント を見ると、pipeline()
の引数に変換処理をするための関数を設定可能だとわかります。
以下のようなイメージになります。
await pipeline(
<入力ストリーム>,
<何らかの処理をする関数1>,
<何らかの処理をする関数2>,
:
<出力ストリーム>,
);
<何らかの処理をする関数x>
のところでデータを整形したり、フィルタしたりすることができます。今回は AgentCore Runtime からのレスポンスが data: {...(JSON)...}
という形式で送られてくるので、そこからモデルが生成した応答テキスト部分だけを抽出してクライアントにストリームで返す、というのをやってみます。また、出力の最後に、セッション ID (runtimeSessionId
) も返すようにしてみます。
前述の Lambda のコードのパイプラインを実行している部分 await pipeline(agentResponse.response as Readable, responseStream);
の引数を以下のように書き換えます。
|
|
pipeline()
の引数として入力と出力の間に transform
という関数を追加しました。関数の定義は以下のようにしました。
|
|
この変換用の関数は function*
という形式でジェネレータ関数というもので定義します。
- 4行目 : 入力ストリーム (
source
) からチャンクとしてデータが渡されます。 - 20-25行目 : 生成 AI モデルが応答したテキストの差分は
contentBlockDelta
というキーを持つイベントのtext
キーに設定されているので抽出します - 37行目 : 抽出したテキストを
yield
で出力していきます
AgentCore Runtime からのストリームレスポンスの形式は Bedrock の ConversationStream に準じていそうでした (AgentCore はまだベータ版なので今後変わる可能性もあります)。
上記のサンプルでは AgentCore Runtime からのレスポンスの型定義や処理中断制御のための AbortController など一部省略していますので完全なコードは GitHub を参照ください。
動作確認
HTTP レスポンスヘッダの Content-Type
も text/event-stream
から text/plain; charset=utf-8
に変更して curl から呼び出してみます。
curl -X POST https://dxxxxxxxxxxxxx.cloudfront.net/ -d '{"prompt":"こんにちは。あなたは何ができますか?"}'
出力は以下のようになりました。結果を貼り付けるだけではわかりにくいですが、ストリームレスポンスとして少しずつ結果が返ってきたかと思います。
こんにちは。私はさまざまな分野のお手伝いができます。例えば:
1. 質問への回答
2. 情報の提供
3. 文章の作成・編集
4. 翻訳(複数の言語に対応)
5. 数学や科学の問題解決
6. プログラミングのアドバイス
7. 分析やアイデアの提案
また、対話を通じて学習し、より良い回答ができるよう努めています。ただし、以下のことはできません:
- インターネットへのアクセス
- 音声や画像の生成
- リアルタイムの情報提供
- 金融取引や予約など
ご質問や課題がありましたら、できる限りお手伝いさせていただきます。
{"runtimeSessionId":"56844334-8b1d-4525-9fd1-bbf6b1d71498"}
まとめ
AgentCore Runtime のストリームレスポンスをプロキシするような形で、Lambda を入れて構築してみました。Lambda では Function URLs (関数 URL) とストリームレスポンスを利用することで、AgentCore Runtime からのストリームレスポンスをそのままクライアントに返すことが可能ですし、あるいは Lambda 側で何らかの変換処理をかけながらクライアントにデータを返すことが可能です。
また、冒頭であげた以下のユースケースにも対応できるかと思います。
- AgentCore Runtime 呼び出し時のアカウント ID を隠蔽したい場合
- AgentCore Runtime からのレスポンスに変換処理をかけたい場合、独自のストリーム形式でクライアントに返したい場合
- AgentCore Runtime の呼び出しに際して WAF を導入したい場合
- AgentCore Runtime 呼び出し時に独自の認証方法を利用したい場合
最後に・・・
この投稿は個人的なものであり、所属組織を代表するものではありません。ご了承ください。