这个错误意味着在使用字典列表的时候,Airflow无法将它们哈希为唯一的键来进行操作。要解决这个问题,可以使用Python内置的frozenset()函数将字典转换为不可变的类型,从而使其成为可哈希的对象。
以下是一些代码示例,演示如何在Airflow TaskGroup中使用frozenset()将字典列表变为可哈希对象:
from airflow.decorators import dag, task
from airflow.utils.task_group import TaskGroup
from typing import List
@task
def extract_values(d: dict) -> List[int]:
values = []
keys = frozenset(d.keys())
if 'a' in keys:
values.append(d['a'])
if 'b' in keys:
values.append(d['b'])
if 'c' in keys:
values.append(d['c'])
return values
@dag(default_args=default_args, schedule_interval=None, start_date=datetime(2020, 1, 1))
def example_dag():
data = [{'a': 1, 'b': 2, 'c': 3}, {'a': 4, 'b': 5}, {'a': 6, 'c': 7}]
with TaskGroup('value_extraction') as tg:
for d in data:
extract_values(d)
example_dag = example_dag()
在这个代码示例中,我们使用frozenset()函数将每个字典的键转换为一个不可变的集合,然后在我们的extract_values()任务中使用这个集合来提取字典中的特定值。这将允许Airflow将字典列表的每个元素正确处理为可哈希对象。