投稿日

Amazon Kinesis Data Streamsのデータストリームを、リソースベースポリシーで他のAWSアカウントと共有する方法

こんにちは、ソラコムの松下(ニックネーム: Max)です。

Amazon Kinesis Data Streams(以下、KDS) は、大量のデータをリアルタイムに収集、後続のAWSサービスへとつなげるマネージドのデータ処理サービスです。ログなど順序に意味のあるデータ(ストリーミングデータ)の扱いに向いてることから、IoTのデータ収集にも使われることの多いサービスです。例えば、機械学習による産業機器の予知保全ができる Amazon Monitron のデータエクスポート先に指定できたりします。

KDS はデータ生成元(プロデューサー)とデータ処理先(コンシューマー)を仲介するサービスで、このようなアーキテクチャーで利用します。

AWSクラウドを本格的に利用していると、用途の区別やコスト管理の面からAWSアカウントを分けて運用することもあります。この時、KDSに収集されたストリーミングデータを他のAWSアカウントで処理したいケースがあります。

このブログでは、Amazon Kinesis Data Streams のリソースベースポリシー (2023年11月のアップデート)を用いてデータストリームを他のAWSアカウントと共有し、AWS Lambda から参照(Lambda 関数のトリガーに指定)する手順を紹介します。ちなみにデータストリームとは、データが流れるパイプのようなものです。

アーキテクチャーと手順

アーキテクチャーは以下の通りです。

設定のポイント

設定のポイントを紹介します。

  • データ処理側の Lambda 関数の実行ロールには、AWSLambdaKinesisExecutionRole ポリシーが必要
  • データ生成元側の KDS のデータストリームのリソースベースポリシーには、Principalにデータ処理側のLambda 関数の実行ロールのIAMロールのARNを指定し、Allow Actionsには「kinesis:DescribeStream」「kinesis:DescribeStreamSummary」「kinesis:GetRecords」「kinesis:GetShardIterator」「kinesis:ListShards」の5つが設定されていること

以上2点に気を付けてください。

特に2つ目のリソースベースポリシーですが、マネジメントコンソール(Web)上のダイアログで指定する方法では「kinesis:DescribeStream」の指定が抜けています。JSONエディタで手動追加する必要があるので注意が必要です(2024年3月現在の情報、レポート済み)。

以下の公式ドキュメントも参考になります。

手順

  • データ生成元側 → データ処理側 → データ生成元側 → データ処理側と、何度かアカウントをまたいだ作業となります。対象を間違えないように気をつけましょう。
  • すべて同じリージョンである必要があります。異なるリージョンでは共有できません(例: データストリームがus-west-2で、Lambda関数がap-northeast-1 とはできない)。異なるリージョンにデータを送りたい場合は、あとがきのAmazon EventBridgeを用いたアーキテクチャーを検討してください。

1. データ生成元側

1-1. Amazon Kinesis で、データストリーム(以下、kds-sharing-example1)を作成する

具体的な作成方法はこちら(Step1: Create Data Stream)をご覧ください。

テストや少量のデータ量ならば、容量モードは「プロビジョンド」、プロビジョニングされたシャードは「1」で十分です。

データストリームは新規作成を前提として進めていますが、既存のデータストリームの流用もできます。

2. データ処理側

2-1. AWS Lambda で、Lambda 関数(以下、kds-reader1)を作成する

具体的な作成方法はこちら(Create a Lambda function with the console)をご覧ください。

コードソースは以下の通り(Python 3)です。event をAmazon CloudWatch Logsに出力するだけのシンプルな内容ですが、動作確認には十分役立ちます。コードソースを以下の内容に書き換えたら「Deploy」をクリックしてデプロイします。

def lambda_handler(event, context):
    print(event)
    return True

関数のテストのテンプレートには、Amazon Kinesis Data Streams のサンプルデータがあります。これを利用してテストが可能です。

2-2. kds-reader1の実行ロールに、AWSLambdaKinesisExecutionRole ポリシーをアタッチする

kds-reader1 の詳細を表示した後、「設定」→「アクセス権限」と進み、実行ロールに割り当てられているロール名をクリックし、ロールの設定を表示します(下図では kds-reader1-role-q6zcv9kq をクリックします)。

許可ポリシーの「許可を追加」→「ポリシーをアタッチ」と進み、アタッチするポリシー一覧を表示します。

「その他許可ポリシー」の一覧から AWSLambdaKinesisExecutionRole を選択した後、「許可を追加」をクリックします。

ポリシーの追加は完了です。
以下のように、許可ポリシー一覧に AWSLambdaKinesisExecutionRole が追加されていることを確認してください。

Lambda 関数の実行ロールは、Lambda 関数作成時に自動的に作成・アタッチされる IAMロールを利用する前提で進めています。既存のIAMロールの流用もできます。

2-3. IAMロールのARNをメモする

このIAMロール(この手順では kds-reader1-role-q6zcv9kq )のARNをメモしてください。データ生成元側で利用します。

3. データ生成元側

3-1. Amazon Kinesis で、kds-sharing-example1 のリソースベースポリシーを設定する

