Web アプリにおいてサーバーからクライアント (ブラウザ) に向けてデータをプッシュしたいシーンがあるかと思います。WebSocket を使った GraphQL などが使われ、AWS のマネージドサービスとしては AppSync があります。一般的な例としては AppSync を通じて DynamoDB のデータを更新すると、AppSync にサブスクライブしているクライアントに対してイベントをリアルタイムに送信できたりします。今回は AppSync を経由せずに DynamoDB が更新された時に、AppSync にサブスクライブしているクライアントにサーバーからイベントをプッシュする方法を Amplify Gen2 で実装してみました。スマートにできなかったところも一部あったのですが、記録として残しておきたいと思います。

はじめに - サーバープッシュを利用するモチベーション

サーバーからクライアントにデータをプッシュしたいシーンとしては以下のようなものがあげられるでしょうか。

  • 非同期化した処理の結果をサーバーサイドからクライアントに送信するシーン (バッチ処理の処理結果を送信とか)
  • (クライアントとは別の) 外部要因によるデータの更新をクライアントにリアルタイムに送信するシーン (株価の変動をリアルタイムに送信とか)

古来より伝わる方法としては、クライアントからポーリングしたりするのですが、リアルタイムではなかったり、結果が得られるまで何度も問い合わせるので無駄も生じていました。サーバープッシュにより無駄なく、そしてほぼリアルタイムにクライアントに情報を伝達できるようになります。

今回検討したアーキテクチャ

今回は以下のアーキテクチャで実装してみました。クライアント (右側) は AppSync に対してサブスクライブしています。何らかの外部要因により DynamoDB が更新されると、更新された項目の情報が DynamoDB Streams に流れるので Lambda でその情報を元に AppSync に対して Mutation リクエストを送信します。AppSync はその更新情報を適切なクライアントにパブリッシュするようにします。

Architecture

上記のようなアーキテクチャにせずとも、AppSync 経由で DynamoDB を更新 (Mutate) することで、DynamoDB Stream や Lambda を使わずともサブスクライバに情報をプッシュすることももちろんできますが、今回は何らか別の要因で DynamoDB の項目が更新された場合を想定します。

例えば、IoT Core のルールを使うとデバイスからの情報を直接 DynamoDB に保存することができますので、それをトリガーにサーバーからクライアントに更新情報をプッシュするようなケースが考えられます。デバイスからの情報をニアリアルタイムにブラウザに表示するようなダッシュボードなどが一例になるでしょうか。

Example use case

というわけで上図を元に、以下のようなシナリオを考えてみます。

想定シナリオ

  1. ユーザーは IoT デバイスを登録する (DynamoDB にデバイス情報を保存)
  2. IoT デバイスは IoT Core に接続し、ライフサイクルイベントを使ってデバイスの接続 / 切断イベントをルールを通じて DynamoDB の項目を直接更新
  3. DynamoDB の更新を DynamoDB Stream 経由で Lambda で取得
  4. Lambda から AppSync に Mutate
  5. AppSync からクライアントに更新内容をプッシュ

※ 今回 IoT Core 部分の実装は本題から外れますので割愛します🙇‍♂️

Amplify Gen2 を使った実装

実装をハンズオンっぽくステップバイステップで紹介していきたいと思います。

(作成したものは一式 GitHub にもあります。)

ちなみに今回使用した主要なパッケージのバージョンは以下になります。

@aws-amplify/backend 1.0.3
@aws-amplify/backend-cli 1.0.4
@aws-amplify/ui-react 6.1.12
aws-amplify 6.3.6
aws-cdk-lib 2.145.0
react 18.3.1
vite 5.2.12

TOC

  1. Amplify Gen2 プロジェクトのセットアップ
  2. Amplify の構成情報などをロード
  3. 認証まわり
  4. データ (と API) まわり
  5. UI の修正 (デバイスの新規登録)
  6. UI の修正 (デバイス一覧の表示)
  7. Mutation の追加
  8. Lambda 関数の追加
  9. DynamoDB Streams とトリガーの設定
  10. AppSync の Subscription の設定
  11. UI の修正 (サブスクライブの設定)
  12. 動作確認…が、失敗
  13. 改めて動作確認
  14. 後片付け (サンドボックスの削除)

0. Amplify Gen2 プロジェクトのセットアップ

まずは Amplify の環境をセットアップします。前回記事にしましたのでそちらを参照ください。

