sql_alchemy_conn = sqlite:///my_database.db
from airflow import models
from airflow.utils.db import provide_session
from sqlalchemy import exc
from sqlalchemy import event
from sqlalchemy.engine import Engine
@event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
if isinstance(dbapi_connection, sqlite3.Connection):
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
@provide_session
def _set_sqlite_pragma_nulls(executor, session=None):
try:
models.Connection._get_engine()
executor.execute("PRAGMA foreign_keys=on")
except exc.OperationalError:
# sqlite does not have a separate process that runs and is the parent
# of these threads. As a result if multiple threads call here at the
# same time, it's possible to leak a connection object and then fail.
# Against the local file version this is just messy but not harmful. But
# against network accessed sqlite server this leaks the connection.
session.rollback()
executor.execute("PRAGMA foreign_keys=on")
_set_sqlite_pragma_nulls()
$ rm airflow.db
$ airflow initdb
注意,这会清除你所有的现有Airflow项目,包括DAG、任务实例等。