Amazon Bedrock AgentCore は AI エージェントをインフラを気にすることなく、開発、実行、運用できるマネージドサービスです。AgentCore の中でも主となる、開発した AI エージェントを実行するための機能として AgentCore Runtime がありますが、エージェントの応答をストリーム形式で返すことができます。ストリームレスポンスといえば Lambda でもサポートされているので、今回は Lambda から AgentCore Runtime を呼び出し、エージェントの応答をストリームで受け取り、Lambda からもそのままストリームで応答させる、というのをやってみました。いろいろなユースケースに対応できるのではないかと思います。

作成したコードは GitHub で公開しています。

はじめに

なぜにストリームレスポンス?

ストリームレスポンスではない一般的なリクエスト - レスポンス (一括レスポンス) の場合、レスポンスされるデータがサーバー側で全て揃ってからクライアントにデータが送信されます。そのため全てのレスポンスデータが揃うまでに時間がかかる場合やレスポンスデータのサイズが大きい場合、リクエストしてからレスポンスが返ってくるまでの時間が長くなり、ユーザー体験としてよくないものになります。

一方、ストリームレスポンスの場合、一部のレスポンスデータが利用可能になったらクライアントに返送し始めることができるので、時間のかかる処理の完了を待つ必要がありません。クライアントがリクエストを開始してから最初のレスポンスが返ってくるまでの時間を短くすることができます。この「最初のレスポンス (バイト) までの時間」を一般的に TTFB (Time To First Byte) と言うそうで、ストリームレスポンスを利用することでこの TTFB を短くすることができ、ユーザー体験を損なわないようにすることができます。

インフラ的な観点ですと、一括レスポンスの場合は一般的にサーバー側でレスポンスするコンテンツを一時的にメモリに展開・保持することになるので、データサイズが大きいとサーバー側のメモリが相応に必要になります。一方、ストリームレスポンスの場合は (作り方次第なところもありますが) データソースからデータを読み出しながら、レスポンスを返すことで (メモリに全てのレスポンスデータを展開する必要がないので) サーバー側のメモリ必要量を抑えることができます。

LLM による推論では、完了まで多少なりとも時間がかかるので、全ての出力を待ってからクライアントにレスポンスを返すのではなく、すぐに少しずつ出力されたレスポンスを順次返すことでより良いユーザー体験が提供できます。(人間的には長大な回答をいきなり見せられるより、少しずつ回答が出てくる方が見やすい、理解しやすいなどのメリットもありそうです。)

AgentCore Runtime でのストリームレスポンス

AgentCore と言うよりは、各種エージェント SDK 側の対応状況に依存するかと思いますが、ストリームレスポンスを実装した AI エージェントを AgentCore でホスティングすることが可能です。AWS のドキュメントでも Strands Agents (AI エージェント SDK) によるストリームレスポンスのサンプルが提供されていますので今回はこちらを利用します。

Lambda でのストリームレスポンス

Lambda でも 2023年 4月よりストリームレスポンスに対応するようになりました。Lambda の Function URLs (関数 URL) という、Lambda が HTTP のエンドポイントを直接ホストする機能を利用している際にストリームレスポンスが利用できます。Lambda のマネージドランタイムとしては Node.js でストリームレスポンスをサポートしています。他の言語ではカスタムランタイムLambda Web Adapter を利用することができます。

余談ですが、Lambda のストリームレスポンスでは従来の制限である 6MB のレスポンスサイズを超えてレスポンスを返すことができます (別途料金もかかります)。気になる方は拙著ですが以下の記事もご覧ください。

今回はクライアントからのリクエストを Lambda で受け、Lambda から AgentCore のエージェントを呼び出し、AgentCore からストリームでレスポンスを返し、Lambda からもクライアントへストリームでレスポンスを返す、というのをやってみます。

アーキテクチャ

今回は以下のようなアーキテクチャで試してみました。

Architecture

  • Lambda@Edge では Lambda の関数 URL で POST リクエストを受け付けるために、リクエスト Body の SHA256 を計算し、HTTP ヘッダに追加します
  • Lambda では関数 URL を有効にし、 “認証タイプ” を AWS_IAM に、“呼び出しモード” を RESPONSE_STREAM にしています
  • CloudFront - Lambda 間は OAC (Origin Access Control) を設定し、インターネットから直接 関数 URL にリクエストできないようにします
  • AgentCore ではインバウンドの設定としてプロトコルを HTTP、認証を IAM にします。また、ストリームレスポンスで返すコードを記述します
  • Lambda から AgentCore Runtime の呼び出しは AWS SDK を利用して IAM 認証 (SigV4) でアクセスします