kds-sharing-example1 の詳細を表示した後、「データストリーム共有」→「ポリシーの作成」と進みます。

ポリシー詳細で、ビジュアルエディタを選択し、「データストリーム共有スループット読み取りアクセス」にチェックを付け、「プリンシパルを指定する」には、先ほどメモをしたIAMロールのARNを入力して、「ポリシーの作成」をクリックします。

プリンシパルにはIAMロールのARNを指定しましょう。IAMロールのARN以外(例えばLambda 関数のARNや、データストリームのARN)を指定した場合はエラーとなり、ポリシーが作成できません。

リソースベースのポリシーが表示されたら「編集」をクリックして、JSONエディタを表示します。ここでは、以下のようにActionの一覧へ "kinesis:DescribeStream" を追加します。最後に「変更を保存」をクリックします。

ビジュアルエディタの段階ですでに同権限が追加されていたら、上記の編集は不要です。

リソースベースポリシーの設定は完了です。
以下のように、PrincipalにIAMロールのARNが、Actionに5つの権限がついていることを確認してください。

3-2. データストリームのARNをメモする

kds-sharing-example1 のARNをメモしてください。データ処理側で利用します。

4. データ処理側

4-1. AWS Lambdaで、kds-reader1 のトリガーを設定する

kds-reader1 の詳細を表示した後、「設定」→「トリガー」→「トリガーを追加」と進みます。

トリガーの設定では「ソースを選択」に Kinesis を指定します。続いて表示された内容の中で「Kinesis stream」に kds-sharing-example1 の ARN を設定します。他の項目はそのままに「追加」をクリックします

テキストボックスにフォーカスが移動した際に “項目がありません” と表示されますが、気にせず ARN を入力してください。

「追加」をクリックしたときに API エラーが発生した時の確認ポイント

「追加」クリック時に API エラーが発生した場合は、次の2点を確認してください。(1) データストリーム側のリソースベースポリシーの権限。特に kinesis:DescribeStream が入っていることを確認してください。(2) Lambda 関数の実行ロールの権限。特にAWSLambdaKinesisExecutionRole ポリシーがアタッチされてることを確認してください。

トリガーの設定は完了です。
以下のように、kds-reader1 のトリガー(入力ソース)に Kinesis が追加されていることが確認できます。

確認方法

確認方法は、データ生成元側でデータストリーム(本手順ならば kds-sharing-example1)へデータを送信して、データ処理側の Amazon CloudWatch Logs の出力を確認します。

データ生成元側の AWS CloudShell で、AWS CLI でデータストリームにデータを送信します。

aws --cli-binary-format raw-in-base64-out kinesis put-record --stream-name kds-sharing-example1 --partition-key DUMMY1 --data '{"this_is": "test record"}'

このように返ってくれば、データ送信成功です。

{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49649683864473953433843496278632352517587667908684677122"
}

データ処理側のCloudWatch Logsの kds-reader1 のログで、以下のようなログが確認できれば設定成功です。

{'Records': [{'kinesis': {'kinesisSchemaVersion': '1.0', 'partitionKey': 'DUMMY1', 'sequenceNumber': '49649683864473953433843496278632352517587667908684677122', 'data': 'eyJ0aGlzX2lzIjogInRlc3QgcmVjb3JkIn0=', 'approximateArrivalTimestamp': 1709100911.667}, 'eventSource': 'aws:kinesis', 'eventVersion': '1.0', 'eventID': 'shardId-000000000000:49649683864473953433843496278632352517587667908684677122', 'eventName': 'aws:kinesis:record', 'invokeIdentityArn': 'arn:aws:iam::888800008888:role/service-role/kds-reader1-role-q6zcv9kq', 'awsRegion': 'us-east-1', 'eventSourceARN': 'arn:aws:kinesis:us-east-1:999900009999:stream/kds-sharing-example1'}]}

あとがき ~ Amazon EventBridgeを用いたアーキテクチャー

今回は Amazon Kinesis Data Streams のリソースベースポリシーを用いたデータストリームの共有を紹介しました。これにより、例えば冒頭で紹介した Amazon Monitron のデータを他の AWS アカウントで利用できるようになるため、AWS アカウントの運用に自由度がでてくるのではないでしょうか。

Amazon Kinesis Data Streams のデータストリームを他の AWS アカウントで利用する方法には、他にはAmazon EventBridge のイベントバスを通じて送信するアーキテクチャーも考えられます。

このメリットはノーコード・マネージドサービスで構成できる点にあるでしょう。また、異なるリージョン間でも使えます。Amazon EventBridgeの利用料は発生しますが、その価値はあるアーキテクチャーだと言えそうです。

このアーキテクチャーを作る際に参考となるURLを紹介します。

単一の解が無い事を悩ましく思うかは賛否ありそうですが、保有スキルや実現したい運用形態に見合った構成が取れるのは、組み立てて創る「ビルディングブロック」の醍醐味だと個人的には思っています。

この構成に限らず、今後出てくるであろう新機能と共にその時に見合った構成に挑戦してみるのも良いのではないでしょうか。

― ソラコム松下(Max)