在Flink程序中,可以通过配置以下参数来解决此问题:
conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.policy", true);
conf.set("dfs.client.block.write.replace-datanode-on-failure.best-effort", "true");
conf.set("dfs.client.max.block.acquire.failures", "1");
其中,dfs.client.block.write.replace-datanode-on-failure.policy配置设置为true,表示在写入数据节点出现故障时,会将该节点替换为其他可用节点。dfs.client.block.write.replace-datanode-on-failure.best-effort配置设置为true,表示在出现故障时,Flink会尽力将节点替换为其他可用节点。dfs.client.max.block.acquire.failures配置设置为1,表示在获取数据节点失败后,Flink会尝试重新获取数据节点一次。
通过配置以上参数,可以大大减少Flink程序写入HDFS时出现RpcException异常的概率。