import boto3
sns = boto3.client('sns') subscriptions = sns.list_subscriptions()['Subscriptions'] for subscription in subscriptions: if subscription['Protocol'] == 'sqs' and subscription['TopicArn'] == 'YOUR_TOPIC_ARN' and subscription['SubscriptionArn'] != 'PendingConfirmation': print('Subscription is verified and active')
import boto3
sns = boto3.client('sns') policy = sns.get_topic_attributes(TopicArn='YOUR_TOPIC_ARN')['Attributes']['Policy'] print(policy)
如果策略未设置正确的访问权限,可以使用以下代码为其设置访问策略:
import boto3 from botocore.exceptions import ClientError
sns = boto3.client('sns')
policy = { "Version": "2008-10-17", "Id": "example", "Statement": [ { "Sid": "example-statement-ID", "Effect": "Allow", "Principal": { "AWS": "*" }, "Action": "sns:Publish", "Resource": "YOUR_TOPIC_ARN", "Condition": { "ArnEquals": { "aws:SourceArn": "YOUR_TOPIC_ARN" } } } ] }
try: sns.set_topic_attributes(TopicArn='YOUR_TOPIC_ARN', AttributeName='Policy', AttributeValue=json.dumps(policy)) except ClientError as e: print('Error: ', e)
import boto3
sqs = boto3.client('sqs')
try: queue_url = sqs.get_queue_url(QueueName='YOUR_QUEUE_NAME')['QueueUrl'] print(queue_url) except: print('Queue does not exist')
如果队列不存在,可以使用以下代码创建一个新队列:
import boto3
sqs = boto3.client('sqs')
response = sqs.create_queue( QueueName='YOUR_QUEUE_NAME' )
queue_url = response['QueueUrl'] print(queue_url)