在 AWS Glue 作业3.0 中无法直接导入 psycopg2,因为 AWS Glue 是运行在托管环境中的服务器。解决方法是使用 AWS Glue 的 Job Bookmarks 功能和Lambda Layers。
步骤如下:
pip install pipreqs
pipreqs /path/to/your/python/code --force
import os
import boto3
from botocore.exceptions import ClientError
import tempfile
import zipfile
import shutil
#Install psycopg2 in temp directory
tempdir=tempfile.mkdtemp()
print("Temp dir=",tempdir)
cmd="pip3 install psycopg2-binary -t "+tempdir
os.system(cmd)
# Create a zip file with the psycopg2 package
zip_file = tempfile.mkstemp(suffix='.zip')[1]
with zipfile.ZipFile(zip_file, mode='w') as package:
for root, dirs, files in os.walk(tempdir):
for file in files:
filename = os.path.join(root, file)
package.write(filename,
os.path.relpath(filename, tempdir))
# Upload the package to Lambda Layers
layer_name = 'psycopg2-layer'
layer_description = 'psycopg2 library for Python'
layer_license = 'BSD-3-Clause'
client = boto3.client('lambda')
try:
response = client.publish_layer_version(
LayerName=layer_name,
Content={
'ZipFile': open(zip_file, 'rb').read(),
},
Description=layer_description,
CompatibleRuntimes=[
'python3.8',
],
LicenseInfo=layer_license
)
print(response)
finally:
os.remove(zip_file)
print("Lambda layer created successfully!")
import boto3
import sys
#Add psycopg2 Lambda layer
client = boto3.client('lambda')
response = client.list_layer_versions(
LayerName='psycopg2-layer'
)
latest_version = response['LayerVersions'][0]['Version']
gluepysparkjob_params = {"--extra-py-files": f"arn:aws:lambda:REGION:ACCOUNT_ID:layer:psycopg2-layer:{latest_version}"}
def main():
try:
spark_submit_args = ["gluepysparksubmit"]
spark_submit_args.extend(sys.argv[1:])
for i, arg in enumerate(spark_submit_args):
if arg == "--extra-py-files":
gluepysparkjob_params[arg] += "," + gluepysparkjob_params[arg + "1"]
del gluepysparkjob_params[arg + "1"]
for param, value in gluepysparkjob_params.items():
if not f"{param} {value}" in spark_submit_args:
spark_submit_args.append(param)
spark_submit_args.append(value)
print(f"Running spark-submit with: {spark_submit_args}")
# start spark-submit process with modified arguments
subprocess.Popen(spark_submit_args)
except Exception as e:
print(f"Error running main function: {e}")