可以通过修改默认的停止信号来解决此问题,例如将默认的 SIGTERM 信号修改为 SIGINT 信号。以下是示例代码:
from google.cloud.storage import Client
from airflow.contrib.operators import GoogleSheetsToGCSOperator
import psutil
import signal
def set_signal_handler():
def kill_process(signum, frame):
pid = os.getpid()
parent = psutil.Process(pid)
children = parent.children(recursive=True)
for child in children:
child.send_signal(signal.SIGINT)
signal.signal(signal.SIGTERM, kill_process)
def airflow_task():
set_signal_handler()
gcs_bucket = 'my-gcs-bucket'
gcs_object = 'output.csv'
spreadsheet_id = 'my-spreadsheet-id'
sheet_name = 'Sheet1'
range_name = 'A1:E'
gcs_conn_id = 'google_cloud_storage_default'
sheets_conn_id = 'google_sheets_default'
client = Client()
bucket = client.get_bucket(gcs_bucket)
blob = bucket.blob(gcs_object)
blob.upload_from_string('')
sheets_to_gcs = GoogleSheetsToGCSOperator(
task_id='sheets_to_gcs',
spreadsheet_id=spreadsheet_id,
sheet_name=sheet_name,
range_name=range_name,
gcs_bucket=gcs_bucket,
gcs_object=gcs_object,
gcs_conn_id=gcs_conn_id,
sheets_conn_id=sheets_conn_id
)
sheets_to_gcs.execute()
if __name__ == '__main__':
airflow_task()
以上是一个示例的代码片段,该代码将默认停止信号从 SIGTERM 修改为 SIGINT,以避免使用 GoogleSheetsToGCSOperator 时发生 Negsignal.SIGKILL 错误。