from airflow.operators.postgres_operator import PostgresOperator
task = PostgresOperator(
task_id='my_task',
sql='SELECT ...',
execution_timeout=datetime.timedelta(minutes=10)
)
import psycopg2
import datetime
# Connect to the database
conn = psycopg2.connect(
host="database_host",
database="database_name",
user="username",
password="password"
)
# Get the current time and the timeout period we want to check for
now = datetime.datetime.now()
timeout_period = datetime.timedelta(minutes=10)
# Query the pg_stat_activity view for all queries that have been running for longer than the timeout period
cursor = conn.cursor()
cursor.execute("""
SELECT *
FROM pg_stat_activity
WHERE state = 'active' AND now() - query_start > %s
""", [timeout_period])
rows = cursor.fetchall()
for row in rows:
# Kill each query returned by the query above
cursor.execute('SELECT pg_terminate_backend(%s)', [row[0]])
conn.commit()
conn.close()
上一篇:AirflowPostgresHook中获取upsert行为时出现错误
下一篇:AirflowPostgresOperator:在使用postgres_conn_id=“redshift”时任务出现异常