カテゴリー
AWS Lambda SQS(Simple Queue Service)

AWS DynamoDBを使用してLambdaの冪等性を実装する

経緯

SQSのトリガーとしてLamdaを使用する際にFIFOキューであってもLabmdaの冪等にすべき事をお勧めするということでしたので、DynamoDBを使用してLambdaに冪等性を実装しました。

SQSにかかわらず、Lambdaを使用する上で多くの場合冪等性を実装する必要がありそうです。

Amazon SQS FIFOキューは、処理の順序がメッセージグループ内のメッセージの順序に従うことを保証します。ただし、Lambdaトリガーとして使用した場合、1回の配信のみを保証するものではありません。サーバーレスアプリケーションで配信が重要なのは1回だけの場合は、関数をべき等にすることをお勧めします。これは、Amazon DynamoDBなどのスケーラブルで低レイテンシのコントロールデータベースを使用して、メッセージの一意の属性を追跡することで実現できます。

https://aws.amazon.com/jp/blogs/compute/new-for-aws-lambda-sqs-fifo-as-an-event-source/

Lambda 関数を冪等にするにはどうすればよいですか?

https://aws.amazon.com/jp/premiumsupport/knowledge-center/lambda-function-idempotent/

冪等性とは?については下記の記事が参考になるかと思います。

サーバーレスが気になる開発者に捧ぐ「べき等性」ことはじめ

https://aws.amazon.com/jp/builders-flash/202104/serverless-idempotency/?awsf.filter-name=*all

今回のゴール

今回は冪等性の実装するにあたりDynamoDBを使用します。

何も難しい事はありません。ただ、処理済みのtransaction_idの値が来た時はその後の処理を行わずに正常終了するようにします。

また、テストするにあたり、SQSのトリガー機能を利用して、SQSにメッセージが送信された時にLambdaが起動するようにします。

DynamoDBの作成

[テーブル名]:Idempotent-test
[パーティションキー]:transaction_id

Lambdaの作成

IAMポリシーの作成

LambdaからDynamoDBにアクセスするで下記のポリシー(Allow_DynamoDB_Idempotent_test_putitem)を作成しました。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "dynamodb:PutItem",
                "dynamodb:DeleteItem"
            ],
            "Resource": "arn:aws:dynamodb:ap-northeast-1:(your account ID):table/Idempotent-test"
        }
    ]
}
Lambdaの作成

[関数名]:Idempotent-test
[ランタイム]:Python 3.9

作成されたロールに先ほど作成したポリシー( Allow_DynamoDB_Idempotent_test_putitem )をアタッチします。

Lambdaのコードは下記のようにしました。

今回はLambdaトリガーとしてSQSを設定してますので、エラーがあった時は例外を発生させて、メッセージが再処理されるようにしております。

import boto3
import json
import botocore.exceptions

TBL_NAME = 'Idempotent-test'

dynamodb = boto3.client('dynamodb')


def lock(transaction_id: str):
    try:
        dynamodb.put_item(
            TableName=TBL_NAME,
            Item={"transaction_id": {"S": transaction_id}},
            Expected={'transaction_id': {'Exists': False}}
        )
        return True
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
            return False
        else:
            raise e
    except Exception as e:
        raise e


def lambda_handler(event, context):
    try:
        for record in event['Records']:
            transaction_id = record['attributes']['MessageDeduplicationId']
            print(transaction_id)

            if(lock(transaction_id)):
                try:
                    # メイン処理
                    print(record)
                    print('done!')

                except Exception as e:
                    dynamodb.delete_item(
                        TableName=TBL_NAME,
                        Key={"transaction_id": {"S": transaction_id}}
                    )
                    raise e
            else:
                print('nothing to do')
    except botocore.exceptions.ClientError as e:
        print('critical error1')
        print(e)
        raise e
    except Exception as e:
        print('critical error2')
        print(e)
        raise e

確認前の準備

動作確認をしていきます。

今回はSQSのメッセージを受信する想定ですのでSQSのキューを作成します。

SQSキューの作成

[タイプ]:FIFO
[名前]:test-Idempotent-queue.fifo
あとは全てデフォルトで設定しました。

このままではLambdaにSQSキューのアクセス権限がありませんので、作成されたSQSキューのアクセスポリシーを下記に変更します。

{
  "Version": "2008-10-17",
  "Id": "__default_policy_ID",
  "Statement": [
    {
      "Sid": "__owner_statement",
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::(your account ID):root"
      },
      "Action": "SQS:*",
      "Resource": "arn:aws:sqs:ap-northeast-1:(your account ID):test-Idempotent-queue"
    },
    {
      "Sid": "2",
      "Effect": "Allow",
      "Principal": "*",
      "Action": "sqs:ReceiveMessage",
      "Resource": "arn:aws:lambda:ap-northeast-1:(your account ID):function:Idempotent-test"
    }
  ]
}

また、LambdaからSQSのメッセージが受信できるようにLambda関数のロールに”AWSLambdaSQSQueueExecutionRole”ポリシーをアタッチします。

確認

それでは確認していきます。

今回は、メッセージ除外IDが違う3種類のメッセージをキューに送信して、5分以上たった後、もう一度同じメッセージを送信しようと思います。

なぜ5分以上かというと、メッセージ重複除外ID(MessageDeduplicationId)がSQSに送信されてから5分以内に同じメッセージ重複除外IDがSQSに送信された時、SQSが自動で重複分を削除する仕様となっているからです。

SQSサービスで該当キューを選択して、「メッセージを送受信」ボタンをクリックします。

下記を入力してメッセージを送信します。

[メッセージ本文]:テスト
[メッセージグループID]:a
[メッセージ重複除外ID]:1

[メッセージ重複除外ID]を2~3に変更して繰り返し送信します。

CloudWatchログで確認すると、メッセージ重複除外ID1~3が正常に実行されておりました。(done!)

DynamoDBでも登録されていることが確認できました。

5分以上時間をおいて同様にメッセージ送信処理を実行します。

CloudWatchログで確認すると、メッセージ重複除外ID1~3がデータベース処理されずに正常に終了しておりました。(nothing to do)

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です