サンドボックスが起動していない場合は、pnpm exec ampx sandbox で起動しておきます。
まだ、開発用の Web サーバが起動していない場合は pnpm run dev で起動しておきます。

1. Amplify の構成情報などをロード

はじめに src/main.tsx に以下のハイライト部分 (6, 7, 8, 10 行目) を追加します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import React from 'react'
import ReactDOM from 'react-dom/client'
import App from './App.tsx'
import './index.css'

import '@aws-amplify/ui-react/styles.css'
import { Amplify } from 'aws-amplify'
import outputs from '../amplify_outputs.json'

Amplify.configure(outputs);

ReactDOM.createRoot(document.getElementById('root')!).render(
  <React.StrictMode>
    <App />
  </React.StrictMode>,
)

参照しているパッケージをプロジェクトに追加します。

pnpm install aws-amplify @aws-amplify/ui-react

2. 認証まわり

AppSync への接続例では API キーを使ったものが多いので、Cognito のユーザプールを使った認証で接続してみたいと思います。

amplify/auth というディレクトリを作成し、amplify/auth/resource.ts というファイルを作成します。中身は以下のようにします。

1
2
3
4
5
6
7
import { defineAuth } from '@aws-amplify/backend';

export const auth = defineAuth({
  loginWith: {
    email: true,
  }
});

続いて、amplify/backend.ts を以下のようにします。

1
2
3
4
5
6
import { defineBackend } from '@aws-amplify/backend';
import { auth } from './auth/resource';

defineBackend({
  auth,
});

サンドボックスが起動していれば、上記の記述で、AWS 側に Cognito のユーザープールが作成されます。

続いて、src/App.tsx を以下のように書き換えます。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import { Authenticator } from '@aws-amplify/ui-react';
import './App.css';

function App() {
  return (
    <Authenticator>
      {({ signOut, user }) => (
        <p>Demo App</p>
      )}
    </Authenticator>
  )
}

export default App

ブラウザを起動し、http://localhost:5173 に接続してみます。以下のように表示されたら、“Create Account” からユーザーを作成しておきます (E メールに確認コードが届きます)。

Added auth

アカウント作成後、ログインすると “Demo App” とだけ表示されると思います。

3. データ (と API) まわり

DynamoDB に登録するデータはシンプルに以下のようにしてみます。

テーブル名 : Device

属性名 説明
name 文字列 IoT デバイスの名前
isConnect 真偽値 デバイスが接続されているかどうか

amplify/data というディレクトリを作成し、amplify/data/resource.ts というファイルを作成します。まずはスキーマの定義だけにします。中身は以下のようにします。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import { a, defineData, type ClientSchema } from '@aws-amplify/backend';

const schema = a.schema({
  Device: a.model({
    name: a.string(),
    isConnect: a.boolean(),
  })
  .authorization( allow => [allow.owner()] ),
});

export type Schema = ClientSchema<typeof schema>;

export const data = defineData({
  schema,
  authorizationModes: {
    defaultAuthorizationMode: 'userPool',
  }
});

8行目にあるように、認証として allow.owner() を設定し、自身で登録したデバイスのみアクセスできるようにします。
16行目でデフォルトの認証モードとして Cognito のユーザープールによる認証を指定しています。
データへの権限について詳しくは公式ドキュメントの以下を見ていただくと良いかもしれません。

続いて、amplify/backend.ts に定義したデータについて追加していきます。

1
2
3
4
5
6
7
8
import { defineBackend } from '@aws-amplify/backend';
import { auth } from './auth/resource';
import { data } from './data/resource';

defineBackend({
  auth,
  data,
});

ハイライトした 3、7行目を追加します。サンドボックスが動き出し、AWS 側には Data (DynamoDB テーブル) と API (AppSync GraphQL API) が作成されます。

4. UI の修正 (デバイスの新規登録)

UI を修正して、デバイスを新規登録するボタンとその処理を追加します。簡単のため、フォームなど使わず、ボタンを押したら固定値で登録してしまいます。(※ DynamoDB のパーティションキーには固有の id が自動で生成されて登録されます。)

src/App.tsx を以下のようにします。ハイライト行が追加箇所になります。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import { Authenticator } from '@aws-amplify/ui-react';
import { generateClient } from 'aws-amplify/data';
import { type Schema } from '../amplify/data/resource';
import './App.css';

const client = generateClient<Schema>({
  authMode: 'userPool',
});

