在使用Airflow连接Postgres数据库时,可能会遇到找不到Postgres模块的问题。这时,需要安装psycopg2依赖包来解决。
pip install psycopg2-binary
如果已经安装了psycopg2而且仍然出现此问题,请尝试重新安装。运行以下命令卸载psycopg2:
pip uninstall psycopg2
然后重新安装psycopg2-binary。
from airflow.providers.postgres.hooks.postgres import PostgresHook from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow import DAG from airflow.providers.postgres.hooks.postgres import PostgresHook from datetime import datetime
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2021, 6, 1), }
dag = DAG( 'postgres_example', default_args=default_args, description='Example DAG for Postgres', schedule_interval='@once', )
pg_hook = PostgresHook(postgres_conn_id='postgres_default')
def get_table_data(): with pg_hook.get_conn() as conn: with conn.cursor() as cursor: cursor.execute('SELECT * FROM my_table') data = cursor.fetchall() return data
t1 = PythonOperator( task_id='get_table_data', python_callable=get_table_data, dag=dag, )
from airflow import DAG from airflow.providers.postgres.operators.postgres import PostgresOperator from datetime import datetime
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2021, 6, 1), }
dag = DAG( 'postgres_sql_example', default_args=default_args, description='Example DAG for Postgres SQL', schedule_interval='@once', )
t1 = PostgresOperator