在Airflow DAG中,可以使用CreateJobFlowOperator
和EmrAddStepsOperator
操作符创建EMR集群并向其添加步骤。CreateJobFlowOperator
可以接收EMR集群的各种配置选项,包括AutoTerminationPolicy
。示例代码如下:
from datetime import datetime
from airflow import DAG
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
from airflow.models import Variable
# Get the AWS connection id from Airflow's UI
aws_conn_id = Variable.get('aws_connection_id')
# Define the DAG
dag = DAG(
dag_id='emr_example',
default_args={
'owner': 'airflow',
'start_date': datetime(2021, 1, 1)
},
schedule_interval=None
)
# Define the EMR create job flow operator
create_job_flow = EmrCreateJobFlowOperator(
task_id='create_job_flow',
aws_conn_id=aws_conn_id,
emr_conn_id=aws_conn_id,
job_flow_overrides={
'Name': 'example_emr_cluster',
'ReleaseLabel': 'emr-6.2.0',
'LogUri': 's3://example-bucket/emr-logs',
'Instances': {
'InstanceGroups': [
{
'Name': 'Master nodes',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1
},
{
'Name': 'Worker nodes',
'InstanceRole': 'CORE',
'InstanceType': 'm5.xlarge',
'InstanceCount': 2
}
],
'Ec2KeyName': 'example_keypair',
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False,
'Ec2SubnetId': 'subnet-0123456789abcdef0',
'EmrManagedMasterSecurityGroup': 'sg-0123456789abcdef0',
'EmrManagedSlaveSecurity