Issue #1

问题描述

触发 STOP_WITH_SAVEPOINT 之后,作业完成 savepoint 之后, 作业未正常退出

根因分析

报错原因: 通过查看 task 的运行状态,发现存在部分的 task 依旧处于 RUNNING 状态.

对应 task 的执行栈

"Legacy Source Thread - Source: Custom File Source (1/1)#4" #3929 prio=5 os_prio=0 tid=0x00007f05f020a800 nid=0x2dcdf waiting on condition [0x00007f058a851000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:226)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:138)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:74)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)

File Source Task 一直处于 sleep 状态, 未退出,导致整体未退出

File Source 对应的代码:


env.readFile(format, path, FileProcessingMode.PROCESS_CONTINUOUSLY, Time.minutes(30).toMilliseconds(), xxxxxx)

基本可以断定,由于 monitor-interval 设置的过长,导致 FileSource 长时间处于 sleep 状态, 未能及时响应 cancel 的逻辑。

解决方案

  1. 调整 monitor-interval, 使 FileSource task 能够及时发现作业进入 finish 状态, 并结束.

  2. 修改 FileSource 的逻辑, 能够及时通知到 cancel 了.

现在的逻辑: 监控新的文件之后, 进入 sleep 状态直到进入下一次循环:

    @Override
    public void run(SourceFunction.SourceContext<TimestampedFileInputSplit> context)
            throws Exception {
        Path p = new Path(path);
        FileSystem fileSystem = FileSystem.get(p.toUri());
        if (!fileSystem.exists(p)) {
            throw new FileNotFoundException("The provided file path " + path + " does not exist.");
        }

        checkpointLock = context.getCheckpointLock();
        switch (watchType) {
            case PROCESS_CONTINUOUSLY:
                while (isRunning) {
                    synchronized (checkpointLock) {
                        monitorDirAndForwardSplits(fileSystem, context);
                    }
                    Thread.sleep(interval);
                }

                // here we do not need to set the running to false and the
                // globalModificationTime to Long.MAX_VALUE because to arrive here,
                // either close() or cancel() have already been called, so this
                // is already done.

                break;
        ....
    }


    @Override
    public void close() throws Exception {
        super.close();

        if (checkpointLock != null) {
            synchronized (checkpointLock) {
                globalModificationTime = Long.MAX_VALUE;
                isRunning = false;
            }
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Closed File Monitoring Source for path: " + path + ".");
        }
    }

可能的修改方式:

  1. close 时,触发 interuupt 逻辑, 中断 sleep 流程
  2. 修改 sleep 的逻辑, 将逻辑拆分成多次sleep,比如一分钟一次, 每次sleep 之前判断 task 的 RUNNING 状态.

Issue #2

问题描述

Flink connector 写 Doris 出错 报错信息:

org.apache.doris.flink.exception.StreamLoadException: stream load error: status: -1, resp msg: status is not TEMPORARY_REDIRECT 307, status: 400, resp content: failed to load audit via AuditLoader plugin with label: flink_connector_20230215_124216

对应代码:

    private HttpURLConnection getConnection(String urlStr, String label) throws IOException {
        URL url = new URL(urlStr);
        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
        conn.setInstanceFollowRedirects(false);
        conn.setRequestMethod("PUT");
        String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
        conn.setRequestProperty("Authorization", "Basic " + authEncoding);
        conn.addRequestProperty("Expect", "100-continue");
        conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
        conn.addRequestProperty("label", label);
        for (Map.Entry<Object, Object> entry : streamLoadProp.entrySet()) {
            conn.addRequestProperty(String.valueOf(entry.getKey()), String.valueOf(entry.getValue()));
        }
        conn.setDoOutput(true);
        conn.setDoInput(true);
        return conn;
    }


 feConn = getConnection(loadUrlStr, label);
            int status = feConn.getResponseCode();
            // fe send back http response code TEMPORARY_REDIRECT 307 and new be location
            if (status != 307) {
                throw new Exception("status is not TEMPORARY_REDIRECT 307, status: " + status);
            }

产生原因

分割符设置问题, 代码中使用的列分割符是 “\01”, 当使用这个分割符时,直接报错 400, Doris 服务端问题需要近一步定位。

修复方式

修改分割符

Issue #3

问题描述

Flink SQL batch join 时结果不符合预期 对应的 SQL 如下:


SELECT t1.id1, t1.f2, t1.f3
FROM t1
LEFT JOIN t2
WHERE t2.`date` = 20230214

查看对应的 SQL 执行图,发现 LEFT JOIN 总是被优化成 INNER JOIN

原因分析

在 WHERE 后面加过滤条件,过滤的是 JOIN 之后的结果,但是上面的 JOIN 结果按照 t2 表的 date 字段进行过滤,这样满足条件的数据 t2.date 一定不为 NULL, 因此 LEFT JOIN 可以优化成 INNER JOIN, 但业务的实际需求是, 只选取 t2 表的 20230124 分区进行过滤。

修复方式

将对应的 SQL 改写为先过滤再 JOIN

SELECT t1.id1, t1.f2, t1.f3
FROM t1
LEFT JOIN (select * from t2 where `date`=20230214) t3