1. 起因
线上有个Flink ETL任务突发Kafka消费延迟告警,之前压测时150并行度就可以满足需求,线上并行度设置为200,但任务延迟还是不断增加。
查看Kafka监控发现有流量突增,与运维同事沟通后确认是由于有一台Broker节点宕机导致,于是修改任务并行度至400加速消费数据,但消费延迟还是没有下降,吞吐只比200并行度时提高了20%且波动很大。
2. 算子性能瓶颈? GC问题?
此时开始怀疑是任务本身出现了性能瓶颈,查看Flink Dashboard发现部分节点的消费数量非常低,初步怀疑是算子内部计算逻辑的RPC调用耗时增加导致的,经过Arthas线上统计发现:正常节点的耗时在0.6ms左右,而这部分节点的耗时在1.2~20ms之间。
满心欢喜,以为已经定位到问题了,但是查看RPC服务端耗时后确认服务端一切正常。。。
于是开始排查异常节点自身问题,查看Dashboard发现这类节点的GC时长明显高于正常节点,打开GC日志打印后却发现回收内存大小并没有太大差异,通过查看Arthas Dashboard发现这些节点的负载非常高,这就是导致单个节点吞吐下降的真正原因。
3. 为啥会导致整个Job吞吐下降?
由于Yarn集群中的节点使用CGroup隔离cpu资源,通过内存检测来隔离内存资源,对于io并没有做隔离,再加上每个节点上运行的任务是不同的,很容易出现负载不均的情况,但是为啥单个节点的吞吐下降会导致整个Job的吞吐下降呢?
之前一直以为Flink上游到下游算子Task之间的数据传递是互不干扰的,某一个下游Task阻塞并不会影响其他下游Task,查看源码后发现这个理解是错误的:
写入下游Task数据时会通过ChannelSelector选择一个channel进行写入
public final class ChannelSelectorRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
//......
@Override
public void emit(T record) throws IOException, InterruptedException {
emit(record, channelSelector.selectChannel(record));
}
}
写入的过程需要获取对应channel的buffer,虽然不同output channel之间的共享buffer是有限制的,但任何一个channe的buffer不足就会导致阻塞
public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
BufferBuilder builder = targetPartition.tryGetBufferBuilder(targetChannel);
if (builder == null) {
long start = System.currentTimeMillis();
builder = targetPartition.getBufferBuilder(targetChannel);
idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start);
}
return builder;
}
也就是当集群中有节点性能很差时,上游算子写入数据时会被阻塞,整体吞吐会下降,随着积压数据的消费,写入数据不再被阻塞,此时整体吞吐又会上升,然后重复上述过程。
4. 如何缓解?
这里提供一个临时方案:既然是因为慢节点拖累整个集群,那我们只要自定义一个分区方案,按照不同节点的数据处理能力来对数据进行分区:
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter;
import org.apache.flink.runtime.io.network.api.writer.MultipleRecordWriters;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.io.network.api.writer.SingleRecordWriter;
import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.taskmanager.RuntimeEnvironment;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DynamicLoadBalancing {
private static final Logger LOGGER = LoggerFactory.getLogger(DynamicLoadBalancing.class);
public DynamicLoadBalancingV3() {
}
public static <R> DataStream<R> rebalance(DataStream<R> dataStream) throws NoSuchFieldException, IllegalAccessException {
String name = "loadbalancer_" + UUID.randomUUID();
return dataStream.flatMap(new RichFlatMapFunction<R, R>() {
public void open(Configuration parameters) throws Exception {
StreamingRuntimeContext context = (StreamingRuntimeContext)this.getRuntimeContext();
Field envField = StreamingRuntimeContext.class.getDeclaredField("taskEnvironment");
envField.setAccessible(true);
RuntimeEnvironment taskEnvironment = (RuntimeEnvironment)envField.get(context);
Field taskField = RuntimeEnvironment.class.getDeclaredField("containingTask");
taskField.setAccessible(true);
Task containingTask = (Task)taskField.get(taskEnvironment);
Field invokableField = Task.class.getDeclaredField("invokable");
invokableField.setAccessible(true);
StreamTask<?, ?> streamTask = (StreamTask)invokableField.get(containingTask);
Field recordWriterField = StreamTask.class.getDeclaredField("recordWriter");
recordWriterField.setAccessible(true);
RecordWriterDelegate<SerializationDelegate<StreamRecord<?>>> recordWriter = (RecordWriterDelegate)recordWriterField.get(streamTask);
List<RecordWriter> writerList = new ArrayList();
if (recordWriter instanceof SingleRecordWriter) {
SingleRecordWriter singleRecordWriter = (SingleRecordWriter)recordWriter;
writerList.add(singleRecordWriter.getRecordWriter(0));
} else {
if (!(recordWriter instanceof MultipleRecordWriters)) {
throw new RuntimeException();
}
MultipleRecordWriters multipleRecordWriters = (MultipleRecordWriters)recordWriter;
Field writersField = MultipleRecordWriters.class.getDeclaredField("recordWriters");
writersField.setAccessible(true);
writerList.addAll((List)writersField.get(multipleRecordWriters));
}
Iterator var23 = writerList.iterator();
while(var23.hasNext()) {
RecordWriter writer = (RecordWriter)var23.next();
ChannelSelectorRecordWriter channelSelectorRecordWriter = (ChannelSelectorRecordWriter)writer;
Field selectorField = ChannelSelectorRecordWriter.class.getDeclaredField("channelSelector");
selectorField.setAccessible(true);
CustomPartitionerWrapper channelSelector = (CustomPartitionerWrapper)selectorField.get(channelSelectorRecordWriter);
Field partitionerField = CustomPartitionerWrapper.class.getDeclaredField("partitioner");
partitionerField.setAccessible(true);
LoadBasedPartitioner partitioner = (LoadBasedPartitioner)partitionerField.get(channelSelector);
Field targetPartition = RecordWriter.class.getDeclaredField("targetPartition");
targetPartition.setAccessible(true);
PipelinedResultPartition resultPartitionWriter = (PipelinedResultPartition)targetPartition.get(channelSelectorRecordWriter);
Field subPartitionsField = BufferWritingResultPartition.class.getDeclaredField("subpartitions");
subPartitionsField.setAccessible(true);
partitioner.setSubWriters((ResultSubpartition[])((ResultSubpartition[])subPartitionsField.get(resultPartitionWriter)));
}
}
public void flatMap(R value, Collector<R> out) throws Exception {
out.collect(value);
}
}).name(name).partitionCustom(new LoadBasedPartitioner(), (value) -> {
return value;
});
}
static class LoadBasedPartitioner<R> implements Partitioner<R> {
private volatile ResultSubpartition[] subWritersArr;
private int index = 0;
LoadBasedPartitioner() {
}
public int partition(R key, int numPartitions) {
ResultSubpartition[] currentSubWritersArr = this.subWritersArr;
if (currentSubWritersArr.length == numPartitions && numPartitions != 0) {
Integer minBufferInBacklog = null;
Integer minBufferInBacklogIndex = null;
this.index = (this.index + 1) % numPartitions;
for(int i = 0; i < numPartitions; ++i) {
ResultSubpartition partition = currentSubWritersArr[(this.index + i) % numPartitions];
int currentBacklog = ((PipelinedSubpartition)partition).getBuffersInBacklog();
if (currentBacklog == 0) {
return partition.getSubPartitionIndex();
}
if (minBufferInBacklog == null || currentBacklog < minBufferInBacklog) {
minBufferInBacklog = currentBacklog;
minBufferInBacklogIndex = partition.getSubPartitionIndex();
}
}
return minBufferInBacklogIndex;
} else {
throw new RuntimeException();
}
}
public void setSubWriters(ResultSubpartition[] subWriters) {
this.subWritersArr = subWriters;
}
}
}
5. 临时方案有什么问题?
对于无状态算子使用这种方案进行负载均衡是可行的,但是在有状态的任务中改变分区也意味着必须要迁移对应的状态,这个改动还是较为复杂的,即使是flink社区提出的弹性扩缩容方案也都涉及到任务的重启,这对于任务的稳定性影响较大,如果有好的解决方案再来探讨这个话题。