【OutOfMemory专题】3. Direct Memory泄漏如何排查

上两篇文章中分别介绍了Spark堆内和堆外内存溢出的排查案例,这次我们来看一个Flink任务的真实案例。

有一个线上任务,代码逻辑相对简单,从kafka读取数据,进行异步查询后落地数据到hdfs:

此任务启动后1-2天就会出现内存超出Yarn container限制而被终止的异常:

2024-05-06 00:16:36,280 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker container_e10_1634010428940_125776_01_000003 is terminated. Diagnostics: Container [pid=122806,containerID=container_e10_1634010428940_125776_01_000003] is running beyond physical memory limits. Current usage: 4.0 GB of 4 GB physical memory used; 6.2 GB of 8.4 GB virtual memory used. Killing container.
Dump of the process-tree for container_e10_1634010428940_125776_01_000003 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 122920 122806 122806 122806 (java) 1926809 110163 6567804928 1048262 /usr/local/jdk8/bin/java -Xmx2383706830 -Xms2383706830 -XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456

异常日志中已经贴心的帮我们打印了当前的进程树。从日志可以看出,和上一篇Spark内存超限的原因不同,这里的确是任务主进程的RSS内存发生了超限。

很自然的想到用strace来分析堆外内存分配情况,可惜线上环境的内核版本太低(2.6.x),不支持高版本strace,低版本的strace又不支持打印栈信息,这里我们就请出另一个Java debug利器:async profiler,通过async profiler我们可以进行cpu、堆内存分配等常用的debug分析,还可以生成火焰图,对于程序的瓶颈分析、内存泄漏分析等场景十分有用。

在这个案例中,堆外内存的使用量一直在缓慢增长,基本可以确认是堆外内存产生了泄漏,async profiler没有直接提供对于堆外内存分配的追踪方式,但是由于Flink任务的堆内存大小固定,我们可以利用这一特性,通过page-faults事件来间接感知到代码中对于堆外内存的申请:

等待任务启动一段时间后(等待各个内存池内存初始化后),执行以下命令:

./async-profiler-3.0-linux-x64/bin/asprof -d 30 -e page-faults 141817

得到以下分析结果:

--- Execution profile ---
Total samples       : 317585
GC_active           : 6 (0.00%)
unknown_Java        : 2 (0.00%)

--- 73948 total (23.28%), 73948 samples
  [ 0] __memmove_ssse3_back
  [ 1] inflate
  [ 2] Java_java_util_zip_Inflater_inflateBytes
  [ 3] java.util.zip.Inflater.inflateBytes
  [ 4] java.util.zip.Inflater.inflate
  [ 5] java.util.zip.InflaterInputStream.read
  [ 6] java.util.zip.GZIPInputStream.read
  [ 7] java.io.FilterInputStream.read
  [ 8] com.my.GZIPUtils.uncompress
  [ 9] com.my.PushItemSinkMain$AsyncFeatureQuery.lambda$asyncInvoke$0
  [10] com.my.PushItemSinkMain$AsyncFeatureQuery$$Lambda$457/156993146.accept
  [11] java.util.concurrent.CompletableFuture.uniAccept
  [12] java.util.concurrent.CompletableFuture$UniAccept.tryFire
  [13] java.util.concurrent.CompletableFuture.postComplete
  [14] java.util.concurrent.CompletableFuture$AsyncSupply.run
  [15] java.util.concurrent.ThreadPoolExecutor.runWorker
  [16] java.util.concurrent.ThreadPoolExecutor$Worker.run
  [17] java.lang.Thread.run

--- 64119 total (20.19%), 64119 samples
  [ 0] __memmove_ssse3_back
  [ 1] inflate
  [ 2] Java_java_util_zip_Inflater_inflateBytes
  [ 3] java.util.zip.Inflater.inflateBytes
  [ 4] java.util.zip.Inflater.inflate
  [ 5] java.util.zip.InflaterInputStream.read
  [ 6] java.util.zip.GZIPInputStream.read
  [ 7] java.io.FilterInputStream.read
  [ 8] com.my.GZIPUtils.uncompress
  [ 9] com.my.PushItemSinkMain$AsyncFeatureQuery.lambda$asyncInvoke$0
  [10] com.my.PushItemSinkMain$AsyncFeatureQuery$$Lambda$457/156993146.accept
  [11] java.util.concurrent.CompletableFuture.uniAccept
  [12] java.util.concurrent.CompletableFuture$UniAccept.tryFire
  [13] java.util.concurrent.CompletableFuture.postComplete
  [14] java.util.concurrent.CompletableFuture$AsyncSupply.run
  [15] java.util.concurrent.ThreadPoolExecutor.runWorker
  [16] java.util.concurrent.ThreadPoolExecutor$Worker.run
  [17] java.lang.Thread.run

--- 48081 total (15.14%), 48081 samples
  [ 0] inflateResetKeep
  [ 1] inflateInit2_
  [ 2] Java_java_util_zip_Inflater_init
  [ 3] java.util.zip.Inflater.init
  [ 4] java.util.zip.Inflater.<init>
  [ 5] java.util.zip.GZIPInputStream.<init>
  [ 6] java.util.zip.GZIPInputStream.<init>
  [ 7] com.my.GZIPUtils.uncompress
  [ 8] com.my.PushItemSinkMain$AsyncFeatureQuery.lambda$asyncInvoke$0
  [ 9] com.my.PushItemSinkMain$AsyncFeatureQuery$$Lambda$457/156993146.accept
  [10] java.util.concurrent.CompletableFuture.uniAccept
  [11] java.util.concurrent.CompletableFuture$UniAccept.tryFire
  [12] java.util.concurrent.CompletableFuture.postComplete
  [13] java.util.concurrent.CompletableFuture$AsyncSupply.run
  [14] java.util.concurrent.ThreadPoolExecutor.runWorker
  [15] java.util.concurrent.ThreadPoolExecutor$Worker.run
  [16] java.lang.Thread.run

可以看到,async-profiler支持解析java栈,能清晰定位到root cause。

从结果可以分析出:产生的缺页异常大部分都来自com.my.GZIPUtils这个类,分析代码发现这是一个很基础的错误:

    public static byte[] uncompress(byte[] bytes) {
        if (bytes == null || bytes.length == 0) {
            return null;
        }
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        ByteArrayInputStream in = new ByteArrayInputStream(bytes);
        try {
            GZIPInputStream ungzip = new GZIPInputStream(in);   //这里的ungzip对象没有及时close掉
            byte[] buffer = new byte[256];
            int n;
            while ((n = ungzip.read(buffer)) >= 0) {
                out.write(buffer, 0, n);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return out.toByteArray();
    }

由于ungzip对象没有及时close,导致Inflater调用的native代码申请的内存没有释放,最终导致了内存溢出。虽然Inflater在finalize方法定义了堆内对象被GC回收时释放堆外内存,但由于finalize的及时性是没有保证的(通过heap dump也可以看到堆内有大量Inflater对象在等待finalize执行),所以最好还是手动关闭GZIPInputStream。

修改代码后进程RSS内存不再上涨:

看看其他朋友如何评价的:

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注