这个问题可能是由于多个 Lambda 实例以不同的速度读取和处理 SQS 消息队列中的消息所导致的。因此,我们需要使用可重入锁来确保只有一个实例能够读取和处理消息。
示例代码:
import threading
import boto3
lock = threading.Lock()
sqs = boto3.client('sqs')
queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789012/myqueue'
def lambda_handler(event, context):
messages = []
with lock:
try:
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20,
VisibilityTimeout=60
)
if 'Messages' in response:
messages = response['Messages']
except Exception as e:
print('Error: {}'.format(str(e)))
# Process messages
for message in messages:
# Your processing code goes here
print(message['Body'])
# Delete message
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
)
在上面的代码中,我们使用了 Python 中的 threading.Lock() 函数来创建了一个可重入锁,用来确保只有一个 Lambda 实例能够访问 SQS 消息队列。在 with lock: 语句块中,我们使用了 SQS 的 receive_message() 方法来读取最多 10 条消息。然后在 for 循环中,我们遍历了每个消息并进行处理,最后调用了 SQS 的 delete_message() 方法来删除已处理的消息。
使用可重入锁是一种避免 AWS SQS 与 Lambda 之间的竞争条件的方法,可以确保只有一个实例处理消息。