记一次线上Flink任务吞吐下降问题的排查

1. 起因

  线上有个Flink ETL任务突发Kafka消费延迟告警,之前压测时150并行度就可以满足需求,线上并行度设置为200,但任务延迟还是不断增加。

查看Kafka监控发现有流量突增,与运维同事沟通后确认是由于有一台Broker节点宕机导致,于是修改任务并行度至400加速消费数据,但消费延迟还是没有下降,吞吐只比200并行度时提高了20%且波动很大。

2. 算子性能瓶颈? GC问题?

  此时开始怀疑是任务本身出现了性能瓶颈,查看Flink Dashboard发现部分节点的消费数量非常低,初步怀疑是算子内部计算逻辑的RPC调用耗时增加导致的,经过Arthas线上统计发现:正常节点的耗时在0.6ms左右,而这部分节点的耗时在1.2~20ms之间。

  满心欢喜,以为已经定位到问题了,但是查看RPC服务端耗时后确认服务端一切正常。。。

  于是开始排查异常节点自身问题,查看Dashboard发现这类节点的GC时长明显高于正常节点,打开GC日志打印后却发现回收内存大小并没有太大差异,通过查看Arthas Dashboard发现这些节点的负载非常高,这就是导致单个节点吞吐下降的真正原因。

3. 为啥会导致整个Job吞吐下降?

  由于Yarn集群中的节点使用CGroup隔离cpu资源,通过内存检测来隔离内存资源,对于io并没有做隔离,再加上每个节点上运行的任务是不同的,很容易出现负载不均的情况,但是为啥单个节点的吞吐下降会导致整个Job的吞吐下降呢?

  之前一直以为Flink上游到下游算子Task之间的数据传递是互不干扰的,某一个下游Task阻塞并不会影响其他下游Task,查看源码后发现这个理解是错误的:

写入下游Task数据时会通过ChannelSelector选择一个channel进行写入

public final class ChannelSelectorRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
    //......
    @Override
    public void emit(T record) throws IOException, InterruptedException {
        emit(record, channelSelector.selectChannel(record));
    }
}

写入的过程需要获取对应channel的buffer,虽然不同output channel之间的共享buffer是有限制的,但任何一个channe的buffer不足就会导致阻塞

public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
    BufferBuilder builder = targetPartition.tryGetBufferBuilder(targetChannel);
    if (builder == null) {
        long start = System.currentTimeMillis();
        builder = targetPartition.getBufferBuilder(targetChannel);
        idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start);
    }
    return builder;
}

  也就是当集群中有节点性能很差时,上游算子写入数据时会被阻塞,整体吞吐会下降,随着积压数据的消费,写入数据不再被阻塞,此时整体吞吐又会上升,然后重复上述过程。

