在AWS中实现分布式锁的一种常见方法是使用DynamoDB作为锁存储的后端。以下是一个使用DynamoDB实现分布式锁的示例代码:
import boto3
import time
class DistributedLock:
def __init__(self, lock_name, table_name):
self.lock_name = lock_name
self.table_name = table_name
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table(table_name)
def acquire_lock(self, timeout=10):
lock_acquired = False
expiration_time = time.time() + timeout
while time.time() < expiration_time and not lock_acquired:
response = self.table.put_item(
Item={
'lock_name': self.lock_name,
'expiration_time': int(time.time()) + 10 # 设置锁的过期时间为10秒
},
ConditionExpression='attribute_not_exists(lock_name)' # 只有当锁不存在时才写入
)
if response.get('ResponseMetadata', {}).get('HTTPStatusCode') == 200:
lock_acquired = True
else:
time.sleep(0.1) # 等待一段时间后重试
return lock_acquired
def release_lock(self):
self.table.delete_item(
Key={
'lock_name': self.lock_name
}
)
# 使用示例
lock = DistributedLock('my_lock', 'my_table')
if lock.acquire_lock():
try:
# 在这里执行需要加锁的代码
print('Lock acquired!')
finally:
lock.release_lock()
else:
print('Failed to acquire lock!')
上述示例代码使用DynamoDB的put_item
方法来创建一个锁,使用ConditionExpression
来确保只有当锁不存在时才创建。锁的过期时间可以自行设置,示例中设置为10秒。在获取锁时,如果锁已存在,则等待一段时间后重试,直到超时时间到达或成功获取锁。释放锁时,使用DynamoDB的delete_item
方法删除锁。
注意,上述示例中的table_name
参数需要替换为您自己的DynamoDB表名。
上一篇:AWS中的ETL转换