在测试代码中添加数据生成器并强制转换数据类型
在 Apache Beam 流水线测试中,如果出现 Assertion Error 错误信息,一般是由于测试数据类型不匹配所导致的。为了解决这个问题,可以在测试代码中添加数据生成器,并强制转换数据类型。
例如,在以下代码中,我们创建一个简单的流水线,对文本中的单词进行计数:
import apache_beam as beam
pipeline = beam.Pipeline()
words = ['hello', 'world',
'hello', 'beam',
'world', 'beam',
'apache', 'beam',
'hello', 'apache']
word_counts = (
pipeline
| beam.Create(words)
| beam.Map(lambda word: (word, 1))
| beam.CombinePerKey(sum)
)
# test code
import logging
import unittest
class WordCountTest(unittest.TestCase):
def test_word_count_pipeline(self):
test_data = [
('hello', 3),
('world', 2),
('beam', 3),
('apache', 2)
]
result = (
test_data
| beam.Create()
| beam.Map(lambda x: (x[0], x[1]))
| word_counts
)
# check the output
for r in result:
logging.info(r)
self.assertIn(r, test_data)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main(argv=['first-arg-is-ignored'], exit=False)
在运行测试时,会报出以下 Assertion Error 错误信息:
ERROR: test_word_count_pipeline (__main__.WordCountTest)
----------------------------------------------------------------------
Traceback (most recent call last):
...
File "/usr/local/lib/python3.7/site-packages/apache_beam/testing/test_pipeline.py", line 612, in assert_that
_assert_that(actual, matcher, self._pipeline_options, label)
File "/usr/local/lib/python3.7/site-packages/apache_beam/testing/test_pipeline.py", line 634, in _assert_that
matcher = _create_flatten_matcher(matcher)