在Amazon EMR中,创建集群时可以使用数据传输出。以下是使用AWS SDK for Python(Boto3)创建Amazon EMR集群并使用数据传输出的示例代码:
import boto3
# 创建EMR客户端
emr_client = boto3.client('emr', region_name='your_region')
# 设置输入和输出路径
input_path = 's3://your_input_bucket/input_path'
output_path = 's3://your_output_bucket/output_path'
# 创建集群配置
cluster_config = {
'Name': 'YourClusterName',
'LogUri': 's3://your_log_bucket/logs',
'ReleaseLabel': 'emr-6.4.0',
'Instances': {
'InstanceGroups': [
{
'Name': 'MasterInstanceGroup',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1,
},
{
'Name': 'CoreInstanceGroup',
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm5.xlarge',
'InstanceCount': 2,
}
],
'Ec2KeyName': 'your_key_pair_name',
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
'Ec2SubnetId': 'your_subnet_id'
},
'Applications': [
{'Name': 'Hadoop'},
{'Name': 'Hive'},
{'Name': 'Spark'},
{'Name': 'Pig'}
],
'BootstrapActions': [
{
'Name': 'Install Dependencies',
'ScriptBootstrapAction': {
'Path': 's3://your_bootstrap_script_bucket/bootstrap_script.sh'
}
}
],
'Steps': [
{
'Name': 'Example Step',
'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--class',
'your_main_class',
'--master',
'yarn',
'--deploy-mode',
'cluster',
'--executor-memory',
'4g',
's3://your_jar_bucket/your_application.jar',
'--input',
input_path,
'--output',
output_path
]
}
}
],
'VisibleToAllUsers': True
}
# 创建集群
response = emr_client.run_job_flow(**cluster_config)
# 输出集群ID
cluster_id = response['JobFlowId']
print(f"Cluster created with ID: {cluster_id}")
在上述代码中,需要替换以下参数:
your_region:您的AWS区域(例如,us-west-2)。your_input_bucket/input_path:输入数据的S3桶和路径。your_output_bucket/output_path:输出数据的S3桶和路径。YourClusterName:集群的名称。s3://your_log_bucket/logs:存储集群日志的S3桶和路径。emr-6.4.0:EMR版本。m5.xlarge:实例类型。your_key_pair_name:用于登录集群的EC2密钥对的名称。your_subnet_id:集群所在的VPC子网的ID。s3://your_bootstrap_script_bucket/bootstrap_script.sh:启动脚本的S3桶和路径。your_main_class:要在集群上运行的主类。s3://your_jar_bucket/your_application.jar:您的应用程序JAR文件的S3桶和路径。此代码将创建一个Amazon EMR集群,并在集群上运行一个示例步骤,该步骤使用Spark提交一个应用程序,并将输入和输出数据传输到S3桶。