Flink大型集群响应缓慢问题分析及解决方案

2022-08-19

< view all posts

在部署大型 Flink 集群时,发现 Flink 集群相应不正常:Flink 对控制台命令的相应十分缓慢,且 Flink Web 界面加载卡顿,刷不出内容,或刷出的内容均为空白。介绍对这一问题的解决方案。

首先,通过查看 Flink 日志,发现有大量查找 hostname 失败的记录:

2022-10-21 16:07:09,260 |flink-akka.actor.default-dispatcher-4|WARN |TaskManagerLocation - No hostname could be resolved for the IP address 172.xx.xx.xx, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.[org.apache.flink.runtime.taskmanager.TaskManagerLocation.getHostName(TaskManagerLocation.java:213)]
2022-10-21 16:07:09,275 |flink-akka.actor.default-dispatcher-4|WARN |TaskManagerLocation - No hostname could be resolved for the IP address 172.xx.xx.xx, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.[org.apache.flink.runtime.taskmanager.TaskManagerLocation.getHostName(TaskManagerLocation.java:213)]
2022-10-21 16:07:09,276 |flink-akka.actor.default-dispatcher-4|WARN |TaskManagerLocation - No hostname could be resolved for the IP address 172.xx.xx.xx, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.[org.apache.flink.runtime.taskmanager.TaskManagerLocation.getHostName(TaskManagerLocation.java:213)]

进一步分析,可以定位到问题的原因是 Flink 在注册 Taskmanager 时,会调用 inetAddress.getCanonicalHostName() 方法获取每个 tm 机器的 hostname,这个方法会查询 inetAddress IP 地址的主机名,如果查询不到,就会卡在这里。并进行大量重试,导致 Taskmanager 数量较多时页面卡顿。

这是Flink 1.12 及之前版本中的一个缺陷,在这篇腾讯云的开发者博客中也有记录:《修复 Flink Kubernetes 资源分配慢》:反查主机名应该设计为异步的,或者lazy的,但是代码中却以同步的方式调用,导致 TaskManager 的注册被堵塞。

与之相关的具体代码在 flink-runtime model 源代码的 TaskManagerLocation 当中:

/**
 * Gets the fully qualified hostname of the TaskManager based on the network address.
 *
 * @param inetAddress the network address that the TaskManager binds its sockets to
 * @return fully qualified hostname of the TaskManager
 */
private static String getFqdnHostName(InetAddress inetAddress) {
   String fqdnHostName;
   try {
      fqdnHostName = inetAddress.getCanonicalHostName();
   } catch (Throwable t) {
      LOG.warn("Unable to determine the canonical hostname. Input split assignment (such as " +
         "for HDFS files) may be non-local when the canonical hostname is missing.");
      LOG.debug("getCanonicalHostName() Exception:", t);
      fqdnHostName = inetAddress.getHostAddress();
   }

   return fqdnHostName;
}

/**
 * Gets the hostname of the TaskManager based on the network address.
 *
 * @param inetAddress the network address that the TaskManager binds its sockets to
 * @return hostname of the TaskManager
 */
public static String getHostName(InetAddress inetAddress) {
   String hostName;
   String fqdnHostName = getFqdnHostName(inetAddress);

   if (fqdnHostName.equals(inetAddress.getHostAddress())) {
      // this happens when the name lookup fails, either due to an exception,
      // or because no hostname can be found for the address
      // take IP textual representation
      hostName = fqdnHostName;
      LOG.warn("No hostname could be resolved for the IP address {}, using IP address as host name. "
         + "Local input split assignment (such as for HDFS files) may be impacted.", inetAddress.getHostAddress());
   } else {
      hostName = NetUtils.getHostnameFromFQDN(fqdnHostName);
   }

   return hostName;
}

通过自己写一个简单的验证程序,可以重现这个问题:

import java.net.InetAddress;
import java.net.UnknownHostException;

public class HostnameTest {
    public static void main(String[] args) throws UnknownHostException {

        String ip = args[0];
        System.out.println("Hostname for ip " + ip + " is:");

        String hostname = InetAddress.getByName(ip).getCanonicalHostName();
        System.out.println(hostname);

    }
}

运行这个程序,查询未知 ip 的主机名,可以观察到明显的卡顿。同时利用它,也可以方便地验证解决办法:将 ip 和主机名的对应关系添加到本机的 /etc/hosts 文件中之后,卡顿消失,程序会立即返回hosts文件中指定的主机名。

因此,配置所有计算节点机器的 /etc/hosts 文件可解决此问题。