Issue #1
问题描述
使用 StreamingFileSink,报错 fileNotFoundException
File does not exist: /user/xxxx/xxx_realtime_sample/tfrecord/2023011202/.part-32-474.inprogress.e9b18821-adf3-4e45-b5e9-dacd1abe32c3
原因分析
Flink 作业在写入过程中, 内部发生了重启, 但下次恢复时, 由于其他定时任务, 对应时刻的文件刚好被 mv 到了其他的目录,导致出现了问题
解决方式
mv 文件时,要避开正在写入的文件, 只处理已经 commit 的文件即可
Issue #2
问题描述
使用 StreamingFileSink 时, 报错 DFSClient_NONMAPREDUCE_-xxxx is already the current lease holder.
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to APPEND_FILE /user/xxxx/xxx_realtime_sample/tfrecord/2023011202/.part-220-474.inprogress.6746e5ad-1f94-4345-82c8-8d7251394cc5 for DFSClient_NONMAPREDUCE_-640776598_106_application_1667456877234_4942861 on [xxxx](http://xxxxx) because DFSClient_NONMAPREDUCE_-640776598_106_application_1667456877234_4942861 is already the current lease holder.
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2783)
at org.apache.hadoop.hdfs.server.namenode.FSDirAppendOp.appendFile(FSDirAppendOp.java:125)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2867)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:914)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:562)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:533)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1121)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1049)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1805)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3048
原因分析
private static void safelyTruncateFile(
final FileSystem fileSystem, final Path path, final HadoopFsRecoverable recoverable)
throws IOException {
ensureTruncateInitialized();
revokeLeaseByFileSystem(fileSystem, path);
// truncate back and append
boolean truncated;
try {
truncated = truncate(fileSystem, path, recoverable.offset());
} catch (Exception e) {
throw new IOException("Problem while truncating file: " + path, e);
}
if (!truncated) {
// Truncate did not complete immediately, we must wait for
// the operation to complete and release the lease.
revokeLeaseByFileSystem(fileSystem, path);
}
}
Flink 作业在恢复时, StreamFileSink 会进行 truncate 操作, 调用 truncate 操作后,如果判断没有立刻 truncate 成功, 会调用 revokeLeaseByFileSystem 来等待 truncate 操作,但在这次调用中,抛出了如上异常导致问题。
原因分析 (by HDFS dev 同学分析)
- 在执行truncate之前先调用_revokeLeaseByFileSystem_(fileSystem, path),该方法最终会调用 dfs.recoverLease(path)。如果文件lease还没有超时(Flink中默认文件lease超时时间为100s)会进行循环等待,直到文件关闭或者lease达到超时时间退出。然后开始执行truncate。
- truncate可能并不会马上结束,这里分两种情况,如果truncate的点正好位于block的边界,truncate能够直接完成,此时truncate返回true。如果truncate的点不是block的边界,就会进入Block Recovery流程,此时truncate返回false。
- 在Flink中,如果truncate返回了false,则会继续调用_revokeLeaseByFileSystem_(fileSystem, path)循环等待,直到满足上述条件退出。接下来就会调用append方法开始往 in-progress 文件中追加写
- 这个作业并发比较多,100s的超时时间会经常出问题,所以调到了10分钟,但是还是报错。
- truncate跟手动调recoverLease虽然都会走recoverLeaseInternal,但是传的参数不同。看日志里,如果连续调两次,选出了不同的协调者,一前一后,而前面的比较慢,就会抛异常
解决方案
- 去掉第二次的 truncate , 改为循环等待判断文件是否处于 close 状态.