function App() {

  // とりあえず固定値でデバイスを新規登録 (複数登録する場合は `name` を都度変更してください)
  // (DynamoDB のパーティションキーには固有の `id` が自動で生成されて登録される
  const createDevice = async () => {
    await client.models.Device.create({
      name: "device-A-1",
      isConnect: false,
    });
  };

  return (
    <Authenticator>
      {({ signOut, user }) => (
        <>
          <p>Demo App</p>
          <button onClick={createDevice}>New Device</button>
        </>
      )}
    </Authenticator>
  )
}

export default App
  • 6-8行目 : Data にアクセスするためのクライアントを生成しています
  • 15行目 : client.models にアクセスすると定義した Device にアクセスできます。create してあげるだけでデータを DynamoDB テーブルに新規追加できます

http://localhost:5173 に接続すると、“New Device” というボタンが表示されるので押してみましょう。

New device button

AppSync の GraphQL API を通じて、DynamoDB の Device- で始まるテーブルにデータが追加されているはずです。

Added new device

属性名の順番がわかりにくいですが、自ら定義した nameisConnect の他に以下の属性が自動的に追加されています。

  • id (パーティションキー)
  • __typename
  • owner
  • createdAt
  • updatedAt

5. UI の修正 (デバイス一覧の表示)

もう少し UI を修正して、ブラウザでの読み込み時に登録したデバイスの一覧を表示するようにします。

src/App.tsx を以下のようにします。ハイライト行が追加箇所になります。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import { useState, useEffect } from 'react'
import { Authenticator } from '@aws-amplify/ui-react';
import { Table, TableCell, TableBody, TableHead, TableRow } from '@aws-amplify/ui-react';
import { generateClient } from 'aws-amplify/data';
import { type Schema } from '../amplify/data/resource';
import './App.css';

const client = generateClient<Schema>({
  authMode: 'userPool',
});

function App() {

  const [devices, setDevices] = useState<Schema['Device']['type'][]>([]);

  const fetchDevices = async () => {
    const { data: devices, errors } = await client.models.Device.list();
    setDevices(devices);
  };

  useEffect(() => {
    fetchDevices();
  }, []);

  // とりあえず固定値でデバイスを新規登録 (複数登録する場合は `name` を都度変更してください)
  // (DynamoDB のパーティションキーには固有の `id` が自動で生成されて登録される
  const createDevice = async () => {
    await client.models.Device.create({
      name: "device-A-1",
      isConnect: false,
    });

    // 新規にデバイスを追加した場合も一覧を再取得
    fetchDevices();
  };

  return (
    <Authenticator>
      {({ signOut, user }) => (
        <>
          <p>Demo App</p>
          <p>
            <button onClick={createDevice}>New Device</button>
          </p>
          <Table style={{backgroundColor: "white"}}>
            <TableHead>
              <TableRow>
                <TableCell as="th">ID</TableCell>
                <TableCell as="th">Name</TableCell>
                <TableCell as="th">isConnect</TableCell>
              </TableRow>
            </TableHead>
            <TableBody>
              {devices.map((device) => (
                <TableRow>
                  <TableCell>{device.id}</TableCell>
                  <TableCell>{device.name}</TableCell>
                  <TableCell>{device.isConnect.toString()}</TableCell>
                </TableRow>
              ))}
            </TableBody>
          </Table>
        </>
      )}
    </Authenticator>
  )
}

export default App
  • 17行目 : 一覧を取得する場合は list() を呼べば取得できます。今の時点ではわかりにくいですが、owner のみアクセスできる権限にしていますので、ユーザーが登録した項目のみ、すなわち DynamoDB テーブルの owner が一致する項目のみ取得されます。

ブラウザを再読み込みすると、以下のように表示されます。

List devices

6. Mutation の追加

ここまでで大体基本的な機能が整いました。ここから本題となる Mutation や Subscription を追加していきます。まずは Mutation。

この Mutation は DynamoDB Streams をトリガーに呼び出される Lambda から実行されます。

amplify/data/resource.ts を以下のようにします。ハイライト行が追加箇所になります。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import { a, defineData, type ClientSchema } from '@aws-amplify/backend';

const schema = a.schema({
  Device: a.model({
    name: a.string(),
    isConnect: a.boolean(),
  })
  .authorization( allow => [allow.owner()] ),

  updateConnectionStatus: a.mutation()
    .arguments({
      id: a.id().required(),
      isConnect: a.boolean(),
    })
    .returns(a.ref('Device'))
    .authorization( allow => [allow.authenticated()] )
    .handler(
      a.handler.custom({
        dataSource: a.ref('Device'),
        entry: './updateConnectionStatus.js',
      })
    ),
});

