Airflow可以存储在云端,例如使用AWS S3或Google Cloud Storage。具体实现方法如下:
S3例子:
[conn_id]
aws_conn_id=s3_conn
[s3_conn]
aws_access_key_id = ACCESS_KEY
aws_secret_access_key = SECRET_KEY
GCS例子:
[conn_id]
google_cloud_conn_id=gcs_conn
[gcs_conn]
google_cloud_storage_conn_id = gcs_conn
S3例子:
from airflow.models import DAG
from airflow.operators import BashOperator
dag = DAG(
'example_s3',
schedule_interval='0 0 * * *',
catchup=False
)
S3_BUCKET = 'my-airflow-bucket'
S3_KEY = 'example_s3.txt'
t1 = BashOperator(
task_id='upload_to_s3',
bash_command=f'aws s3 cp {local_file_path} s3://{S3_BUCKET}/{S3_KEY}',
dag=dag
)
t2 = BashOperator(
task_id='download_from_s3',
bash_command=f'aws s3 cp s3://{S3_BUCKET}/{S3_KEY} {local_download_path}',
dag=dag
)
GCS例子:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
dag = DAG(
'example_gcs',
description='Example DAG to upload and download files from GCS',
schedule_interval='@daily',
start_date=datetime(2022, 3, 1),
catchup=False
)
GCS_BUCKET = 'my-airflow-bucket'
GCS_KEY = 'example_gcs.txt'
t1 = BashOperator(
task_id='upload_to_gcs',
bash_command=f'gsutil cp {local_file_path} gs://{GCS_BUCKET}/{GCS_KEY}',
dag=dag
)
t2 = BashOperator(
task_id='download_from_gcs',
bash_command=f