import boto3
TABLE_NAME = 'event_status_table'
def lambda_handler(event, context): event_id = event['id'] dynamodb = boto3.resource('dynamodb') table = dynamodb.Table(TABLE_NAME)
response = table.get_item(Key={'event_id': event_id})
if 'Item' in response:
# Already processed, skip
return {'status': 'skipped'}
# Process the event
# ...
# Save the status to the database
table.put_item(Item={'event_id': event_id})
return {'status': 'processed'}
这种方法需要使用异步触发器(例如EventBridge)来触发Lambda函数。
{ "StartAt": "ProcessEvent", "States": { "ProcessEvent": { "Type": "Task", "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:my-lambda-function", "InputPath": "$", "Parameters": { "id": "$$.Execution.Name", # Use the execution name as the idempotency identifier "event": "$.detail" }, "End": true, "Retry": [ { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 2, "MaxAttempts": 3, "BackoffRate": 1.0 } ] } } }
Lambda函数可以使用传入的id参数检查该事件是否已经被处理过。如果是,则可以跳过处理并立即返回结果。这种方法需要使用异步触发器(例如EventBridge)来触发Step Function。