首先,确保已经安装了Airflow和Django。然后按照以下步骤进行操作:
$ django-admin startproject myproject
$ cd myproject
$ python manage.py startapp myapp
AIRFLOW_DAG_DIR = os.path.join(BASE_DIR, 'dags')
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'airflow': {
'level': 'DEBUG',
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow',
},
},
'loggers': {
'airflow.task': {
'handlers': ['airflow'],
'level': 'DEBUG',
'propagate': True,
},
},
'formatters': {
'airflow': {
'format': '%(asctime)s - %(name)s - %(levelname)s - %(process)d - %(message)s',
},
},
}
LOGGING_CONFIG = 'myproject.logging'
STATICFILES_DIRS = [
os.path.join(BASE_DIR, 'dags'),
]
from django.urls import include, path
urlpatterns = [
...
path('airflow/', include('airflow.urls')),
...
]
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
application = get_wsgi_application()
# Add Airflow DAG path
from airflow.models import DagBag
dag_bag = DagBag(os.path.join(os.path.dirname(__file__), 'dags'), include_examples=False)
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def hello_world():
print("Hello, World!")
dag = DAG('hello_world', description='Hello World DAG', schedule_interval='@once', start_date=datetime(2022, 1, 1))
task = PythonOperator(task_id='hello_world_task', python_callable=hello_world, dag=dag)
$ airflow scheduler
$ airflow webserver
现在,您可以在Django的admin页面中查看和管理Airflow的DAG和任务。此外,您还可以在Django中使用Airflow的日志记录功能来记录自定义日志消息。