在App Engine的YAML配置文件中,需要将dataflow_job_file参数指向正确的数据流作业文件路径。例如:
runtime: python38 entrypoint: gunicorn -w 4 -b :$PORT main:app
env_variables: GCP_PROJECT: my-project-id GOOGLE_APPLICATION_CREDENTIALS: /path/to/credentials.json DATAFLOW_JOB_FILE: /path/to/dataflow-job-file.py
handlers:
其中,DATAFLOW_JOB_FILE参数指向数据流作业文件的路径,必须保证路径正确有效。
示例代码中的dataflow-job-file.py文件应该像这样:
import apache_beam as beam
def run_pipeline(): # 构建和启动数据流管道 p = beam.Pipeline() lines = ['Hello', 'World!', 'Beam', 'Python'] (p | beam.Create(lines) | beam.Map(lambda x: x.lower()) | beam.Map(print)) p.run().wait_until_finish()
if name == 'main': run_pipeline()
这个示例仅仅是为了演示数据流作业的实时流式处理功能,实际使用中需要根据具体的业务需求编写相应的作业代码。