下面是一个简单的Airflow任务示例,展示了如何使用Airflow的Operator来操作Oracle数据库:
from datetime import datetime
from airflow import DAG
from airflow.operators.oracle_operator import OracleOperator
# 定义DAG参数
dag_args = {
'owner': 'airflow',
'start_date': datetime(2021, 1, 1)
}
# 定义DAG
dag = DAG(
dag_id='airflow_oracle_operator_example',
default_args=dag_args,
schedule_interval=None
)
# 定义OracleOperator任务
oracle_task = OracleOperator(
task_id='oracle_task',
sql='SELECT * FROM your_table',
oracle_conn_id='your_oracle_conn',
dag=dag
)
# 设置任务依赖关系
oracle_task
在上述示例中,我们首先导入了OracleOperator
类,该类是Airflow中用于执行Oracle数据库操作的Operator。
然后,我们定义了一个DAG,并传入了一些基本参数,如owner
和start_date
。
接下来,我们创建了一个OracleOperator
任务,指定了任务的task_id
,要执行的SQL查询,以及用于连接Oracle数据库的连接ID。
最后,我们设置了任务之间的依赖关系,将oracle_task
添加到DAG中。
请注意,上述示例中的your_table
和your_oracle_conn
需要根据实际情况进行替换。