这个问题的原因可能是因为本地环境和 GCP Dataflow 环境之间的差异导致的。解决这个问题的方法是使用 Apache Beam 的 DirectRunner 来运行 Beam 管道的单元测试,以确保在本地和 GCP Dataflow 环境中的执行相同。
以下是使用 DirectRunner 运行 Beam 管道的示例:
import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
class MyPipeline(beam.Pipeline):
def __init__(self):
super(MyPipeline, self).__init__(runner=beam.runners.DirectRunner())
class MyTest(unittest.TestCase):
def test_beam_pipeline(self):
with TestPipeline() as p:
data = ['foo', 'bar', 'baz']
result = (
p
| beam.Create(data)
| beam.Map(lambda x: x.upper())
| beam.Map(print)
)
expected_result = data
self.assertEqual(result, expected_result)
if __name__ == '__main__':
unittest.main()
这个示例代码使用 DirectRunner 来运行测试管道,然后使用 TestPipeline 类来创建一个模拟的管道。在测试管道中,我们创建了一个字符串列表,然后将它们全部转换为大写并进行打印。最后,我们通过使用 assertEqual 函数来比较预期结果和管道的实际结果。
在这个示例中,我们可以看到,在使用 DirectRunner 运行管道时,它在本地和 GCP Dataflow 环境中的执行是相同的。