在提交作业时,避免每次上传外部JAR的Flink配置,可以使用Flink的集群模式来实现。下面是一个示例代码:
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.ProgramMissingJobException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.yarn.client.YarnClusterClientFactory;
import java.io.File;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
public class FlinkJobSubmitter {
public static void main(String[] args) throws Exception {
// 设置Flink配置
Configuration flinkConfig = new Configuration();
flinkConfig.setInteger("parallelism", 4);
// 创建一个Flink集群客户端
YarnClusterClientFactory clusterClientFactory = new YarnClusterClientFactory();
ClusterClient> clusterClient = clusterClientFactory.createClusterClient(flinkConfig);
// 创建Flink执行环境
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(clusterClient);
// 添加外部JAR依赖
List jarFiles = Arrays.asList(
new File("/path/to/external.jar").toURI().toURL()
);
env.registerExternalJars(jarFiles.toArray(new URL[0]));
// 使用ParameterTool解析命令行参数
ParameterTool params = ParameterTool.fromArgs(args);
// 设置作业参数
env.getConfig().setGlobalJobParameters(params);
// 提交作业
try {
PackagedProgram program = new PackagedProgram(new File("/path/to/job.jar"));
JobGraph jobGraph = program.getJobGraph();
clusterClient.runDetached(jobGraph);
} catch (ProgramMissingJobException e) {
System.err.println("No Flink job found in JAR.");
} catch (ProgramInvocationException e) {
System.err.println("Failed to invoke Flink job.");
e.printStackTrace();
} catch (JobExecutionException e) {
System.err.println("Failed to execute Flink job.");
e.printStackTrace();
}
// 关闭集群客户端
clusterClient.shutdown();
}
}
上述代码使用了YarnClusterClient
来创建一个Flink集群客户端,并通过ExecutionEnvironment
创建了一个远程执行环境。在提交作业之前,通过env.registerExternalJars()
方法添加了外部JAR依赖。然后,使用ParameterTool
解析命令行参数,并使用PackagedProgram
和JobGraph
来构建和提交作业。最后,关闭集群客户端。
在实际使用中,你需要将代码中的/path/to/external.jar
和/path/to/job.jar
替换为你实际的外部JAR文件路径。
上一篇:避免每次输入变化都进行网络请求