Lambda を介す意味ある?

AgentCore Runtime は IAM 認証 (SigV4) か、JWT Bearer トークンによる認証を利用して呼び出すことできるので、クライアント (ブラウザ含む) から Cognito Identity Pool を使って IAM 認証したり、アクセストークンを使って JWT で認証することができるので、必ずしも Lambda が必要というわけではありません。。。

Lambda を介すことで実現できるユースケース

Lambda をプロキシ的に配置することでさらに実現できるユースケースを考えてみます。

ケース 1. AWS アカウント ID を隠蔽したい時

AgentCore Runtime を呼び出すには IAM 認証を使って SDK から呼び出す方法と、JWT を用いて エンドポイント URL を呼び出す方法があります。SDK のパラメータ、エンドポイント URL のパス、どちらにも AgentCore Runtime の ARN を指定する必要があるため、ブラウザなどから AgentCore Runtime を呼び出す場合は ARN に含まれるアカウント ID が露呈してしまうことになります。AWS からも「アカウント ID は機密情報ではない」とされていますが、エージェントを不特定多数に公開する場合は、アカウント ID は見えないようにしておいた方が良いでしょう。

手前に Lambda を Proxy 的に配置することで AgentCore Runtime の ARN をクライアントから見えないようにすることができます。

  • AWS SDK のパラメータ
    SDK から AgentCore Runtime を呼び出すには InvokeAgentRuntime という API を使います。JavaScript / TypeScript の場合、以下のようになり、AgentCore Runtime の ARN にアカウント ID が含まれます。

    1
    2
    3
    4
    5
    6
    
    const req = new InvokeAgentRuntimeCommand({
      agentRuntimeArn: AGENT_ARN,    // AgentCore Runtime の ARN を指定
      //   :
      // (snip)
      //   :
    });
    
  • AgentCore Runtime のエンドポイント URL
    エンドポイント URL はドキュメント によると、以下のようになっており、URL エンコードされたエージェントの ARN ({{ENCODED_AGENT_ARN}}) がパス部分に入ります。ARN なのですなわちアカウント ID がそのまま URL に含まれてしまいます。

    https://bedrock-agentcore.{{REGION}}.amazonaws.com/runtimes/{{ENCODED_AGENT_ARN}}/invocations?qualifier=DEFAULT
    

ケース 2. AgentCore Runtime のレスポンス形式を変更したい時

AgentCore Runtime というよりは各種エージェントの実装によりますが、ストリームレスポンスは代表的には SSE (Server-Sent Events) としてデータが送られてきます。詳細は後でも紹介しますが、Lambda 側で AgentCore Runtime からのレスポンスに変更やフィルタをかけたり、独自フォーマットにして送信したい場合に対応できます。

ケース 3. AWS WAF を入れたい時

Lambda というよりは CloudFront を入れることによる恩恵ですが、AgentCore Runtime のエンドポイント URL を使って呼び出す場合、CloudFront - Lambda (関数 URL) - AgentCore とすることで CloudFront に AWS WAF を設定することができます。大人の事情でどうにもこうにも WAF を導入しなければならないケースに対応できそうです。

ケース 4. 独自の認証方式を利用したい時

AgentCore Runtime を呼び出す際の認証として、IAM 認証 (SigV4) もしくは、OAuth 2.0 などの JWT を利用する必要がありますが、Lambda で独自の認証手段 (例: ID / パスワード認証) を実装して提供するといったケースにも対応できそうです (Lambda から AgentCore Runtime を呼び出す場合は IAM 認証 (SigV4) にする、など)。

注意点

注意点として、AgentCore Runtime のストリーミングによる接続の最大時間は 60分に制限されています。一方、Lambda の実行時間の制限はおなじみの 15分までになります。Lambda の実行時間の制限が先に来る点に注意です。

Agent Core まわりの実装

