from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from airflow.contrib.sensors.file_sensor import FileSensor import os
dag = DAG( 'example_file_sensor', default_args=default_args, schedule_interval='@daily', catchup=False )
def print_hello(): return 'Hello world!'
t1 = BashOperator( task_id='print_hello', bash_command='echo "{{ task_instance_key_str }}" && echo "Hello World"', dag=dag, )
t2 = FileSensor( task_id='watch_input_directory', filepath=os.path.join(os.environ['HOME'], 'input_directory'), fs_conn_id='fs_default', poke_interval=10, dag=dag, )
t3 = PythonOperator( task_id='print_hello_again', python_callable=print_hello, dag=dag, )
t1 >> t2 >> t3