AWS Glue 是一款处理大规模数据的 ETL(Extract, Transform and Load)服务,提供了分布式运行和自动调度等功能。在运行 AWS Glue 任务时,可以配置任务使用的 worker 数量,以加快数据处理速度。当然,更好地利用多核 CPU 也是必不可少的。
在 AWS Glue 中,任务的 worker 是 AWS 托管的 EC2 实例。为了更好地利用 worker 的多核 CPU,AWS Glue 引入了线程池的概念。处理任务的 worker 在任务运行期间创建线程池,然后可以将每个线程分配给不同的任务。在 AWS Glue 中,线程池的大小可以配置,这有助于提高总任务的并发性。
下面是一个使用线程池的 AWS Glue 任务示例。我们可以指定并行执行任务的线程数,并通过配置 worker 类型来控制 worker 的数量和类型:
import sys
import boto3
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import *
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'num_executors'])
num_executors = int(args['num_executors'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
input_path = "s3://mybucket/myinputpath/"
output_path = "s3://mybucket/myoutputpath/"
df = spark.read.format("csv").option("header","true").load(input_path)
df = df.withColumn("new_col", lit(1))
# 通过线程池并行执行 DataFrame 的计算
df.write.format("parquet").mode("overwrite").option("compression", "snappy") \
.option("numPartitions", num_executors*4) \
.option("maxRecordsPerFile", 10000000) \
.save(output_path)
在这个示例中,我们使用了 Spark 的函数来创建一个包含新列的