Airflow中的S3KeySensor是一种传感器,用于监测S3存储桶中指定键值的对象是否存在。默认情况下,S3KeySensor只检查S3存储桶中指定键值的对象是否存在,而不会检查该对象是否完整加载。
如果需要检查对象是否完整加载,可以在DAG定义中指定' min_bytes '参数。该参数定义了指定键值的对象必须具备的最小文件大小。如果对象的文件大小小于这个值,S3KeySensor将继续等待,并在达到最大尝试次数后失败。
下面是一个检查文件是否完整加载的示例代码:
from airflow import DAG from airflow.contrib.sensors.aws_s3_key_sensor import S3KeySensor from datetime import datetime, timedelta
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2019, 1, 1), 'retries': 3, 'retry_delay': timedelta(minutes=5), }
with DAG('s3_file_sensor', default_args=default_args, schedule_interval=timedelta(days=1)) as dag: sensor_task = S3KeySensor(task_id='s3_file_sensor', bucket_key='s3://mybucket/myfile.txt', wildcard_match=True, min_bytes=1024, poke_interval=60 * 5, timeout=60 * 60 * 24, mode='s3', aws_conn_id='aws_default')