要解决"AirFlow DatabricksSubmitRunOperator不接受笔记本参数"的问题,您可以尝试以下解决方法:
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 1, 1)
}
dag = DAG('databricks_run_notebook', default_args=default_args, schedule_interval=None)
run_notebook_task = DatabricksRunNowOperator(
task_id='run_notebook',
dag=dag,
json={
'notebook_task': {
'notebook_path': '/path/to/notebook',
'base_parameters': {
'param1': 'value1',
'param2': 'value2'
}
}
}
)
json
参数传递笔记本参数。尽管DatabricksSubmitRunOperator不直接接受笔记本参数,但您可以在json
参数中指定笔记本路径和参数。以下是一个示例代码:from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 1, 1)
}
dag = DAG('databricks_submit_run', default_args=default_args, schedule_interval=None)
submit_run_task = DatabricksSubmitRunOperator(
task_id='submit_run',
dag=dag,
json={
'notebook_task': {
'notebook_path': '/path/to/notebook',
'base_parameters': {
'param1': 'value1',
'param2': 'value2'
}
}
}
)
请注意,以上示例代码是Airflow的基本示例,您可能需要根据您的实际需求进行适当的调整。
上一篇:Airflow daskexecutor 异常:"FileNotFoundError(2, '找不到文件或目录')" 在dask工作进程上
下一篇:airflow db migrate出现TypeError: SqlAlchemySessionInterface.__init__()缺少6个必需的位置参数。