在Airflow中使用EMRAddStep时,如果HadoopJarStep参数具有以.json结尾的参数时无法添加EMR步骤,可以尝试通过以下方法解决:
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
# 定义EMR步骤
emr_step = {
'Name': 'Example Step',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'hadoop', 'jar', 'example.jar',
'--input', 's3://example/input',
'--output', 's3://example/output',
'--param', '{"key": "value"}'
]
}
}
# 将以.json结尾的参数转换为字符串形式
emr_step['HadoopJarStep']['Args'][-1] = '{"key": "value"}'
# 添加EMR步骤
add_step_task = EmrAddStepsOperator(
task_id='add_step_task',
job_flow_id='{{ task_instance.xcom_pull(task_ids="create_cluster_task", key="return_value") }}',
steps=[emr_step],
aws_conn_id='aws_default',
dag=dag
)
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
# 创建EMR集群
create_cluster_task = EmrCreateJobFlowOperator(
task_id='create_cluster_task',
job_flow_overrides=job_flow_overrides,
aws_conn_id='aws_default',
dag=dag
)
# 上传参数文件到S3
upload_param_task = S3Hook(aws_conn_id='aws_default').load_string(
'{"key": "value"}',
key='example.json',
bucket_name='example-bucket'
)
# 定义EMR步骤
emr_step = {
'Name': 'Example Step',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'hadoop', 'jar', 'example.jar',
'--input', 's3://example/input',
'--output', 's3://example/output',
'--param', 's3://example-bucket/example.json'
]
}
}
# 添加EMR步骤
add_step_task = EmrAddStepsOperator(
task_id='add_step_task',
job_flow_id='{{ task_instance.xcom_pull(task_ids="create_cluster_task", key="return_value") }}',
steps=[emr_step],
aws_conn_id='aws_default',
dag=dag
)
通过以上方法,您应该能够成功添加带有以.json结尾的参数的EMR步骤。
上一篇:Airflow在任务执行时间过长时使用SIGKILL终止任务。
下一篇:Airflow在使用MSSQL作为后端数据库时出现死锁问题,但在使用PostgreSQL作为后端数据库时同样的DAG工作正常。