将Apache Airflow的凭证存储在Openmetadata中的解决方法是,在Openmetadata中创建一个名为“airflow”的集合,并使用Airflow的连接(connection)名称作为实例的名称。使用以下代码存储凭证:
from metadata.ingestion.models.userprofile import UserProfile
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.ometa.openmetadata_shim import OpenMetadata
from metadata.ingestion.source.sql.sql_source import SQLSourceConfig
from metadata.ingestion.source.sql.sql_metadata_extractor import SQLMetadataExtractor
from metadata.ingestion.api.common import ConfigModel
class AirflowMetadataExtractor(SQLMetadataExtractor):
def __init__(self, config: SQLSourceConfig, metadata_config: MetadataServerConfig,
output_path: str, emitter=None, transformation_ctx=None):
super().__init__(config, metadata_config, output_path, emitter=emitter, transformation_ctx=transformation_ctx)
def get_metadata(self):
metadata = super().get_metadata()
credentials = self.config.credentials
if credentials and isinstance(credentials, ConfigModel):
airflow_credentials = credentials.dict(exclude_unset=True)
airflow_connection = airflow_credentials.get("connection")
if airflow_connection:
instance = self.ometa.create_instance("airflow", airflow_connection)
instance.properties = {"username": airflow_credentials.get("username"),
"password": airflow_credentials.get("password")}
self.ometa.emit_instance(instance, sync=True)
return metadata
这将创建一个名为“airflow”的集合,并使用Airflow的连接名称作为实例的名称,然后将凭据存储在此实例的属性中。