export type Schema = ClientSchema<typeof schema>;

export const data = defineData({
  schema,
  authorizationModes: {
    defaultAuthorizationMode: 'userPool',
  }
});

updateConnectionStatus という Mutation を追加していきます。

続いて、20行目にあるとおり、リゾルバを追加していきます。amplify/data/updateConnectionStatus.js というファイルを作成し、中身を以下のようにします。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import { util } from "@aws-appsync/utils";
import * as ddb from "@aws-appsync/utils/dynamodb";

export function request(ctx) {
  console.log('resolver(updateConnectionStatus)#request', ctx);
  const { id } = ctx.args;
  return ddb.get({ key: { id }});
}

export function response(ctx) {
  console.log('resolver(updateConnectionStatus)#response', ctx);

  const { error, result } = ctx;
  if (error) {
    return util.appendError(error.message, error.type, result)
  }
  return ctx.result
}

5、11行目にあるとおり、console.log を使ってログを出力することができます (※ AppSync のログ出力が有効になっている必要があります)

AWS 側では AppSync の GraphQL API に Mutation が追加されます (API 選択後、“スキーマ” を開いて type Mutation {... を探すと追加されています)。

7. Lambda 関数の追加

DynamoDB Streams をトリガーに起動する Lambda 関数を追加していきます。

amplify/function/ddb-streams-trigger というディレクトリを作成し、amplify/function/ddb-streams-trigger/resource.ts というファイルを作成します。中身は以下のようにします。

1
2
3
4
5
6
import { defineFunction } from '@aws-amplify/backend';

export const ddbStreamsTriggerFunction = defineFunction({
  name: 'ddb-streams-trigger',
  entry: './handler.ts',
});

続いて Lambda 関数の実態を作成します。amplify/function/ddb-streams-trigger/handler.ts というファイルを作成し、中身を以下のようにします。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import { default as fetch, Request } from 'node-fetch';
import { HttpRequest } from '@smithy/protocol-http';
import { SignatureV4 } from '@smithy/signature-v4';
import { Sha256 } from '@aws-crypto/sha256-universal';
import { defaultProvider } from '@aws-sdk/credential-provider-node';
import type { DynamoDBStreamHandler } from 'aws-lambda';

const ENDPOINT = process.env.ENDPOINT!;
const AWS_REGION = process.env.AWS_REGION!;

// Mutation クエリ
const query = `mutation updateConnectionStatus($id: ID!, $isConnect: Boolean) {
  updateConnectionStatus(id: $id, isConnect: $isConnect){
    id
    owner
    name
    isConnect
    createdAt
    updatedAt
  }
}`;

const appSyncEndpoint = new URL(ENDPOINT);

export const handler: DynamoDBStreamHandler = async (event) => {
  console.debug(JSON.stringify(event));

  // SigV4 での Signer 生成
  const sigV4Signer = new SignatureV4({
    credentials: defaultProvider(),
    region: AWS_REGION,
    service: 'appsync',
    sha256: Sha256,
  });

  // 失敗した項目だけを Streams に報告するための配列
  // https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting
  const batchItemFailures: DynamoDBBatchItemFailure[] = [];

  for (const record of event.Records) {
    if (record.eventName !== 'MODIFY') {
      continue;
    }

    // DynamoDB Streams から更新された情報を取得
    const newImage = record.dynamodb?.NewImage;
    const variables = {
      id: newImage?.id.S,
      isConnect: newImage?.isConnect.BOOL,
    };

    // SigV4 署名リクエストを作成
    const httpRequest = new HttpRequest({
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        host: appSyncEndpoint.host,
      },
      hostname: appSyncEndpoint.host,
      body: JSON.stringify({ query, variables }),
      path: appSyncEndpoint.pathname,
    });
    const signedRequest = await sigV4Signer.sign(httpRequest);

    // AppSync のエンドポイントへリクエスト
    const request = new Request(ENDPOINT, signedRequest);

    try {
      const response = await fetch(request);
      const body = await response.json();

      if (body.errors){
        console.error(body);
        batchItemFailures.push(record.dynamodb!.SequenceNumber!);
      }
    }
    catch (error) {
      console.error(error);
      batchItemFailures.push(record.dynamodb!.SequenceNumber!);
    }
  }

  return { batchItemFailures };
};

