上两篇文章中分别介绍了Spark堆内和堆外内存溢出的排查案例,这次我们来看一个Flink任务的真实案例。
有一个线上任务,代码逻辑相对简单,从kafka读取数据,进行异步查询后落地数据到hdfs:
此任务启动后1-2天就会出现内存超出Yarn container限制而被终止的异常:
2024-05-06 00:16:36,280 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker container_e10_1634010428940_125776_01_000003 is terminated. Diagnostics: Container [pid=122806,containerID=container_e10_1634010428940_125776_01_000003] is running beyond physical memory limits. Current usage: 4.0 GB of 4 GB physical memory used; 6.2 GB of 8.4 GB virtual memory used. Killing container.
Dump of the process-tree for container_e10_1634010428940_125776_01_000003 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 122920 122806 122806 122806 (java) 1926809 110163 6567804928 1048262 /usr/local/jdk8/bin/java -Xmx2383706830 -Xms2383706830 -XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456
异常日志中已经贴心的帮我们打印了当前的进程树。从日志可以看出,和上一篇Spark内存超限的原因不同,这里的确是任务主进程的RSS内存发生了超限。
很自然的想到用strace来分析堆外内存分配情况,可惜线上环境的内核版本太低(2.6.x),不支持高版本strace,低版本的strace又不支持打印栈信息,这里我们就请出另一个Java debug利器:async profiler,通过async profiler我们可以进行cpu、堆内存分配等常用的debug分析,还可以生成火焰图,对于程序的瓶颈分析、内存泄漏分析等场景十分有用。
在这个案例中,堆外内存的使用量一直在缓慢增长,基本可以确认是堆外内存产生了泄漏,async profiler没有直接提供对于堆外内存分配的追踪方式,但是由于Flink任务的堆内存大小固定,我们可以利用这一特性,通过page-faults事件来间接感知到代码中对于堆外内存的申请:
等待任务启动一段时间后(等待各个内存池内存初始化后),执行以下命令:
./async-profiler-3.0-linux-x64/bin/asprof -d 30 -e page-faults 141817
得到以下分析结果:
--- Execution profile ---
Total samples : 317585
GC_active : 6 (0.00%)
unknown_Java : 2 (0.00%)
--- 73948 total (23.28%), 73948 samples
[ 0] __memmove_ssse3_back
[ 1] inflate
[ 2] Java_java_util_zip_Inflater_inflateBytes
[ 3] java.util.zip.Inflater.inflateBytes
[ 4] java.util.zip.Inflater.inflate
[ 5] java.util.zip.InflaterInputStream.read
[ 6] java.util.zip.GZIPInputStream.read
[ 7] java.io.FilterInputStream.read
[ 8] com.my.GZIPUtils.uncompress
[ 9] com.my.PushItemSinkMain$AsyncFeatureQuery.lambda$asyncInvoke$0
[10] com.my.PushItemSinkMain$AsyncFeatureQuery$$Lambda$457/156993146.accept
[11] java.util.concurrent.CompletableFuture.uniAccept
[12] java.util.concurrent.CompletableFuture$UniAccept.tryFire
[13] java.util.concurrent.CompletableFuture.postComplete
[14] java.util.concurrent.CompletableFuture$AsyncSupply.run
[15] java.util.concurrent.ThreadPoolExecutor.runWorker
[16] java.util.concurrent.ThreadPoolExecutor$Worker.run
[17] java.lang.Thread.run
--- 64119 total (20.19%), 64119 samples
[ 0] __memmove_ssse3_back
[ 1] inflate
[ 2] Java_java_util_zip_Inflater_inflateBytes
[ 3] java.util.zip.Inflater.inflateBytes
[ 4] java.util.zip.Inflater.inflate
[ 5] java.util.zip.InflaterInputStream.read
[ 6] java.util.zip.GZIPInputStream.read
[ 7] java.io.FilterInputStream.read
[ 8] com.my.GZIPUtils.uncompress
[ 9] com.my.PushItemSinkMain$AsyncFeatureQuery.lambda$asyncInvoke$0
[10] com.my.PushItemSinkMain$AsyncFeatureQuery$$Lambda$457/156993146.accept
[11] java.util.concurrent.CompletableFuture.uniAccept
[12] java.util.concurrent.CompletableFuture$UniAccept.tryFire
[13] java.util.concurrent.CompletableFuture.postComplete
[14] java.util.concurrent.CompletableFuture$AsyncSupply.run
[15] java.util.concurrent.ThreadPoolExecutor.runWorker
[16] java.util.concurrent.ThreadPoolExecutor$Worker.run
[17] java.lang.Thread.run
--- 48081 total (15.14%), 48081 samples
[ 0] inflateResetKeep
[ 1] inflateInit2_
[ 2] Java_java_util_zip_Inflater_init
[ 3] java.util.zip.Inflater.init
[ 4] java.util.zip.Inflater.<init>
[ 5] java.util.zip.GZIPInputStream.<init>
[ 6] java.util.zip.GZIPInputStream.<init>
[ 7] com.my.GZIPUtils.uncompress
[ 8] com.my.PushItemSinkMain$AsyncFeatureQuery.lambda$asyncInvoke$0
[ 9] com.my.PushItemSinkMain$AsyncFeatureQuery$$Lambda$457/156993146.accept
[10] java.util.concurrent.CompletableFuture.uniAccept
[11] java.util.concurrent.CompletableFuture$UniAccept.tryFire
[12] java.util.concurrent.CompletableFuture.postComplete
[13] java.util.concurrent.CompletableFuture$AsyncSupply.run
[14] java.util.concurrent.ThreadPoolExecutor.runWorker
[15] java.util.concurrent.ThreadPoolExecutor$Worker.run
[16] java.lang.Thread.run
可以看到,async-profiler支持解析java栈,能清晰定位到root cause。
从结果可以分析出:产生的缺页异常大部分都来自com.my.GZIPUtils
这个类,分析代码发现这是一个很基础的错误:
public static byte[] uncompress(byte[] bytes) {
if (bytes == null || bytes.length == 0) {
return null;
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
try {
GZIPInputStream ungzip = new GZIPInputStream(in); //这里的ungzip对象没有及时close掉
byte[] buffer = new byte[256];
int n;
while ((n = ungzip.read(buffer)) >= 0) {
out.write(buffer, 0, n);
}
} catch (Exception e) {
e.printStackTrace();
}
return out.toByteArray();
}
由于ungzip对象没有及时close,导致Inflater调用的native代码申请的内存没有释放,最终导致了内存溢出。虽然Inflater在finalize方法定义了堆内对象被GC回收时释放堆外内存,但由于finalize的及时性是没有保证的(通过heap dump也可以看到堆内有大量Inflater对象在等待finalize执行),所以最好还是手动关闭GZIPInputStream。
赞一个