在 Apache Flink SQL 中,建议使用 Flink 的安全模块来存储凭据和其他机密信息。Flink 的安全模块提供了一个加密的键值存储,可以用于存储敏感数据,例如用户凭据和数据库凭据等。下面是一个简单的示例,演示了如何将用户名和密码保存在 Flink 安全存储中:
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.security.DynamicConfigurationLoader;
import org.apache.flink.runtime.security.KerberosConfLoader;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.security.modules.JaasModule;
import org.apache.flink.runtime.security.modules.PlainModule;
import org.apache.flink.security.credentials.*;
import org.apache.flink.util.StringUtils;
import java.io.File;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
public class FlinkSecretsDemo {
public static void main(String[] args) throws Exception {
// Initialize Flink's security module
SecurityConfiguration sc = new SecurityConfiguration();
sc.registerModule(JaasModule.class);
sc.registerModule(PlainModule.class);
URL securityDir = new File("/path/to/flink/security").toURI().toURL();
sc.loadFromDirectory(securityDir);
// Create credentials provider
CredentialsProvider credentialsProvider = SecurityUtils
.getInstalledContext()
.getCredentialsProvider();
// Store username and password
String username = "myuser";
String password = "mypassword";
byte[] encodedPassword = StringUtils.hexStringToByte(password);
SecureString securePassword = new SecureString(encodedPassword);
credentialsProvider.addCredentials(username, new UsernamePasswordCredentials(username, securePassword));
// Retrieve credentials
Credentials credentials = credentialsProvider.getCredentials(username);
System.out.println("Username: " + ((UsernamePasswordCredentials) credentials).getUsername());
System.out.println("Password: " + ((UsernamePasswordCredentials) credentials).getPassword().toString(StandardCharsets.UTF_8));
}
}
此示例演示了如