要解决AWS Lambda函数在Airflow中触发长时间运行时的350秒超时问题,可以使用以下解决方法:
dagrun_timeout属性来实现。from datetime import timedelta
from airflow import DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'retries': 3,
'retry_delay': timedelta(minutes=5),
'dagrun_timeout': timedelta(hours=2) # 设置较大的超时时间
}
dag = DAG('example_dag', default_args=default_args, schedule_interval='0 0 * * *')
首先,创建一个Step Functions状态机定义,该定义包含调用Lambda函数的步骤,并将超时时间设置为所需的时间。例如,以下是一个简单的状态机定义:
{
"Comment": "A Hello World example of the Amazon States Language using a Pass state",
"StartAt": "InvokeLambda",
"States": {
"InvokeLambda": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:FUNCTION_NAME",
"TimeoutSeconds": 600, # 设置超时时间为600秒
"End": true
}
}
}
然后,使用boto3或AWS Step Functions控制台将该状态机定义上传到AWS。
最后,在Airflow DAG中使用BashOperator来触发Step Functions状态机,例如:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('example_dag', default_args=default_args, schedule_interval='0 0 * * *')
invoke_step_functions = BashOperator(
task_id='invoke_step_functions',
bash_command='aws stepfunctions start-execution --state-machine-arn "STATE_MACHINE_ARN"',
dag=dag
)
在上述示例中,BashOperator用于执行AWS CLI命令来触发Step Functions状态机。确保将STATE_MACHINE_ARN替换为实际的Step Functions状态机ARN。
通过使用Step Functions来管理长时间运行的Lambda函数,可以避免Airflow的默认超时时间限制,并且可以更好地控制和监视Lambda函数的执行。