ではここから実装について順を追って紹介していきます。今回作成したもの (CDK + AgentCore まわり) は GitHub で公開しています。(本投稿では読みやすくするため、コードは一部シンプルにしています。)

まずは AgentCore まわりです。

  1. エージェントの実装
  2. エージェントの IAM 実行ロール
  3. ECR リポジトリの作成
  4. AgentCore Runtime へのデプロイ準備 (ツールインストールと初期設定)
  5. AgentCore Runtime へのデプロイ

1. エージェントの実装

はじめに Strands Agents を使ってエージェントを作っていきます。uv で環境を整えます。

uv init my-agent
cd my-agent

uv add strands-agents bedrock-agentcore
# 私が試した時は strands-agents は `>=1.4.0`、bedrock-agentcore は `>=0.1.2` でした

source .venv/bin/activate

AgentCore のストリームレスポンスについてのドキュメントにあるサンプルをベースにします。
モデルの ID やリージョンは適宜変更してください。

  • main.py
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    
    import os
    from bedrock_agentcore import RequestContext
    from bedrock_agentcore.runtime import BedrockAgentCoreApp
    from strands import Agent
    from strands.models import BedrockModel
    
    REGION=os.getenv('AWS_REGION', 'us-east-1')
    
    model = BedrockModel(model_id="anthropic.claude-3-5-sonnet-20240620-v1:0", region_name=REGION)
    agent = Agent(model=model)
    
    app = BedrockAgentCoreApp()
    
    @app.entrypoint
    async def invoke(payload: dict, context: RequestContext):
        """Handler for agent invocation"""
        user_message = payload.get('prompt')
    
        stream = agent.stream_async(user_message)
        async for event in stream:
            if 'event' in event:
                yield event
    
    if __name__ == '__main__':
        app.run()
    
    • 19-22 行目がストリームでレスポンスしているところになります。モデルの出力を得るには event キーがあるものを取得できれば良いので event キーが存在する場合だけ送信するようにしています。

ローカルでテストするには以下のようにします。

uv run main.py

localhost:8080 でリッスンするエンドポイントが起動するので curl で呼び出します。

curl -X POST http://localhost:8080/invocations \
  -H "Content-Type: application/json" \
  -d '{"prompt": "Hello world!"}'

2. エージェントの IAM 実行ロール

AgentCore Runtime に設定する実行ロールはドキュメントに掲載されているポリシーを利用します。

3. ECR リポジトリの作成

AgentCore Runtime は前述のコードをコンテナとして実行します。そのコンテナのためのリポジトリを ECR で用意します。

aws ecr create-repository --repository-name ${REPOSITORY_NAME}

4. AgentCore Runtime へのデプロイ準備 (ツールインストールと初期設定)

作成したエージェントを AgentCore にデプロイするには、CloudFormation、CDK ではまだできませんので、専用のツールを利用するのが簡単です。

ツールをインストールします。

uv add --dev bedrock-agentcore-starter-toolkit
uv sync

インストールした agentcore configure コマンドを実行し、初期設定をします。

ACCOUNT=123456789012
REGION=us-east-1
EXECUTION_ROLE_NAME=...  # 実行ロール名
REPOSITORY_NAME=...      # ECR リポジトリ名
AGENT_NAME=...           # エージェント名
ENTRYPOINT=main.py       # 作成したエージェントのファイル名

EXECUTION_ROLE_ARN=arn:aws:iam::${ACCOUNT}:role/${EXECUTION_ROLE_NAME}
ECR_REPOSITORY_URI=${ACCOUNT}.dkr.ecr.${REGION}.amazonaws.com/${REPOSITORY_NAME}

agentcore configure \
  --name ${AGENT_NAME} \
  --entrypoint ${ENTRYPOINT} \
  --authorizer-config 'null' \
  --region ${REGION} \
  --execution-role ${EXECUTION_ROLE_ARN} \
  --ecr ${ECR_REPOSITORY_URI}

上記を実行すると、ライブラリの依存ファイルの指定を求められます。今回は uv を使ってきたので pyproject.toml を指定します。pip を使ってきた場合は requirements.txt を指定します。どちらのファイルもオプション --requirements-file であらかじめ指定することもできます。

🔍 Detected dependency file: pyproject.toml
Press Enter to use this file, or type a different path (use Tab for autocomplete):
Path or Press Enter to use detected dependency file:

