Airflow 中的 Worker 和 Scheduler 都是相互独立的进程,通过消息队列进行通信。如果需要在这两个进程之间共享数据,需要使用某些持久化的存储方式,比如数据库或者共享文件系统。
对于数据库方案,可以在 Airflow 的配置文件中指定数据库的 URL,并启用 SharedExecutor 模式。这样 Worker 和 Scheduler 都会连接到同一个数据库,就可以在数据库中存储共享的数据。例如可以使用数据库表中存储的临时任务输出数据进行共享。以下是一个使用 SQLAlchemy 作为数据库连接的示例:
from sqlalchemy import create_engine
from airflow import settings
from airflow.models import Connection
engine = create_engine(settings.SQL_ALCHEMY_CONN)
# 创建数据表用于存储共享数据
conn = Connection(
conn_id='shared_data',
uri='postgresql://localhost/airflow',
)
conn.extra = '{"shared_database_table":"shared_table"}'
sess = settings.Session()
sess.add(conn)
sess.commit()
sess.close()
# 在 worker 或者 scheduler 进程中使用共享数据
from airflow.utils.db import provide_session
@provide_session
def load_shared_data(session=None):
conn = session.query(Connection).filter(Connection.conn_id == 'shared_data').first()
table_name = conn.extra_dejson.get('shared_database_table')
sql = f'SELECT * FROM {table_name}'
result = session.execute(sql).fetchall()
return result
对于共享文件系统方案,可以在配置中指定一个共享文件系统路径,例如:
[core]
...
shared_file_system = /path/to/shared_dir
...
然后在 Worker 或者 Scheduler 中使用该路径即可访问共享数据:
# 在 worker 或者 scheduler 进程中使用共享数据
import os
SHARED_PATH = '/path/to/shared_dir'
def load_shared_data():
file_path = os.path.join(SHARED_PATH, 'shared_data.txt')
with open(file_path, 'r') as f:
data = f