4. 如何缓解?

  这里提供一个临时方案:既然是因为慢节点拖累整个集群,那我们只要自定义一个分区方案,按照不同节点的数据处理能力来对数据进行分区:

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter;
import org.apache.flink.runtime.io.network.api.writer.MultipleRecordWriters;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.io.network.api.writer.SingleRecordWriter;
import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.taskmanager.RuntimeEnvironment;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DynamicLoadBalancing {
    private static final Logger LOGGER = LoggerFactory.getLogger(DynamicLoadBalancing.class);

    public DynamicLoadBalancingV3() {
    }

    public static <R> DataStream<R> rebalance(DataStream<R> dataStream) throws NoSuchFieldException, IllegalAccessException {
        String name = "loadbalancer_" + UUID.randomUUID();
        return dataStream.flatMap(new RichFlatMapFunction<R, R>() {
            public void open(Configuration parameters) throws Exception {
                StreamingRuntimeContext context = (StreamingRuntimeContext)this.getRuntimeContext();
                Field envField = StreamingRuntimeContext.class.getDeclaredField("taskEnvironment");
                envField.setAccessible(true);
                RuntimeEnvironment taskEnvironment = (RuntimeEnvironment)envField.get(context);
                Field taskField = RuntimeEnvironment.class.getDeclaredField("containingTask");
                taskField.setAccessible(true);
                Task containingTask = (Task)taskField.get(taskEnvironment);
                Field invokableField = Task.class.getDeclaredField("invokable");
                invokableField.setAccessible(true);
                StreamTask<?, ?> streamTask = (StreamTask)invokableField.get(containingTask);
                Field recordWriterField = StreamTask.class.getDeclaredField("recordWriter");
                recordWriterField.setAccessible(true);
                RecordWriterDelegate<SerializationDelegate<StreamRecord<?>>> recordWriter = (RecordWriterDelegate)recordWriterField.get(streamTask);
                List<RecordWriter> writerList = new ArrayList();
                if (recordWriter instanceof SingleRecordWriter) {
                    SingleRecordWriter singleRecordWriter = (SingleRecordWriter)recordWriter;
                    writerList.add(singleRecordWriter.getRecordWriter(0));
                } else {
                    if (!(recordWriter instanceof MultipleRecordWriters)) {
                        throw new RuntimeException();
                    }

                    MultipleRecordWriters multipleRecordWriters = (MultipleRecordWriters)recordWriter;
                    Field writersField = MultipleRecordWriters.class.getDeclaredField("recordWriters");
                    writersField.setAccessible(true);
                    writerList.addAll((List)writersField.get(multipleRecordWriters));
                }

                Iterator var23 = writerList.iterator();

                while(var23.hasNext()) {
                    RecordWriter writer = (RecordWriter)var23.next();
                    ChannelSelectorRecordWriter channelSelectorRecordWriter = (ChannelSelectorRecordWriter)writer;
                    Field selectorField = ChannelSelectorRecordWriter.class.getDeclaredField("channelSelector");
                    selectorField.setAccessible(true);
                    CustomPartitionerWrapper channelSelector = (CustomPartitionerWrapper)selectorField.get(channelSelectorRecordWriter);
                    Field partitionerField = CustomPartitionerWrapper.class.getDeclaredField("partitioner");
                    partitionerField.setAccessible(true);
                    LoadBasedPartitioner partitioner = (LoadBasedPartitioner)partitionerField.get(channelSelector);
                    Field targetPartition = RecordWriter.class.getDeclaredField("targetPartition");
                    targetPartition.setAccessible(true);
                    PipelinedResultPartition resultPartitionWriter = (PipelinedResultPartition)targetPartition.get(channelSelectorRecordWriter);
                    Field subPartitionsField = BufferWritingResultPartition.class.getDeclaredField("subpartitions");
                    subPartitionsField.setAccessible(true);
                    partitioner.setSubWriters((ResultSubpartition[])((ResultSubpartition[])subPartitionsField.get(resultPartitionWriter)));
                }

            }

            public void flatMap(R value, Collector<R> out) throws Exception {
                out.collect(value);
            }
        }).name(name).partitionCustom(new LoadBasedPartitioner(), (value) -> {
            return value;
        });
    }

    static class LoadBasedPartitioner<R> implements Partitioner<R> {
        private volatile ResultSubpartition[] subWritersArr;
        private int index = 0;

        LoadBasedPartitioner() {
        }

        public int partition(R key, int numPartitions) {
            ResultSubpartition[] currentSubWritersArr = this.subWritersArr;
            if (currentSubWritersArr.length == numPartitions && numPartitions != 0) {
                Integer minBufferInBacklog = null;
                Integer minBufferInBacklogIndex = null;
                this.index = (this.index + 1) % numPartitions;

                for(int i = 0; i < numPartitions; ++i) {
                    ResultSubpartition partition = currentSubWritersArr[(this.index + i) % numPartitions];
                    int currentBacklog = ((PipelinedSubpartition)partition).getBuffersInBacklog();
                    if (currentBacklog == 0) {
                        return partition.getSubPartitionIndex();
                    }

                    if (minBufferInBacklog == null || currentBacklog < minBufferInBacklog) {
                        minBufferInBacklog = currentBacklog;
                        minBufferInBacklogIndex = partition.getSubPartitionIndex();
                    }
                }

                return minBufferInBacklogIndex;
            } else {
                throw new RuntimeException();
            }
        }

        public void setSubWriters(ResultSubpartition[] subWriters) {
            this.subWritersArr = subWriters;
        }
    }
}

5. 临时方案有什么问题?

对于无状态算子使用这种方案进行负载均衡是可行的,但是在有状态的任务中改变分区也意味着必须要迁移对应的状态,这个改动还是较为复杂的,即使是flink社区提出的弹性扩缩容方案也都涉及到任务的重启,这对于任务的稳定性影响较大,如果有好的解决方案再来探讨这个话题。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注