agentcore configure コマンドを実行するとフォルダ内に .bedrock_agentcore.yaml というファイルや、AgentCore Runtime はコンテナとしてデプロイされるので Dockerfile などが作成されます。

5. AgentCore Runtime へのデプロイ

デプロイは agentcore launch というコマンドを使います。CodeBuild のビルドプロジェクトが自動的に作成され、コンテナとしてビルド、ECR へのプッシュを経て、AgentCore Runtime にデプロイされます。

agentcore launch

完了すると以下のように表示されます。

╭───────────────────────────────── CodeBuild Deployment Complete ─────────────────────────────────╮
│ CodeBuild ARM64 Deployment Successful!                                                          │
│                                                                                                 │
│ Agent Name: XXXXX                                                                               │
│ CodeBuild ID: bedrock-agentcore-XXXXX-builder:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx              │
│ Agent ARN: arn:aws:bedrock-agentcore:us-east-1:123456789012:runtime/XXXXX-xxxxxxxxxx            │
│ ECR URI: 123456789012.dkr.ecr.us-east-1.amazonaws.com/xxxxx:latest                              │
│                                                                                                 │
│ ARM64 container deployed to Bedrock AgentCore.                                                  │
│                                                                                                 │
│ You can now check the status of your Bedrock AgentCore endpoint with:                           │
│ agentcore status                                                                                │
│                                                                                                 │
│ You can now invoke your Bedrock AgentCore endpoint with:                                        │
│ agentcore invoke '{"prompt": "Hello"}'                                                          │
│                                                                                                 │
│ 📋 Agent logs available at:                                                                     │
│    /aws/bedrock-agentcore/runtimes/XXXXX-xxxxxxxxxx-DEFAULT                                     │
│    /aws/bedrock-agentcore/runtimes/XXXXX-xxxxxxxxxx-DEFAULT/runtime-logs                        │
│                                                                                                 │
│ 💡 Tail logs with:                                                                              │
│    aws logs tail /aws/bedrock-agentcore/runtimes/XXXXX-xxxxxxxxxx-DEFAULT --follow              │
│    aws logs tail /aws/bedrock-agentcore/runtimes/XXXXX-xxxxxxxxxx-DEFAULT --since 1h            │
╰─────────────────────────────────────────────────────────────────────────────────────────────────╯

出力にもあるとおり、以下のコマンドでデプロイした AgentCore Runtime を呼び出してみることができます。

agentcore invoke '{"prompt": "Hello"}'

Lambda の実装

続いて Lambda の実装です。今回のテーマ的にはここからが本題です。
今回は、AgentCore からの応答を Lambda からもそのままストリームで応答するのと、Lambda で何らか変換処理をしてからストリームで応答する 2種類それぞれ試してみました。

(Lambda やその他の CloudFront、Lambda@Edge は CDK でデプロイできるものを GitHub に公開しています。)

Lambda の実行ロール

Lambda から AgentCore Runtime を呼び出すために、以下のポリシーを付与します。Resource はデプロイした AgentCore Runtime の ARN です。(他にマネージドポリシー AWSLambdaBasicExecutionRole もアタッチします。)

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "bedrock-agentcore:InvokeAgentRuntime",
      "Resource": [
        "arn:aws:bedrock-agentcore:${REGION}:${ACCOUNT}:runtime/XXXXX-xxxxxxxxxx",
        "arn:aws:bedrock-agentcore:${REGION}:${ACCOUNT}:runtime/XXXXX-xxxxxxxxxx/runtime-endpoint/*"
      ]
    }
  ]
}

Lambda のコード (1) - AgentCore からの応答をそのまま返す場合

Lambda のストリームレスポンスの実装方法については Lambda のドキュメントも参照してもらえればと思いますが、AgentCore Runtime からのストリームレスポンスをそのままクライアントに返す場合は以下のようにします。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import { promisify } from 'util';
import { pipeline as pipelineAsync, Readable, Writable } from 'stream';
import { randomUUID } from 'crypto';
import { LambdaFunctionURLEvent, Context, StreamifyHandler, } from 'aws-lambda';
import { BedrockAgentCoreClient, InvokeAgentRuntimeCommand, } from '@aws-sdk/client-bedrock-agentcore';