AppSync のクライアントを使えばもっと楽に書けると思いますが、今回はあえて汎用的なライブラリを使って SigV4 署名したリクエストで AppSync に接続します。

  • 29-34行目 : SigV4 署名を行うための Signer を生成しています
  • 53-63行目 : リクエストに SigV4 署名を付加しています

詳しく知りたい方は、公式ドキュメントでも紹介されています。Amplify Gen1 のものですが、Lambda から AppSync へのリクエストに関して Amplify は登場しないので汎用的に参考になると思います。

そして、Lambda 内で参照しているライブラリ類をプロジェクトに追加します。

pnpm install \
  @aws-crypto/sha256-universal \
  @aws-sdk/credential-provider-node \
  @smithy/protocol-http \
  @smithy/signature-v4 \
  node-fetch

pnpm install -D \
  @types/aws-lambda \
  aws-lambda

さらに amplify/backend.ts で Lambda 関数について定義を追加します。ハイライトされた行が追加箇所になります。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import { defineBackend } from '@aws-amplify/backend';
import { auth } from './auth/resource';
import { data } from './data/resource';
import { ddbStreamsTriggerFunction } from './function/ddb-streams-trigger/resource';

defineBackend({
  auth,
  data,
  ddbStreamsTriggerFunction,
});

サンドボックスが動き出し、Lambda 関数が AWS 側に作成されます。

8. DynamoDB Streams とトリガーの設定

Lambda 関数はただ作成しただけなので、DynamoDB Streams の有効化、Lambda 関数のトリガーとして設定していきます。公式ドキュメントにも DynamoDB Stream を使ったサンプルが公開されているので参考にします。

まず、CDK を扱えるようにプロジェクトに追加します。

pnpm install -D aws-cdk-lib

