这个问题的原因是在sensor中设置了reschedule模式,会导致重复的日志。可以通过在dag中添加一个日志filter,来避免重复的日志。
例如,在dag中添加如下代码:
import logging
from airflow.utils.log.logging_mixin import LoggingMixin
class DeduplicationFilter(logging.Filter, LoggingMixin):
def filter(self, record):
last_log = getattr(self, 'last_log', None)
if last_log:
if record.msg == last_log.msg \
and record.levelno == last_log.levelno \
and record.args == last_log.args:
return False
setattr(self, 'last_log', record)
return True
然后在dag中的default_args
中添加log_filter=DeduplicationFilter()
,即可解决该问题,示例如下:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 12, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'log_filter': DeduplicationFilter()
}