// AgentCore Runtime の ARN
const AGENT_ARN = process.env.AGENT_ARN;

const pipeline = promisify(pipelineAsync);

const agentCore = new BedrockAgentCoreClient();

// POST メソッドでクライアントから送られてくるデータの型 (独自の型定義)
//   runtimeSessionId: AgentCore Runtime の呼び出し時に設定できるセッション ID
//   prompt: プロンプト
type UserRequest = {
  runtimeSessionId?: string;
  prompt: string;
}

export const handler: StreamifyHandler = awslambda.streamifyResponse(
  async (event: LambdaFunctionURLEvent, responseStream: NodeJS.WritableStream, context: Context) => {

    // クライアントから送信されたデータを抽出 (セッション ID とプロンプト)
    const userRequest: UserRequest = extractUserRequest(event);

    // SSE (Server-Sent Events) で応答する
    const httpResponseMetadata = {
      statusCode: 200,
      headers: {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
      },
    };
    responseStream = awslambda.HttpResponseStream.from(responseStream as Writable, httpResponseMetadata);

    try {
      // AgentCore Runtime 呼び出しの入力パラメータ
      const req = new InvokeAgentRuntimeCommand({
        agentRuntimeArn: AGENT_ARN,
        runtimeSessionId: userRequest.runtimeSessionId,
        payload: new TextEncoder().encode(JSON.stringify({
          'prompt': userRequest.prompt,
        })) ,
        qualifier: 'DEFAULT',
      });

      // AgentCore Runtime の呼び出し
      const agentResponse = await agentCore.send(req);

      // 入力ストリーム (AgentCore Runtime からのレスポンス) から出力ストリーム (クライアントへのレスポンス) へデータを送信
      await pipeline(agentResponse.response as Readable, responseStream);
    }
    catch (e) {
      responseStream.write('Error!');
      if (e instanceof Error) {
        responseStream.write(' - ' + e.message);
      }
      responseStream.end();
    }
  }
);

// クライアントからのリクエストを抽出する関数
const extractUserRequest = (event: LambdaFunctionURLEvent): UserRequest => {
  let userRequest: UserRequest;
  if (event.isBase64Encoded && 'body' in event) {
    // POST された時
    userRequest = JSON.parse(Buffer.from(event.body!, 'base64').toString('utf-8'));
  }
  else if ('queryStringParameters' in event && 'prompt' in event.queryStringParameters!){
    // クエリパラメータに設定されている時
    userRequest = (({ prompt, runtimeSessionId }) => ({ prompt, runtimeSessionId }))(event.queryStringParameters as UserRequest);
  }
  else {
    userRequest = { prompt: 'hello world' };
  }

  // runtimeSessionId が指定されていないときは新たに生成
  if (!('runtimeSessionId' in userRequest) || !userRequest.runtimeSessionId) {
    userRequest.runtimeSessionId = randomUUID();
  }

  return userRequest;
};
  • 22-23行目 : ストリームレスポンスを返すために、いつものハンドラー部分は awslambda.streamifyResponse() という謎の関数を実行し、引数に渡す関数内でリクエストを処理するコードを記述します
  • 29-36行目 : Sever-Sent Events のまま応答を返すので、準じた HTTP ヘッダーを設定します
  • 40-50行目 : SDK を使って AgentCore Runtime の InvokeAgentRuntime API を呼び出します
  • 10行目 と 53行目 : pipeline() を使って、入力ストリーム (AgentCore Runtime からのレスポンス) から出力ストリーム (クライアントへのレスポンス) へそのままデータを流していきます

肝となるのは 53行目の pipeline() という関数で、これは 10行目 util.promisify(stream.pipeline) で得られる関数になり、AWS ドキュメントでもこの関数の利用が推奨されています。引数は、第 1引数に入力ストリームとして AgentCore Runtime のレスポンスを指定し、第 2引数に出力ストリームとしてクライアントへのレスポンス (responseStream) を指定することで、AgentCore Runtime からの出力を Lambda を介してそのままクライアントにストリームで返すことができます。

Lambda の設定

Lambda では “関数 URL” を有効にします。その際、“認証タイプ” は AWS_IAM に、“呼び出しモード” は RESPONSE_STREAM を選択します。