amplify/backend.ts を以下のようにします。ハイライトされた行が追加箇所になります。なお、7行目は変更している箇所で、defineBackend({ から const backend = defineBackend({ と、変数に代入するように変更しています。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import { defineBackend } from '@aws-amplify/backend';
import { auth } from './auth/resource';
import { data } from './data/resource';
import { ddbStreamsTriggerFunction } from './function/ddb-streams-trigger/resource';
import * as cdk from 'aws-cdk-lib';

const backend = defineBackend({
  auth,
  data,
  ddbStreamsTriggerFunction,
});

// DynamoDB Streams の表示タイプを設定
// (https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling)
backend.data.resources.cfnResources.amplifyDynamoDbTables.Device.streamSpecification = {
  streamViewType: cdk.aws_dynamodb.StreamViewType.NEW_IMAGE,
};

// Lambda のトリガーとなるイベントソースとして DynamoDB Streams を設定
const eventSource = new cdk.aws_lambda_event_sources.DynamoEventSource(backend.data.resources.tables['Device'], {
  startingPosition: cdk.aws_lambda.StartingPosition.LATEST,
});
backend.ddbStreamsTriggerFunction.resources.lambda.addEventSource(eventSource);

// Lambda に AppSync GraphQL API を実行する権限を与える
backend.data.resources.graphqlApi.grant(
  backend.ddbStreamsTriggerFunction.resources.lambda,
  cdk.aws_appsync.IamResource.all(),
  'appsync:GraphQL'
);

// Lambda 関数の環境変数に AppSync のエンドポイント URL を設定
backend.ddbStreamsTriggerFunction.resources.cfnResources.cfnFunction.environment = {
  variables: {
    ENDPOINT: backend.data.resources.cfnResources.cfnGraphqlApi.attrGraphQlUrl,
  }
};

backend.XXX.resources のオブジェクトから CDK の L1 や L2 の Construct にアクセスできるので、CDK の記法を使っていろいろと設定していきます。

DynamoDB Streams を Lambda 関数のトリガーとして設定しました。今の状態で、DynamoDB の項目を更新したりすると Lambda 関数が実行され、AppSync を Mutate するところまでできています。

9. AppSync の Subscription の設定

AppSync 側で Mutate されたら、それをサブスクライブすることで、ニアリアルタイムにクライアントにデータをプッシュすることができます。ここでは Subscription の設定をしていきます。

amplify/data/resource.ts の中身を以下のようにします。ハイライトされた行が追加箇所になります。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import { a, defineData, type ClientSchema } from '@aws-amplify/backend';

const schema = a.schema({
  Device: a.model({
    name: a.string(),
    isConnect: a.boolean(),
  })
  .authorization( allow => [allow.owner()] ),

  updateConnectionStatus: a.mutation()
    .arguments({
      id: a.id().required(),
      isConnect: a.boolean(),
    })
    .returns(a.ref('Device'))
    .authorization( allow => [allow.authenticated()] )
    .handler(
      a.handler.custom({
        dataSource: a.ref('Device'),
        entry: './updateConnectionStatus.js',
      })
    ),

  receiveUpdatedStatus: a.subscription()
    .for(a.ref('updateConnectionStatus'))
    .authorization( allow => [allow.authenticated()] )
    .handler(
      a.handler.custom({
        entry: './receiveUpdatedStatus.js',
      })
    ),
});

export type Schema = ClientSchema<typeof schema>;

export const data = defineData({
  schema,
  authorizationModes: {
    defaultAuthorizationMode: 'userPool',
  }
});

receiveUpdatedStatus という名前で Subscription を追加します。

続いてリゾルバを作成します。amplify/data/receiveUpdatedStatus.js というファイルを作成し、中身は以下のようにします。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import { util, extensions } from "@aws-appsync/utils"

export function request(ctx) {
  console.log('resolver(receiveUpdatedStatus)#request', ctx);
  return { payload: null };
}

export const response = (ctx) => {
  console.log('resolver(receiveUpdatedStatus)#response', ctx);
  const filter = {
    owner: {
      eq: ctx.identity.sub
    }
  };

  extensions.setSubscriptionFilter(util.transform.toSubscriptionFilter(filter));

  return ctx.result;
};
  • 5行目 : Amplify のドキュメント (Add custom real-time subscriptions) によると request() では null ペイロードを返さなければならないようなので、それに倣います
  • 10-14行目 : サーバーサイドでフィルタを設定しています。今回はユーザが自分で登録したデバイスだけ参照できるように、DynamoDB テーブルの ownew がユーザ名 (sub) と一致する場合のみ、Subscriber にデータを送信するようにします。リゾルバから参照するコンテキスト (ctx) のリファレンスは公式ドキュメントを参照ください

AWS 側では AppSync の GraphQL API に Mutation が追加されます (API 選択後、“スキーマ” を開いて type Subscription {... を探すと追加されています)。

10. UI の修正 (サブスクライブの設定)

UI を修正し、クライアントの読み込み時に、AppSync にサブスクライブするようにします。

src/App.tsx を以下のようにします。ハイライトされた行が追加箇所になります。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import { useState, useEffect } from 'react'
import { Authenticator } from '@aws-amplify/ui-react';
import { Table, TableCell, TableBody, TableHead, TableRow } from '@aws-amplify/ui-react';
import { generateClient } from 'aws-amplify/data';
import { type Schema } from '../amplify/data/resource';
import './App.css';

const client = generateClient<Schema>({
  authMode: 'userPool',
});

function App() {

  const [devices, setDevices] = useState<Schema['Device']['type'][]>([]);

  const fetchDevices = async () => {
    const { data: devices, errors } = await client.models.Device.list();
    setDevices(devices);
  };

  useEffect(() => {
    fetchDevices();
  }, []);

  // サブスクライブ
  useEffect(() => {
    const sub = client.subscriptions.receiveUpdatedStatus().subscribe({
      next: (updatedDevice) => {
        setDevices((currentDevices) => currentDevices.map((device) => device.id === updatedDevice.id ? updatedDevice : device));
      },
    });
    return () => sub.unsubscribe();
  }, []);

  // とりあえず固定値でデバイスを新規登録 (複数登録する場合は `name` を都度変更してください)
  // (DynamoDB のパーティションキーには固有の `id` が自動で生成されて登録される
  const createDevice = async () => {
    await client.models.Device.create({
      name: "device-A-1",
      isConnect: false,
    });

    // 新規にデバイスを追加した場合も一覧を再取得
    fetchDevices();
  };

  return (
    <Authenticator>
      {({ signOut, user }) => (
        <>
          <p>Demo App</p>
          <p>
            <button onClick={createDevice}>New Device</button>
          </p>
          <Table style={{backgroundColor: "white"}}>
            <TableHead>
              <TableRow>
                <TableCell as="th">ID</TableCell>
                <TableCell as="th">Name</TableCell>
                <TableCell as="th">isConnect</TableCell>
              </TableRow>
            </TableHead>
            <TableBody>
              {devices.map((device) => (
                <TableRow>
                  <TableCell>{device.id}</TableCell>
                  <TableCell>{device.name}</TableCell>
                  <TableCell>{device.isConnect.toString()}</TableCell>
                </TableRow>
              ))}
            </TableBody>
          </Table>
        </>
      )}
    </Authenticator>
  )
}

export default App
  • 27-31行目 : サブスクライブしている箇所。サーバーサイドでフィルタを入れているので、サブスクライブの際、引数にユーザ名等を指定する必要はありません。(クライアント側で引数変更して他人の更新をサブスクライブできちゃっても困りますよね…)

11. 動作確認…が、失敗

ここまでで一通りの実装ができたので、動作確認ということで、AWS のマネジメントコンソールから DynamoDB の項目の isConnecttrue に更新してみます。…が、ブラウザ側に更新が届きません。。。

調べてみると、Lambda から AppSync を呼び出した際に以下のようなエラーが出力されていました。どうやら認証がうまくできていなさそうです。

{
  data: { updateConnectionStatus: null },
  errors: [
    {
      path: [Array],
      data: null,
      errorType: 'Unauthorized',
      errorInfo: null,
      locations: [Array],
      message: 'Not Authorized to access updateConnectionStatus on type Mutation'
    }
  ]
}

Lambda の IAM Role は問題なかったので、AppSync のスキーマを確認してみたところ、IAM 認証による Mutate が許可されていませんでした。
(自動で生成された createDevice などには @aws_iam が付いているのに、自前で作成した updateConnectionStatus には付いていない…)

Not permitted to mutate by lambda

amplify/data/resource.ts の 16行目のところで IAM 認証が設定できれば良さそうなので、ドキュメント (Customize your auth rules) を参考に試してみました。

10
11
12
13
14
15
16
17
18
19
20
21
22
  updateConnectionStatus: a.mutation()
    .arguments({
      id: a.id().required(),
      isConnect: a.boolean(),
    })
    .returns(a.ref('Device'))
    .authorization( allow => [allow.authenticated()] )
    .handler(
      a.handler.custom({
        dataSource: a.ref('Device'),
        entry: './updateConnectionStatus.js',
      })
    ),

allow.authenticated('identityPool')allow.resource(ddbStreamsTriggerFunction) など試してみましたが、カスタムのハンドラのためサポートされていないとのエラーが出てしまいます。

identityPool-based auth (allow.guest() and allow.authenticated('identityPool')) is not supported with a.handler.custom

GitHub でも Issue として上がっていました。

CDK からも操作しにくそうな感じでした。仕方がないので不本意ではありますが、ここを上手く書くのが今回の本題ではないので、AppSync のマネジメントコンソールからスキーマを手で変更しました (@aws_iam を追記)。

Add @aws-iam auth type to mutation

12. 改めて動作確認

改めて、AWS のマネジメントコンソールから DynamoDB の項目の isConnect を更新してみます (DynamoDB で項目を保存したとしても値に変更がなければ DynamoDB Streams に流れない点にご注意ください)。

クライアント側の表示がサーバーからのプッシュにより更新されれば成功です!🙌

今回はユーザー自身の項目のみ一覧に出力するようにしたので、別のユーザーアカウントを作成して試してみてください。他人のデータは一覧、サーバープッシュによる更新とも取得できないようになっているはずです。

13. 後片付け (サンドボックスの削除)

サンドボックスは AWS クラウド側では CloudFormation スタックとして存在しています。ローカル側でサンドボックスのプロセス (ampx) を終了させても、CloudFormation スタックが消えるわけではありません。スタックを削除するには以下のコマンドを実行します。

pnpm exec ampx sandbox delete

確認が入るので “y” を押します。

? Are you sure you want to delete all the resources in your sandbox environment (This can't be undone)? (y/N)

まとめ

Amplify Gen2 を使って、外的要因で更新された DynamoDB の更新イベントを AppSync のサブスクライブで受信するデモをステップバイステップで紹介しました (一部手作業が入ってしまいかっこ悪さが否めません…)。
ニアリアルタイムイベントの受信や、非同期処理の結果受信などのユースケースを検討されている方の参考になれば幸いです。

最後に・・・

この投稿は個人的なものであり、所属組織を代表するものではありません。ご了承ください。