dag = DAG(
'my_dag',
default_args=default_args,
schedule_interval=timedelta(days=1),
user_defined_macros={
'git_home': '/opt/bitnami/git',
'git_repo': 'https://github.com/myuser/myrepo.git',
'git_branch': 'master'
}
)
sync_operator = BashOperator(
task_id='sync_git_repo',
bash_command='cd {{ git_home }} && git fetch && git checkout {{ git_branch }} && git pull origin {{ git_branch }}',
dag=dag
)
airflow.cfg
,设置[core]
部分中的以下值:dag_processing_store_serialized_dags = True
worker_log_server_port = 8793 #(或其他可用的端口)
airflow worker --log-server-port 8793 #(或其他端口)
from airflow.models import DagBag
from airflow.utils.log.logging_mixin import LoggingMixin
import urllib.request
import json
log = LoggingMixin().log
def check_for_new_dags(interval=30):
"""
Check for new DAGs in the Git repository every 'interval' seconds
:param interval: Interval (in seconds) between checks for new DAGs (default: 30)
"""
while True:
try:
# Check Git repository for new DAGs
urllib.request.urlretrieve('http://localhost:8793/status', 'status.json')
with open('status.json') as f:
status = json.load(f)
if 'gitSync' in status and status['gitSync']:
dagbag = DagBag(dag_folder='/path/to/dags')
if dagbag.process_file('my_dag.py'):
log.info('DAG "my_dag" updated successfully')
else:
log.warning('DAG "my_dag" failed to update')
# Wait for the next check interval
time.sleep(interval)
except Exception as e:
log.warning(f'Error checking for new DAGs: {e}')
airflow webserver --port 8080
airflow scheduler
这些步骤将确