その他のリソース

CloudFront

  • 今回は Lambda の関数 URL の前段に CloudFront を配置しています。また、CloudFront を経由しない関数 URL へのアクセスを防ぐために OAC (Origin Access Control) を構成します。詳細はドキュメントも参照ください。

Lambda@Edge

  • Lambda 関数 URL に POST (または PUT) する場合は、ドキュメントにもあるとおり、リクエストボディの SHA256 を計算してそのハッシュ値を x-amz-content-sha265 ヘッダーに設定する必要があります。
  • ドキュメントにはサンプルコードとしてハッシュ計算をしている箇所も含まれているのですが、OAC を構成する場合は Lambda@Edge での SigV4 署名は不要になるので、そのまま流用するには冗長です。クラスメソッドさんの記事に、よりシンプルにした関数コードがありますので今回はこちらを利用させていただきました。

GitHub の方にはリソースを CDK でデプロイできるようにしたコードがありますので、詳細な設定などはそちらを参照くださいませ。

動作確認

リソースが全てデプロイできたら、curl でテストしてみます。

curl -X POST https://dxxxxxxxxxxxxx.cloudfront.net/ -d '{"prompt":"hello"}'

出力は以下のようになります。回答が短いレスポンスだとわかりにくいですが、長めのプロンプトを渡すとレスポンスが少しずつ返ってくるのがわかるかと思います。
(エージェント側で {"event":...} というイベントだけ応答するように絞っています。エージェントの実装の 21行目部分)

data: {"event": {"messageStart": {"role": "assistant"}}}

data: {"event": {"contentBlockDelta": {"delta": {"text": "Hi"}, "contentBlockIndex": 0}}}

data: {"event": {"contentBlockDelta": {"delta": {"text": "! How can"}, "contentBlockIndex": 0}}}

data: {"event": {"contentBlockDelta": {"delta": {"text": " I help you today?"}, "contentBlockIndex": 0}}}

data: {"event": {"contentBlockStop": {"contentBlockIndex": 0}}}

data: {"event": {"messageStop": {"stopReason": "end_turn"}}}

data: {"event": {"metadata": {"usage": {"inputTokens": 8, "outputTokens": 12, "totalTokens": 20}, "metrics": {"latencyMs": 754}}}}

event.contentBlockDelta.delta.text に差分のテキストが出力されていますね。

Lambda のコード (2) - AgentCore からの応答に何らか処理をしたい場合

AgentCore Runtime からのレスポンスに何かしら処理をしたい場合、Node.js の stream.pipeline() のドキュメント を見ると、pipeline() の引数に変換処理をするための関数を設定可能だとわかります。

以下のようなイメージになります。

await pipeline(
  <入力ストリーム>,
  <何らかの処理をする関数1>,
  <何らかの処理をする関数2>,
      :
  <出力ストリーム>,
);

<何らかの処理をする関数x> のところでデータを整形したり、フィルタしたりすることができます。今回は AgentCore Runtime からのレスポンスが data: {...(JSON)...} という形式で送られてくるので、そこからモデルが生成した応答テキスト部分だけを抽出してクライアントにストリームで返す、というのをやってみます。また、出力の最後に、セッション ID (runtimeSessionId) も返すようにしてみます。

前述の Lambda のコードのパイプラインを実行している部分 await pipeline(agentResponse.response as Readable, responseStream); の引数を以下のように書き換えます。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// 前述のコード
// 入力ストリーム (AgentCore Runtime からのレスポンス) から出力ストリーム (クライアントへのレスポンス) へデータを送信
// await pipeline(agentResponse.response as Readable, responseStream);

// 以下に書き換え

await pipeline(
  agentResponse.response as Readable, // 入力ストリーム (AgentCore Runtime からのレスポンス)
  transform,                          // 応答テキスト抽出用関数
  responseStream,                     // 出力ストリーム (クライアントへのレスポンス)
  { end: false, }                     // ストリームを終了させないようにする (出力の最後にセッション ID を追加出力するため)
);

// 最後に AgentCore Runtime を呼び出した際に返ってくるセッション ID を出力
responseStream.write('\n' + JSON.stringify({ 'runtimeSessionId': agentResponse.runtimeSessionId }));

// pipeline() で `end: false` をした場合は、必ず明示的に終了させる
responseStream.end();

