Airflow支持对DAG进行访问控制。可以通过以下方式实现:
from airflow import models
from airflow.models import DAG, Variable, Connection
from airflow.models import DAG, DagBag, TaskInstance, TaskReschedule
from airflow.security import permissions, permissions as perms
from airflow.models.baseoperator import BaseOperator
from airflow.utils.db import provide_session
from datetime import datetime
# 创建一个“管理员”角色并授予权限
admin_role = models.Role(
role_name="admin",
permissions=[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
],
)
# 创建一个名为“admin”的用户并将其分配给“管理员”角色。
admin_user = models.User(
email='admin@example.com',
username='admin',
password='your_password',
roles=[admin_role],
)
# 创建一个“开发”角色并授予权限
developer_role = models.Role(
role_name="developer",
permissions=[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
],
)
# 创建一个名为“dev”的用户并将其分配给“开发人员”角色。
dev_user = models.User(
email='dev@example.com',
username='dev',
password='your_password',
roles=[developer_role],
)
access_control
方法实现访问控制。from airflow import DAG
from airflow.operators.python_operator import PythonOperator
# 用于演示的任务,打印“Hello World!”
def my_task():
print("Hello World!")
# 创建一个DAG,命名为“my_dag”,调度每天上午9点00分。
dag = DAG(
"my_dag",
default_args={"owner": "me"},
schedule_interval="@daily 9:00",
)
# 添加一个名为“run_my_task”的任务,并将其设置为PythonOperator
run_my_task = PythonOperator(
task_id="run_my_task",
python_callable=my_task,
dag=dag,
)
# DAG需要管理员角色才能读取和编辑它
dag.access_control = {
"admin": {"can_read", "can_edit"},
"developer": {"can_read"},
}
定义access_control之后,只有具有适当权限的用户才能编辑和读取DAG。 此外,还可以在DAG和Operator级别配置访问控制,然后将它们分配给角色。