在代码中使用typehints指定类型来解决类型错误。具体做法如下所示:
首先,在导入所需要的库之后,使用typehints模块中的Type来定义一个自定义类型。例如:
from apache_beam.typehints import Type
class TableRef(Type): pass
接下来,在处理数据流之前,在函数的参数中添加一个注释来指定参数类型。例如:
from apache_beam import ParDo from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.transforms import DoFn
def my_function(table_reference: TableRef) -> None: pass
class MyDoFn(DoFn): def process(self, element, *args, **kwargs): my_function(element['tableRef'])
最后,在定义管道options时,使用type_check_strict参数,将其设置为True来启用类型检查。例如:
options = PipelineOptions() options.view_as(TypeOptions).type_check_strict = True
with beam.Pipeline(options=options) as p: # 此处省略数据处理过程
这样,我们就成功地解决了“TypeError for ApacheBeam Dataflow job: 'unable to determinisitcally encode