Issue #1

问题描述

使用 StreamingFileSink,报错 fileNotFoundException

File does not exist: /user/xxxx/xxx_realtime_sample/tfrecord/2023011202/.part-32-474.inprogress.e9b18821-adf3-4e45-b5e9-dacd1abe32c3

原因分析

Flink 作业在写入过程中, 内部发生了重启, 但下次恢复时, 由于其他定时任务, 对应时刻的文件刚好被 mv 到了其他的目录,导致出现了问题

解决方式

mv 文件时,要避开正在写入的文件, 只处理已经 commit 的文件即可

Issue #2

问题描述

使用 StreamingFileSink 时, 报错 DFSClient_NONMAPREDUCE_-xxxx is already the current lease holder.

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to APPEND_FILE /user/xxxx/xxx_realtime_sample/tfrecord/2023011202/.part-220-474.inprogress.6746e5ad-1f94-4345-82c8-8d7251394cc5 for DFSClient_NONMAPREDUCE_-640776598_106_application_1667456877234_4942861 on [xxxx](http://xxxxx) because DFSClient_NONMAPREDUCE_-640776598_106_application_1667456877234_4942861 is already the current lease holder.

at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2783)

at org.apache.hadoop.hdfs.server.namenode.FSDirAppendOp.appendFile(FSDirAppendOp.java:125)

at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2867)

at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:914)

at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:562)

at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:533)

at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)

at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1121)

at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1049)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1805)

at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3048

原因分析

private static void safelyTruncateFile(  
        final FileSystem fileSystem, final Path path, final HadoopFsRecoverable recoverable)  
        throws IOException {  
  
    ensureTruncateInitialized();  
  
    revokeLeaseByFileSystem(fileSystem, path);  
  
    // truncate back and append  
    boolean truncated;  
    try {  
        truncated = truncate(fileSystem, path, recoverable.offset());  
    } catch (Exception e) {  
        throw new IOException("Problem while truncating file: " + path, e);  
    }  
  
    if (!truncated) {  
        // Truncate did not complete immediately, we must wait for  
        // the operation to complete and release the lease.        
        revokeLeaseByFileSystem(fileSystem, path);  
    }  
}

Flink 作业在恢复时, StreamFileSink 会进行 truncate 操作, 调用 truncate 操作后,如果判断没有立刻 truncate 成功, 会调用 revokeLeaseByFileSystem 来等待 truncate 操作,但在这次调用中,抛出了如上异常导致问题。

原因分析 (by HDFS dev 同学分析)

  1. 在执行truncate之前先调用_revokeLeaseByFileSystem_(fileSystem, path),该方法最终会调用 dfs.recoverLease(path)。如果文件lease还没有超时(Flink中默认文件lease超时时间为100s)会进行循环等待,直到文件关闭或者lease达到超时时间退出。然后开始执行truncate。
  2. truncate可能并不会马上结束,这里分两种情况,如果truncate的点正好位于block的边界,truncate能够直接完成,此时truncate返回true。如果truncate的点不是block的边界,就会进入Block Recovery流程,此时truncate返回false。
  3. 在Flink中,如果truncate返回了false,则会继续调用_revokeLeaseByFileSystem_(fileSystem, path)循环等待,直到满足上述条件退出。接下来就会调用append方法开始往 in-progress 文件中追加写
  4. 这个作业并发比较多,100s的超时时间会经常出问题,所以调到了10分钟,但是还是报错。
  5. truncate跟手动调recoverLease虽然都会走recoverLeaseInternal,但是传的参数不同。看日志里,如果连续调两次,选出了不同的协调者,一前一后,而前面的比较慢,就会抛异常

解决方案

  1. 去掉第二次的 truncate , 改为循环等待判断文件是否处于 close 状态.