这可能是由于Airflow每个DAG只允许单个任务实例运行引起的。为了避免这个问题,可以使用Airflow的分布式特性来运行多个任务实例,以便多个KafkaConsumer实例可以消费相同的Kafka topic。
以下是一个示例代码,其中基于Airflow使用Kafka Producer和Consumer的DAG被配置为并行地运行两个任务实例,每个实例使用自己的KafkaConsumer来消费相同的topic:
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.apache.kafka.hooks.kafka import KafkaHook
from airflow.providers.apache.kafka.operators.kafka import KafkaProducerOperator, KafkaConsumerOperator
from datetime import timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'kafka_dag',
default_args=default_args,
description='DAG to demonstrate parallel processing of Kafka topics in Airflow',
schedule_interval=timedelta(days=1),
catchup=False
)
kafka_hook = KafkaHook()
task_1 = KafkaConsumerOperator(
task_id='consume_from_kafka_1',
kafka_topic='my_topic',
consumer_group='my_consumer_group',
auto_offset_reset='earliest',
hooks=[kafka_hook],
dag=dag
)
task_2 = KafkaConsumerOperator(
task_id='consume_from_kafka_2',
kafka_topic='my_topic',
consumer_group='my_consumer_group',
auto_offset_reset='earliest',
hooks=[kafka_hook],
dag=dag
)
task_3 = KafkaProducerOperator(
task_id='produce_to_kafka',
kafka_topic='my_topic',
value=b'test_message',
hooks=[kafka_hook],
dag=dag
)
task_2.set_upstream(task_1)
task_3.set_upstream(task_1)
在这个