Apache Beam无法识别自定义容器的参数,这可能是因为在创建容器运行环境时,未正确设置作业的参数。以下是一个以Kubernetes为环境的Beam作业的示例,解决该问题的方法是在作业创建配置中使用add_args()方法设置容器参数。
import apache_beam as beam
from apache_beam.options.pipeline_options import StandardOptions
options = PipelineOptions()
options.view_as(StandardOptions).runner = 'DataflowRunner'
job_name = 'my-job'
project_id = 'my-project'
staging_location = 'gs://my-bucket/staging'
temp_location = 'gs://my-bucket/temp'
region = 'us-central1'
service_account_email = 'my-service-account@google.com'
worker_zone = 'us-central1-a'
network = 'my-vpc'
subnetwork = 'my-subnet'
runner = 'DataflowRunner'
zone = 'us-central1-a'
num_workers = 5
worker_harness_container_image = 'my-custom-container-image'
worker_harness_container_image_args = [
'--my-arg', 'value1',
'--my-other-arg', 'value2',
]
beam_options = [
'--project={}'.format(project_id),
'--job_name={}'.format(job_name),
'--staging_location={}'.format(staging_location),
'--temp_location={}'.format(temp_location),
'--region={}'.format(region),
'--service_account_email={}'.format(service_account_email),
'--worker_zone={}'.format(worker_zone),
'--network={}'.format(network),
'--subnetwork={}'.format(subnetwork),
'--runner={}'.format(runner),
'--zone={}'.format(zone),
'--num_workers={}'.format(num_workers),
'--worker_harness_container_image={}'.format(worker_harness_container_image),
]
for arg in worker_harness_container_image_args:
beam_options.append('--worker_harness_container_image_args={}'.format(arg))
options = PipelineOptions(beam_options)
p = beam.Pipeline(options=options)
# 定义数据处理步骤 ...
result = p.run()
此示例在Beam作业创建配置中添加了worker_harness_container_image_args