pipeline() の引数として入力と出力の間に transform という関数を追加しました。関数の定義は以下のようにしました。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
const transform = async function* (source: AsyncIterable<string>) {
  (source as stream.Readable).setEncoding('utf8');

  for await (const chunk of source) {

    let responseMessage = '';

    chunk.split('\n').forEach((line: string) => {

      if (line.length === 0) {
        return;
      }

      const eventData = JSON.parse(line.replace(/^data: /, ''));

      // ストリームレスポンスの形式は Bedrock の ConversationStream に準じてそう
      // https://docs.aws.amazon.com/ja_jp/bedrock/latest/userguide/conversation-inference-call.html#conversation-inference-call-response-converse-stream
      if ('messageStart' in eventData.event) {
        console.info('Message Start');
      }
      else if ('contentBlockDelta' in eventData.event) {
        if ('text' in eventData.event.contentBlockDelta.delta) {
          responseMessage += eventData.event.contentBlockDelta.delta.text;
        }
      }
      else if ('contentBlockStop' in eventData.event) {
        console.info('Content Block Stop');
      }
      else if ('messageStop' in eventData.event) {
        console.info(`Message Stop (stop reason: ${eventData.event.messageStop.stopReason})`);
      }
      else if ('metadata' in eventData.event){
        console.info(eventData.event);
      }
    });

    yield responseMessage;
  }
};

この変換用の関数は function* という形式でジェネレータ関数というもので定義します。

  • 4行目 : 入力ストリーム (source) からチャンクとしてデータが渡されます。
  • 20-25行目 : 生成 AI モデルが応答したテキストの差分は contentBlockDelta というキーを持つイベントの text キーに設定されているので抽出します
  • 37行目 : 抽出したテキストを yield で出力していきます

AgentCore Runtime からのストリームレスポンスの形式は Bedrock の ConversationStream に準じていそうでした (AgentCore はまだベータ版なので今後変わる可能性もあります)。

上記のサンプルでは AgentCore Runtime からのレスポンスの型定義や処理中断制御のための AbortController など一部省略していますので完全なコードは GitHub を参照ください。

動作確認

HTTP レスポンスヘッダの Content-Typetext/event-stream から text/plain; charset=utf-8 に変更して curl から呼び出してみます。

curl -X POST https://dxxxxxxxxxxxxx.cloudfront.net/ -d '{"prompt":"こんにちは。あなたは何ができますか?"}'

出力は以下のようになりました。結果を貼り付けるだけではわかりにくいですが、ストリームレスポンスとして少しずつ結果が返ってきたかと思います。

こんにちは。私はさまざまな分野のお手伝いができます。例えば:

1. 質問への回答
2. 情報の提供
3. 文章の作成・編集
4. 翻訳(複数の言語に対応)
5. 数学や科学の問題解決
6. プログラミングのアドバイス
7. 分析やアイデアの提案

また、対話を通じて学習し、より良い回答ができるよう努めています。ただし、以下のことはできません:

- インターネットへのアクセス
- 音声や画像の生成
- リアルタイムの情報提供
- 金融取引や予約など

ご質問や課題がありましたら、できる限りお手伝いさせていただきます。
{"runtimeSessionId":"56844334-8b1d-4525-9fd1-bbf6b1d71498"}

まとめ

AgentCore Runtime のストリームレスポンスをプロキシするような形で、Lambda を入れて構築してみました。Lambda では Function URLs (関数 URL) とストリームレスポンスを利用することで、AgentCore Runtime からのストリームレスポンスをそのままクライアントに返すことが可能ですし、あるいは Lambda 側で何らかの変換処理をかけながらクライアントにデータを返すことが可能です。

また、冒頭であげた以下のユースケースにも対応できるかと思います。

  • AgentCore Runtime 呼び出し時のアカウント ID を隠蔽したい場合
  • AgentCore Runtime からのレスポンスに変換処理をかけたい場合、独自のストリーム形式でクライアントに返したい場合
  • AgentCore Runtime の呼び出しに際して WAF を導入したい場合
  • AgentCore Runtime 呼び出し時に独自の認証方法を利用したい場合

最後に・・・

この投稿は個人的なものであり、所属組織を代表するものではありません。ご了承ください。