要在Airflow中实现彩色日志记录,可以按照以下步骤进行操作:
安装依赖库:
pip install colorlog
创建一个自定义的日志处理器(ColoredStreamHandler)来格式化和输出彩色日志:
import logging
from colorlog import ColoredFormatter
class ColoredStreamHandler(logging.StreamHandler):
def __init__(self):
super().__init__()
self.setFormatter(ColoredFormatter(
"%(log_color)s%(levelname)s:%(name)s:%(message)s",
log_colors={
'DEBUG': 'green',
'INFO': 'blue',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red,bg_white',
},
secondary_log_colors={},
style='%'
))
在Airflow的dag文件中,设置日志处理器为ColoredStreamHandler:
import logging
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'start_date': datetime(2022, 1, 1),
}
dag = DAG('example_dag', default_args=default_args)
def my_task():
logging.debug('This is a debug message')
logging.info('This is an info message')
logging.warning('This is a warning message')
logging.error('This is an error message')
logging.critical('This is a critical message')
task = PythonOperator(
task_id='my_task',
python_callable=my_task,
dag=dag
)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime.timedelta(minutes=60)
task.dag.dagrun_timeout = datetime